#include <OSGPointMCastConnection.h>
Inheritance diagram for osg::PointMCastConnection:

private helpers | |
| *bool | recvNextDgram (Dgram *dgram) |
| void | combineAck (Dgram *dgram, SocketAddress from) |
| bool | recvQueue (void) |
| void | initialize (void) |
| static void | recvQueueThread (void *arg) |
Public Member Functions | |
Constructors | |
| * | PointMCastConnection (void) |
| virtual | ~PointMCastConnection (void) |
type info | |
| *virtual const ConnectionType * | getType (void) |
connection | |
| *virtual Channel | connectGroup (const std::string &address, Time timeout=-1) |
| virtual void | disconnect (void) |
| virtual Channel | acceptGroup (Time timeout=-1) |
synchronisation | |
| *virtual bool | wait (Time timeout) throw (ReadError) |
channel handling | |
| *virtual Channel | selectChannel (Time timeout=-1) throw (ReadError) |
Static Public Member Functions | |
create | |
| *static PointConnection * | create (void) |
| create conneciton | |
Protected Types | |
| typedef std::vector< MemoryBlock > | BuffersT |
| typedef std::list< MemoryHandle > | FreeMemT |
Protected Member Functions | |
IO Implementation | |
| *virtual void | read (MemoryHandle mem, UInt32 size) |
| virtual void | readBuffer (void) |
Protected Attributes | |
members | |
| *DgramSocket | _mcastSocket |
| DgramSocket | _responseSocket |
| BaseThread * | _recvQueueThread |
| bool | _recvQueueThreadRunning |
| bool | _recvQueueThreadStop |
| UInt16 | _seqNumber |
| SocketAddress | _mcastAddress |
| DgramQueue | _queue |
| DgramQueue | _free |
| Lock * | _lock |
| SocketAddress | _sender |
| SocketAddress | _ackDestination |
| Dgram * | _lastDgram |
| UInt32 | _lastDgramPos |
| bool | _initialized |
| std::map< SocketAddress, UInt16 > | _combineAck |
| UInt16 | _maxAck |
Private Types | |
| typedef PointSockConnection | Inherited |
Private Member Functions | |
| PointMCastConnection (const PointMCastConnection &source) | |
| PointMCastConnection & | operator= (const PointMCastConnection &source) |
Static Private Attributes | |
static type | |
| *static ConnectionType | _type |
Classes | |
| struct | SocketBufferHeader |
Definition at line 62 of file OSGPointMCastConnection.h.
|
|
Reimplemented from osg::PointSockConnection. Definition at line 180 of file OSGPointMCastConnection.h. |
|
|
Definition at line 213 of file OSGBinaryDataHandler.h. |
|
|
Definition at line 214 of file OSGBinaryDataHandler.h. |
|
|
Constructor Definition at line 67 of file OSGPointMCastConnection.cpp. References osg::PointSockConnection::_acceptSocket, _free, _lock, osg::Lock::get(), osg::StreamSocket::open(), OSG_DGRAM_QUEUE_LEN, osg::DgramQueue::put(), and osg::Socket::setReusePort(). Referenced by create(). 00067 : 00068 Inherited(), 00069 _lastDgram(NULL), 00070 _initialized(false) 00071 { 00072 char lockName[256]; 00073 sprintf(lockName,"PointMCastConnection%p",this); 00074 00075 // create locks 00076 _lock = Lock::get(lockName); 00077 00078 // fill dgramqueue 00079 for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI) 00080 _free.put(new Dgram()); 00081 00082 _acceptSocket.open(); 00083 _acceptSocket.setReusePort(true); 00084 00085 /* 00086 _socketWriteBuffer.resize(131071); 00087 // reserve first bytes for buffer size 00088 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)], 00089 _socketWriteBuffer.size()-sizeof(SocketBufferHeader)); 00090 */ 00091 }
|
|
|
Destructor Definition at line 95 of file OSGPointMCastConnection.cpp. References osg::PointSockConnection::_acceptSocket, _free, _lock, _mcastSocket, _queue, _recvQueueThread, _recvQueueThreadStop, osg::Lock::aquire(), osg::StreamSocket::close(), osg::DgramSocket::close(), osg::DgramQueue::empty(), osg::DgramQueue::get(), osg::BaseThread::join(), and osg::Lock::release(). 00096 { 00097 // indicate thread stop 00098 _recvQueueThreadStop = true; 00099 // wait for stop 00100 BaseThread::join(_recvQueueThread); 00101 // close socket 00102 _mcastSocket.close(); 00103 // free queues 00104 _lock->aquire(); 00105 while(!_free.empty()) 00106 delete _free.get(_lock); 00107 while(!_queue.empty()) 00108 delete _queue.get(_lock); 00109 _lock->release(); 00110 // close socket 00111 _acceptSocket.close(); 00112 }
|
|
|
|
|
|
get connection type Reimplemented from osg::PointSockConnection. Definition at line 116 of file OSGPointMCastConnection.cpp. References _type. 00117 { 00118 return &_type; 00119 }
|
|
||||||||||||
|
connect to the given group. If timeout is reached, -1 is returned Reimplemented from osg::PointSockConnection. Definition at line 127 of file OSGPointMCastConnection.cpp. References osg::Connection::Channel, and osg::PointSockConnection::connectGroup(). 00130 { 00131 Channel channel = Inherited::connectGroup(address,timeout); 00132 return channel; 00133 }
|
|
|
disconnect the given channel Reimplemented from osg::PointSockConnection. Definition at line 137 of file OSGPointMCastConnection.cpp. References osg::PointSockConnection::_socket, and osg::StreamSocket::close().
|
|
|
accept an icomming grop connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout Reimplemented from osg::PointSockConnection. Definition at line 145 of file OSGPointMCastConnection.cpp. References osg::PointSockConnection::acceptGroup(), and osg::Connection::Channel. 00146 { 00147 Channel channel = Inherited::acceptGroup(timeout); 00148 return channel; 00149 }
|
|
|
wait for signal Reimplemented from osg::PointSockConnection. Definition at line 202 of file OSGPointMCastConnection.cpp. References FFATAL. 00203 { 00204 UInt32 tag; 00205 00206 if(_pointToPoint) 00207 return Inherited::wait(timeout); 00208 try 00209 { 00210 if(selectChannel(timeout) < 0) 00211 return false; 00212 getValue(tag); 00213 if(tag != 314156) 00214 { 00215 FFATAL(("Stream out of sync in PointMCastConnection\n")); 00216 throw ReadError("Stream out of sync"); 00217 } 00218 } 00219 catch(SocketError &e) 00220 { 00221 throw ReadError(e.what()); 00222 } 00223 return true; 00224 }
|
|
|
select the next channel for reading. If timeout is not -1 then -1 is returned if timeout is reached Reimplemented from osg::PointSockConnection. Definition at line 157 of file OSGPointMCastConnection.cpp. 00159 { 00160 if(_pointToPoint) 00161 return Inherited::selectChannel(timeout); 00162 try 00163 { 00164 if(!_initialized) 00165 initialize(); 00166 // todo 00167 if(isReadBufferEmpty() && 00168 !_lastDgram && 00169 _queue.empty()) 00170 { 00171 if(timeout == -1) 00172 { 00173 // wait for a dgram 00174 _lock->aquire(); 00175 _queue.wait(_lock); 00176 _lock->release(); 00177 return 0; 00178 } 00179 if(timeout == 0) 00180 return -1; 00181 while(_queue.empty() && timeout > 0) 00182 { 00183 _mcastSocket.waitReadable(.1); 00184 timeout-=.1; 00185 } 00186 if(_queue.empty()) 00187 return -1; 00188 } 00189 } 00190 catch(SocketException &e) 00191 { 00192 throw ReadError(e.what()); 00193 } 00194 return 0; 00195 }
|
|
|
Reimplemented from osg::PointSockConnection. Definition at line 231 of file OSGPointMCastConnection.cpp. References PointMCastConnection(). 00232 { 00233 return new PointMCastConnection(); 00234 }
|
|
||||||||||||
|
Read data into given memory Read data form the current read socket. The read socket is that socket, that was selectet in selectChannel. Reimplemented from osg::PointSockConnection. Definition at line 246 of file OSGPointMCastConnection.cpp. References _free, _initialized, _lastDgram, _lastDgramPos, _lock, osg::PointConnection::_pointToPoint, _queue, osg::Lock::aquire(), osg::DgramQueue::get(), osg::Dgram::getData(), osg::Dgram::getSize(), initialize(), osg::osgMin(), osg::DgramQueue::put(), osg::PointSockConnection::read(), and osg::Lock::release(). 00247 { 00248 if(_pointToPoint) 00249 { 00250 Inherited::read(mem,size); 00251 return; 00252 } 00253 Dgram *dgram = NULL; 00254 char *buffer = (char*)mem; 00255 UInt32 len; 00256 UInt32 dgramPos; 00257 00258 if(!_initialized) 00259 initialize(); 00260 00261 while(size) 00262 { 00263 if(_lastDgram) 00264 { 00265 dgramPos = _lastDgramPos; 00266 dgram = _lastDgram; 00267 } 00268 else 00269 { 00270 // get next dgram 00271 _lock->aquire(); 00272 dgram = _queue.get(_lock); 00273 _lock->release(); 00274 dgramPos = 0; 00275 if(dgram->getSize() == 0) 00276 throw ReadError("Channel closed\n"); 00277 } 00278 // copy to buffer 00279 len = osgMin(size,dgram->getSize()-dgramPos); 00280 memcpy(buffer,dgram->getData()+dgramPos,len); 00281 buffer += len; 00282 size -= len; 00283 dgramPos += len; 00284 if(dgramPos == dgram->getSize()) 00285 { 00286 // put to free queue 00287 _lock->aquire(); 00288 _free.put(dgram); 00289 _lock->release(); 00290 _lastDgram = NULL; 00291 } 00292 else 00293 { 00294 _lastDgram = dgram; 00295 _lastDgramPos = dgramPos; 00296 } 00297 } 00298 }
|
|
|
Read next data block The stream connection uses only BinaryDataHandler buffer. If more then one buffer is present, then this methode must be changed! Reimplemented from osg::PointSockConnection. Definition at line 307 of file OSGPointMCastConnection.cpp. References _free, _initialized, _lastDgram, _lastDgramPos, _lock, osg::PointConnection::_pointToPoint, _queue, osg::Lock::aquire(), osg::DgramQueue::empty(), osg::DgramQueue::get(), osg::Dgram::getData(), osg::Dgram::getSize(), initialize(), osg::osgMin(), osg::DgramQueue::put(), osg::BinaryDataHandler::readBufBegin(), osg::PointSockConnection::readBuffer(), and osg::Lock::release(). 00308 { 00309 if(_pointToPoint) 00310 { 00311 Inherited::readBuffer(); 00312 return; 00313 } 00314 00315 static int sumSize=0; 00316 Dgram *dgram = NULL; 00317 UInt32 size = readBufBegin()->getSize(); 00318 MemoryHandle buffer = readBufBegin()->getMem(); 00319 UInt32 len; 00320 UInt32 dgramPos; 00321 00322 if(!_initialized) 00323 initialize(); 00324 00325 do 00326 { 00327 if(_lastDgram) 00328 { 00329 dgramPos = _lastDgramPos; 00330 dgram = _lastDgram; 00331 } 00332 else 00333 { 00334 // get next dgram 00335 _lock->aquire(); 00336 dgram = _queue.get(_lock); 00337 _lock->release(); 00338 dgramPos = 0; 00339 if(dgram->getSize() == 0) 00340 throw ReadError("Channel closed"); 00341 } 00342 // copy to buffer 00343 len = osgMin(size,dgram->getSize()-dgramPos); 00344 memcpy(buffer,dgram->getData()+dgramPos,len); 00345 buffer += len; 00346 size -= len; 00347 dgramPos += len; 00348 if(dgramPos == dgram->getSize()) 00349 { 00350 // put to free queue 00351 _lock->aquire(); 00352 _free.put(dgram); 00353 _lock->release(); 00354 _lastDgram = NULL; 00355 } 00356 else 00357 { 00358 _lastDgram = dgram; 00359 _lastDgramPos = dgramPos; 00360 } 00361 } 00362 while(size && !_queue.empty()); 00363 // set data size 00364 readBufBegin()->setDataSize(readBufBegin()->getSize()-size); 00365 sumSize += readBufBegin()->getDataSize(); 00366 }
|
|
|
read next dgram from mcast or private socket Definition at line 373 of file OSGPointMCastConnection.cpp. References _ackDestination, _combineAck, _maxAck, _mcastSocket, _responseSocket, _sender, combineAck(), osg::Dgram::getBuffer(), osg::Dgram::getBufferCapacity(), osg::Dgram::getBufferSize(), osg::Dgram::getId(), osg::SocketSelection::isSetRead(), osg::DgramSocket::recvFrom(), osg::SocketSelection::select(), osg::DgramSocket::sendTo(), osg::Dgram::setBufferSize(), osg::Dgram::setId(), osg::SocketSelection::setRead(), osg::Dgram::setResponseAck(), and osg::Dgram::setResponseSize(). Referenced by recvQueue(). 00374 { 00375 SocketSelection selection; 00376 SocketAddress from; 00377 UInt32 length; 00378 00379 selection.setRead(_mcastSocket); 00380 selection.setRead(_responseSocket); 00381 if(!selection.select(0.5)) 00382 return false; 00383 if(selection.isSetRead(_responseSocket)) 00384 { 00385 length = _responseSocket.recvFrom(dgram->getBuffer(), 00386 dgram->getBufferCapacity(), 00387 from); 00388 dgram->setBufferSize(length); 00389 #if 0 00390 // ???? 00391 // from sender 00392 if(from == _sender && !_combineAck.empty()) 00393 { 00394 exit(0); 00395 00396 00397 if(_maxAck == dgram->getId()) 00398 { 00399 // do we have all acks ? 00400 dgram->setId(_maxAck); 00401 dgram->setResponseSize(); 00402 dgram->setResponseAck(true); 00403 #ifdef TEST_LOST_DGRAM_RATE 00404 if(drand48()>TEST_LOST_DGRAM_RATE) 00405 #endif 00406 _responseSocket.sendTo( 00407 dgram->getBuffer(), 00408 dgram->getBufferSize(), 00409 _ackDestination); 00410 return false; 00411 } 00412 else 00413 { 00414 return true; 00415 } 00416 } 00417 #endif 00418 combineAck(dgram,from); 00419 } 00420 if(selection.isSetRead(_mcastSocket)) 00421 { 00422 length = _mcastSocket.recvFrom(dgram->getBuffer(), 00423 dgram->getBufferCapacity(), 00424 from); 00425 dgram->setBufferSize(length); 00426 // ignore packages from wrong destination 00427 if(from != _sender) 00428 return false; 00429 else 00430 return true; 00431 } 00432 else 00433 { 00434 return false; 00435 } 00436 }
|
|
||||||||||||
|
combine several acks to 1 ack stream Definition at line 440 of file OSGPointMCastConnection.cpp. References _ackDestination, _combineAck, _maxAck, _responseSocket, _seqNumber, FFATAL, osg::Dgram::getBuffer(), osg::Dgram::getBufferSize(), osg::Dgram::getId(), osg::Dgram::less(), osg::DgramSocket::sendTo(), osg::Dgram::setId(), osg::Dgram::setResponseAck(), and osg::Dgram::setResponseSize(). Referenced by recvNextDgram(), and recvQueue(). 00441 { 00442 UInt16 maxAck; 00443 00444 if(dgram) 00445 { 00446 // do we expect acks from different source 00447 if(_combineAck.count(from)==0) 00448 { 00449 FFATAL(("no ack from other expected\n")); 00450 return; 00451 } 00452 // ack retransmission 00453 if( Dgram::less(dgram->getId(),_combineAck[from] ) ) 00454 { 00455 // printf("Ack restranmisson\n"); 00456 return; 00457 } 00458 _combineAck[from] = dgram->getId(); 00459 } 00460 00461 maxAck = _seqNumber-1; 00462 for(std::map<SocketAddress,UInt16>::iterator aI 00463 = _combineAck.begin() ; 00464 aI != _combineAck.end() ; ++aI) 00465 { 00466 if( Dgram::less(aI->second,maxAck) ) 00467 maxAck = aI->second; 00468 } 00469 // when _max ack is now greate 00470 00471 if( Dgram::less(_maxAck,maxAck)) 00472 { 00473 Dgram response; 00474 00475 _maxAck = maxAck; 00476 response.setResponseSize(); 00477 response.setId(_maxAck); 00478 response.setResponseAck(true); 00479 00480 _responseSocket.sendTo( 00481 response.getBuffer(), 00482 response.getBufferSize(), 00483 _ackDestination); 00484 } 00485 }
|
|
|
read next dgram from mcast or private socket Definition at line 633 of file OSGPointMCastConnection.cpp. References _recvQueueThreadRunning, recvQueue(), and SFATAL. Referenced by initialize(). 00634 { 00635 PointMCastConnection *the=(PointMCastConnection*)arg; 00636 try 00637 { 00638 the->recvQueue(); 00639 } 00640 catch(SocketException &e) 00641 { 00642 SFATAL << "Error in dgram reader thread:" << e.what() << std::endl; 00643 } 00644 the->_recvQueueThreadRunning = false; 00645 }
|
|
|
recv queue Definition at line 489 of file OSGPointMCastConnection.cpp. References _ackDestination, _combineAck, _free, _lock, _maxAck, _mcastSocket, _queue, _recvQueueThreadStop, _responseSocket, _sender, _seqNumber, osg::PointSockConnection::_socket, osg::Lock::aquire(), combineAck(), FLOG, osg::DgramQueue::get(), osg::Dgram::getBuffer(), osg::Dgram::getBufferSize(), osg::Dgram::getId(), osg::Dgram::getResponseAck(), osg::Dgram::getSize(), osg::getSystemTime(), osg::Dgram::less(), osg::DgramQueue::put(), osg::Socket::recv(), recvNextDgram(), osg::Lock::release(), osg::DgramSocket::sendTo(), osg::Dgram::setId(), osg::Dgram::setResponseAck(), osg::Dgram::setResponseSize(), osg::Dgram::setSize(), and osg::Socket::waitReadable(). Referenced by recvQueueThread(). 00490 { 00491 SocketAddress from; 00492 Dgram *dgram; 00493 Dgram response; 00494 UInt32 readCount=0; 00495 UInt32 length; 00496 bool missing=false; 00497 Time ignoreT=getSystemTime(); 00498 UInt16 id; 00499 00500 #ifdef TEST_LOST_DGRAM_RATE 00501 srand48((long int)(10000*getSystemTime())); 00502 #endif 00503 00504 for(;;) 00505 { 00506 // get free dgram 00507 _lock->aquire(); 00508 dgram =_free.get(_lock); 00509 _lock->release(); 00510 do 00511 { 00512 // ignore for a while 00513 if(missing) 00514 ignoreT = getSystemTime(); 00515 do 00516 { 00517 while(!recvNextDgram(dgram)) 00518 { 00519 if(_recvQueueThreadStop) 00520 return true; 00521 try 00522 { 00523 while(_socket.waitReadable(0)) 00524 { 00525 char buffer; 00526 if(_socket.recv(&buffer,1) <= 0) 00527 { 00528 // put EOT to the queue 00529 dgram->setSize(0); 00530 _lock->aquire(); 00531 _queue.put(dgram); 00532 _lock->release(); 00533 FLOG(("Connection lost\n")); 00534 return false; 00535 } 00536 } 00537 } 00538 catch(SocketException &e) 00539 { 00540 // put EOT to the queue 00541 dgram->setSize(0); 00542 _lock->aquire(); 00543 _queue.put(dgram); 00544 _lock->release(); 00545 FLOG(("Connection lost\n")); 00546 return false; 00547 } 00548 } 00549 id = dgram->getId(); 00550 } 00551 while( missing && 00552 id != _seqNumber && 00553 (getSystemTime() - ignoreT) < 0.01); 00554 00555 missing = false; 00556 response.setId(id); 00557 00558 // ack request ? 00559 if(dgram->getSize() == 0) 00560 { 00561 if( !Dgram::less(id,_seqNumber ) ) 00562 { 00563 missing = true; 00564 response.setId(_seqNumber); 00565 } 00566 // printf("ack request %d %d\n",id,missing); 00567 } 00568 else 00569 { 00570 // printf("%d got %d\n",id,_seqNumber); 00571 if( dgram->getId() == _seqNumber) 00572 { 00573 // got expected dgram. 00574 // put to queue 00575 _lock->aquire(); 00576 _queue.put(dgram); 00577 _lock->release(); 00578 } 00579 else 00580 { 00581 // ignore if unneccesary retransmission 00582 if( Dgram::less(id,_seqNumber ) ) 00583 { 00584 continue; 00585 } 00586 else 00587 { 00588 missing = true; 00589 response.setId(_seqNumber); 00590 } 00591 } 00592 } 00593 00594 // printf("Responde %d\n",response.getId()); 00595 // prepare response 00596 response.setResponseAck(!missing); 00597 response.setResponseSize(); 00598 00599 // send response if nak or no data in the queue 00600 if(!response.getResponseAck() || 00601 !_mcastSocket.waitReadable(0)) 00602 { 00603 #ifdef TEST_LOST_DGRAM_RATE 00604 if(drand48()>TEST_LOST_DGRAM_RATE) 00605 #endif 00606 if(response.getResponseAck()) 00607 { 00608 // send response if no ack combination 00609 // or ack request 00610 if(_combineAck.empty() || id == _maxAck) 00611 _responseSocket.sendTo(response.getBuffer(), 00612 response.getBufferSize(), 00613 _ackDestination); 00614 else 00615 combineAck(NULL,_sender); 00616 } 00617 else 00618 { 00619 // send nak to sender 00620 _responseSocket.sendTo(response.getBuffer(), 00621 response.getBufferSize(), 00622 _sender); 00623 } 00624 } 00625 } 00626 while(id != _seqNumber || missing); 00627 _seqNumber++; 00628 } 00629 00630 // return true; 00631 }
|
|
|
initialize connection Definition at line 649 of file OSGPointMCastConnection.cpp. References _ackDestination, _combineAck, _initialized, _maxAck, _mcastSocket, _recvQueueThread, _recvQueueThreadRunning, _recvQueueThreadStop, _responseSocket, _sender, _seqNumber, osg::PointSockConnection::_socket, osg::SocketAddress::ANY, osg::Socket::bind(), osg::Connection::Channel, osg::BinaryMessage::clear(), osg::BaseThread::get(), osg::Socket::getAddress(), osg::Connection::getInterface(), osg::SocketAddress::getPort(), osg::BinaryMessage::getString(), osg::BinaryMessage::getUInt32(), osg::DgramSocket::join(), osg::DgramSocket::open(), osg::BinaryMessage::putString(), osg::BinaryMessage::putUInt32(), osg::Socket::recv(), recvQueueThread(), osg::BaseThread::runFunction(), osg::Socket::send(), osg::DgramSocket::setMCastInterface(), osg::Socket::setReadBufferSize(), and osg::Socket::setReusePort(). Referenced by read(), and readBuffer(). 00650 { 00651 std::string group; 00652 Channel channel; 00653 BinaryMessage message; 00654 char hostname[256]; 00655 std::string fromHost; 00656 UInt32 fromPort; 00657 UInt32 combineCount; 00658 std::string host; 00659 UInt32 port; 00660 char threadName[256]; 00661 00662 sprintf(threadName,"PointMCastConnection%p",this); 00663 00664 // get info about the group 00665 _socket.recv(message); 00666 // group and port 00667 group = message.getString(); 00668 port = message.getUInt32(); 00669 // get seq number 00670 _seqNumber = message.getUInt32(); 00671 _maxAck = _seqNumber - 1; 00672 // server port 00673 fromHost = message.getString(); 00674 fromPort = message.getUInt32(); 00675 _sender = SocketAddress(fromHost.c_str(),fromPort); 00676 00677 // prepare socket to receive mcast packages 00678 _mcastSocket.open(); 00679 _mcastSocket.setReusePort(true); 00680 _mcastSocket.setReadBufferSize(524288); 00681 _mcastSocket.bind(SocketAddress(SocketAddress::ANY,port)); 00682 try 00683 { 00684 _mcastSocket.join(SocketAddress(group.c_str())); 00685 } 00686 catch(...) {} 00687 // set multicast interface 00688 if(!getInterface().empty()) 00689 { 00690 _mcastSocket.setMCastInterface(SocketAddress(getInterface().c_str())); 00691 } 00692 00693 _responseSocket.open(); 00694 _responseSocket.bind(SocketAddress(SocketAddress::ANY,0)); 00695 00696 // tell the group from wich port requests are comming 00697 hostname[255] = '\0'; 00698 gethostname(hostname,255); 00699 message.clear(); 00700 message.putString(hostname); 00701 message.putUInt32(_responseSocket.getAddress().getPort()); 00702 _socket.send(message); 00703 00704 // get ack destination info 00705 _socket.recv(message); 00706 00707 message.getUInt32(combineCount); 00708 // printf("%d\n",combineCount); 00709 while(combineCount--) 00710 { 00711 host=message.getString(); 00712 port=message.getUInt32(); 00713 _combineAck[SocketAddress(host.c_str(),port)]=_seqNumber-1; 00714 } 00715 00716 host=message.getString(); 00717 port=message.getUInt32(); 00718 _ackDestination = SocketAddress(host.c_str(),port); 00719 00720 // start reader thread 00721 _recvQueueThread=BaseThread::get(threadName); 00722 _recvQueueThreadRunning = true; 00723 _recvQueueThreadStop = false; 00724 _recvQueueThread->runFunction( recvQueueThread, (void *) (this) ); 00725 00726 _initialized = true; 00727 // printf("init end\n"); 00728 }
|
|
|
|
|