#include <OSGGroupMCastConnection.h>
Inheritance diagram for osg::GroupMCastConnection:

private helpers | |
| *bool | checkChannels (void) |
| bool | sendQueue (void) |
| void | initialize (void) |
| static void | sendQueueThread (void *arg) |
Public Member Functions | |
Constructors | |
| * | GroupMCastConnection (void) |
| virtual | ~GroupMCastConnection (void) |
type info | |
| *virtual const ConnectionType * | getType (void) |
connection | |
| *virtual Channel | connectPoint (const std::string &address, Time timeout=-1) |
| virtual void | disconnect (Channel channel) |
| virtual Channel | acceptPoint (Time timeout=-1) |
params | |
| *virtual void | setParams (const std::string ¶ms) |
Static Public Member Functions | |
create | |
| *static GroupConnection * | create (void) |
| create conneciton | |
Protected Types | |
| typedef std::vector< MemoryBlock > | BuffersT |
| typedef std::list< MemoryHandle > | FreeMemT |
Protected Member Functions | |
IO Implementation | |
| *virtual void | write (MemoryHandle mem, UInt32 size) |
| virtual void | writeBuffer (void) |
synchronisation | |
| *virtual bool | wait (Time timeout) throw (ReadError) |
| virtual void | signal (void) throw (WriteError) |
Protected Attributes | |
Members | |
| *DgramSocket | _mcastSocket |
| SocketAddress | _mcastAddress |
| BaseThread * | _sendQueueThread |
| bool | _sendQueueThreadRunning |
| bool | _sendQueueThreadStop |
| DgramQueue | _queue |
| DgramQueue | _free |
| Lock * | _lock |
| UInt16 | _seqNumber |
| UInt32 | _receivers |
| UInt32 | _windowSize |
| std::vector< SocketAddress > | _receiver |
| std::vector< SocketAddress > | _waitFor |
| bool | _initialized |
Private Types | |
| typedef GroupSockConnection | Inherited |
Private Member Functions | |
| GroupMCastConnection (const GroupMCastConnection &source) | |
| GroupMCastConnection & | operator= (const GroupMCastConnection &source) |
Static Private Attributes | |
static members | |
| *static ConnectionType | _type |
Friends | |
| class | PointMCastConnection |
Classes | |
| struct | SocketBufferHeader |
Definition at line 62 of file OSGGroupMCastConnection.h.
|
|
Reimplemented from osg::GroupSockConnection. Definition at line 180 of file OSGGroupMCastConnection.h. |
|
|
Definition at line 213 of file OSGBinaryDataHandler.h. |
|
|
Definition at line 214 of file OSGBinaryDataHandler.h. |
|
|
Constructor Definition at line 71 of file OSGGroupMCastConnection.cpp. References _free, _lock, _mcastSocket, _windowSize, osg::Socket::bind(), osg::Lock::get(), osg::Socket::getReadBufferSize(), osg::DgramSocket::open(), OSG_DGRAM_LEN, OSG_DGRAM_QUEUE_LEN, osg::osgMin(), osg::DgramQueue::put(), and osg::Socket::setReadBufferSize(). Referenced by create(). 00071 : 00072 Inherited(), 00073 _sendQueueThread(NULL), 00074 _seqNumber(0), 00075 _initialized(false) 00076 { 00077 char lockName[256]; 00078 sprintf(lockName,"GroupMCastConnection%p",this); 00079 00080 // create locks 00081 _lock = Lock::get(lockName); 00082 // fill dgramqueue 00083 for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI) 00084 _free.put(new Dgram()); 00085 // prepare mcast socket 00086 _mcastSocket.open(); 00087 _mcastSocket.bind(); 00088 _mcastSocket.setReadBufferSize(262144); 00089 // set window size 00090 _windowSize = _mcastSocket.getReadBufferSize()/(OSG_DGRAM_LEN) - 1; 00091 _windowSize = osgMin((UInt32)13,_windowSize); 00092 }
|
|
|
Destructor Definition at line 96 of file OSGGroupMCastConnection.cpp. References _free, _lock, _mcastSocket, _queue, _sendQueueThread, _sendQueueThreadStop, osg::Lock::aquire(), osg::DgramSocket::close(), osg::DgramQueue::empty(), osg::DgramQueue::get(), osg::BaseThread::join(), osg::DgramQueue::put(), osg::Lock::release(), and osg::Dgram::setSize(). 00097 { 00098 Dgram *dgram; 00099 _sendQueueThreadStop = true; 00100 00101 // get free dgram 00102 _lock->aquire(); 00103 dgram = _free.get(_lock); 00104 dgram->setSize(0); 00105 _queue.put(dgram); 00106 _lock->release(); 00107 // wait for stop 00108 if(_sendQueueThread) 00109 BaseThread::join(_sendQueueThread); 00110 // close socket 00111 _mcastSocket.close(); 00112 // free queues 00113 _lock->aquire(); 00114 while(!_free.empty()) 00115 delete _free.get(_lock); 00116 while(!_queue.empty()) 00117 delete _queue.get(_lock); 00118 _lock->release(); 00119 }
|
|
|
|
|
|
get connection type Reimplemented from osg::GroupSockConnection. Definition at line 123 of file OSGGroupMCastConnection.cpp. References _type. 00124 { 00125 return &_type; 00126 }
|
|
||||||||||||
|
connect to the given point. If timeout is reached, -1 is returned Reimplemented from osg::GroupSockConnection. Definition at line 134 of file OSGGroupMCastConnection.cpp. References osg::Connection::Channel, and osg::GroupSockConnection::connectPoint(). 00137 { 00138 Channel channel = Inherited::connectPoint(address,timeout); 00139 return channel; 00140 }
|
|
|
disconnect the given channel Reimplemented from osg::GroupSockConnection. Definition at line 144 of file OSGGroupMCastConnection.cpp. References osg::GroupConnection::_destination, _lock, osg::Lock::aquire(), osg::GroupConnection::channelToIndex(), osg::GroupSockConnection::disconnect(), and osg::Lock::release(). 00145 { 00146 Inherited::disconnect(channel); 00147 _lock->aquire(); 00148 _destination.erase(_destination.begin()+channelToIndex(channel)); 00149 _lock->release(); 00150 }
|
|
|
accept an icomming point connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout Reimplemented from osg::GroupSockConnection. Definition at line 155 of file OSGGroupMCastConnection.cpp. References osg::GroupSockConnection::acceptPoint(), and osg::Connection::Channel. 00156 { 00157 Connection::Channel channel = Inherited::acceptPoint(timeout); 00158 return channel; 00159 }
|
|
|
parse the params string. Reimplemented from osg::GroupSockConnection. Definition at line 163 of file OSGGroupMCastConnection.cpp. References _mcastSocket, FINFO, osg::GroupSockConnection::setParams(), and osg::DgramSocket::setTTL(). 00164 { 00165 if(params.empty()) 00166 return; 00167 00168 Inherited::setParams(params); 00169 00170 std::string option = "TTL="; 00171 std::string::size_type i = 0; 00172 if((i=params.find(option)) != std::string::npos) 00173 { 00174 std::string str = params.substr(i + option.size()); 00175 00176 std::stringstream ss; 00177 std::string::size_type j = 0; 00178 while(j < str.length() && str[j] != ',' && isdigit(str[j])) 00179 { 00180 ss << str[j++]; 00181 } 00182 UInt32 ttl; 00183 ss >> ttl; 00184 if(ttl > 255) 00185 ttl = 255; 00186 _mcastSocket.setTTL((unsigned char) ttl); 00187 FINFO(("GroupMCastConnection::setParams : setting ttl to %u.\n", ttl)); 00188 } 00189 }
|
|
|
Reimplemented from osg::GroupSockConnection. Definition at line 216 of file OSGGroupMCastConnection.cpp. References GroupMCastConnection(). 00217 { 00218 return new GroupMCastConnection(); 00219 }
|
|
||||||||||||
|
Write data to all destinations
Reimplemented from osg::GroupSockConnection. Definition at line 231 of file OSGGroupMCastConnection.cpp. References _free, _initialized, _lock, _mcastAddress, _mcastSocket, _queue, _sendQueueThread, _sendQueueThreadRunning, _seqNumber, osg::Lock::aquire(), osg::DgramQueue::get(), osg::Dgram::getBuffer(), osg::Dgram::getBufferSize(), osg::Dgram::getCapacity(), osg::Dgram::getData(), osg::Dgram::getSize(), initialize(), osg::BaseThread::join(), osg::osgMin(), osg::DgramQueue::put(), osg::Lock::release(), osg::DgramSocket::sendTo(), osg::Dgram::setEarlySend(), osg::Dgram::setId(), osg::Dgram::setSize(), and osg::DgramQueue::waiting(). Referenced by writeBuffer(). 00232 { 00233 Dgram *dgram = NULL;; 00234 UInt32 pos = 0; 00235 char *buffer = (char*)mem; 00236 00237 if(!_initialized) 00238 initialize(); 00239 00240 while(size) 00241 { 00242 // get free dgram 00243 _lock->aquire(); 00244 dgram = _free.get(_lock); 00245 _lock->release(); 00246 // fill with data 00247 dgram->setSize(osgMin(size,dgram->getCapacity())); 00248 memcpy(dgram->getData(),buffer+pos,dgram->getSize()); 00249 // set sequence number 00250 dgram->setId( _seqNumber++ ); 00251 // prepate next block 00252 size -= dgram->getSize(); 00253 pos += dgram->getSize(); 00254 // put to write queue 00255 _lock->aquire(); 00256 if(!_sendQueueThreadRunning) 00257 { 00258 _lock->release(); 00259 BaseThread::join(_sendQueueThread); 00260 throw WriteError("Channel closed"); 00261 } 00262 #ifdef USE_EARLY_SEND 00263 if(_queue.waiting()) 00264 { 00265 dgram->setEarlySend(true); 00266 _mcastSocket.sendTo(dgram->getBuffer(), 00267 dgram->getBufferSize(), 00268 _mcastAddress); 00269 } 00270 #endif 00271 _queue.put(dgram); 00272 _lock->release(); 00273 } 00274 }
|
|
|
Write buffer Write blocksize and data. Reimplemented from osg::GroupSockConnection. Definition at line 281 of file OSGGroupMCastConnection.cpp. References write(), and osg::BinaryDataHandler::writeBufBegin(). 00282 { 00283 UInt32 size = writeBufBegin()->getDataSize(); 00284 MemoryHandle buffer = writeBufBegin()->getMem(); 00285 00286 write(buffer,size); 00287 }
|
|
|
wait for signal Reimplemented from osg::GroupSockConnection. Definition at line 196 of file OSGGroupMCastConnection.cpp. 00197 { 00198 // todo 00199 return Inherited::wait(timeout); 00200 }
|
|
|
send signal Reimplemented from osg::GroupSockConnection. Definition at line 204 of file OSGGroupMCastConnection.cpp. References osg::BinaryDataHandler::flush(), and osg::BinaryDataHandler::putValue().
|
|
|
check if all receivers are alive Definition at line 294 of file OSGGroupMCastConnection.cpp. References _lock, osg::GroupSockConnection::_sockets, osg::Lock::aquire(), FWARNING, osg::SocketSelection::isSetRead(), osg::Lock::release(), osg::SocketSelection::select(), and osg::SocketSelection::setRead(). Referenced by sendQueue(). 00295 { 00296 SocketSelection selection; 00297 UInt32 index; 00298 bool valid=true; 00299 00300 _lock->aquire(); 00301 for(index = 0 ; index < _sockets.size() ; ++index) 00302 selection.setRead(_sockets[index]); 00303 if(selection.select(0)) 00304 { 00305 UInt32 len; 00306 char buffer; 00307 for(index = 0 ; index < _sockets.size() ; ++index) 00308 { 00309 if(selection.isSetRead(_sockets[index])) 00310 { 00311 try 00312 { 00313 _sockets[index].send(&buffer,1); 00314 } 00315 catch(SocketException &e) 00316 { 00317 valid = false; 00318 FWARNING(("Socket error:%s\n",e.what())) 00319 break; 00320 } 00321 } 00322 } 00323 } 00324 _lock->release(); 00325 return valid; 00326 }
|
|
|
write thread Definition at line 530 of file OSGGroupMCastConnection.cpp. References _sendQueueThreadRunning, FFATAL, and sendQueue(). Referenced by initialize(). 00531 { 00532 GroupMCastConnection *the = (GroupMCastConnection *)arg; 00533 try 00534 { 00535 the->sendQueue(); 00536 } 00537 catch(SocketException &e) 00538 { 00539 FFATAL(( "Writer Proc crashed %s\n",e.what() )); 00540 } 00541 the->_sendQueueThreadRunning = false; 00542 }
|
|
|
Send current write queue Definition at line 330 of file OSGGroupMCastConnection.cpp. References _free, _lock, _mcastAddress, _mcastSocket, _queue, _sendQueueThreadStop, _waitFor, _windowSize, osg::Lock::aquire(), checkChannels(), osg::DgramQueue::empty(), FDEBUG, osg::DgramQueue::get(), osg::Dgram::getBuffer(), osg::Dgram::getBufferCapacity(), osg::Dgram::getBufferSize(), osg::SocketAddress::getHost(), osg::Dgram::getId(), osg::SocketAddress::getPort(), osg::Dgram::getResponseAck(), osg::getSystemTime(), osg::Dgram::less(), osg::DgramQueue::put(), osg::DgramSocket::recvFrom(), osg::Lock::release(), osg::DgramSocket::sendTo(), osg::Dgram::setId(), osg::Dgram::setResponseSize(), osg::Dgram::setSize(), and osg::Socket::waitReadable(). Referenced by sendQueueThread(). 00331 { 00332 std::vector<Dgram*> dgram; 00333 std::vector<std::set<SocketAddress> > missing; 00334 UInt32 count = 0; 00335 UInt32 maxCount = _windowSize-1; 00336 UInt32 ack = 0; 00337 UInt32 end = 0; 00338 UInt32 send = 0; 00339 UInt32 channel; 00340 UInt32 index; 00341 Dgram response; 00342 UInt32 m; 00343 bool readable = false; 00344 Time waitStart = getSystemTime(); 00345 Time lastAckTime=0; 00346 UInt32 len; 00347 SocketAddress fromAddress; 00348 Dgram ackRequest; 00349 const Time ackTimeout = 0.01; 00350 UInt16 sendId=0; 00351 UInt16 lastNak=0; 00352 Time lastNakTime=0; 00353 bool stopAfterSend=false; 00354 00355 // prepate buffers 00356 dgram.resize(_windowSize); 00357 missing.resize(_windowSize); 00358 00359 #ifdef TEST_LOST_DGRAM_RATE 00360 srand48((int)getSystemTime()); 00361 #endif 00362 do 00363 { 00364 // read new dgrams 00365 if(count < maxCount) 00366 { 00367 _lock->aquire(); 00368 while(count < maxCount && 00369 (count == 0 || !_queue.empty())) 00370 { 00371 dgram[end] = _queue.get(_lock); 00372 // stop 00373 if(!dgram[end]->getSize()) 00374 { 00375 if(count) 00376 { 00377 // wait for ack of packages in window 00378 stopAfterSend = true; 00379 break; 00380 } 00381 else 00382 { 00383 // no packages in the window 00384 _lock->release(); 00385 return true; 00386 } 00387 } 00388 // insert to expected responses 00389 for(index=0 ; index<_waitFor.size() ; ++index) 00390 missing[end].insert(_waitFor[index]); 00391 end = (end+1) % _windowSize; 00392 if(!count) 00393 lastAckTime = getSystemTime(); 00394 count++; 00395 } 00396 _lock->release(); 00397 } 00398 00399 // send all dgrams in current window 00400 for( ; send != end ; send = (send+1) % _windowSize) 00401 { 00402 #ifdef TEST_LOST_DGRAM_RATE 00403 if(drand48()>TEST_LOST_DGRAM_RATE) 00404 #endif 00405 { 00406 if(!dgram[send]->getEarlySend()) 00407 _mcastSocket.sendTo(dgram[send]->getBuffer(), 00408 dgram[send]->getBufferSize(), 00409 _mcastAddress); 00410 dgram[send]->setEarlySend(false); 00411 } 00412 sendId = dgram[send]->getId(); 00413 // printf("send dgram %d at id %d\n",send,dgram[send]->getId()); 00414 } 00415 00416 // loop while 00417 // window is full and nothing to send or 00418 // queue is empty 00419 // or there is something to read 00420 while(count && 00421 ( ( readable = _mcastSocket.waitReadable(0) ) || 00422 ( count == maxCount && send == end ) || 00423 ( getSystemTime() - lastAckTime > ackTimeout) ) ) 00424 { 00425 if(!readable && !_mcastSocket.waitReadable(ackTimeout)) 00426 { 00427 #if 0 00428 printf("count %d\n",count); 00429 printf("missing %d\n",missing[ack].size()); 00430 00431 printf("readable %d\n",readable); 00432 printf("send %d end %d\n",send,end); 00433 printf("lastack %lf\n",getSystemTime() - lastAckTime); 00434 #endif 00435 FDEBUG(("timeout count %d %d missing %d\n",count,sendId,missing[ack].size())) 00436 // printf("%.10f timeout count %d %d missing %d\n",getSystemTime()-t1,count,sendId,missing[ack].size()); 00437 00438 ackRequest.setSize(0); 00439 ackRequest.setId(sendId); 00440 00441 // send request over multicast 00442 _mcastSocket.sendTo(ackRequest.getBuffer(), 00443 ackRequest.getBufferSize(), 00444 _mcastAddress); 00445 00446 // wait until next ack request 00447 _mcastSocket.waitReadable(0.05); 00448 00449 // check channels 00450 if(getSystemTime() - lastAckTime > 0.5) 00451 { 00452 if(!checkChannels()) 00453 return false; 00454 } 00455 if(_sendQueueThreadStop && 00456 getSystemTime() - lastAckTime > 1) 00457 // linger max 1 sec after close 00458 break; 00459 else 00460 // retry wait 00461 continue; 00462 } 00463 00464 // read response 00465 len = _mcastSocket.recvFrom(response.getBuffer(), 00466 response.getBufferCapacity(), 00467 fromAddress); 00468 lastAckTime = getSystemTime(); 00469 00470 // ignore response with wrong len 00471 response.setResponseSize(); 00472 if(len != response.getBufferSize()) 00473 { 00474 FDEBUG(("Wrong response len %d\n",len)) 00475 continue; 00476 } 00477 00478 // old ack ? 00479 if(!Dgram::less(response.getId(),dgram[ack]->getId())) 00480 { 00481 // first ack for this dgram from this receiver 00482 if(response.getResponseAck() == true) 00483 { 00484 // printf("Ack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort()); 00485 for(m = ack ; 00486 dgram[m]->getId() != response.getId() ; 00487 m=(m+1) % _windowSize) 00488 missing[m].erase(fromAddress); 00489 missing[m].erase(fromAddress); 00490 } 00491 else 00492 { 00493 if(response.getId() == lastNak && 00494 getSystemTime() - lastNakTime < 0.02) 00495 continue; 00496 lastNak = response.getId(); 00497 lastNakTime = getSystemTime(); 00498 FDEBUG(("Nack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort())); 00499 // printf("Nack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort()); 00500 // retransmit 00501 for(m = ack ; 00502 m != send && dgram[m]->getId() != response.getId() ; 00503 m = (m+1) % _windowSize); 00504 send = m; 00505 } 00506 } 00507 00508 // free acknolaged packes 00509 if(missing[ack].empty()) 00510 { 00511 // printf("ack %d\n",dgram[ack]->getId()); 00512 _lock->aquire(); 00513 while(count && missing[ack].empty()) 00514 { 00515 _free.put(dgram[ack]); 00516 ack = (ack+1) % _windowSize; 00517 count--; 00518 } 00519 _lock->release(); 00520 } 00521 } 00522 } 00523 while(!stopAfterSend || send != end); 00524 00525 return true; 00526 }
|
|
|
initialize connection. Connect to all points Definition at line 546 of file OSGGroupMCastConnection.cpp. References osg::GroupConnection::_destination, _initialized, _mcastAddress, _mcastSocket, _receiver, _sendQueueThread, _sendQueueThreadRunning, _sendQueueThreadStop, _seqNumber, osg::GroupSockConnection::_sockets, _waitFor, osg::BinaryMessage::clear(), osg::BaseThread::get(), osg::Socket::getAddress(), osg::GroupConnection::getDestination(), osg::SocketAddress::getHost(), osg::Connection::getInterface(), osg::SocketAddress::getPort(), osg::BinaryMessage::getString(), osg::BinaryMessage::getUInt32(), osg::osgMin(), osg::osgsqrt(), osg::BinaryMessage::putString(), osg::BinaryMessage::putUInt32(), osg::BaseThread::runFunction(), sendQueueThread(), osg::SocketAddress::setHost(), osg::DgramSocket::setMCastInterface(), osg::SocketAddress::setPort(), and SINFO. Referenced by write(). 00547 { 00548 std::string group = "239.33.42.32"; 00549 // std::string group = "146.140.32.7"; 00550 // std::string group = "146.140.32.255"; 00551 int port = 15356; 00552 int pos = _destination.find(':'); 00553 int clientPort; 00554 std::string clientHost; 00555 char hostname[256]; 00556 UInt32 index; 00557 UInt32 len; 00558 UInt32 ackNum = (UInt32) osgsqrt(_sockets.size()); 00559 UInt32 numSource; 00560 UInt32 sendTo; 00561 BinaryMessage message; 00562 char threadName[256]; 00563 00564 sprintf(threadName,"GroupMCastConnection%p",this); 00565 00566 if(!getDestination().empty()) 00567 group = getDestination(); 00568 00569 if(_sockets.size()<=16) 00570 ackNum = 1; 00571 00572 if(pos>=0) 00573 { 00574 group = std::string(_destination,0,pos); 00575 port = atoi(std::string(_destination,pos+1,std::string::npos).c_str()); 00576 } 00577 else 00578 { 00579 if(_destination.size()) 00580 group = _destination; 00581 } 00582 _mcastAddress.setHost(group); 00583 _mcastAddress.setPort(port ); 00584 00585 // set multicast interface, if given 00586 if(!getInterface().empty()) 00587 _mcastSocket.setMCastInterface( 00588 SocketAddress(getInterface().c_str())); 00589 00590 for(index = 0 ; index < _sockets.size() ; ++index) 00591 { 00592 message.clear(); 00593 // tell the point connection the multicast address 00594 message.putString(_mcastAddress.getHost()); 00595 message.putUInt32(_mcastAddress.getPort()); 00596 // tell the current seq number 00597 message.putUInt32(_seqNumber); 00598 // tell the point from wich port requests are comming 00599 hostname[255] = '\0'; 00600 gethostname(hostname,255); 00601 message.putString(hostname); 00602 message.putUInt32(_mcastSocket.getAddress().getPort()); 00603 // send the message 00604 _sockets[index].send(message); 00605 00606 // receive destination address 00607 message.clear(); 00608 len = _sockets[index].recv(message); 00609 if(len == 0) 00610 throw ReadError("Channel closed\n"); 00611 clientHost = message.getString(); 00612 clientPort = message.getUInt32(); 00613 00614 SINFO << "Server:" << clientHost 00615 << " Port:" << clientPort << std::endl; 00616 _receiver.push_back(SocketAddress(clientHost.c_str(),clientPort)); 00617 } 00618 for(index = 0 ; index < _sockets.size() ; ++index) 00619 { 00620 message.clear(); 00621 // tell receivers, whom to report acks 00622 if((index % ackNum) == 0) 00623 { 00624 _waitFor.push_back(_receiver[index]); 00625 numSource = osgMin( ackNum-1, (UInt32) _sockets.size()-index-1 ); 00626 message.putUInt32(numSource); 00627 for(UInt32 r = index+1 ; r < index+1+numSource ; ++r) 00628 { 00629 message.putString(_receiver[r].getHost()); 00630 message.putUInt32(_receiver[r].getPort()); 00631 } 00632 message.putString(hostname); 00633 message.putUInt32(_mcastSocket.getAddress().getPort()); 00634 } 00635 else 00636 { 00637 sendTo = index - (index % ackNum); 00638 message.putUInt32(0); 00639 message.putString(_receiver[sendTo].getHost()); 00640 message.putUInt32(_receiver[sendTo].getPort()); 00641 } 00642 // send the message 00643 _sockets[index].send(message); 00644 } 00645 00646 // start write thread 00647 _sendQueueThread=BaseThread::get(threadName); 00648 _sendQueueThreadRunning = true; 00649 _sendQueueThreadStop = false; 00650 _sendQueueThread->runFunction( sendQueueThread, (void *) (this) ); 00651 _initialized = true; 00652 }
|
|
|
|
|
|
bind the connection to an network interface. The address is returned, on wich the port could be connected. The interface is determined by the connection interface filed and the address parameter. Address can be empty, wich means to use a free port or address can contain a port number. Implements osg::Connection. Definition at line 172 of file OSGGroupSockConnection.cpp. References osg::GroupSockConnection::_acceptSocket, osg::Socket::bind(), osg::Socket::getAddress(), osg::SocketAddress::getHost(), osg::Connection::getInterface(), osg::SocketAddress::getPort(), osg::Socket::listen(), osg::Socket::setReusePort(), and SINFO. 00173 { 00174 int port=0; 00175 char localhost[256]; 00176 char host[256]; 00177 char portStr[256]; 00178 std::string interf; 00179 std::string boundedAddress; 00180 00181 // get local host name 00182 gethostname(localhost,255); 00183 if(!getInterface().empty()) 00184 interf = getInterface(); 00185 else 00186 interf = localhost; 00187 // parse address 00188 if(!address.empty()) 00189 if(sscanf(address.c_str(),"%*[^:]:%d",&port) != 1) 00190 if(sscanf(address.c_str(),":%d",&port) != 1) 00191 port = 0; 00192 // bind port 00193 _acceptSocket.setReusePort(true); 00194 _acceptSocket.bind(SocketAddress(interf.c_str(),port)); 00195 SINFO << "Connection bound to " 00196 << _acceptSocket.getAddress().getHost() << ":" 00197 << _acceptSocket.getAddress().getPort() << std::endl; 00198 _acceptSocket.listen(); 00199 // create address 00200 sprintf(portStr,"%d",_acceptSocket.getAddress().getPort()); 00201 return interf + ":" + portStr; 00202 }
|
|
|
select the next channel for reading. If timeout is not -1 then -1 is returned if timeout is reached Implements osg::Connection. Definition at line 249 of file OSGGroupSockConnection.cpp. References FFATAL, osg::SocketSelection::isSetRead(), osg::SocketSelection::select(), and osg::SocketSelection::setRead(). 00251 { 00252 Int32 maxnread=0,nread; 00253 ChannelIndex index; 00254 SocketSelection selection,result; 00255 00256 // if there is data in the read buffer, return current channel 00257 if(_zeroCopyThreshold != 1 && 00258 _currentReadBuffer != readBufEnd()) 00259 { 00260 FFATAL(("Channel change ignores data in current buffer")) 00261 return indexToChannel(_readIndex); 00262 } 00263 00264 if(_selection[_readIndex] && 00265 _sockets[_readIndex].getAvailable()) 00266 { 00267 return indexToChannel(_readIndex);; 00268 } 00269 00270 // wait for first socket to deliver data 00271 for(index = 0 ; index < _sockets.size() ; ++index) 00272 { 00273 if(_selection[index]) 00274 selection.setRead(_sockets[index]); 00275 } 00276 |