Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

osg::GroupMCastConnection Class Reference
[Network]

#include <OSGGroupMCastConnection.h>

Inheritance diagram for osg::GroupMCastConnection:

osg::GroupSockConnection osg::GroupConnection osg::Connection osg::BinaryDataHandler List of all members.

private helpers

*bool checkChannels (void)
bool sendQueue (void)
void initialize (void)
static void sendQueueThread (void *arg)

Public Member Functions

Constructors
GroupMCastConnection (void)
virtual ~GroupMCastConnection (void)
type info
*virtual const ConnectionTypegetType (void)
connection
*virtual Channel connectPoint (const std::string &address, Time timeout=-1)
virtual void disconnect (Channel channel)
virtual Channel acceptPoint (Time timeout=-1)
params
*virtual void setParams (const std::string &params)

Static Public Member Functions

create
*static GroupConnectioncreate (void)
 create conneciton

Protected Types

typedef std::vector< MemoryBlock > BuffersT
typedef std::list< MemoryHandleFreeMemT

Protected Member Functions

IO Implementation
*virtual void write (MemoryHandle mem, UInt32 size)
virtual void writeBuffer (void)
synchronisation
*virtual bool wait (Time timeout) throw (ReadError)
virtual void signal (void) throw (WriteError)

Protected Attributes

Members
*DgramSocket _mcastSocket
SocketAddress _mcastAddress
BaseThread_sendQueueThread
bool _sendQueueThreadRunning
bool _sendQueueThreadStop
DgramQueue _queue
DgramQueue _free
Lock_lock
UInt16 _seqNumber
UInt32 _receivers
UInt32 _windowSize
std::vector< SocketAddress_receiver
std::vector< SocketAddress_waitFor
bool _initialized

Private Types

typedef GroupSockConnection Inherited

Private Member Functions

 GroupMCastConnection (const GroupMCastConnection &source)
GroupMCastConnectionoperator= (const GroupMCastConnection &source)

Static Private Attributes

static members
*static ConnectionType _type

Friends

class PointMCastConnection

Classes

struct  SocketBufferHeader

Detailed Description

Definition at line 62 of file OSGGroupMCastConnection.h.


Member Typedef Documentation

typedef GroupSockConnection osg::GroupMCastConnection::Inherited [private]
 

Reimplemented from osg::GroupSockConnection.

Definition at line 180 of file OSGGroupMCastConnection.h.

typedef std::vector<MemoryBlock> osg::BinaryDataHandler::BuffersT [protected, inherited]
 

Definition at line 213 of file OSGBinaryDataHandler.h.

typedef std::list<MemoryHandle> osg::BinaryDataHandler::FreeMemT [protected, inherited]
 

Definition at line 214 of file OSGBinaryDataHandler.h.


Constructor & Destructor Documentation

GroupMCastConnection::GroupMCastConnection void   ) 
 

Constructor

Definition at line 71 of file OSGGroupMCastConnection.cpp.

References _free, _lock, _mcastSocket, _windowSize, osg::Socket::bind(), osg::Lock::get(), osg::Socket::getReadBufferSize(), osg::DgramSocket::open(), OSG_DGRAM_LEN, OSG_DGRAM_QUEUE_LEN, osg::osgMin(), osg::DgramQueue::put(), and osg::Socket::setReadBufferSize().

Referenced by create().

00071                                           :
00072     Inherited(),
00073     _sendQueueThread(NULL),
00074     _seqNumber(0),
00075     _initialized(false)
00076 {
00077     char lockName[256];
00078     sprintf(lockName,"GroupMCastConnection%p",this);
00079 
00080     // create locks
00081     _lock     = Lock::get(lockName);
00082     // fill dgramqueue
00083     for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI)
00084         _free.put(new Dgram());
00085     // prepare mcast socket
00086     _mcastSocket.open();
00087     _mcastSocket.bind();
00088     _mcastSocket.setReadBufferSize(262144);
00089     // set window size
00090     _windowSize = _mcastSocket.getReadBufferSize()/(OSG_DGRAM_LEN) - 1;
00091     _windowSize = osgMin((UInt32)13,_windowSize);
00092 }

GroupMCastConnection::~GroupMCastConnection void   )  [virtual]
 

Destructor

Definition at line 96 of file OSGGroupMCastConnection.cpp.

References _free, _lock, _mcastSocket, _queue, _sendQueueThread, _sendQueueThreadStop, osg::Lock::aquire(), osg::DgramSocket::close(), osg::DgramQueue::empty(), osg::DgramQueue::get(), osg::BaseThread::join(), osg::DgramQueue::put(), osg::Lock::release(), and osg::Dgram::setSize().

00097 {
00098     Dgram *dgram;
00099     _sendQueueThreadStop = true;
00100 
00101     // get free dgram
00102     _lock->aquire();
00103     dgram = _free.get(_lock);
00104     dgram->setSize(0);
00105     _queue.put(dgram);
00106     _lock->release();
00107     // wait for stop
00108     if(_sendQueueThread)
00109         BaseThread::join(_sendQueueThread);    
00110     // close socket
00111     _mcastSocket.close();
00112     // free queues
00113     _lock->aquire();
00114     while(!_free.empty())
00115         delete _free.get(_lock);
00116     while(!_queue.empty())
00117         delete _queue.get(_lock);
00118     _lock->release();
00119 }

osg::GroupMCastConnection::GroupMCastConnection const GroupMCastConnection source  )  [private]
 


Member Function Documentation

const ConnectionType * GroupMCastConnection::getType void   )  [virtual]
 

get connection type

Reimplemented from osg::GroupSockConnection.

Definition at line 123 of file OSGGroupMCastConnection.cpp.

References _type.

00124 {
00125     return &_type;
00126 }

GroupConnection::Channel GroupMCastConnection::connectPoint const std::string &  address,
Time  timeout = -1
[virtual]
 

connect to the given point. If timeout is reached, -1 is returned

Reimplemented from osg::GroupSockConnection.

Definition at line 134 of file OSGGroupMCastConnection.cpp.

References osg::Connection::Channel, and osg::GroupSockConnection::connectPoint().

00137 {
00138     Channel channel = Inherited::connectPoint(address,timeout);
00139     return channel;
00140 }

void GroupMCastConnection::disconnect Channel  channel  )  [virtual]
 

disconnect the given channel

Reimplemented from osg::GroupSockConnection.

Definition at line 144 of file OSGGroupMCastConnection.cpp.

References osg::GroupConnection::_destination, _lock, osg::Lock::aquire(), osg::GroupConnection::channelToIndex(), osg::GroupSockConnection::disconnect(), and osg::Lock::release().

00145 {
00146     Inherited::disconnect(channel);
00147     _lock->aquire();
00148     _destination.erase(_destination.begin()+channelToIndex(channel));
00149     _lock->release();
00150 }

GroupConnection::Channel GroupMCastConnection::acceptPoint Time  timeout = -1  )  [virtual]
 

accept an icomming point connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout

Reimplemented from osg::GroupSockConnection.

Definition at line 155 of file OSGGroupMCastConnection.cpp.

References osg::GroupSockConnection::acceptPoint(), and osg::Connection::Channel.

00156 {
00157     Connection::Channel channel = Inherited::acceptPoint(timeout);
00158     return channel;
00159 }

void GroupMCastConnection::setParams const std::string &  params  )  [virtual]
 

parse the params string.

Reimplemented from osg::GroupSockConnection.

Definition at line 163 of file OSGGroupMCastConnection.cpp.

References _mcastSocket, FINFO, osg::GroupSockConnection::setParams(), and osg::DgramSocket::setTTL().

00164 {
00165     if(params.empty())
00166         return;
00167 
00168     Inherited::setParams(params);
00169 
00170     std::string option = "TTL=";
00171     std::string::size_type i = 0;
00172     if((i=params.find(option)) != std::string::npos)
00173     {
00174         std::string str = params.substr(i + option.size());
00175 
00176         std::stringstream ss;
00177         std::string::size_type j = 0;
00178         while(j < str.length() && str[j] != ',' && isdigit(str[j]))
00179         {
00180             ss << str[j++];
00181         }
00182         UInt32 ttl;
00183         ss >> ttl;
00184         if(ttl > 255)
00185             ttl = 255;
00186         _mcastSocket.setTTL((unsigned char) ttl);
00187         FINFO(("GroupMCastConnection::setParams : setting ttl to %u.\n", ttl));
00188     }
00189 }

GroupConnection * GroupMCastConnection::create void   )  [static]
 

Reimplemented from osg::GroupSockConnection.

Definition at line 216 of file OSGGroupMCastConnection.cpp.

References GroupMCastConnection().

00217 {
00218     return new GroupMCastConnection();
00219 }

void GroupMCastConnection::write MemoryHandle  mem,
UInt32  size
[protected, virtual]
 

Write data to all destinations

Parameters:
mem Pointer to data buffer
size Size of bytes to write

Reimplemented from osg::GroupSockConnection.

Definition at line 231 of file OSGGroupMCastConnection.cpp.

References _free, _initialized, _lock, _mcastAddress, _mcastSocket, _queue, _sendQueueThread, _sendQueueThreadRunning, _seqNumber, osg::Lock::aquire(), osg::DgramQueue::get(), osg::Dgram::getBuffer(), osg::Dgram::getBufferSize(), osg::Dgram::getCapacity(), osg::Dgram::getData(), osg::Dgram::getSize(), initialize(), osg::BaseThread::join(), osg::osgMin(), osg::DgramQueue::put(), osg::Lock::release(), osg::DgramSocket::sendTo(), osg::Dgram::setEarlySend(), osg::Dgram::setId(), osg::Dgram::setSize(), and osg::DgramQueue::waiting().

Referenced by writeBuffer().

00232 {
00233     Dgram  *dgram  = NULL;;
00234     UInt32  pos    = 0;
00235     char   *buffer = (char*)mem;
00236 
00237     if(!_initialized)
00238         initialize();
00239 
00240     while(size)
00241     {
00242         // get free dgram
00243         _lock->aquire();
00244         dgram = _free.get(_lock);
00245         _lock->release();
00246         // fill with data
00247         dgram->setSize(osgMin(size,dgram->getCapacity()));
00248         memcpy(dgram->getData(),buffer+pos,dgram->getSize());
00249         // set sequence number
00250         dgram->setId( _seqNumber++ );
00251         // prepate next block
00252         size -= dgram->getSize();
00253         pos  += dgram->getSize();
00254         // put to write queue
00255         _lock->aquire();
00256         if(!_sendQueueThreadRunning)
00257         {
00258             _lock->release();
00259             BaseThread::join(_sendQueueThread);    
00260             throw WriteError("Channel closed");
00261         }
00262 #ifdef USE_EARLY_SEND
00263         if(_queue.waiting())
00264         {
00265             dgram->setEarlySend(true);
00266             _mcastSocket.sendTo(dgram->getBuffer(),
00267                                 dgram->getBufferSize(),
00268                                 _mcastAddress);
00269         }
00270 #endif
00271         _queue.put(dgram);
00272         _lock->release();
00273     }
00274 }

void GroupMCastConnection::writeBuffer void   )  [protected, virtual]
 

Write buffer

Write blocksize and data.

Reimplemented from osg::GroupSockConnection.

Definition at line 281 of file OSGGroupMCastConnection.cpp.

References write(), and osg::BinaryDataHandler::writeBufBegin().

00282 {
00283     UInt32       size   = writeBufBegin()->getDataSize();
00284     MemoryHandle buffer = writeBufBegin()->getMem();
00285 
00286     write(buffer,size);
00287 }

bool GroupMCastConnection::wait Time  timeout  )  throw (ReadError) [protected, virtual]
 

wait for signal

Reimplemented from osg::GroupSockConnection.

Definition at line 196 of file OSGGroupMCastConnection.cpp.

00197 {
00198     // todo
00199     return Inherited::wait(timeout);
00200 }

void GroupMCastConnection::signal void   )  throw (WriteError) [protected, virtual]
 

send signal

Reimplemented from osg::GroupSockConnection.

Definition at line 204 of file OSGGroupMCastConnection.cpp.

References osg::BinaryDataHandler::flush(), and osg::BinaryDataHandler::putValue().

00205 {
00206     UInt32 tag=314156;
00207     putValue(tag);
00208     flush();
00209 }

bool GroupMCastConnection::checkChannels void   )  [private]
 

check if all receivers are alive

Definition at line 294 of file OSGGroupMCastConnection.cpp.

References _lock, osg::GroupSockConnection::_sockets, osg::Lock::aquire(), FWARNING, osg::SocketSelection::isSetRead(), osg::Lock::release(), osg::SocketSelection::select(), and osg::SocketSelection::setRead().

Referenced by sendQueue().

00295 {
00296     SocketSelection selection;
00297     UInt32          index;
00298     bool            valid=true;
00299                     
00300     _lock->aquire();
00301     for(index = 0 ; index < _sockets.size() ; ++index)
00302         selection.setRead(_sockets[index]);
00303     if(selection.select(0))
00304     {
00305         UInt32 len;
00306         char   buffer;
00307         for(index = 0 ; index < _sockets.size() ; ++index)
00308         {
00309             if(selection.isSetRead(_sockets[index]))
00310             {
00311                 try
00312                 {
00313                     _sockets[index].send(&buffer,1);
00314                 }
00315                 catch(SocketException &e)
00316                 {
00317                     valid = false;
00318                     FWARNING(("Socket error:%s\n",e.what()))
00319                     break;
00320                 }
00321             }
00322         }
00323     }
00324     _lock->release();
00325     return valid;
00326 }

void GroupMCastConnection::sendQueueThread void *  arg  )  [static, private]
 

write thread

Definition at line 530 of file OSGGroupMCastConnection.cpp.

References _sendQueueThreadRunning, FFATAL, and sendQueue().

Referenced by initialize().

00531 {
00532     GroupMCastConnection *the = (GroupMCastConnection *)arg;
00533     try
00534     {
00535         the->sendQueue();
00536     }
00537     catch(SocketException &e)
00538     {
00539         FFATAL(( "Writer Proc crashed %s\n",e.what() ));
00540     }
00541     the->_sendQueueThreadRunning = false;
00542 }

bool GroupMCastConnection::sendQueue void   )  [private]
 

Send current write queue

Definition at line 330 of file OSGGroupMCastConnection.cpp.

References _free, _lock, _mcastAddress, _mcastSocket, _queue, _sendQueueThreadStop, _waitFor, _windowSize, osg::Lock::aquire(), checkChannels(), osg::DgramQueue::empty(), FDEBUG, osg::DgramQueue::get(), osg::Dgram::getBuffer(), osg::Dgram::getBufferCapacity(), osg::Dgram::getBufferSize(), osg::SocketAddress::getHost(), osg::Dgram::getId(), osg::SocketAddress::getPort(), osg::Dgram::getResponseAck(), osg::getSystemTime(), osg::Dgram::less(), osg::DgramQueue::put(), osg::DgramSocket::recvFrom(), osg::Lock::release(), osg::DgramSocket::sendTo(), osg::Dgram::setId(), osg::Dgram::setResponseSize(), osg::Dgram::setSize(), and osg::Socket::waitReadable().

Referenced by sendQueueThread().

00331 {
00332     std::vector<Dgram*>                   dgram;
00333     std::vector<std::set<SocketAddress> > missing;
00334     UInt32                  count    = 0;
00335     UInt32                  maxCount = _windowSize-1;
00336     UInt32                  ack  = 0;
00337     UInt32                  end  = 0;
00338     UInt32                  send = 0;
00339     UInt32                  channel;
00340     UInt32                  index;
00341     Dgram                   response;
00342     UInt32                  m;
00343     bool                    readable = false;
00344     Time                    waitStart = getSystemTime();
00345     Time                    lastAckTime=0;
00346     UInt32                  len;
00347     SocketAddress           fromAddress;
00348     Dgram                   ackRequest;
00349     const Time              ackTimeout = 0.01;
00350     UInt16                  sendId=0;
00351     UInt16                  lastNak=0;
00352     Time                    lastNakTime=0;
00353     bool                    stopAfterSend=false;
00354 
00355     // prepate buffers
00356     dgram.resize(_windowSize);
00357     missing.resize(_windowSize);
00358 
00359 #ifdef TEST_LOST_DGRAM_RATE
00360     srand48((int)getSystemTime());
00361 #endif
00362     do
00363     {
00364         // read new dgrams
00365         if(count < maxCount)
00366         {
00367             _lock->aquire();
00368             while(count < maxCount &&
00369                   (count == 0 || !_queue.empty()))
00370             {
00371                 dgram[end] = _queue.get(_lock);
00372                 // stop
00373                 if(!dgram[end]->getSize())
00374                 {
00375                     if(count)
00376                     {
00377                         // wait for ack of packages in window
00378                         stopAfterSend = true;
00379                         break;
00380                     }
00381                     else
00382                     {
00383                         // no packages in the window
00384                         _lock->release();
00385                         return true;
00386                     }
00387                 }
00388                 // insert to expected responses
00389                 for(index=0 ; index<_waitFor.size() ; ++index)
00390                     missing[end].insert(_waitFor[index]);
00391                 end = (end+1) % _windowSize;
00392                 if(!count)
00393                     lastAckTime = getSystemTime();
00394                 count++;
00395             }
00396             _lock->release();
00397         }
00398 
00399         // send all dgrams in current window
00400         for( ; send != end ; send = (send+1) % _windowSize)
00401         {
00402 #ifdef TEST_LOST_DGRAM_RATE
00403             if(drand48()>TEST_LOST_DGRAM_RATE)
00404 #endif
00405             {
00406                 if(!dgram[send]->getEarlySend())
00407                     _mcastSocket.sendTo(dgram[send]->getBuffer(),
00408                                         dgram[send]->getBufferSize(),
00409                                         _mcastAddress);
00410                 dgram[send]->setEarlySend(false);
00411             }
00412             sendId = dgram[send]->getId();
00413 //            printf("send dgram %d at id %d\n",send,dgram[send]->getId());
00414         }
00415 
00416         // loop while
00417         // window is full and nothing to send or
00418         // queue is empty
00419         // or there is something to read
00420         while(count &&
00421               ( ( readable = _mcastSocket.waitReadable(0) ) ||
00422                 ( count == maxCount && send == end ) || 
00423                 ( getSystemTime() - lastAckTime > ackTimeout) ) )
00424         {
00425             if(!readable && !_mcastSocket.waitReadable(ackTimeout))
00426             {
00427 #if 0
00428                 printf("count %d\n",count);
00429                 printf("missing %d\n",missing[ack].size());
00430 
00431                 printf("readable %d\n",readable);
00432                 printf("send %d end %d\n",send,end);
00433                 printf("lastack %lf\n",getSystemTime() - lastAckTime);
00434 #endif
00435                 FDEBUG(("timeout count %d %d missing %d\n",count,sendId,missing[ack].size()))
00436 //                printf("%.10f timeout count %d %d missing %d\n",getSystemTime()-t1,count,sendId,missing[ack].size());
00437                     
00438                 ackRequest.setSize(0);
00439                 ackRequest.setId(sendId);
00440 
00441                 // send request over multicast
00442                 _mcastSocket.sendTo(ackRequest.getBuffer(),
00443                                     ackRequest.getBufferSize(),
00444                                     _mcastAddress);
00445 
00446                 // wait until next ack request                
00447                 _mcastSocket.waitReadable(0.05);
00448 
00449                 // check channels
00450                 if(getSystemTime() - lastAckTime > 0.5)
00451                 {
00452                     if(!checkChannels())
00453                         return false;
00454                 }
00455                 if(_sendQueueThreadStop && 
00456                    getSystemTime() - lastAckTime > 1)
00457                     // linger max 1 sec after close
00458                     break;
00459                 else
00460                     // retry wait
00461                     continue;
00462             }
00463 
00464             // read response
00465             len = _mcastSocket.recvFrom(response.getBuffer(),
00466                                         response.getBufferCapacity(),
00467                                         fromAddress);
00468             lastAckTime = getSystemTime();
00469 
00470             // ignore response with wrong len
00471             response.setResponseSize();
00472             if(len != response.getBufferSize())
00473             {
00474                 FDEBUG(("Wrong response len %d\n",len))
00475                     continue;
00476             }
00477 
00478             // old ack ?
00479             if(!Dgram::less(response.getId(),dgram[ack]->getId()))
00480             {
00481                 // first ack for this dgram from this receiver
00482                 if(response.getResponseAck() == true)
00483                 {
00484 //                    printf("Ack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort());
00485                     for(m = ack ; 
00486                         dgram[m]->getId() != response.getId() ;
00487                         m=(m+1) % _windowSize)
00488                         missing[m].erase(fromAddress);
00489                     missing[m].erase(fromAddress);
00490                 }
00491                 else
00492                 {
00493                     if(response.getId() == lastNak &&
00494                        getSystemTime() - lastNakTime < 0.02)
00495                         continue;
00496                     lastNak = response.getId();
00497                     lastNakTime = getSystemTime();
00498                     FDEBUG(("Nack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort()));
00499 //                    printf("Nack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort());
00500                     // retransmit
00501                     for(m = ack ; 
00502                         m != send && dgram[m]->getId() != response.getId() ; 
00503                         m = (m+1) % _windowSize);
00504                     send = m;
00505                 }
00506             }
00507 
00508             // free acknolaged packes
00509             if(missing[ack].empty())
00510             {
00511 //                printf("ack %d\n",dgram[ack]->getId());
00512                 _lock->aquire();
00513                 while(count && missing[ack].empty())
00514                 {
00515                     _free.put(dgram[ack]);
00516                     ack = (ack+1) % _windowSize;
00517                     count--;
00518                 }
00519                 _lock->release();
00520             }
00521         }
00522     }
00523     while(!stopAfterSend || send != end);
00524 
00525     return true;
00526 }

void GroupMCastConnection::initialize void   )  [private]
 

initialize connection. Connect to all points

Definition at line 546 of file OSGGroupMCastConnection.cpp.

References osg::GroupConnection::_destination, _initialized, _mcastAddress, _mcastSocket, _receiver, _sendQueueThread, _sendQueueThreadRunning, _sendQueueThreadStop, _seqNumber, osg::GroupSockConnection::_sockets, _waitFor, osg::BinaryMessage::clear(), osg::BaseThread::get(), osg::Socket::getAddress(), osg::GroupConnection::getDestination(), osg::SocketAddress::getHost(), osg::Connection::getInterface(), osg::SocketAddress::getPort(), osg::BinaryMessage::getString(), osg::BinaryMessage::getUInt32(), osg::osgMin(), osg::osgsqrt(), osg::BinaryMessage::putString(), osg::BinaryMessage::putUInt32(), osg::BaseThread::runFunction(), sendQueueThread(), osg::SocketAddress::setHost(), osg::DgramSocket::setMCastInterface(), osg::SocketAddress::setPort(), and SINFO.

Referenced by write().

00547 {
00548     std::string   group = "239.33.42.32";
00549 //    std::string group = "146.140.32.7";
00550 //    std::string group = "146.140.32.255";
00551     int           port = 15356;
00552     int           pos  = _destination.find(':');
00553     int           clientPort;
00554     std::string   clientHost;
00555     char          hostname[256];
00556     UInt32        index;
00557     UInt32        len;
00558     UInt32        ackNum = (UInt32) osgsqrt(_sockets.size());
00559     UInt32        numSource;
00560     UInt32        sendTo;
00561     BinaryMessage message;
00562     char          threadName[256];
00563 
00564     sprintf(threadName,"GroupMCastConnection%p",this);
00565 
00566     if(!getDestination().empty())
00567         group = getDestination();
00568         
00569     if(_sockets.size()<=16)
00570         ackNum = 1;
00571 
00572     if(pos>=0)
00573     {
00574         group = std::string(_destination,0,pos);
00575         port  = atoi(std::string(_destination,pos+1,std::string::npos).c_str());
00576     }
00577     else
00578     {
00579         if(_destination.size())
00580             group = _destination;
00581     }
00582     _mcastAddress.setHost(group);
00583     _mcastAddress.setPort(port );
00584 
00585     // set multicast interface, if given
00586     if(!getInterface().empty())
00587         _mcastSocket.setMCastInterface(
00588             SocketAddress(getInterface().c_str()));
00589 
00590     for(index = 0 ; index < _sockets.size() ; ++index)
00591     {
00592         message.clear();
00593         // tell the point connection the multicast address
00594         message.putString(_mcastAddress.getHost());
00595         message.putUInt32(_mcastAddress.getPort());
00596         // tell the current seq number
00597         message.putUInt32(_seqNumber);
00598         // tell the point from wich port requests are comming
00599         hostname[255] = '\0';
00600         gethostname(hostname,255);
00601         message.putString(hostname);
00602         message.putUInt32(_mcastSocket.getAddress().getPort());
00603         // send the message
00604         _sockets[index].send(message);
00605         
00606         // receive destination address
00607         message.clear();
00608         len = _sockets[index].recv(message);
00609         if(len == 0)
00610             throw ReadError("Channel closed\n");
00611         clientHost = message.getString();
00612         clientPort = message.getUInt32();
00613 
00614         SINFO << "Server:" << clientHost 
00615               << " Port:" << clientPort << std::endl;
00616         _receiver.push_back(SocketAddress(clientHost.c_str(),clientPort));
00617     }
00618     for(index = 0 ; index < _sockets.size() ; ++index)
00619     {
00620         message.clear();
00621         // tell receivers, whom to report acks
00622         if((index % ackNum) == 0)
00623         {
00624             _waitFor.push_back(_receiver[index]);
00625             numSource = osgMin( ackNum-1, (UInt32) _sockets.size()-index-1 );
00626             message.putUInt32(numSource);
00627             for(UInt32 r = index+1 ; r < index+1+numSource ; ++r)
00628             {
00629                 message.putString(_receiver[r].getHost());
00630                 message.putUInt32(_receiver[r].getPort());
00631             }
00632             message.putString(hostname);
00633             message.putUInt32(_mcastSocket.getAddress().getPort());
00634         }
00635         else
00636         {
00637             sendTo = index - (index % ackNum);
00638             message.putUInt32(0);
00639             message.putString(_receiver[sendTo].getHost());
00640             message.putUInt32(_receiver[sendTo].getPort());
00641         }
00642         // send the message
00643         _sockets[index].send(message);
00644     }
00645 
00646     // start write thread
00647     _sendQueueThread=BaseThread::get(threadName);
00648     _sendQueueThreadRunning = true;
00649     _sendQueueThreadStop    = false;
00650     _sendQueueThread->runFunction( sendQueueThread, (void *) (this) );
00651     _initialized = true;
00652 }

GroupMCastConnection& osg::GroupMCastConnection::operator= const GroupMCastConnection source  )  [private]
 

std::string GroupSockConnection::bind const std::string &  address  )  [virtual, inherited]
 

bind the connection to an network interface. The address is returned, on wich the port could be connected. The interface is determined by the connection interface filed and the address parameter. Address can be empty, wich means to use a free port or address can contain a port number.

Implements osg::Connection.

Definition at line 172 of file OSGGroupSockConnection.cpp.

References osg::GroupSockConnection::_acceptSocket, osg::Socket::bind(), osg::Socket::getAddress(), osg::SocketAddress::getHost(), osg::Connection::getInterface(), osg::SocketAddress::getPort(), osg::Socket::listen(), osg::Socket::setReusePort(), and SINFO.

00173 {
00174     int         port=0;
00175     char        localhost[256];
00176     char        host[256];
00177     char        portStr[256];
00178     std::string interf;
00179     std::string boundedAddress;
00180 
00181     // get local host name
00182     gethostname(localhost,255);
00183     if(!getInterface().empty())
00184         interf = getInterface();
00185     else
00186         interf = localhost;
00187     // parse address
00188     if(!address.empty())
00189         if(sscanf(address.c_str(),"%*[^:]:%d",&port) != 1)
00190             if(sscanf(address.c_str(),":%d",&port) != 1)
00191                 port = 0;
00192     // bind port
00193     _acceptSocket.setReusePort(true);
00194     _acceptSocket.bind(SocketAddress(interf.c_str(),port));
00195     SINFO << "Connection bound to "
00196           << _acceptSocket.getAddress().getHost() << ":"
00197           << _acceptSocket.getAddress().getPort() << std::endl;
00198     _acceptSocket.listen();
00199     // create address
00200     sprintf(portStr,"%d",_acceptSocket.getAddress().getPort());
00201     return interf + ":" + portStr;
00202 }

Connection::Channel GroupSockConnection::selectChannel Time  timeout = -1  )  throw (ReadError) [virtual, inherited]
 

select the next channel for reading. If timeout is not -1 then -1 is returned if timeout is reached

Implements osg::Connection.

Definition at line 249 of file OSGGroupSockConnection.cpp.

References FFATAL, osg::SocketSelection::isSetRead(), osg::SocketSelection::select(), and osg::SocketSelection::setRead().

00251 {
00252     Int32 maxnread=0,nread;
00253     ChannelIndex index;
00254     SocketSelection selection,result;
00255 
00256     // if there is data in the read buffer, return current channel
00257     if(_zeroCopyThreshold != 1 &&
00258        _currentReadBuffer != readBufEnd())
00259     {
00260         FFATAL(("Channel change ignores data in current buffer"))
00261         return indexToChannel(_readIndex);
00262     }    
00263 
00264     if(_selection[_readIndex] &&
00265        _sockets[_readIndex].getAvailable())
00266     {
00267         return indexToChannel(_readIndex);;
00268     }
00269 
00270     // wait for first socket to deliver data
00271     for(index = 0 ; index < _sockets.size() ; ++index)
00272     {
00273         if(_selection[index])
00274             selection.setRead(_sockets[index]);
00275     }
00276