Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

OSGPointMCastConnection.cpp

Go to the documentation of this file.
00001 /*---------------------------------------------------------------------------*\
00002  *                                OpenSG                                     *
00003  *                                                                           *
00004  *                                                                           *
00005  *             Copyright (C) 2000-2002 by the OpenSG Forum                   *
00006  *                                                                           *
00007  *                            www.opensg.org                                 *
00008  *                                                                           *
00009  *   contact: dirk@opensg.org, gerrit.voss@vossg.org, jbehr@zgdv.de          *
00010  *                                                                           *
00011 \*---------------------------------------------------------------------------*/
00012 /*---------------------------------------------------------------------------*\
00013  *                                License                                    *
00014  *                                                                           *
00015  * This library is free software; you can redistribute it and/or modify it   *
00016  * under the terms of the GNU Library General Public License as published    *
00017  * by the Free Software Foundation, version 2.                               *
00018  *                                                                           *
00019  * This library is distributed in the hope that it will be useful, but       *
00020  * WITHOUT ANY WARRANTY; without even the implied warranty of                *
00021  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU         *
00022  * Library General Public License for more details.                          *
00023  *                                                                           *
00024  * You should have received a copy of the GNU Library General Public         *
00025  * License along with this library; if not, write to the Free Software       *
00026  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.                 *
00027  *                                                                           *
00028 \*---------------------------------------------------------------------------*/
00029 /*---------------------------------------------------------------------------*\
00030  *                                Changes                                    *
00031  *                                                                           *
00032  *                                                                           *
00033  *                                                                           *
00034  *                                                                           *
00035  *                                                                           *
00036  *                                                                           *
00037 \*---------------------------------------------------------------------------*/
00038 
00039 //---------------------------------------------------------------------------
00040 //  Includes
00041 //---------------------------------------------------------------------------
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045 
00046 #include "OSGConfig.h"
00047 #include "OSGLog.h"
00048 #include "OSGBaseThread.h"
00049 #include "OSGSocketSelection.h"
00050 #include "OSGBinaryMessage.h"
00051 #include "OSGPointMCastConnection.h"
00052 #include "OSGGroupMCastConnection.h"
00053 #include "OSGConnectionType.h"
00054 
00055 OSG_USING_NAMESPACE
00056 
00061 /*-------------------------------------------------------------------------*/
00062 /*                            constructor destructor                       */
00063 
00067 PointMCastConnection::PointMCastConnection():
00068     Inherited(),
00069     _lastDgram(NULL),
00070     _initialized(false)
00071 {
00072     char lockName[256];
00073     sprintf(lockName,"PointMCastConnection%p",this);
00074 
00075     // create locks
00076     _lock     = Lock::get(lockName);
00077 
00078     // fill dgramqueue
00079     for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI)
00080         _free.put(new Dgram());
00081 
00082     _acceptSocket.open();
00083     _acceptSocket.setReusePort(true);
00084 
00085 /*
00086     _socketWriteBuffer.resize(131071);
00087     // reserve first bytes for buffer size
00088     writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00089                 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00090 */
00091 }
00092 
00095 PointMCastConnection::~PointMCastConnection(void)
00096 {
00097     // indicate thread stop
00098     _recvQueueThreadStop = true;
00099     // wait for stop
00100     BaseThread::join(_recvQueueThread);    
00101     // close socket
00102     _mcastSocket.close();
00103     // free queues
00104     _lock->aquire();
00105     while(!_free.empty())
00106         delete _free.get(_lock);
00107     while(!_queue.empty())
00108         delete _queue.get(_lock);
00109     _lock->release();
00110     // close socket
00111     _acceptSocket.close();
00112 }
00113 
00116 const ConnectionType *PointMCastConnection::getType()
00117 {
00118     return &_type;
00119 }
00120 
00121 /*-------------------------------------------------------------------------*/
00122 /*                            connection                                   */
00123 
00127 Connection::Channel PointMCastConnection::connectGroup(
00128     const std::string &address,
00129     Time               timeout)
00130 {
00131     Channel channel = Inherited::connectGroup(address,timeout);
00132     return channel;
00133 }
00134 
00137 void PointMCastConnection::disconnect(void)
00138 {
00139     _socket.close();
00140 }
00141 
00145 Connection::Channel PointMCastConnection::acceptGroup(Time timeout)
00146 {
00147     Channel channel = Inherited::acceptGroup(timeout);
00148     return channel;
00149 }
00150 
00151 /*-------------------------------------------------------------------------*/
00152 /*                              channel handling                           */
00153 
00157 Connection::Channel PointMCastConnection::selectChannel(Time timeout)
00158     throw (ReadError)
00159 {
00160     if(_pointToPoint)
00161         return Inherited::selectChannel(timeout);
00162     try
00163     {
00164         if(!_initialized)
00165             initialize();
00166         // todo
00167         if(isReadBufferEmpty() && 
00168            !_lastDgram && 
00169            _queue.empty())
00170         {
00171             if(timeout == -1)
00172             {
00173                 // wait for a dgram
00174                 _lock->aquire();
00175                 _queue.wait(_lock);
00176                 _lock->release();
00177                 return 0;
00178             }
00179             if(timeout == 0)
00180                 return -1;
00181             while(_queue.empty() && timeout > 0)
00182             {
00183                 _mcastSocket.waitReadable(.1);
00184                 timeout-=.1;
00185             }
00186             if(_queue.empty())
00187                 return -1;
00188         }
00189     }
00190     catch(SocketException &e)
00191     {
00192         throw ReadError(e.what());
00193     }
00194     return 0;
00195 }
00196 
00197 /*-------------------------------------------------------------------------*/
00198 /*                            sync                                         */
00199 
00202 bool PointMCastConnection::wait(Time timeout) throw (ReadError)
00203 {
00204     UInt32 tag;
00205 
00206     if(_pointToPoint)
00207         return Inherited::wait(timeout);
00208     try
00209     {
00210         if(selectChannel(timeout) < 0)
00211             return false;
00212         getValue(tag);
00213         if(tag != 314156)
00214         {
00215             FFATAL(("Stream out of sync in PointMCastConnection\n"));
00216             throw ReadError("Stream out of sync");
00217         }
00218     }
00219     catch(SocketError &e)
00220     {
00221         throw ReadError(e.what());
00222     }
00223     return true;
00224 }
00225 
00226 /*-------------------------- create ---------------------------------------*/
00227 
00231 PointConnection *PointMCastConnection::create(void)
00232 {
00233     return new PointMCastConnection();
00234 }
00235 
00236 /*-------------------------------------------------------------------------*/
00237 /*                              read write                                 */
00238 
00246 void PointMCastConnection::read(MemoryHandle mem,UInt32 size)
00247 {
00248     if(_pointToPoint)
00249     {
00250         Inherited::read(mem,size);
00251         return;
00252     }
00253     Dgram *dgram  = NULL;
00254     char  *buffer = (char*)mem;
00255     UInt32 len;
00256     UInt32 dgramPos;
00257 
00258     if(!_initialized)
00259         initialize();
00260 
00261     while(size)
00262     {
00263         if(_lastDgram)
00264         {
00265             dgramPos = _lastDgramPos;
00266             dgram = _lastDgram;
00267         }
00268         else
00269         {
00270             // get next dgram
00271             _lock->aquire();
00272             dgram = _queue.get(_lock);
00273             _lock->release();
00274             dgramPos = 0;
00275             if(dgram->getSize() == 0)
00276                 throw ReadError("Channel closed\n");
00277         }
00278         // copy to buffer
00279         len = osgMin(size,dgram->getSize()-dgramPos);
00280         memcpy(buffer,dgram->getData()+dgramPos,len);
00281         buffer   += len;
00282         size     -= len;
00283         dgramPos += len;
00284         if(dgramPos == dgram->getSize())
00285         {
00286             // put to free queue
00287             _lock->aquire();
00288             _free.put(dgram);
00289             _lock->release();
00290             _lastDgram = NULL;
00291         }
00292         else
00293         {
00294             _lastDgram    = dgram;
00295             _lastDgramPos = dgramPos;
00296         }
00297     }
00298 }
00299 
00307 void PointMCastConnection::readBuffer()
00308 {
00309     if(_pointToPoint)
00310     {
00311         Inherited::readBuffer();
00312         return;
00313     }
00314 
00315     static int sumSize=0;
00316     Dgram       *dgram  = NULL;
00317     UInt32       size   = readBufBegin()->getSize();
00318     MemoryHandle buffer = readBufBegin()->getMem();
00319     UInt32       len;
00320     UInt32       dgramPos;
00321 
00322     if(!_initialized)
00323         initialize();
00324 
00325     do
00326     {
00327         if(_lastDgram)
00328         {
00329             dgramPos = _lastDgramPos;
00330             dgram = _lastDgram;
00331         }
00332         else
00333         {
00334             // get next dgram
00335             _lock->aquire();
00336             dgram = _queue.get(_lock);
00337             _lock->release();
00338             dgramPos = 0;
00339             if(dgram->getSize() == 0)
00340                 throw ReadError("Channel closed");
00341         }
00342         // copy to buffer
00343         len = osgMin(size,dgram->getSize()-dgramPos);
00344         memcpy(buffer,dgram->getData()+dgramPos,len);
00345         buffer   += len;
00346         size     -= len;
00347         dgramPos += len;
00348         if(dgramPos == dgram->getSize())
00349         {
00350             // put to free queue
00351             _lock->aquire();
00352             _free.put(dgram);
00353             _lock->release();
00354             _lastDgram = NULL;
00355         }
00356         else
00357         {
00358             _lastDgram    = dgram;
00359             _lastDgramPos = dgramPos;
00360         }
00361     }
00362     while(size && !_queue.empty());
00363     // set data size
00364     readBufBegin()->setDataSize(readBufBegin()->getSize()-size);
00365     sumSize += readBufBegin()->getDataSize();
00366 }    
00367 
00368 /*-------------------------------------------------------------------------*/
00369 /*                              private helpers                            */
00370 
00373 bool PointMCastConnection::recvNextDgram(Dgram *dgram)
00374 {
00375     SocketSelection selection;
00376     SocketAddress   from;
00377     UInt32          length;
00378     
00379     selection.setRead(_mcastSocket);
00380     selection.setRead(_responseSocket);
00381     if(!selection.select(0.5))
00382         return false;
00383     if(selection.isSetRead(_responseSocket))
00384     {
00385         length = _responseSocket.recvFrom(dgram->getBuffer(),
00386                                           dgram->getBufferCapacity(),
00387                                           from);
00388         dgram->setBufferSize(length);
00389 #if 0
00390 // ????
00391         // from sender
00392         if(from == _sender && !_combineAck.empty())
00393         {
00394             exit(0);
00395 
00396 
00397             if(_maxAck == dgram->getId())
00398             {
00399                 // do we have all acks ?
00400                 dgram->setId(_maxAck);
00401                 dgram->setResponseSize();
00402                 dgram->setResponseAck(true);
00403 #ifdef TEST_LOST_DGRAM_RATE
00404                 if(drand48()>TEST_LOST_DGRAM_RATE)
00405 #endif
00406                     _responseSocket.sendTo(
00407                         dgram->getBuffer(),
00408                         dgram->getBufferSize(),
00409                         _ackDestination);
00410                 return false;
00411             }
00412             else
00413             {
00414                 return true;
00415             }
00416         }
00417 #endif
00418         combineAck(dgram,from);
00419     } 
00420     if(selection.isSetRead(_mcastSocket))
00421     {
00422         length = _mcastSocket.recvFrom(dgram->getBuffer(),
00423                                        dgram->getBufferCapacity(),
00424                                        from);
00425         dgram->setBufferSize(length);
00426         // ignore packages from wrong destination
00427         if(from != _sender)
00428             return false;
00429         else
00430             return true;
00431     }
00432     else
00433     {
00434         return false;
00435     }
00436 }
00437 
00440 void PointMCastConnection::combineAck(Dgram *dgram,SocketAddress from)
00441 {
00442     UInt16 maxAck;
00443 
00444     if(dgram)
00445     {
00446         // do we expect acks from different source
00447         if(_combineAck.count(from)==0)
00448         {
00449             FFATAL(("no ack from other expected\n"));
00450             return;
00451         }
00452         // ack retransmission
00453         if( Dgram::less(dgram->getId(),_combineAck[from] ) )
00454         {
00455 //        printf("Ack restranmisson\n");
00456             return;
00457         }
00458         _combineAck[from] = dgram->getId();
00459     }
00460 
00461     maxAck = _seqNumber-1;
00462     for(std::map<SocketAddress,UInt16>::iterator aI
00463             = _combineAck.begin() ; 
00464         aI != _combineAck.end() ; ++aI)
00465     {
00466         if( Dgram::less(aI->second,maxAck) )
00467             maxAck = aI->second;
00468     }
00469     // when _max ack is now greate
00470 
00471     if( Dgram::less(_maxAck,maxAck))
00472     {
00473         Dgram response;
00474 
00475         _maxAck = maxAck;
00476         response.setResponseSize();
00477         response.setId(_maxAck);
00478         response.setResponseAck(true);
00479 
00480         _responseSocket.sendTo(
00481             response.getBuffer(),
00482             response.getBufferSize(),
00483             _ackDestination);
00484     }
00485 }
00486 
00489 bool PointMCastConnection::recvQueue(void)
00490 {
00491     SocketAddress  from;
00492     Dgram         *dgram;
00493     Dgram          response;
00494     UInt32         readCount=0;
00495     UInt32         length;
00496     bool           missing=false;
00497     Time           ignoreT=getSystemTime();
00498     UInt16         id;
00499 
00500 #ifdef TEST_LOST_DGRAM_RATE
00501     srand48((long int)(10000*getSystemTime()));
00502 #endif
00503 
00504     for(;;)
00505     {
00506         // get free dgram
00507         _lock->aquire();
00508         dgram   =_free.get(_lock);
00509         _lock->release();
00510         do
00511         {
00512             // ignore for a while
00513             if(missing)
00514                 ignoreT = getSystemTime();
00515             do
00516             {
00517                 while(!recvNextDgram(dgram))
00518                 {
00519                     if(_recvQueueThreadStop)
00520                         return true;
00521                     try
00522                     {
00523                         while(_socket.waitReadable(0))
00524                         {
00525                             char buffer;
00526                             if(_socket.recv(&buffer,1) <= 0)
00527                             {
00528                                 // put EOT to the queue
00529                                 dgram->setSize(0);
00530                                 _lock->aquire();
00531                                 _queue.put(dgram);
00532                                 _lock->release();
00533                                 FLOG(("Connection lost\n"));
00534                                 return false;
00535                             }
00536                         }
00537                     }
00538                     catch(SocketException &e)
00539                     {
00540                         // put EOT to the queue
00541                         dgram->setSize(0);
00542                         _lock->aquire();
00543                         _queue.put(dgram);
00544                         _lock->release();
00545                         FLOG(("Connection lost\n"));
00546                         return false;
00547                     }
00548                 }
00549                 id = dgram->getId();
00550             }
00551             while( missing &&
00552                    id != _seqNumber &&
00553                    (getSystemTime() - ignoreT) < 0.01);
00554 
00555             missing = false;
00556             response.setId(id);
00557 
00558             // ack request ?
00559             if(dgram->getSize() == 0)
00560             {
00561                 if( !Dgram::less(id,_seqNumber ) )
00562                 {
00563                     missing = true;
00564                     response.setId(_seqNumber);
00565                 }
00566 //                printf("ack request %d %d\n",id,missing);
00567             }
00568             else
00569             {
00570 //                printf("%d got %d\n",id,_seqNumber);
00571                 if( dgram->getId() == _seqNumber)
00572                 {
00573                     // got expected dgram.
00574                     // put to queue
00575                     _lock->aquire();
00576                     _queue.put(dgram);
00577                     _lock->release();
00578                 }
00579                 else
00580                 {
00581                     // ignore if unneccesary retransmission
00582                     if( Dgram::less(id,_seqNumber ) )
00583                     {
00584                         continue;
00585                     }
00586                     else
00587                     {
00588                         missing = true;
00589                         response.setId(_seqNumber);
00590                     }
00591                 }
00592             }
00593 
00594 //                printf("Responde %d\n",response.getId());
00595             // prepare response
00596             response.setResponseAck(!missing);
00597             response.setResponseSize();
00598 
00599             // send response if nak or no data in the queue
00600             if(!response.getResponseAck() || 
00601                !_mcastSocket.waitReadable(0))
00602             {
00603 #ifdef TEST_LOST_DGRAM_RATE
00604                 if(drand48()>TEST_LOST_DGRAM_RATE)
00605 #endif
00606                     if(response.getResponseAck())
00607                     {
00608                         // send response if no ack combination
00609                         // or ack request 
00610                         if(_combineAck.empty() || id == _maxAck)
00611                             _responseSocket.sendTo(response.getBuffer(),
00612                                                    response.getBufferSize(),
00613                                                    _ackDestination);
00614                         else
00615                             combineAck(NULL,_sender);
00616                     }
00617                     else
00618                     {
00619                         // send nak to sender
00620                         _responseSocket.sendTo(response.getBuffer(),
00621                                                response.getBufferSize(),
00622                                                _sender);
00623                     }
00624             }
00625         }
00626         while(id != _seqNumber || missing);
00627         _seqNumber++;
00628     }    
00629 
00630 //    return true;
00631 }
00632 
00633 void PointMCastConnection::recvQueueThread(void *arg)
00634 {
00635     PointMCastConnection *the=(PointMCastConnection*)arg;
00636     try 
00637     {
00638         the->recvQueue();
00639     }
00640     catch(SocketException &e)
00641     {
00642         SFATAL << "Error in dgram reader thread:" << e.what() << std::endl;
00643     }
00644     the->_recvQueueThreadRunning = false;
00645 }
00646 
00649 void PointMCastConnection::initialize()
00650 {
00651     std::string   group;
00652     Channel       channel;
00653     BinaryMessage message;
00654     char          hostname[256];
00655     std::string   fromHost;
00656     UInt32        fromPort;
00657     UInt32        combineCount;
00658     std::string   host;
00659     UInt32        port;
00660     char          threadName[256];
00661 
00662     sprintf(threadName,"PointMCastConnection%p",this);
00663 
00664     // get info about the group
00665     _socket.recv(message);
00666     // group and port
00667     group = message.getString();
00668     port  = message.getUInt32();
00669     // get seq number
00670     _seqNumber = message.getUInt32();
00671     _maxAck = _seqNumber - 1;
00672     // server port
00673     fromHost = message.getString();
00674     fromPort = message.getUInt32();
00675     _sender = SocketAddress(fromHost.c_str(),fromPort);
00676 
00677     // prepare socket to receive mcast packages
00678     _mcastSocket.open();
00679     _mcastSocket.setReusePort(true);
00680     _mcastSocket.setReadBufferSize(524288);
00681     _mcastSocket.bind(SocketAddress(SocketAddress::ANY,port));
00682     try
00683     {
00684         _mcastSocket.join(SocketAddress(group.c_str()));
00685     }
00686     catch(...) {}
00687     // set multicast interface
00688     if(!getInterface().empty())
00689     {
00690         _mcastSocket.setMCastInterface(SocketAddress(getInterface().c_str()));
00691     }
00692 
00693     _responseSocket.open();
00694     _responseSocket.bind(SocketAddress(SocketAddress::ANY,0));
00695 
00696     // tell the group from wich port requests are comming
00697     hostname[255] = '\0';
00698     gethostname(hostname,255);
00699     message.clear();
00700     message.putString(hostname);
00701     message.putUInt32(_responseSocket.getAddress().getPort());
00702     _socket.send(message);
00703 
00704     // get ack destination info
00705     _socket.recv(message);
00706 
00707     message.getUInt32(combineCount);
00708 //    printf("%d\n",combineCount);
00709     while(combineCount--)
00710     {
00711         host=message.getString();
00712         port=message.getUInt32();
00713         _combineAck[SocketAddress(host.c_str(),port)]=_seqNumber-1;
00714     }
00715 
00716     host=message.getString();
00717     port=message.getUInt32();
00718     _ackDestination = SocketAddress(host.c_str(),port);
00719 
00720     // start reader thread
00721     _recvQueueThread=BaseThread::get(threadName);
00722     _recvQueueThreadRunning = true;
00723     _recvQueueThreadStop    = false;
00724     _recvQueueThread->runFunction( recvQueueThread, (void *) (this) );
00725 
00726     _initialized = true;
00727 //    printf("init end\n");
00728 }
00729 
00730 /*-------------------------------------------------------------------------*/
00731 /*                              static type                                */
00732 
00733 ConnectionType PointMCastConnection::_type(
00734     &PointMCastConnection::create,
00735     "Multicast");
00736 
00737 /*-------------------------------------------------------------------------*/
00738 /*                              cvs id's                                   */
00739 
00740 #ifdef __sgi
00741 #pragma set woff 1174
00742 #endif
00743 
00744 #ifdef OSG_LINUX_ICC
00745 #pragma warning( disable : 177 )
00746 #endif
00747 
00748 namespace
00749 {
00750     static Char8 cvsid_cpp       [] = "@(#)$Id: $";
00751     static Char8 cvsid_hpp       [] = OSG_GROUPMCASTCONNECTION_HEADER_CVSID;
00752 }
00753 

Generated on Thu Aug 25 04:08:33 2005 for OpenSG by  doxygen 1.4.3