Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

osg::PointMCastConnection Class Reference
[Network]

#include <OSGPointMCastConnection.h>

Inheritance diagram for osg::PointMCastConnection:

osg::PointSockConnection osg::PointConnection osg::Connection osg::BinaryDataHandler List of all members.

private helpers

*bool recvNextDgram (Dgram *dgram)
void combineAck (Dgram *dgram, SocketAddress from)
bool recvQueue (void)
void initialize (void)
static void recvQueueThread (void *arg)

Public Member Functions

Constructors
PointMCastConnection (void)
virtual ~PointMCastConnection (void)
type info
*virtual const ConnectionTypegetType (void)
connection
*virtual Channel connectGroup (const std::string &address, Time timeout=-1)
virtual void disconnect (void)
virtual Channel acceptGroup (Time timeout=-1)
synchronisation
*virtual bool wait (Time timeout) throw (ReadError)
channel handling
*virtual Channel selectChannel (Time timeout=-1) throw (ReadError)

Static Public Member Functions

create
*static PointConnectioncreate (void)
 create conneciton

Protected Types

typedef std::vector< MemoryBlock > BuffersT
typedef std::list< MemoryHandleFreeMemT

Protected Member Functions

IO Implementation
*virtual void read (MemoryHandle mem, UInt32 size)
virtual void readBuffer (void)

Protected Attributes

members
*DgramSocket _mcastSocket
DgramSocket _responseSocket
BaseThread_recvQueueThread
bool _recvQueueThreadRunning
bool _recvQueueThreadStop
UInt16 _seqNumber
SocketAddress _mcastAddress
DgramQueue _queue
DgramQueue _free
Lock_lock
SocketAddress _sender
SocketAddress _ackDestination
Dgram_lastDgram
UInt32 _lastDgramPos
bool _initialized
std::map< SocketAddress, UInt16_combineAck
UInt16 _maxAck

Private Types

typedef PointSockConnection Inherited

Private Member Functions

 PointMCastConnection (const PointMCastConnection &source)
PointMCastConnectionoperator= (const PointMCastConnection &source)

Static Private Attributes

static type
*static ConnectionType _type

Classes

struct  SocketBufferHeader

Detailed Description

Definition at line 62 of file OSGPointMCastConnection.h.


Member Typedef Documentation

typedef PointSockConnection osg::PointMCastConnection::Inherited [private]
 

Reimplemented from osg::PointSockConnection.

Definition at line 180 of file OSGPointMCastConnection.h.

typedef std::vector<MemoryBlock> osg::BinaryDataHandler::BuffersT [protected, inherited]
 

Definition at line 213 of file OSGBinaryDataHandler.h.

typedef std::list<MemoryHandle> osg::BinaryDataHandler::FreeMemT [protected, inherited]
 

Definition at line 214 of file OSGBinaryDataHandler.h.


Constructor & Destructor Documentation

PointMCastConnection::PointMCastConnection void   ) 
 

Constructor

Definition at line 67 of file OSGPointMCastConnection.cpp.

References osg::PointSockConnection::_acceptSocket, _free, _lock, osg::Lock::get(), osg::StreamSocket::open(), OSG_DGRAM_QUEUE_LEN, osg::DgramQueue::put(), and osg::Socket::setReusePort().

Referenced by create().

00067                                           :
00068     Inherited(),
00069     _lastDgram(NULL),
00070     _initialized(false)
00071 {
00072     char lockName[256];
00073     sprintf(lockName,"PointMCastConnection%p",this);
00074 
00075     // create locks
00076     _lock     = Lock::get(lockName);
00077 
00078     // fill dgramqueue
00079     for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI)
00080         _free.put(new Dgram());
00081 
00082     _acceptSocket.open();
00083     _acceptSocket.setReusePort(true);
00084 
00085 /*
00086     _socketWriteBuffer.resize(131071);
00087     // reserve first bytes for buffer size
00088     writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00089                 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00090 */
00091 }

PointMCastConnection::~PointMCastConnection void   )  [virtual]
 

Destructor

Definition at line 95 of file OSGPointMCastConnection.cpp.

References osg::PointSockConnection::_acceptSocket, _free, _lock, _mcastSocket, _queue, _recvQueueThread, _recvQueueThreadStop, osg::Lock::aquire(), osg::StreamSocket::close(), osg::DgramSocket::close(), osg::DgramQueue::empty(), osg::DgramQueue::get(), osg::BaseThread::join(), and osg::Lock::release().

00096 {
00097     // indicate thread stop
00098     _recvQueueThreadStop = true;
00099     // wait for stop
00100     BaseThread::join(_recvQueueThread);    
00101     // close socket
00102     _mcastSocket.close();
00103     // free queues
00104     _lock->aquire();
00105     while(!_free.empty())
00106         delete _free.get(_lock);
00107     while(!_queue.empty())
00108         delete _queue.get(_lock);
00109     _lock->release();
00110     // close socket
00111     _acceptSocket.close();
00112 }

osg::PointMCastConnection::PointMCastConnection const PointMCastConnection source  )  [private]
 


Member Function Documentation

const ConnectionType * PointMCastConnection::getType void   )  [virtual]
 

get connection type

Reimplemented from osg::PointSockConnection.

Definition at line 116 of file OSGPointMCastConnection.cpp.

References _type.

00117 {
00118     return &_type;
00119 }

Connection::Channel PointMCastConnection::connectGroup const std::string &  address,
Time  timeout = -1
[virtual]
 

connect to the given group. If timeout is reached, -1 is returned

Reimplemented from osg::PointSockConnection.

Definition at line 127 of file OSGPointMCastConnection.cpp.

References osg::Connection::Channel, and osg::PointSockConnection::connectGroup().

00130 {
00131     Channel channel = Inherited::connectGroup(address,timeout);
00132     return channel;
00133 }

void PointMCastConnection::disconnect void   )  [virtual]
 

disconnect the given channel

Reimplemented from osg::PointSockConnection.

Definition at line 137 of file OSGPointMCastConnection.cpp.

References osg::PointSockConnection::_socket, and osg::StreamSocket::close().

00138 {
00139     _socket.close();
00140 }

Connection::Channel PointMCastConnection::acceptGroup Time  timeout = -1  )  [virtual]
 

accept an icomming grop connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout

Reimplemented from osg::PointSockConnection.

Definition at line 145 of file OSGPointMCastConnection.cpp.

References osg::PointSockConnection::acceptGroup(), and osg::Connection::Channel.

00146 {
00147     Channel channel = Inherited::acceptGroup(timeout);
00148     return channel;
00149 }

bool PointMCastConnection::wait Time  timeout  )  throw (ReadError) [virtual]
 

wait for signal

Reimplemented from osg::PointSockConnection.

Definition at line 202 of file OSGPointMCastConnection.cpp.

References FFATAL.

00203 {
00204     UInt32 tag;
00205 
00206     if(_pointToPoint)
00207         return Inherited::wait(timeout);
00208     try
00209     {
00210         if(selectChannel(timeout) < 0)
00211             return false;
00212         getValue(tag);
00213         if(tag != 314156)
00214         {
00215             FFATAL(("Stream out of sync in PointMCastConnection\n"));
00216             throw ReadError("Stream out of sync");
00217         }
00218     }
00219     catch(SocketError &e)
00220     {
00221         throw ReadError(e.what());
00222     }
00223     return true;
00224 }

Connection::Channel PointMCastConnection::selectChannel Time  timeout = -1  )  throw (ReadError) [virtual]
 

select the next channel for reading. If timeout is not -1 then -1 is returned if timeout is reached

Reimplemented from osg::PointSockConnection.

Definition at line 157 of file OSGPointMCastConnection.cpp.

00159 {
00160     if(_pointToPoint)
00161         return Inherited::selectChannel(timeout);
00162     try
00163     {
00164         if(!_initialized)
00165             initialize();
00166         // todo
00167         if(isReadBufferEmpty() && 
00168            !_lastDgram && 
00169            _queue.empty())
00170         {
00171             if(timeout == -1)
00172             {
00173                 // wait for a dgram
00174                 _lock->aquire();
00175                 _queue.wait(_lock);
00176                 _lock->release();
00177                 return 0;
00178             }
00179             if(timeout == 0)
00180                 return -1;
00181             while(_queue.empty() && timeout > 0)
00182             {
00183                 _mcastSocket.waitReadable(.1);
00184                 timeout-=.1;
00185             }
00186             if(_queue.empty())
00187                 return -1;
00188         }
00189     }
00190     catch(SocketException &e)
00191     {
00192         throw ReadError(e.what());
00193     }
00194     return 0;
00195 }

PointConnection * PointMCastConnection::create void   )  [static]
 

Reimplemented from osg::PointSockConnection.

Definition at line 231 of file OSGPointMCastConnection.cpp.

References PointMCastConnection().

00232 {
00233     return new PointMCastConnection();
00234 }

void PointMCastConnection::read MemoryHandle  mem,
UInt32  size
[protected, virtual]
 

Read data into given memory

Read data form the current read socket. The read socket is that socket, that was selectet in selectChannel.

Reimplemented from osg::PointSockConnection.

Definition at line 246 of file OSGPointMCastConnection.cpp.

References _free, _initialized, _lastDgram, _lastDgramPos, _lock, osg::PointConnection::_pointToPoint, _queue, osg::Lock::aquire(), osg::DgramQueue::get(), osg::Dgram::getData(), osg::Dgram::getSize(), initialize(), osg::osgMin(), osg::DgramQueue::put(), osg::PointSockConnection::read(), and osg::Lock::release().

00247 {
00248     if(_pointToPoint)
00249     {
00250         Inherited::read(mem,size);
00251         return;
00252     }
00253     Dgram *dgram  = NULL;
00254     char  *buffer = (char*)mem;
00255     UInt32 len;
00256     UInt32 dgramPos;
00257 
00258     if(!_initialized)
00259         initialize();
00260 
00261     while(size)
00262     {
00263         if(_lastDgram)
00264         {
00265             dgramPos = _lastDgramPos;
00266             dgram = _lastDgram;
00267         }
00268         else
00269         {
00270             // get next dgram
00271             _lock->aquire();
00272             dgram = _queue.get(_lock);
00273             _lock->release();
00274             dgramPos = 0;
00275             if(dgram->getSize() == 0)
00276                 throw ReadError("Channel closed\n");
00277         }
00278         // copy to buffer
00279         len = osgMin(size,dgram->getSize()-dgramPos);
00280         memcpy(buffer,dgram->getData()+dgramPos,len);
00281         buffer   += len;
00282         size     -= len;
00283         dgramPos += len;
00284         if(dgramPos == dgram->getSize())
00285         {
00286             // put to free queue
00287             _lock->aquire();
00288             _free.put(dgram);
00289             _lock->release();
00290             _lastDgram = NULL;
00291         }
00292         else
00293         {
00294             _lastDgram    = dgram;
00295             _lastDgramPos = dgramPos;
00296         }
00297     }
00298 }

void PointMCastConnection::readBuffer void   )  [protected, virtual]
 

Read next data block

The stream connection uses only BinaryDataHandler buffer. If more then one buffer is present, then this methode must be changed!

Reimplemented from osg::PointSockConnection.

Definition at line 307 of file OSGPointMCastConnection.cpp.

References _free, _initialized, _lastDgram, _lastDgramPos, _lock, osg::PointConnection::_pointToPoint, _queue, osg::Lock::aquire(), osg::DgramQueue::empty(), osg::DgramQueue::get(), osg::Dgram::getData(), osg::Dgram::getSize(), initialize(), osg::osgMin(), osg::DgramQueue::put(), osg::BinaryDataHandler::readBufBegin(), osg::PointSockConnection::readBuffer(), and osg::Lock::release().

00308 {
00309     if(_pointToPoint)
00310     {
00311         Inherited::readBuffer();
00312         return;
00313     }
00314 
00315     static int sumSize=0;
00316     Dgram       *dgram  = NULL;
00317     UInt32       size   = readBufBegin()->getSize();
00318     MemoryHandle buffer = readBufBegin()->getMem();
00319     UInt32       len;
00320     UInt32       dgramPos;
00321 
00322     if(!_initialized)
00323         initialize();
00324 
00325     do
00326     {
00327         if(_lastDgram)
00328         {
00329             dgramPos = _lastDgramPos;
00330             dgram = _lastDgram;
00331         }
00332         else
00333         {
00334             // get next dgram
00335             _lock->aquire();
00336             dgram = _queue.get(_lock);
00337             _lock->release();
00338             dgramPos = 0;
00339             if(dgram->getSize() == 0)
00340                 throw ReadError("Channel closed");
00341         }
00342         // copy to buffer
00343         len = osgMin(size,dgram->getSize()-dgramPos);
00344         memcpy(buffer,dgram->getData()+dgramPos,len);
00345         buffer   += len;
00346         size     -= len;
00347         dgramPos += len;
00348         if(dgramPos == dgram->getSize())
00349         {
00350             // put to free queue
00351             _lock->aquire();
00352             _free.put(dgram);
00353             _lock->release();
00354             _lastDgram = NULL;
00355         }
00356         else
00357         {
00358             _lastDgram    = dgram;
00359             _lastDgramPos = dgramPos;
00360         }
00361     }
00362     while(size && !_queue.empty());
00363     // set data size
00364     readBufBegin()->setDataSize(readBufBegin()->getSize()-size);
00365     sumSize += readBufBegin()->getDataSize();
00366 }    

bool PointMCastConnection::recvNextDgram Dgram dgram  )  [private]
 

read next dgram from mcast or private socket

Definition at line 373 of file OSGPointMCastConnection.cpp.

References _ackDestination, _combineAck, _maxAck, _mcastSocket, _responseSocket, _sender, combineAck(), osg::Dgram::getBuffer(), osg::Dgram::getBufferCapacity(), osg::Dgram::getBufferSize(), osg::Dgram::getId(), osg::SocketSelection::isSetRead(), osg::DgramSocket::recvFrom(), osg::SocketSelection::select(), osg::DgramSocket::sendTo(), osg::Dgram::setBufferSize(), osg::Dgram::setId(), osg::SocketSelection::setRead(), osg::Dgram::setResponseAck(), and osg::Dgram::setResponseSize().

Referenced by recvQueue().

00374 {
00375     SocketSelection selection;
00376     SocketAddress   from;
00377     UInt32          length;
00378     
00379     selection.setRead(_mcastSocket);
00380     selection.setRead(_responseSocket);
00381     if(!selection.select(0.5))
00382         return false;
00383     if(selection.isSetRead(_responseSocket))
00384     {
00385         length = _responseSocket.recvFrom(dgram->getBuffer(),
00386                                           dgram->getBufferCapacity(),
00387                                           from);
00388         dgram->setBufferSize(length);
00389 #if 0
00390 // ????
00391         // from sender
00392         if(from == _sender && !_combineAck.empty())
00393         {
00394             exit(0);
00395 
00396 
00397             if(_maxAck == dgram->getId())
00398             {
00399                 // do we have all acks ?
00400                 dgram->setId(_maxAck);
00401                 dgram->setResponseSize();
00402                 dgram->setResponseAck(true);
00403 #ifdef TEST_LOST_DGRAM_RATE
00404                 if(drand48()>TEST_LOST_DGRAM_RATE)
00405 #endif
00406                     _responseSocket.sendTo(
00407                         dgram->getBuffer(),
00408                         dgram->getBufferSize(),
00409                         _ackDestination);
00410                 return false;
00411             }
00412             else
00413             {
00414                 return true;
00415             }
00416         }
00417 #endif
00418         combineAck(dgram,from);
00419     } 
00420     if(selection.isSetRead(_mcastSocket))
00421     {
00422         length = _mcastSocket.recvFrom(dgram->getBuffer(),
00423                                        dgram->getBufferCapacity(),
00424                                        from);
00425         dgram->setBufferSize(length);
00426         // ignore packages from wrong destination
00427         if(from != _sender)
00428             return false;
00429         else
00430             return true;
00431     }
00432     else
00433     {
00434         return false;
00435     }
00436 }

void PointMCastConnection::combineAck Dgram dgram,
SocketAddress  from
[private]
 

combine several acks to 1 ack stream

Definition at line 440 of file OSGPointMCastConnection.cpp.

References _ackDestination, _combineAck, _maxAck, _responseSocket, _seqNumber, FFATAL, osg::Dgram::getBuffer(), osg::Dgram::getBufferSize(), osg::Dgram::getId(), osg::Dgram::less(), osg::DgramSocket::sendTo(), osg::Dgram::setId(), osg::Dgram::setResponseAck(), and osg::Dgram::setResponseSize().

Referenced by recvNextDgram(), and recvQueue().

00441 {
00442     UInt16 maxAck;
00443 
00444     if(dgram)
00445     {
00446         // do we expect acks from different source
00447         if(_combineAck.count(from)==0)
00448         {
00449             FFATAL(("no ack from other expected\n"));
00450             return;
00451         }
00452         // ack retransmission
00453         if( Dgram::less(dgram->getId(),_combineAck[from] ) )
00454         {
00455 //        printf("Ack restranmisson\n");
00456             return;
00457         }
00458         _combineAck[from] = dgram->getId();
00459     }
00460 
00461     maxAck = _seqNumber-1;
00462     for(std::map<SocketAddress,UInt16>::iterator aI
00463             = _combineAck.begin() ; 
00464         aI != _combineAck.end() ; ++aI)
00465     {
00466         if( Dgram::less(aI->second,maxAck) )
00467             maxAck = aI->second;
00468     }
00469     // when _max ack is now greate
00470 
00471     if( Dgram::less(_maxAck,maxAck))
00472     {
00473         Dgram response;
00474 
00475         _maxAck = maxAck;
00476         response.setResponseSize();
00477         response.setId(_maxAck);
00478         response.setResponseAck(true);
00479 
00480         _responseSocket.sendTo(
00481             response.getBuffer(),
00482             response.getBufferSize(),
00483             _ackDestination);
00484     }
00485 }

void PointMCastConnection::recvQueueThread void *  arg  )  [static, private]
 

read next dgram from mcast or private socket

Definition at line 633 of file OSGPointMCastConnection.cpp.

References _recvQueueThreadRunning, recvQueue(), and SFATAL.

Referenced by initialize().

00634 {
00635     PointMCastConnection *the=(PointMCastConnection*)arg;
00636     try 
00637     {
00638         the->recvQueue();
00639     }
00640     catch(SocketException &e)
00641     {
00642         SFATAL << "Error in dgram reader thread:" << e.what() << std::endl;
00643     }
00644     the->_recvQueueThreadRunning = false;
00645 }

bool PointMCastConnection::recvQueue void   )  [private]
 

recv queue

Definition at line 489 of file OSGPointMCastConnection.cpp.

References _ackDestination, _combineAck, _free, _lock, _maxAck, _mcastSocket, _queue, _recvQueueThreadStop, _responseSocket, _sender, _seqNumber, osg::PointSockConnection::_socket, osg::Lock::aquire(), combineAck(), FLOG, osg::DgramQueue::get(), osg::Dgram::getBuffer(), osg::Dgram::getBufferSize(), osg::Dgram::getId(), osg::Dgram::getResponseAck(), osg::Dgram::getSize(), osg::getSystemTime(), osg::Dgram::less(), osg::DgramQueue::put(), osg::Socket::recv(), recvNextDgram(), osg::Lock::release(), osg::DgramSocket::sendTo(), osg::Dgram::setId(), osg::Dgram::setResponseAck(), osg::Dgram::setResponseSize(), osg::Dgram::setSize(), and osg::Socket::waitReadable().

Referenced by recvQueueThread().

00490 {
00491     SocketAddress  from;
00492     Dgram         *dgram;
00493     Dgram          response;
00494     UInt32         readCount=0;
00495     UInt32         length;
00496     bool           missing=false;
00497     Time           ignoreT=getSystemTime();
00498     UInt16         id;
00499 
00500 #ifdef TEST_LOST_DGRAM_RATE
00501     srand48((long int)(10000*getSystemTime()));
00502 #endif
00503 
00504     for(;;)
00505     {
00506         // get free dgram
00507         _lock->aquire();
00508         dgram   =_free.get(_lock);
00509         _lock->release();
00510         do
00511         {
00512             // ignore for a while
00513             if(missing)
00514                 ignoreT = getSystemTime();
00515             do
00516             {
00517                 while(!recvNextDgram(dgram))
00518                 {
00519                     if(_recvQueueThreadStop)
00520                         return true;
00521                     try
00522                     {
00523                         while(_socket.waitReadable(0))
00524                         {
00525                             char buffer;
00526                             if(_socket.recv(&buffer,1) <= 0)
00527                             {
00528                                 // put EOT to the queue
00529                                 dgram->setSize(0);
00530                                 _lock->aquire();
00531                                 _queue.put(dgram);
00532                                 _lock->release();
00533                                 FLOG(("Connection lost\n"));
00534                                 return false;
00535                             }
00536                         }
00537                     }
00538                     catch(SocketException &e)
00539                     {
00540                         // put EOT to the queue
00541                         dgram->setSize(0);
00542                         _lock->aquire();
00543                         _queue.put(dgram);
00544                         _lock->release();
00545                         FLOG(("Connection lost\n"));
00546                         return false;
00547                     }
00548                 }
00549                 id = dgram->getId();
00550             }
00551             while( missing &&
00552                    id != _seqNumber &&
00553                    (getSystemTime() - ignoreT) < 0.01);
00554 
00555             missing = false;
00556             response.setId(id);
00557 
00558             // ack request ?
00559             if(dgram->getSize() == 0)
00560             {
00561                 if( !Dgram::less(id,_seqNumber ) )
00562                 {
00563                     missing = true;
00564                     response.setId(_seqNumber);
00565                 }
00566 //                printf("ack request %d %d\n",id,missing);
00567             }
00568             else
00569             {
00570 //                printf("%d got %d\n",id,_seqNumber);
00571                 if( dgram->getId() == _seqNumber)
00572                 {
00573                     // got expected dgram.
00574                     // put to queue
00575                     _lock->aquire();
00576                     _queue.put(dgram);
00577                     _lock->release();
00578                 }
00579                 else
00580                 {
00581                     // ignore if unneccesary retransmission
00582                     if( Dgram::less(id,_seqNumber ) )
00583                     {
00584                         continue;
00585                     }
00586                     else
00587                     {
00588                         missing = true;
00589                         response.setId(_seqNumber);
00590                     }
00591                 }
00592             }
00593 
00594 //                printf("Responde %d\n",response.getId());
00595             // prepare response
00596             response.setResponseAck(!missing);
00597             response.setResponseSize();
00598 
00599             // send response if nak or no data in the queue
00600             if(!response.getResponseAck() || 
00601                !_mcastSocket.waitReadable(0))
00602             {
00603 #ifdef TEST_LOST_DGRAM_RATE
00604                 if(drand48()>TEST_LOST_DGRAM_RATE)
00605 #endif
00606                     if(response.getResponseAck())
00607                     {
00608                         // send response if no ack combination
00609                         // or ack request 
00610                         if(_combineAck.empty() || id == _maxAck)
00611                             _responseSocket.sendTo(response.getBuffer(),
00612                                                    response.getBufferSize(),
00613                                                    _ackDestination);
00614                         else
00615                             combineAck(NULL,_sender);
00616                     }
00617                     else
00618                     {
00619                         // send nak to sender
00620                         _responseSocket.sendTo(response.getBuffer(),
00621                                                response.getBufferSize(),
00622                                                _sender);
00623                     }
00624             }
00625         }
00626         while(id != _seqNumber || missing);
00627         _seqNumber++;
00628     }    
00629 
00630 //    return true;
00631 }

void PointMCastConnection::initialize void   )  [private]
 

initialize connection

Definition at line 649 of file OSGPointMCastConnection.cpp.

References _ackDestination, _combineAck, _initialized, _maxAck, _mcastSocket, _recvQueueThread, _recvQueueThreadRunning, _recvQueueThreadStop, _responseSocket, _sender, _seqNumber, osg::PointSockConnection::_socket, osg::SocketAddress::ANY, osg::Socket::bind(), osg::Connection::Channel, osg::BinaryMessage::clear(), osg::BaseThread::get(), osg::Socket::getAddress(), osg::Connection::getInterface(), osg::SocketAddress::getPort(), osg::BinaryMessage::getString(), osg::BinaryMessage::getUInt32(), osg::DgramSocket::join(), osg::DgramSocket::open(), osg::BinaryMessage::putString(), osg::BinaryMessage::putUInt32(), osg::Socket::recv(), recvQueueThread(), osg::BaseThread::runFunction(), osg::Socket::send(), osg::DgramSocket::setMCastInterface(), osg::Socket::setReadBufferSize(), and osg::Socket::setReusePort().

Referenced by read(), and readBuffer().

00650 {
00651     std::string   group;
00652     Channel       channel;
00653     BinaryMessage message;
00654     char          hostname[256];
00655     std::string   fromHost;
00656     UInt32        fromPort;
00657     UInt32        combineCount;
00658     std::string   host;
00659     UInt32        port;
00660     char          threadName[256];
00661 
00662     sprintf(threadName,"PointMCastConnection%p",this);
00663 
00664     // get info about the group
00665     _socket.recv(message);
00666     // group and port
00667     group = message.getString();
00668     port  = message.getUInt32();
00669     // get seq number
00670     _seqNumber = message.getUInt32();
00671     _maxAck = _seqNumber - 1;
00672     // server port
00673     fromHost = message.getString();
00674     fromPort = message.getUInt32();
00675     _sender = SocketAddress(fromHost.c_str(),fromPort);
00676 
00677     // prepare socket to receive mcast packages
00678     _mcastSocket.open();
00679     _mcastSocket.setReusePort(true);
00680     _mcastSocket.setReadBufferSize(524288);
00681     _mcastSocket.bind(SocketAddress(SocketAddress::ANY,port));
00682     try
00683     {
00684         _mcastSocket.join(SocketAddress(group.c_str()));
00685     }
00686     catch(...) {}
00687     // set multicast interface
00688     if(!getInterface().empty())
00689     {
00690         _mcastSocket.setMCastInterface(SocketAddress(getInterface().c_str()));
00691     }
00692 
00693     _responseSocket.open();
00694     _responseSocket.bind(SocketAddress(SocketAddress::ANY,0));
00695 
00696     // tell the group from wich port requests are comming
00697     hostname[255] = '\0';
00698     gethostname(hostname,255);
00699     message.clear();
00700     message.putString(hostname);
00701     message.putUInt32(_responseSocket.getAddress().getPort());
00702     _socket.send(message);
00703 
00704     // get ack destination info
00705     _socket.recv(message);
00706 
00707     message.getUInt32(combineCount);
00708 //    printf("%d\n",combineCount);
00709     while(combineCount--)
00710     {
00711         host=message.getString();
00712         port=message.getUInt32();
00713         _combineAck[SocketAddress(host.c_str(),port)]=_seqNumber-1;
00714     }
00715 
00716     host=message.getString();
00717     port=message.getUInt32();
00718     _ackDestination = SocketAddress(host.c_str(),port);
00719 
00720     // start reader thread
00721     _recvQueueThread=BaseThread::get(threadName);
00722     _recvQueueThreadRunning = true;
00723     _recvQueueThreadStop    = false;
00724     _recvQueueThread->runFunction( recvQueueThread, (void *) (this) );
00725 
00726     _initialized = true;
00727 //    printf("init end\n");
00728 }

PointMCastConnection& osg::PointMCastConnection::operator= const PointMCastConnection source  )  [private]
 

Connection::Channel PointSockConnection::connectPoint const std::string &  address,