Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

OSGGroupMCastConnection.cpp

Go to the documentation of this file.
00001 /*---------------------------------------------------------------------------*\
00002  *                                OpenSG                                     *
00003  *                                                                           *
00004  *                                                                           *
00005  *             Copyright (C) 2000-2002 by the OpenSG Forum                   *
00006  *                                                                           *
00007  *                            www.opensg.org                                 *
00008  *                                                                           *
00009  *   contact: dirk@opensg.org, gerrit.voss@vossg.org, jbehr@zgdv.de          *
00010  *                                                                           *
00011 \*---------------------------------------------------------------------------*/
00012 /*---------------------------------------------------------------------------*\
00013  *                                License                                    *
00014  *                                                                           *
00015  * This library is free software; you can redistribute it and/or modify it   *
00016  * under the terms of the GNU Library General Public License as published    *
00017  * by the Free Software Foundation, version 2.                               *
00018  *                                                                           *
00019  * This library is distributed in the hope that it will be useful, but       *
00020  * WITHOUT ANY WARRANTY; without even the implied warranty of                *
00021  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU         *
00022  * Library General Public License for more details.                          *
00023  *                                                                           *
00024  * You should have received a copy of the GNU Library General Public         *
00025  * License along with this library; if not, write to the Free Software       *
00026  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.                 *
00027  *                                                                           *
00028 \*---------------------------------------------------------------------------*/
00029 /*---------------------------------------------------------------------------*\
00030  *                                Changes                                    *
00031  *                                                                           *
00032  *                                                                           *
00033  *                                                                           *
00034  *                                                                           *
00035  *                                                                           *
00036  *                                                                           *
00037 \*---------------------------------------------------------------------------*/
00038 
00039 //---------------------------------------------------------------------------
00040 //  Includes
00041 //---------------------------------------------------------------------------
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045 
00046 #include <set>
00047 #include <sstream>
00048 
00049 #include "OSGConfig.h"
00050 #include "OSGLog.h"
00051 #include "OSGBaseThread.h"
00052 #include "OSGSocketSelection.h"
00053 #include "OSGBinaryMessage.h"
00054 #include "OSGGroupMCastConnection.h"
00055 #include "OSGConnectionType.h"
00056 
00057 #define USE_EARLY_SEND
00058 
00059 OSG_USING_NAMESPACE
00060 
00065 /*-------------------------------------------------------------------------*/
00066 /*                            constructor destructor                       */
00067 
00071 GroupMCastConnection::GroupMCastConnection():
00072     Inherited(),
00073     _sendQueueThread(NULL),
00074     _seqNumber(0),
00075     _initialized(false)
00076 {
00077     char lockName[256];
00078     sprintf(lockName,"GroupMCastConnection%p",this);
00079 
00080     // create locks
00081     _lock     = Lock::get(lockName);
00082     // fill dgramqueue
00083     for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI)
00084         _free.put(new Dgram());
00085     // prepare mcast socket
00086     _mcastSocket.open();
00087     _mcastSocket.bind();
00088     _mcastSocket.setReadBufferSize(262144);
00089     // set window size
00090     _windowSize = _mcastSocket.getReadBufferSize()/(OSG_DGRAM_LEN) - 1;
00091     _windowSize = osgMin((UInt32)13,_windowSize);
00092 }
00093 
00096 GroupMCastConnection::~GroupMCastConnection(void)
00097 {
00098     Dgram *dgram;
00099     _sendQueueThreadStop = true;
00100 
00101     // get free dgram
00102     _lock->aquire();
00103     dgram = _free.get(_lock);
00104     dgram->setSize(0);
00105     _queue.put(dgram);
00106     _lock->release();
00107     // wait for stop
00108     if(_sendQueueThread)
00109         BaseThread::join(_sendQueueThread);    
00110     // close socket
00111     _mcastSocket.close();
00112     // free queues
00113     _lock->aquire();
00114     while(!_free.empty())
00115         delete _free.get(_lock);
00116     while(!_queue.empty())
00117         delete _queue.get(_lock);
00118     _lock->release();
00119 }
00120 
00123 const ConnectionType *GroupMCastConnection::getType()
00124 {
00125     return &_type;
00126 }
00127 
00128 /*-------------------------------------------------------------------------*/
00129 /*                            connection                                   */
00130 
00134 GroupConnection::Channel GroupMCastConnection::connectPoint(
00135     const std::string &address,
00136     Time               timeout)
00137 {
00138     Channel channel = Inherited::connectPoint(address,timeout);
00139     return channel;
00140 }
00141 
00144 void GroupMCastConnection::disconnect(Channel channel)
00145 {
00146     Inherited::disconnect(channel);
00147     _lock->aquire();
00148     _destination.erase(_destination.begin()+channelToIndex(channel));
00149     _lock->release();
00150 }
00151 
00155 GroupConnection::Channel GroupMCastConnection::acceptPoint(Time timeout)
00156 {
00157     Connection::Channel channel = Inherited::acceptPoint(timeout);
00158     return channel;
00159 }
00160 
00163 void GroupMCastConnection::setParams(const std::string &params)
00164 {
00165     if(params.empty())
00166         return;
00167 
00168     Inherited::setParams(params);
00169 
00170     std::string option = "TTL=";
00171     std::string::size_type i = 0;
00172     if((i=params.find(option)) != std::string::npos)
00173     {
00174         std::string str = params.substr(i + option.size());
00175 
00176         std::stringstream ss;
00177         std::string::size_type j = 0;
00178         while(j < str.length() && str[j] != ',' && isdigit(str[j]))
00179         {
00180             ss << str[j++];
00181         }
00182         UInt32 ttl;
00183         ss >> ttl;
00184         if(ttl > 255)
00185             ttl = 255;
00186         _mcastSocket.setTTL((unsigned char) ttl);
00187         FINFO(("GroupMCastConnection::setParams : setting ttl to %u.\n", ttl));
00188     }
00189 }
00190 
00191 /*-------------------------------------------------------------------------*/
00192 /*                            sync                                         */
00193 
00196 bool GroupMCastConnection::wait(Time timeout) throw (ReadError)
00197 {
00198     // todo
00199     return Inherited::wait(timeout);
00200 }
00201 
00204 void GroupMCastConnection::signal(void) throw (WriteError)
00205 {
00206     UInt32 tag=314156;
00207     putValue(tag);
00208     flush();
00209 }
00210 
00211 /*-------------------------- helpers --------------------------------------*/
00212 
00216 GroupConnection *GroupMCastConnection::create(void)
00217 {
00218     return new GroupMCastConnection();
00219 }
00220 
00221 /*-------------------------------------------------------------------------*/
00222 /*                              read write                                 */
00223 
00231 void GroupMCastConnection::write(MemoryHandle mem,UInt32 size)
00232 {
00233     Dgram  *dgram  = NULL;;
00234     UInt32  pos    = 0;
00235     char   *buffer = (char*)mem;
00236 
00237     if(!_initialized)
00238         initialize();
00239 
00240     while(size)
00241     {
00242         // get free dgram
00243         _lock->aquire();
00244         dgram = _free.get(_lock);
00245         _lock->release();
00246         // fill with data
00247         dgram->setSize(osgMin(size,dgram->getCapacity()));
00248         memcpy(dgram->getData(),buffer+pos,dgram->getSize());
00249         // set sequence number
00250         dgram->setId( _seqNumber++ );
00251         // prepate next block
00252         size -= dgram->getSize();
00253         pos  += dgram->getSize();
00254         // put to write queue
00255         _lock->aquire();
00256         if(!_sendQueueThreadRunning)
00257         {
00258             _lock->release();
00259             BaseThread::join(_sendQueueThread);    
00260             throw WriteError("Channel closed");
00261         }
00262 #ifdef USE_EARLY_SEND
00263         if(_queue.waiting())
00264         {
00265             dgram->setEarlySend(true);
00266             _mcastSocket.sendTo(dgram->getBuffer(),
00267                                 dgram->getBufferSize(),
00268                                 _mcastAddress);
00269         }
00270 #endif
00271         _queue.put(dgram);
00272         _lock->release();
00273     }
00274 }
00275 
00281 void GroupMCastConnection::writeBuffer(void)
00282 {
00283     UInt32       size   = writeBufBegin()->getDataSize();
00284     MemoryHandle buffer = writeBufBegin()->getMem();
00285 
00286     write(buffer,size);
00287 }
00288 
00289 /*-------------------------------------------------------------------------*/
00290 /*                              private helpers                            */
00291 
00294 bool GroupMCastConnection::checkChannels(void)
00295 {
00296     SocketSelection selection;
00297     UInt32          index;
00298     bool            valid=true;
00299                     
00300     _lock->aquire();
00301     for(index = 0 ; index < _sockets.size() ; ++index)
00302         selection.setRead(_sockets[index]);
00303     if(selection.select(0))
00304     {
00305         UInt32 len;
00306         char   buffer;
00307         for(index = 0 ; index < _sockets.size() ; ++index)
00308         {
00309             if(selection.isSetRead(_sockets[index]))
00310             {
00311                 try
00312                 {
00313                     _sockets[index].send(&buffer,1);
00314                 }
00315                 catch(SocketException &e)
00316                 {
00317                     valid = false;
00318                     FWARNING(("Socket error:%s\n",e.what()))
00319                     break;
00320                 }
00321             }
00322         }
00323     }
00324     _lock->release();
00325     return valid;
00326 }
00327 
00330 bool GroupMCastConnection::sendQueue(void)
00331 {
00332     std::vector<Dgram*>                   dgram;
00333     std::vector<std::set<SocketAddress> > missing;
00334     UInt32                  count    = 0;
00335     UInt32                  maxCount = _windowSize-1;
00336     UInt32                  ack  = 0;
00337     UInt32                  end  = 0;
00338     UInt32                  send = 0;
00339     UInt32                  channel;
00340     UInt32                  index;
00341     Dgram                   response;
00342     UInt32                  m;
00343     bool                    readable = false;
00344     Time                    waitStart = getSystemTime();
00345     Time                    lastAckTime=0;
00346     UInt32                  len;
00347     SocketAddress           fromAddress;
00348     Dgram                   ackRequest;
00349     const Time              ackTimeout = 0.01;
00350     UInt16                  sendId=0;
00351     UInt16                  lastNak=0;
00352     Time                    lastNakTime=0;
00353     bool                    stopAfterSend=false;
00354 
00355     // prepate buffers
00356     dgram.resize(_windowSize);
00357     missing.resize(_windowSize);
00358 
00359 #ifdef TEST_LOST_DGRAM_RATE
00360     srand48((int)getSystemTime());
00361 #endif
00362     do
00363     {
00364         // read new dgrams
00365         if(count < maxCount)
00366         {
00367             _lock->aquire();
00368             while(count < maxCount &&
00369                   (count == 0 || !_queue.empty()))
00370             {
00371                 dgram[end] = _queue.get(_lock);
00372                 // stop
00373                 if(!dgram[end]->getSize())
00374                 {
00375                     if(count)
00376                     {
00377                         // wait for ack of packages in window
00378                         stopAfterSend = true;
00379                         break;
00380                     }
00381                     else
00382                     {
00383                         // no packages in the window
00384                         _lock->release();
00385                         return true;
00386                     }
00387                 }
00388                 // insert to expected responses
00389                 for(index=0 ; index<_waitFor.size() ; ++index)
00390                     missing[end].insert(_waitFor[index]);
00391                 end = (end+1) % _windowSize;
00392                 if(!count)
00393                     lastAckTime = getSystemTime();
00394                 count++;
00395             }
00396             _lock->release();
00397         }
00398 
00399         // send all dgrams in current window
00400         for( ; send != end ; send = (send+1) % _windowSize)
00401         {
00402 #ifdef TEST_LOST_DGRAM_RATE
00403             if(drand48()>TEST_LOST_DGRAM_RATE)
00404 #endif
00405             {
00406                 if(!dgram[send]->getEarlySend())
00407                     _mcastSocket.sendTo(dgram[send]->getBuffer(),
00408                                         dgram[send]->getBufferSize(),
00409                                         _mcastAddress);
00410                 dgram[send]->setEarlySend(false);
00411             }
00412             sendId = dgram[send]->getId();
00413 //            printf("send dgram %d at id %d\n",send,dgram[send]->getId());
00414         }
00415 
00416         // loop while
00417         // window is full and nothing to send or
00418         // queue is empty
00419         // or there is something to read
00420         while(count &&
00421               ( ( readable = _mcastSocket.waitReadable(0) ) ||
00422                 ( count == maxCount && send == end ) || 
00423                 ( getSystemTime() - lastAckTime > ackTimeout) ) )
00424         {
00425             if(!readable && !_mcastSocket.waitReadable(ackTimeout))
00426             {
00427 #if 0
00428                 printf("count %d\n",count);
00429                 printf("missing %d\n",missing[ack].size());
00430 
00431                 printf("readable %d\n",readable);
00432                 printf("send %d end %d\n",send,end);
00433                 printf("lastack %lf\n",getSystemTime() - lastAckTime);
00434 #endif
00435                 FDEBUG(("timeout count %d %d missing %d\n",count,sendId,missing[ack].size()))
00436 //                printf("%.10f timeout count %d %d missing %d\n",getSystemTime()-t1,count,sendId,missing[ack].size());
00437                     
00438                 ackRequest.setSize(0);
00439                 ackRequest.setId(sendId);
00440 
00441                 // send request over multicast
00442                 _mcastSocket.sendTo(ackRequest.getBuffer(),
00443                                     ackRequest.getBufferSize(),
00444                                     _mcastAddress);
00445 
00446                 // wait until next ack request                
00447                 _mcastSocket.waitReadable(0.05);
00448 
00449                 // check channels
00450                 if(getSystemTime() - lastAckTime > 0.5)
00451                 {
00452                     if(!checkChannels())
00453                         return false;
00454                 }
00455                 if(_sendQueueThreadStop && 
00456                    getSystemTime() - lastAckTime > 1)
00457                     // linger max 1 sec after close
00458                     break;
00459                 else
00460                     // retry wait
00461                     continue;
00462             }
00463 
00464             // read response
00465             len = _mcastSocket.recvFrom(response.getBuffer(),
00466                                         response.getBufferCapacity(),
00467                                         fromAddress);
00468             lastAckTime = getSystemTime();
00469 
00470             // ignore response with wrong len
00471             response.setResponseSize();
00472             if(len != response.getBufferSize())
00473             {
00474                 FDEBUG(("Wrong response len %d\n",len))
00475                     continue;
00476             }
00477 
00478             // old ack ?
00479             if(!Dgram::less(response.getId(),dgram[ack]->getId()))
00480             {
00481                 // first ack for this dgram from this receiver
00482                 if(response.getResponseAck() == true)
00483                 {
00484 //                    printf("Ack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort());
00485                     for(m = ack ; 
00486                         dgram[m]->getId() != response.getId() ;
00487                         m=(m+1) % _windowSize)
00488                         missing[m].erase(fromAddress);
00489                     missing[m].erase(fromAddress);
00490                 }
00491                 else
00492                 {
00493                     if(response.getId() == lastNak &&
00494                        getSystemTime() - lastNakTime < 0.02)
00495                         continue;
00496                     lastNak = response.getId();
00497                     lastNakTime = getSystemTime();
00498                     FDEBUG(("Nack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort()));
00499 //                    printf("Nack %d from %s:%d\n",response.getId(),fromAddress.getHost().c_str(),fromAddress.getPort());
00500                     // retransmit
00501                     for(m = ack ; 
00502                         m != send && dgram[m]->getId() != response.getId() ; 
00503                         m = (m+1) % _windowSize);
00504                     send = m;
00505                 }
00506             }
00507 
00508             // free acknolaged packes
00509             if(missing[ack].empty())
00510             {
00511 //                printf("ack %d\n",dgram[ack]->getId());
00512                 _lock->aquire();
00513                 while(count && missing[ack].empty())
00514                 {
00515                     _free.put(dgram[ack]);
00516                     ack = (ack+1) % _windowSize;
00517                     count--;
00518                 }
00519                 _lock->release();
00520             }
00521         }
00522     }
00523     while(!stopAfterSend || send != end);
00524 
00525     return true;
00526 }
00527 
00530 void GroupMCastConnection::sendQueueThread(void *arg)
00531 {
00532     GroupMCastConnection *the = (GroupMCastConnection *)arg;
00533     try
00534     {
00535         the->sendQueue();
00536     }
00537     catch(SocketException &e)
00538     {
00539         FFATAL(( "Writer Proc crashed %s\n",e.what() ));
00540     }
00541     the->_sendQueueThreadRunning = false;
00542 }
00543 
00546 void GroupMCastConnection::initialize()
00547 {
00548     std::string   group = "239.33.42.32";
00549 //    std::string group = "146.140.32.7";
00550 //    std::string group = "146.140.32.255";
00551     int           port = 15356;
00552     int           pos  = _destination.find(':');
00553     int           clientPort;
00554     std::string   clientHost;
00555     char          hostname[256];
00556     UInt32        index;
00557     UInt32        len;
00558     UInt32        ackNum = (UInt32) osgsqrt(_sockets.size());
00559     UInt32        numSource;
00560     UInt32        sendTo;
00561     BinaryMessage message;
00562     char          threadName[256];
00563 
00564     sprintf(threadName,"GroupMCastConnection%p",this);
00565 
00566     if(!getDestination().empty())
00567         group = getDestination();
00568         
00569     if(_sockets.size()<=16)
00570         ackNum = 1;
00571 
00572     if(pos>=0)
00573     {
00574         group = std::string(_destination,0,pos);
00575         port  = atoi(std::string(_destination,pos+1,std::string::npos).c_str());
00576     }
00577     else
00578     {
00579         if(_destination.size())
00580             group = _destination;
00581     }
00582     _mcastAddress.setHost(group);
00583     _mcastAddress.setPort(port );
00584 
00585     // set multicast interface, if given
00586     if(!getInterface().empty())
00587         _mcastSocket.setMCastInterface(
00588             SocketAddress(getInterface().c_str()));
00589 
00590     for(index = 0 ; index < _sockets.size() ; ++index)
00591     {
00592         message.clear();
00593         // tell the point connection the multicast address
00594         message.putString(_mcastAddress.getHost());
00595         message.putUInt32(_mcastAddress.getPort());
00596         // tell the current seq number
00597         message.putUInt32(_seqNumber);
00598         // tell the point from wich port requests are comming
00599         hostname[255] = '\0';
00600         gethostname(hostname,255);
00601         message.putString(hostname);
00602         message.putUInt32(_mcastSocket.getAddress().getPort());
00603         // send the message
00604         _sockets[index].send(message);
00605         
00606         // receive destination address
00607         message.clear();
00608         len = _sockets[index].recv(message);
00609         if(len == 0)
00610             throw ReadError("Channel closed\n");
00611         clientHost = message.getString();
00612         clientPort = message.getUInt32();
00613 
00614         SINFO << "Server:" << clientHost 
00615               << " Port:" << clientPort << std::endl;
00616         _receiver.push_back(SocketAddress(clientHost.c_str(),clientPort));
00617     }
00618     for(index = 0 ; index < _sockets.size() ; ++index)
00619     {
00620         message.clear();
00621         // tell receivers, whom to report acks
00622         if((index % ackNum) == 0)
00623         {
00624             _waitFor.push_back(_receiver[index]);
00625             numSource = osgMin( ackNum-1, (UInt32) _sockets.size()-index-1 );
00626             message.putUInt32(numSource);
00627             for(UInt32 r = index+1 ; r < index+1+numSource ; ++r)
00628             {
00629                 message.putString(_receiver[r].getHost());
00630                 message.putUInt32(_receiver[r].getPort());
00631             }
00632             message.putString(hostname);
00633             message.putUInt32(_mcastSocket.getAddress().getPort());
00634         }
00635         else
00636         {
00637             sendTo = index - (index % ackNum);
00638             message.putUInt32(0);
00639             message.putString(_receiver[sendTo].getHost());
00640             message.putUInt32(_receiver[sendTo].getPort());
00641         }
00642         // send the message
00643         _sockets[index].send(message);
00644     }
00645 
00646     // start write thread
00647     _sendQueueThread=BaseThread::get(threadName);
00648     _sendQueueThreadRunning = true;
00649     _sendQueueThreadStop    = false;
00650     _sendQueueThread->runFunction( sendQueueThread, (void *) (this) );
00651     _initialized = true;
00652 }
00653 
00654 /*-------------------------------------------------------------------------*/
00655 /*                              static type                                */
00656 
00657 ConnectionType GroupMCastConnection::_type(
00658     &GroupMCastConnection::create,
00659     "Multicast");
00660 
00661 /*-------------------------------------------------------------------------*/
00662 /*                              cvs id's                                   */
00663 
00664 #ifdef __sgi
00665 #pragma set woff 1174
00666 #endif
00667 
00668 #ifdef OSG_LINUX_ICC
00669 #pragma warning( disable : 177 )
00670 #endif
00671 
00672 namespace
00673 {
00674     static Char8 cvsid_cpp       [] = "@(#)$Id: $";
00675     static Char8 cvsid_hpp       [] = OSG_GROUPMCASTCONNECTION_HEADER_CVSID;
00676 }
00677 

Generated on Thu Aug 25 04:06:00 2005 for OpenSG by  doxygen 1.4.3