00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045
00046 #include "OSGConfig.h"
00047 #include "OSGLog.h"
00048 #include "OSGBaseThread.h"
00049 #include "OSGSocketSelection.h"
00050 #include "OSGBinaryMessage.h"
00051 #include "OSGPointMCastConnection.h"
00052 #include "OSGGroupMCastConnection.h"
00053 #include "OSGConnectionType.h"
00054
00055 OSG_USING_NAMESPACE
00056
00061
00062
00063
00067 PointMCastConnection::PointMCastConnection():
00068 Inherited(),
00069 _lastDgram(NULL),
00070 _initialized(false)
00071 {
00072 char lockName[256];
00073 sprintf(lockName,"PointMCastConnection%p",this);
00074
00075
00076 _lock = Lock::get(lockName);
00077
00078
00079 for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI)
00080 _free.put(new Dgram());
00081
00082 _acceptSocket.open();
00083 _acceptSocket.setReusePort(true);
00084
00085
00086
00087
00088
00089
00090
00091 }
00092
00095 PointMCastConnection::~PointMCastConnection(void)
00096 {
00097
00098 _recvQueueThreadStop = true;
00099
00100 BaseThread::join(_recvQueueThread);
00101
00102 _mcastSocket.close();
00103
00104 _lock->aquire();
00105 while(!_free.empty())
00106 delete _free.get(_lock);
00107 while(!_queue.empty())
00108 delete _queue.get(_lock);
00109 _lock->release();
00110
00111 _acceptSocket.close();
00112 }
00113
00116 const ConnectionType *PointMCastConnection::getType()
00117 {
00118 return &_type;
00119 }
00120
00121
00122
00123
00127 Connection::Channel PointMCastConnection::connectGroup(
00128 const std::string &address,
00129 Time timeout)
00130 {
00131 Channel channel = Inherited::connectGroup(address,timeout);
00132 return channel;
00133 }
00134
00137 void PointMCastConnection::disconnect(void)
00138 {
00139 _socket.close();
00140 }
00141
00145 Connection::Channel PointMCastConnection::acceptGroup(Time timeout)
00146 {
00147 Channel channel = Inherited::acceptGroup(timeout);
00148 return channel;
00149 }
00150
00151
00152
00153
00157 Connection::Channel PointMCastConnection::selectChannel(Time timeout)
00158 throw (ReadError)
00159 {
00160 if(_pointToPoint)
00161 return Inherited::selectChannel(timeout);
00162 try
00163 {
00164 if(!_initialized)
00165 initialize();
00166
00167 if(isReadBufferEmpty() &&
00168 !_lastDgram &&
00169 _queue.empty())
00170 {
00171 if(timeout == -1)
00172 {
00173
00174 _lock->aquire();
00175 _queue.wait(_lock);
00176 _lock->release();
00177 return 0;
00178 }
00179 if(timeout == 0)
00180 return -1;
00181 while(_queue.empty() && timeout > 0)
00182 {
00183 _mcastSocket.waitReadable(.1);
00184 timeout-=.1;
00185 }
00186 if(_queue.empty())
00187 return -1;
00188 }
00189 }
00190 catch(SocketException &e)
00191 {
00192 throw ReadError(e.what());
00193 }
00194 return 0;
00195 }
00196
00197
00198
00199
00202 bool PointMCastConnection::wait(Time timeout) throw (ReadError)
00203 {
00204 UInt32 tag;
00205
00206 if(_pointToPoint)
00207 return Inherited::wait(timeout);
00208 try
00209 {
00210 if(selectChannel(timeout) < 0)
00211 return false;
00212 getValue(tag);
00213 if(tag != 314156)
00214 {
00215 FFATAL(("Stream out of sync in PointMCastConnection\n"));
00216 throw ReadError("Stream out of sync");
00217 }
00218 }
00219 catch(SocketError &e)
00220 {
00221 throw ReadError(e.what());
00222 }
00223 return true;
00224 }
00225
00226
00227
00231 PointConnection *PointMCastConnection::create(void)
00232 {
00233 return new PointMCastConnection();
00234 }
00235
00236
00237
00238
00246 void PointMCastConnection::read(MemoryHandle mem,UInt32 size)
00247 {
00248 if(_pointToPoint)
00249 {
00250 Inherited::read(mem,size);
00251 return;
00252 }
00253 Dgram *dgram = NULL;
00254 char *buffer = (char*)mem;
00255 UInt32 len;
00256 UInt32 dgramPos;
00257
00258 if(!_initialized)
00259 initialize();
00260
00261 while(size)
00262 {
00263 if(_lastDgram)
00264 {
00265 dgramPos = _lastDgramPos;
00266 dgram = _lastDgram;
00267 }
00268 else
00269 {
00270
00271 _lock->aquire();
00272 dgram = _queue.get(_lock);
00273 _lock->release();
00274 dgramPos = 0;
00275 if(dgram->getSize() == 0)
00276 throw ReadError("Channel closed\n");
00277 }
00278
00279 len = osgMin(size,dgram->getSize()-dgramPos);
00280 memcpy(buffer,dgram->getData()+dgramPos,len);
00281 buffer += len;
00282 size -= len;
00283 dgramPos += len;
00284 if(dgramPos == dgram->getSize())
00285 {
00286
00287 _lock->aquire();
00288 _free.put(dgram);
00289 _lock->release();
00290 _lastDgram = NULL;
00291 }
00292 else
00293 {
00294 _lastDgram = dgram;
00295 _lastDgramPos = dgramPos;
00296 }
00297 }
00298 }
00299
00307 void PointMCastConnection::readBuffer()
00308 {
00309 if(_pointToPoint)
00310 {
00311 Inherited::readBuffer();
00312 return;
00313 }
00314
00315 static int sumSize=0;
00316 Dgram *dgram = NULL;
00317 UInt32 size = readBufBegin()->getSize();
00318 MemoryHandle buffer = readBufBegin()->getMem();
00319 UInt32 len;
00320 UInt32 dgramPos;
00321
00322 if(!_initialized)
00323 initialize();
00324
00325 do
00326 {
00327 if(_lastDgram)
00328 {
00329 dgramPos = _lastDgramPos;
00330 dgram = _lastDgram;
00331 }
00332 else
00333 {
00334
00335 _lock->aquire();
00336 dgram = _queue.get(_lock);
00337 _lock->release();
00338 dgramPos = 0;
00339 if(dgram->getSize() == 0)
00340 throw ReadError("Channel closed");
00341 }
00342
00343 len = osgMin(size,dgram->getSize()-dgramPos);
00344 memcpy(buffer,dgram->getData()+dgramPos,len);
00345 buffer += len;
00346 size -= len;
00347 dgramPos += len;
00348 if(dgramPos == dgram->getSize())
00349 {
00350
00351 _lock->aquire();
00352 _free.put(dgram);
00353 _lock->release();
00354 _lastDgram = NULL;
00355 }
00356 else
00357 {
00358 _lastDgram = dgram;
00359 _lastDgramPos = dgramPos;
00360 }
00361 }
00362 while(size && !_queue.empty());
00363
00364 readBufBegin()->setDataSize(readBufBegin()->getSize()-size);
00365 sumSize += readBufBegin()->getDataSize();
00366 }
00367
00368
00369
00370
00373 bool PointMCastConnection::recvNextDgram(Dgram *dgram)
00374 {
00375 SocketSelection selection;
00376 SocketAddress from;
00377 UInt32 length;
00378
00379 selection.setRead(_mcastSocket);
00380 selection.setRead(_responseSocket);
00381 if(!selection.select(0.5))
00382 return false;
00383 if(selection.isSetRead(_responseSocket))
00384 {
00385 length = _responseSocket.recvFrom(dgram->getBuffer(),
00386 dgram->getBufferCapacity(),
00387 from);
00388 dgram->setBufferSize(length);
00389 #if 0
00390
00391
00392 if(from == _sender && !_combineAck.empty())
00393 {
00394 exit(0);
00395
00396
00397 if(_maxAck == dgram->getId())
00398 {
00399
00400 dgram->setId(_maxAck);
00401 dgram->setResponseSize();
00402 dgram->setResponseAck(true);
00403 #ifdef TEST_LOST_DGRAM_RATE
00404 if(drand48()>TEST_LOST_DGRAM_RATE)
00405 #endif
00406 _responseSocket.sendTo(
00407 dgram->getBuffer(),
00408 dgram->getBufferSize(),
00409 _ackDestination);
00410 return false;
00411 }
00412 else
00413 {
00414 return true;
00415 }
00416 }
00417 #endif
00418 combineAck(dgram,from);
00419 }
00420 if(selection.isSetRead(_mcastSocket))
00421 {
00422 length = _mcastSocket.recvFrom(dgram->getBuffer(),
00423 dgram->getBufferCapacity(),
00424 from);
00425 dgram->setBufferSize(length);
00426
00427 if(from != _sender)
00428 return false;
00429 else
00430 return true;
00431 }
00432 else
00433 {
00434 return false;
00435 }
00436 }
00437
00440 void PointMCastConnection::combineAck(Dgram *dgram,SocketAddress from)
00441 {
00442 UInt16 maxAck;
00443
00444 if(dgram)
00445 {
00446
00447 if(_combineAck.count(from)==0)
00448 {
00449 FFATAL(("no ack from other expected\n"));
00450 return;
00451 }
00452
00453 if( Dgram::less(dgram->getId(),_combineAck[from] ) )
00454 {
00455
00456 return;
00457 }
00458 _combineAck[from] = dgram->getId();
00459 }
00460
00461 maxAck = _seqNumber-1;
00462 for(std::map<SocketAddress,UInt16>::iterator aI
00463 = _combineAck.begin() ;
00464 aI != _combineAck.end() ; ++aI)
00465 {
00466 if( Dgram::less(aI->second,maxAck) )
00467 maxAck = aI->second;
00468 }
00469
00470
00471 if( Dgram::less(_maxAck,maxAck))
00472 {
00473 Dgram response;
00474
00475 _maxAck = maxAck;
00476 response.setResponseSize();
00477 response.setId(_maxAck);
00478 response.setResponseAck(true);
00479
00480 _responseSocket.sendTo(
00481 response.getBuffer(),
00482 response.getBufferSize(),
00483 _ackDestination);
00484 }
00485 }
00486
00489 bool PointMCastConnection::recvQueue(void)
00490 {
00491 SocketAddress from;
00492 Dgram *dgram;
00493 Dgram response;
00494 UInt32 readCount=0;
00495 UInt32 length;
00496 bool missing=false;
00497 Time ignoreT=getSystemTime();
00498 UInt16 id;
00499
00500 #ifdef TEST_LOST_DGRAM_RATE
00501 srand48((long int)(10000*getSystemTime()));
00502 #endif
00503
00504 for(;;)
00505 {
00506
00507 _lock->aquire();
00508 dgram =_free.get(_lock);
00509 _lock->release();
00510 do
00511 {
00512
00513 if(missing)
00514 ignoreT = getSystemTime();
00515 do
00516 {
00517 while(!recvNextDgram(dgram))
00518 {
00519 if(_recvQueueThreadStop)
00520 return true;
00521 try
00522 {
00523 while(_socket.waitReadable(0))
00524 {
00525 char buffer;
00526 if(_socket.recv(&buffer,1) <= 0)
00527 {
00528
00529 dgram->setSize(0);
00530 _lock->aquire();
00531 _queue.put(dgram);
00532 _lock->release();
00533 FLOG(("Connection lost\n"));
00534 return false;
00535 }
00536 }
00537 }
00538 catch(SocketException &e)
00539 {
00540
00541 dgram->setSize(0);
00542 _lock->aquire();
00543 _queue.put(dgram);
00544 _lock->release();
00545 FLOG(("Connection lost\n"));
00546 return false;
00547 }
00548 }
00549 id = dgram->getId();
00550 }
00551 while( missing &&
00552 id != _seqNumber &&
00553 (getSystemTime() - ignoreT) < 0.01);
00554
00555 missing = false;
00556 response.setId(id);
00557
00558
00559 if(dgram->getSize() == 0)
00560 {
00561 if( !Dgram::less(id,_seqNumber ) )
00562 {
00563 missing = true;
00564 response.setId(_seqNumber);
00565 }
00566
00567 }
00568 else
00569 {
00570
00571 if( dgram->getId() == _seqNumber)
00572 {
00573
00574
00575 _lock->aquire();
00576 _queue.put(dgram);
00577 _lock->release();
00578 }
00579 else
00580 {
00581
00582 if( Dgram::less(id,_seqNumber ) )
00583 {
00584 continue;
00585 }
00586 else
00587 {
00588 missing = true;
00589 response.setId(_seqNumber);
00590 }
00591 }
00592 }
00593
00594
00595
00596 response.setResponseAck(!missing);
00597 response.setResponseSize();
00598
00599
00600 if(!response.getResponseAck() ||
00601 !_mcastSocket.waitReadable(0))
00602 {
00603 #ifdef TEST_LOST_DGRAM_RATE
00604 if(drand48()>TEST_LOST_DGRAM_RATE)
00605 #endif
00606 if(response.getResponseAck())
00607 {
00608
00609
00610 if(_combineAck.empty() || id == _maxAck)
00611 _responseSocket.sendTo(response.getBuffer(),
00612 response.getBufferSize(),
00613 _ackDestination);
00614 else
00615 combineAck(NULL,_sender);
00616 }
00617 else
00618 {
00619
00620 _responseSocket.sendTo(response.getBuffer(),
00621 response.getBufferSize(),
00622 _sender);
00623 }
00624 }
00625 }
00626 while(id != _seqNumber || missing);
00627 _seqNumber++;
00628 }
00629
00630
00631 }
00632
00633 void PointMCastConnection::recvQueueThread(void *arg)
00634 {
00635 PointMCastConnection *the=(PointMCastConnection*)arg;
00636 try
00637 {
00638 the->recvQueue();
00639 }
00640 catch(SocketException &e)
00641 {
00642 SFATAL << "Error in dgram reader thread:" << e.what() << std::endl;
00643 }
00644 the->_recvQueueThreadRunning = false;
00645 }
00646
00649 void PointMCastConnection::initialize()
00650 {
00651 std::string group;
00652 Channel channel;
00653 BinaryMessage message;
00654 char hostname[256];
00655 std::string fromHost;
00656 UInt32 fromPort;
00657 UInt32 combineCount;
00658 std::string host;
00659 UInt32 port;
00660 char threadName[256];
00661
00662 sprintf(threadName,"PointMCastConnection%p",this);
00663
00664
00665 _socket.recv(message);
00666
00667 group = message.getString();
00668 port = message.getUInt32();
00669
00670 _seqNumber = message.getUInt32();
00671 _maxAck = _seqNumber - 1;
00672
00673 fromHost = message.getString();
00674 fromPort = message.getUInt32();
00675 _sender = SocketAddress(fromHost.c_str(),fromPort);
00676
00677
00678 _mcastSocket.open();
00679 _mcastSocket.setReusePort(true);
00680 _mcastSocket.setReadBufferSize(524288);
00681 _mcastSocket.bind(SocketAddress(SocketAddress::ANY,port));
00682 try
00683 {
00684 _mcastSocket.join(SocketAddress(group.c_str()));
00685 }
00686 catch(...) {}
00687
00688 if(!getInterface().empty())
00689 {
00690 _mcastSocket.setMCastInterface(SocketAddress(getInterface().c_str()));
00691 }
00692
00693 _responseSocket.open();
00694 _responseSocket.bind(SocketAddress(SocketAddress::ANY,0));
00695
00696
00697 hostname[255] = '\0';
00698 gethostname(hostname,255);
00699 message.clear();
00700 message.putString(hostname);
00701 message.putUInt32(_responseSocket.getAddress().getPort());
00702 _socket.send(message);
00703
00704
00705 _socket.recv(message);
00706
00707 message.getUInt32(combineCount);
00708
00709 while(combineCount--)
00710 {
00711 host=message.getString();
00712 port=message.getUInt32();
00713 _combineAck[SocketAddress(host.c_str(),port)]=_seqNumber-1;
00714 }
00715
00716 host=message.getString();
00717 port=message.getUInt32();
00718 _ackDestination = SocketAddress(host.c_str(),port);
00719
00720
00721 _recvQueueThread=BaseThread::get(threadName);
00722 _recvQueueThreadRunning = true;
00723 _recvQueueThreadStop = false;
00724 _recvQueueThread->runFunction( recvQueueThread, (void *) (this) );
00725
00726 _initialized = true;
00727
00728 }
00729
00730
00731
00732
00733 ConnectionType PointMCastConnection::_type(
00734 &PointMCastConnection::create,
00735 "Multicast");
00736
00737
00738
00739
00740 #ifdef __sgi
00741 #pragma set woff 1174
00742 #endif
00743
00744 #ifdef OSG_LINUX_ICC
00745 #pragma warning( disable : 177 )
00746 #endif
00747
00748 namespace
00749 {
00750 static Char8 cvsid_cpp [] = "@(#)$Id: $";
00751 static Char8 cvsid_hpp [] = OSG_GROUPMCASTCONNECTION_HEADER_CVSID;
00752 }
00753