00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045
00046 #include <set>
00047 #include <sstream>
00048
00049 #include "OSGConfig.h"
00050 #include "OSGLog.h"
00051 #include "OSGBaseThread.h"
00052 #include "OSGSocketSelection.h"
00053 #include "OSGBinaryMessage.h"
00054 #include "OSGGroupMCastConnection.h"
00055 #include "OSGConnectionType.h"
00056
00057 #define USE_EARLY_SEND
00058
00059 OSG_USING_NAMESPACE
00060
00065
00066
00067
00071 GroupMCastConnection::GroupMCastConnection():
00072 Inherited(),
00073 _sendQueueThread(NULL),
00074 _seqNumber(0),
00075 _initialized(false)
00076 {
00077 char lockName[256];
00078 sprintf(lockName,"GroupMCastConnection%p",this);
00079
00080
00081 _lock = Lock::get(lockName);
00082
00083 for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI)
00084 _free.put(new Dgram());
00085
00086 _mcastSocket.open();
00087 _mcastSocket.bind();
00088 _mcastSocket.setReadBufferSize(262144);
00089
00090 _windowSize = _mcastSocket.getReadBufferSize()/(OSG_DGRAM_LEN) - 1;
00091 _windowSize = osgMin((UInt32)13,_windowSize);
00092 }
00093
00096 GroupMCastConnection::~GroupMCastConnection(void)
00097 {
00098 Dgram *dgram;
00099 _sendQueueThreadStop = true;
00100
00101
00102 _lock->aquire();
00103 dgram = _free.get(_lock);
00104 dgram->setSize(0);
00105 _queue.put(dgram);
00106 _lock->release();
00107
00108 if(_sendQueueThread)
00109 BaseThread::join(_sendQueueThread);
00110
00111 _mcastSocket.close();
00112
00113 _lock->aquire();
00114 while(!_free.empty())
00115 delete _free.get(_lock);
00116 while(!_queue.empty())
00117 delete _queue.get(_lock);
00118 _lock->release();
00119 }
00120
00123 const ConnectionType *GroupMCastConnection::getType()
00124 {
00125 return &_type;
00126 }
00127
00128
00129
00130
00134 GroupConnection::Channel GroupMCastConnection::connectPoint(
00135 const std::string &address,
00136 Time timeout)
00137 {
00138 Channel channel = Inherited::connectPoint(address,timeout);
00139 return channel;
00140 }
00141
00144 void GroupMCastConnection::disconnect(Channel channel)
00145 {
00146 Inherited::disconnect(channel);
00147 _lock->aquire();
00148 _destination.erase(_destination.begin()+channelToIndex(channel));
00149 _lock->release();
00150 }
00151
00155 GroupConnection::Channel GroupMCastConnection::acceptPoint(Time timeout)
00156 {
00157 Connection::Channel channel = Inherited::acceptPoint(timeout);
00158 return channel;
00159 }
00160
00163 void GroupMCastConnection::setParams(const std::string ¶ms)
00164 {
00165 if(params.empty())
00166 return;
00167
00168 Inherited::setParams(params);
00169
00170 std::string option = "TTL=";
00171 std::string::size_type i = 0;
00172 if((i=params.find(option)) != std::string::npos)
00173 {
00174 std::string str = params.substr(i + option.size());
00175
00176 std::stringstream ss;
00177 std::string::size_type j = 0;
00178 while(j < str.length() && str[j] != ',' && isdigit(str[j]))
00179 {
00180 ss << str[j++];
00181 }
00182 UInt32 ttl;
00183 ss >> ttl;
00184 if(ttl > 255)
00185 ttl = 255;
00186 _mcastSocket.setTTL((unsigned char) ttl);
00187 FINFO(("GroupMCastConnection::setParams : setting ttl to %u.\n", ttl));
00188 }
00189 }
00190
00191
00192
00193
00196 bool GroupMCastConnection::wait(Time timeout) throw (ReadError)
00197 {
00198
00199 return Inherited::wait(timeout);
00200 }
00201
00204 void GroupMCastConnection::signal(void) throw (WriteError)
00205 {
00206 UInt32 tag=314156;
00207 putValue(tag);
00208 flush();
00209 }
00210
00211
00212
00216 GroupConnection *GroupMCastConnection::create(void)
00217 {
00218 return new GroupMCastConnection();
00219 }
00220
00221
00222
00223
00231 void GroupMCastConnection::write(MemoryHandle mem,UInt32 size)
00232 {
00233 Dgram *dgram = NULL;;
00234 UInt32 pos = 0;
00235 char *buffer = (char*)mem;
00236
00237 if(!_initialized)
00238 initialize();
00239
00240 while(size)
00241 {
00242
00243 _lock->aquire();
00244 dgram = _free.get(_lock);
00245 _lock->release();
00246
00247 dgram->setSize(osgMin(size,dgram->getCapacity()));
00248 memcpy(dgram->getData(),buffer+pos,dgram->getSize());
00249
00250 dgram->setId( _seqNumber++ );
00251
00252 size -= dgram->getSize();
00253 pos += dgram->getSize();
00254
00255 _lock->aquire();
00256 if(!_sendQueueThreadRunning)
00257 {
00258 _lock->release();
00259 BaseThread::join(_sendQueueThread);
00260 throw WriteError("Channel closed");
00261 }
00262 #ifdef USE_EARLY_SEND
00263 if(_queue.waiting())
00264 {
00265 dgram->setEarlySend(true);
00266 _mcastSocket.sendTo(dgram->getBuffer(),
00267 dgram->getBufferSize(),
00268 _mcastAddress);
00269 }
00270 #endif
00271 _queue.put(dgram);
00272 _lock->release();
00273 }
00274 }
00275
00281 void GroupMCastConnection::writeBuffer(void)
00282 {
00283 UInt32 size = writeBufBegin()->getDataSize();
00284 MemoryHandle buffer = writeBufBegin()->getMem();
00285
00286 write(buffer,size);
00287 }
00288
00289
00290
00291
00294 bool GroupMCastConnection::checkChannels(void)
00295 {
00296 SocketSelection selection;
00297 UInt32 index;
00298 bool valid=true;
00299
00300 _lock->aquire();
00301 for(index = 0 ; index < _sockets.size() ; ++index)
00302 selection.setRead(_sockets[index]);
00303 if(selection.select(0))
00304 {
00305 UInt32 len;
00306 char buffer;
00307 for(index = 0 ; index < _sockets.size() ; ++index)
00308 {
00309 if(selection.isSetRead(_sockets[index]))
00310 {
00311 try
00312 {
00313 _sockets[index].send(&buffer,1);
00314 }
00315 catch(SocketException &e)
00316 {
00317 valid = false;
00318 FWARNING(("Socket error:%s\n",e.what()))
00319 break;
00320 }
00321 }
00322 }
00323 }
00324 _lock->release();
00325 return valid;
00326 }
00327
00330 bool GroupMCastConnection::sendQueue(void)
00331 {
00332 std::vector<Dgram*> dgram;
00333 std::vector<std::set<SocketAddress> > missing;
00334 UInt32 count = 0;
00335 UInt32 maxCount = _windowSize-1;
00336 UInt32 ack = 0;
00337 UInt32 end = 0;
00338 UInt32 send = 0;
00339 UInt32 channel;
00340 UInt32 index;
00341 Dgram response;
00342 UInt32 m;
00343 bool readable = false;
00344 Time waitStart = getSystemTime();
00345 Time lastAckTime=0;
00346 UInt32 len;
00347 SocketAddress fromAddress;
00348 Dgram ackRequest;
00349 const Time ackTimeout = 0.01;
00350 UInt16 sendId=0;
00351 UInt16 lastNak=0;
00352 Time lastNakTime=0;
00353 bool stopAfterSend=false;
00354
00355
00356 dgram.resize(_windowSize);
00357 missing.resize(_windowSize);
00358
00359 #ifdef TEST_LOST_DGRAM_RATE
00360 srand48((int)getSystemTime());
00361 #endif
00362 do
00363 {
00364
00365 if(count < maxCount)
00366 {
00367 _lock->aquire();
00368 while(count < maxCount &&
00369 (count == 0 || !_queue.empty()))
00370 {
00371 dgram[end] = _queue.get(_lock);
00372
00373 if(!dgram[end]->getSize())
00374 {
00375 if(count)
00376 {
00377
00378 stopAfterSend = true;
00379 break;
00380 }
00381 else
00382 {
00383
00384 _lock->release();
00385 return true;
00386 }
00387 }
00388
00389 for(index=0 ; index<_waitFor.size() ; ++index)
00390 missing[end].insert(_waitFor[index]);
00391 end = (end+1) % _windowSize;
00392 if(!count)
00393 lastAckTime = getSystemTime();
00394 count++;
00395 }
00396 _lock->release();
00397 }
00398
00399
00400 for( ; send != end ; send = (send+1) % _windowSize)
00401 {
00402 #ifdef TEST_LOST_DGRAM_RATE
00403 if(drand48()>TEST_LOST_DGRAM_RATE)
00404 #endif
00405 {
00406 if(!dgram[send]->getEarlySend())
00407 _mcastSocket.sendTo(dgram[send]->getBuffer(),
00408 dgram[send]->getBufferSize(),
00409 _mcastAddress);
00410 dgram[send]->setEarlySend(false);
00411 }
00412 sendId = dgram[send]->getId();
00413
00414 }
00415
00416
00417
00418
00419
00420 while(count &&
00421 ( ( readable = _mcastSocket.waitReadable(0) ) ||
00422 ( count == maxCount && send == end ) ||
00423 ( getSystemTime() - lastAckTime > ackTimeout) ) )
00424 {
00425 if(!readable && !_mcastSocket.waitReadable(ackTimeout))
00426 {
00427 #if 0
00428 printf("count %d\n",count);
00429 printf("missing %d\n",missing[ack].size());
00430
00431 printf("readable %d\n",readable);
00432 printf("send %d end %d\n",send,end);
00433 printf("lastack %lf\n",getSystemTime() - lastAckTime);
00434 #endif
00435 FDEBUG(("timeout count %d %d missing %d\n",count,sendId,missing[ack].size()))
00436
00437
00438 ackRequest.setSize(0);
00439 ackRequest.setId(sendId);
00440
00441
00442 _mcastSocket.sendTo(ackRequest.getBuffer(),
00443 ackRequest.getBufferSize(),
00444 _mcastAddress);
00445
00446
00447 _mcastSocket.waitReadable(0.05);
00448
00449
00450 if(getSystemTime() - lastAckTime > 0.5)
00451 {
00452 if(!checkChannels())
00453 return false;
00454 }
00455 if(_sendQueueThreadStop &&
00456 getSystemTime() - lastAckTime > 1)
00457
00458 break;
00459 else
00460
00461 continue;
00462 }
00463
00464
00465 len = _mcastSocket.recvFrom(response.getBuffer(),
00466 response.getBufferCapacity(),
00467 fromAddress);
00468 lastAckTime = getSystemTime();
00469
00470
00471 response.setResponseSize();
00472 if(len != response.getBufferSize())
00473 {
00474 FDEBUG(("Wrong response len %d\n",len))
00475 continue;
00476 }
00477
00478
00479 if(!Dgram::less(response.getId(),dgram[ack]->getId()))
00480 {
00481
00482 if(response.getResponseAck() == true)
00483 {
00484
00485 for(m = ack ;
00486 dgram[m]->getId() != response.getId() ;
00487 m=(m+1) % _windowSize)
00488 missing[m].erase(fromAddress);
00489 missing[m].erase(fromAddress);
00490 }
00491 else
00492 {
00493 if(response.getId() == lastNak &&
00494 getSystemTime() - lastNakTime < 0.02)
00495 continue;
00496 lastNak = response.getId();
00497 lastNakTime = getSystemTime();
00498 FDEBUG(("Nack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort()));
00499
00500
00501 for(m = ack ;
00502 m != send && dgram[m]->getId() != response.getId() ;
00503 m = (m+1) % _windowSize);
00504 send = m;
00505 }
00506 }
00507
00508
00509 if(missing[ack].empty())
00510 {
00511
00512 _lock->aquire();
00513 while(count && missing[ack].empty())
00514 {
00515 _free.put(dgram[ack]);
00516 ack = (ack+1) % _windowSize;
00517 count--;
00518 }
00519 _lock->release();
00520 }
00521 }
00522 }
00523 while(!stopAfterSend || send != end);
00524
00525 return true;
00526 }
00527
00530 void GroupMCastConnection::sendQueueThread(void *arg)
00531 {
00532 GroupMCastConnection *the = (GroupMCastConnection *)arg;
00533 try
00534 {
00535 the->sendQueue();
00536 }
00537 catch(SocketException &e)
00538 {
00539 FFATAL(( "Writer Proc crashed %s\n",e.what() ));
00540 }
00541 the->_sendQueueThreadRunning = false;
00542 }
00543
00546 void GroupMCastConnection::initialize()
00547 {
00548 std::string group = "239.33.42.32";
00549
00550
00551 int port = 15356;
00552 int pos = _destination.find(':');
00553 int clientPort;
00554 std::string clientHost;
00555 char hostname[256];
00556 UInt32 index;
00557 UInt32 len;
00558 UInt32 ackNum = (UInt32) osgsqrt(_sockets.size());
00559 UInt32 numSource;
00560 UInt32 sendTo;
00561 BinaryMessage message;
00562 char threadName[256];
00563
00564 sprintf(threadName,"GroupMCastConnection%p",this);
00565
00566 if(!getDestination().empty())
00567 group = getDestination();
00568
00569 if(_sockets.size()<=16)
00570 ackNum = 1;
00571
00572 if(pos>=0)
00573 {
00574 group = std::string(_destination,0,pos);
00575 port = atoi(std::string(_destination,pos+1,std::string::npos).c_str());
00576 }
00577 else
00578 {
00579 if(_destination.size())
00580 group = _destination;
00581 }
00582 _mcastAddress.setHost(group);
00583 _mcastAddress.setPort(port );
00584
00585
00586 if(!getInterface().empty())
00587 _mcastSocket.setMCastInterface(
00588 SocketAddress(getInterface().c_str()));
00589
00590 for(index = 0 ; index < _sockets.size() ; ++index)
00591 {
00592 message.clear();
00593
00594 message.putString(_mcastAddress.getHost());
00595 message.putUInt32(_mcastAddress.getPort());
00596
00597 message.putUInt32(_seqNumber);
00598
00599 hostname[255] = '\0';
00600 gethostname(hostname,255);
00601 message.putString(hostname);
00602 message.putUInt32(_mcastSocket.getAddress().getPort());
00603
00604 _sockets[index].send(message);
00605
00606
00607 message.clear();
00608 len = _sockets[index].recv(message);
00609 if(len == 0)
00610 throw ReadError("Channel closed\n");
00611 clientHost = message.getString();
00612 clientPort = message.getUInt32();
00613
00614 SINFO << "Server:" << clientHost
00615 << " Port:" << clientPort << std::endl;
00616 _receiver.push_back(SocketAddress(clientHost.c_str(),clientPort));
00617 }
00618 for(index = 0 ; index < _sockets.size() ; ++index)
00619 {
00620 message.clear();
00621
00622 if((index % ackNum) == 0)
00623 {
00624 _waitFor.push_back(_receiver[index]);
00625 numSource = osgMin( ackNum-1, (UInt32) _sockets.size()-index-1 );
00626 message.putUInt32(numSource);
00627 for(UInt32 r = index+1 ; r < index+1+numSource ; ++r)
00628 {
00629 message.putString(_receiver[r].getHost());
00630 message.putUInt32(_receiver[r].getPort());
00631 }
00632 message.putString(hostname);
00633 message.putUInt32(_mcastSocket.getAddress().getPort());
00634 }
00635 else
00636 {
00637 sendTo = index - (index % ackNum);
00638 message.putUInt32(0);
00639 message.putString(_receiver[sendTo].getHost());
00640 message.putUInt32(_receiver[sendTo].getPort());
00641 }
00642
00643 _sockets[index].send(message);
00644 }
00645
00646
00647 _sendQueueThread=BaseThread::get(threadName);
00648 _sendQueueThreadRunning = true;
00649 _sendQueueThreadStop = false;
00650 _sendQueueThread->runFunction( sendQueueThread, (void *) (this) );
00651 _initialized = true;
00652 }
00653
00654
00655
00656
00657 ConnectionType GroupMCastConnection::_type(
00658 &GroupMCastConnection::create,
00659 "Multicast");
00660
00661
00662
00663
00664 #ifdef __sgi
00665 #pragma set woff 1174
00666 #endif
00667
00668 #ifdef OSG_LINUX_ICC
00669 #pragma warning( disable : 177 )
00670 #endif
00671
00672 namespace
00673 {
00674 static Char8 cvsid_cpp [] = "@(#)$Id: $";
00675 static Char8 cvsid_hpp [] = OSG_GROUPMCASTCONNECTION_HEADER_CVSID;
00676 }
00677