Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

OSGGroupSockConnection.cpp

Go to the documentation of this file.
00001 /*---------------------------------------------------------------------------*\
00002  *                                OpenSG                                     *
00003  *                                                                           *
00004  *                                                                           *
00005  *             Copyright (C) 2000-2002 by the OpenSG Forum                   *
00006  *                                                                           *
00007  *                            www.opensg.org                                 *
00008  *                                                                           *
00009  *   contact: dirk@opensg.org, gerrit.voss@vossg.org, jbehr@zgdv.de          *
00010  *                                                                           *
00011 \*---------------------------------------------------------------------------*/
00012 /*---------------------------------------------------------------------------*\
00013  *                                License                                    *
00014  *                                                                           *
00015  * This library is free software; you can redistribute it and/or modify it   *
00016  * under the terms of the GNU Library General Public License as published    *
00017  * by the Free Software Foundation, version 2.                               *
00018  *                                                                           *
00019  * This library is distributed in the hope that it will be useful, but       *
00020  * WITHOUT ANY WARRANTY; without even the implied warranty of                *
00021  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU         *
00022  * Library General Public License for more details.                          *
00023  *                                                                           *
00024  * You should have received a copy of the GNU Library General Public         *
00025  * License along with this library; if not, write to the Free Software       *
00026  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.                 *
00027  *                                                                           *
00028 \*---------------------------------------------------------------------------*/
00029 /*---------------------------------------------------------------------------*\
00030  *                                Changes                                    *
00031  *                                                                           *
00032  *                                                                           *
00033  *                                                                           *
00034  *                                                                           *
00035  *                                                                           *
00036  *                                                                           *
00037 \*---------------------------------------------------------------------------*/
00038 
00039 //---------------------------------------------------------------------------
00040 //  Includes
00041 //---------------------------------------------------------------------------
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045 
00046 #include <sstream>
00047 
00048 #include "OSGConfig.h"
00049 #include "OSGLog.h"
00050 #include "OSGBaseFunctions.h"
00051 #include "OSGSocketSelection.h"
00052 #include "OSGSocketException.h"
00053 #include "OSGGroupSockConnection.h"
00054 #include "OSGConnectionType.h"
00055 
00056 OSG_USING_NAMESPACE
00057 
00062 /*-------------------------------------------------------------------------*/
00063 /*                            constructor destructor                       */
00064 
00068 GroupSockConnection::GroupSockConnection():
00069     GroupConnection(0)
00070 {
00071     _acceptSocket.open();
00072     _acceptSocket.setReusePort(true);
00073 
00074     _socketReadBuffer.resize(131071);
00075     _socketWriteBuffer.resize( _socketReadBuffer.size() );
00076     // reserve first bytes for buffer size
00077     readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)],
00078                 _socketReadBuffer.size() -sizeof(SocketBufferHeader));
00079     writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00080                 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00081 }
00082 
00085 GroupSockConnection::~GroupSockConnection(void)
00086 {
00087     // close and remove sockets
00088     while(_sockets.size())
00089     {
00090         try
00091         {
00092             _sockets.begin()->close();
00093             _sockets.erase(_sockets.begin());
00094         }
00095         catch(...)
00096         {
00097         }
00098     }
00099     _acceptSocket.close();
00100 }
00101 
00104 const ConnectionType *GroupSockConnection::getType()
00105 {
00106     return &_type;
00107 }
00108 
00109 /*-------------------------------------------------------------------------*/
00110 /*                            connection                                   */
00111 
00115 GroupConnection::Channel GroupSockConnection::connectPoint(
00116     const std::string &address,
00117     Time               timeout)
00118 {
00119     Channel channel = -1;
00120     StreamSocket socket;
00121     if(connectSocket(socket,address,timeout))
00122     {
00123         channel = newChannelIndex(_sockets.size());
00124         _sockets.push_back(socket);
00125         _readIndex = 0;
00126     }
00127     return channel;
00128 }
00129 
00132 void GroupSockConnection::disconnect(Channel channel)
00133 {
00134     ChannelIndex index = channelToIndex(channel);
00135     try
00136     {
00137         _sockets[index].close();
00138     }
00139     catch(...)
00140     {
00141     }
00142     _sockets.erase(_sockets.begin() + index);
00143     delChannelIndex(index);
00144     _readIndex = 0;
00145 }
00146 
00150 GroupConnection::Channel GroupSockConnection::acceptPoint(Time timeout)
00151 {
00152     StreamSocket from;
00153     if(GroupSockConnection::acceptSocket(_acceptSocket,from,timeout))
00154     {
00155         Channel channel = newChannelIndex(_sockets.size());
00156         _sockets.push_back(from);
00157         _readIndex = 0;
00158         return channel;
00159     }
00160     else
00161     {
00162         return -1;
00163     }
00164 }
00165 
00172 std::string GroupSockConnection::bind(const std::string &address)
00173 {
00174     int         port=0;
00175     char        localhost[256];
00176     char        host[256];
00177     char        portStr[256];
00178     std::string interf;
00179     std::string boundedAddress;
00180 
00181     // get local host name
00182     gethostname(localhost,255);
00183     if(!getInterface().empty())
00184         interf = getInterface();
00185     else
00186         interf = localhost;
00187     // parse address
00188     if(!address.empty())
00189         if(sscanf(address.c_str(),"%*[^:]:%d",&port) != 1)
00190             if(sscanf(address.c_str(),":%d",&port) != 1)
00191                 port = 0;
00192     // bind port
00193     _acceptSocket.setReusePort(true);
00194     _acceptSocket.bind(SocketAddress(interf.c_str(),port));
00195     SINFO << "Connection bound to "
00196           << _acceptSocket.getAddress().getHost() << ":"
00197           << _acceptSocket.getAddress().getPort() << std::endl;
00198     _acceptSocket.listen();
00199     // create address
00200     sprintf(portStr,"%d",_acceptSocket.getAddress().getPort());
00201     return interf + ":" + portStr;
00202 }
00203 
00206 void GroupSockConnection::setParams(const std::string &params)
00207 {
00208     if(params.empty())
00209         return;
00210 
00211     std::string option = "bufferSize=";
00212     std::string::size_type i = 0;
00213     if((i=params.find(option)) != std::string::npos)
00214     {
00215         std::string str = params.substr(i + option.size());
00216 
00217         std::stringstream ss;
00218         std::string::size_type j = 0;
00219         while(j < str.length() && str[j] != ',' && isdigit(str[j]))
00220         {
00221             ss << str[j++];
00222         }
00223         UInt32 bufferSize;
00224         ss >> bufferSize;
00225 
00226         // clear old buffer.
00227         readBufClear();
00228         writeBufClear();
00229 
00230         _socketReadBuffer.resize(bufferSize);
00231         _socketWriteBuffer.resize(_socketReadBuffer.size());
00232         
00233         // reserve first bytes for buffer size
00234         readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)],
00235                     _socketReadBuffer.size() -sizeof(SocketBufferHeader));
00236         writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00237                     _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00238 
00239         FINFO(("GroupSockConnection::setParams : setting buffer size to %u.\n", bufferSize));
00240     }
00241 }
00242 
00243 /*-------------------------------------------------------------------------*/
00244 /*                              channel handling                           */
00245 
00249 Connection::Channel GroupSockConnection::selectChannel(Time timeout)
00250     throw (ReadError)
00251 {
00252     Int32 maxnread=0,nread;
00253     ChannelIndex index;
00254     SocketSelection selection,result;
00255 
00256     // if there is data in the read buffer, return current channel
00257     if(_zeroCopyThreshold != 1 &&
00258        _currentReadBuffer != readBufEnd())
00259     {
00260         FFATAL(("Channel change ignores data in current buffer"))
00261         return indexToChannel(_readIndex);
00262     }    
00263 
00264     if(_selection[_readIndex] &&
00265        _sockets[_readIndex].getAvailable())
00266     {
00267         return indexToChannel(_readIndex);;
00268     }
00269 
00270     // wait for first socket to deliver data
00271     for(index = 0 ; index < _sockets.size() ; ++index)
00272     {
00273         if(_selection[index])
00274             selection.setRead(_sockets[index]);
00275     }
00276     
00277     try 
00278     {
00279         // select ok ?
00280         if(!selection.select(timeout,result))
00281             return -1;
00282 
00283         // use socket with most data
00284         for(index = 0 ; index < _sockets.size() ; ++index)
00285         {
00286             if(result.isSetRead(_sockets[index]))
00287             {
00288                 nread=_sockets[index].getAvailable();
00289                 if(maxnread < nread)
00290                 {
00291                     maxnread = nread;
00292                     _readIndex=index;
00293                 }
00294             }
00295         }
00296     }
00297     catch(SocketException &e)
00298     {
00299         throw ReadError(e.what());
00300     }
00301 
00302     // return channel id
00303     return indexToChannel(_readIndex);
00304 }
00305 
00306 /*-------------------------------------------------------------------------*/
00307 /*                            sync                                         */
00308 
00311 bool GroupSockConnection::wait(Time timeout) throw (ReadError)
00312 {
00313     UInt32 len;
00314     UInt32 index;
00315     UInt32 tag=314156;
00316     UInt32 missing = _sockets.size();
00317     SocketSelection selection,result;
00318 
00319     for(index = 0 ; index < _sockets.size() ; ++index)
00320         selection.setRead(_sockets[index]);
00321 
00322     try
00323     {
00324         while(missing)
00325         {
00326             if(!selection.select(timeout,result))
00327                 return false;
00328             for(index = 0 ; index < _sockets.size() ; ++index)
00329             {
00330                 if(result.isSetRead(_sockets[index]))
00331                 {
00332                     len = _sockets[index].recv(&tag,sizeof(tag));
00333                     tag = osgntohl(tag);
00334                     if(len == 0)
00335                         throw ReadError("Channel closed");
00336                     selection.clearRead(_sockets[index]);
00337                     missing--;
00338                     if(tag != 314156)
00339                     {
00340                         FFATAL(("Stream out of sync in SockConnection\n"));
00341                         throw ReadError("Stream out of sync");
00342                     }
00343                 }
00344             }
00345         }
00346     }
00347     catch(SocketException &e)
00348     {
00349         throw ReadError(e.what());
00350     }
00351     return true;
00352 }
00353 
00356 void GroupSockConnection::signal(void) throw (WriteError)
00357 {
00358     UInt32 tag=osghtonl(314156);
00359     UInt32 index;
00360 
00361     try
00362     {
00363         for(index = 0 ; index<_sockets.size() ; ++index)
00364             _sockets[index].send(&tag,sizeof(tag));
00365     }
00366     catch(SocketError &e)
00367     {
00368         throw WriteError(e.what());
00369     }
00370 }
00371 
00372 /*-------------------------- create ---------------------------------------*/
00373 
00377 GroupConnection *GroupSockConnection::create(void)
00378 {
00379     return new GroupSockConnection();
00380 }
00381 
00382 /*-------------------------------------------------------------------------*/
00383 /*                              read write                                 */
00384 
00392 void GroupSockConnection::read(MemoryHandle mem,UInt32 size)
00393 {
00394     int len;
00395 
00396     // read data
00397     len=_sockets[_readIndex].recv(mem,size);
00398     if(len==0)
00399     {
00400 //        throw ChannelClosed(indexToChannel(_readIndex));
00401         throw ReadError("Channel closed");
00402     }
00403 } 
00404 
00412 void GroupSockConnection::readBuffer()
00413 {
00414     int size;
00415     int len;
00416 
00417     // read buffer header
00418     len=_sockets[_readIndex].recv(&_socketReadBuffer[0],sizeof(SocketBufferHeader));
00419     if(len==0)
00420         throw ReadError("Channel closed");
00421     // read remaining data
00422     size=osgntohl(((SocketBufferHeader*)&_socketReadBuffer[0])->size);
00423     len=_sockets[_readIndex].recv(&_socketReadBuffer[sizeof(SocketBufferHeader)],
00424                          size);
00425     if(len==0)
00426         throw ReadError("Channel closed");
00427     readBufBegin()->setDataSize(size);
00428 }    
00429 
00437 void GroupSockConnection::write(MemoryHandle mem,UInt32 size)
00438 {
00439     Int32 index;
00440 
00441     try
00442     {
00443         // write to all connected sockets
00444         for(index = 0 ; index < _sockets.size() ; ++index)
00445             _sockets[index].send(mem,size);
00446     }
00447     catch(SocketException &e)
00448     {
00449         throw WriteError(e.what());
00450     }
00451 }
00452 
00458 void GroupSockConnection::writeBuffer(void)
00459 {
00460     Int32 index;
00461     UInt32 size = writeBufBegin()->getDataSize();
00462     // write size to header
00463     ((SocketBufferHeader*)&_socketWriteBuffer[0])->size=osghtonl(size);
00464     if(size)
00465     {
00466         // write data to all sockets
00467         for(index = 0 ; index < _sockets.size() ; ++index)
00468         {
00469             // write whole block
00470             _sockets[index].send(&_socketWriteBuffer[0],
00471                                  size+sizeof(SocketBufferHeader));
00472         }
00473     }
00474 }
00475 
00476 /*-------------------------------------------------------------------------*/
00477 /*                              internal helper functions                  */
00478 
00481 bool GroupSockConnection::connectSocket(StreamSocket &socket,
00482                                         std::string   address,
00483                                         Time          timeout)
00484 {
00485     std::string  host="unknown";
00486     int          port=0;
00487     Time         startTime = getSystemTime();
00488     bool         connected=false;
00489     
00490     int pos = address.find(':');
00491     if(pos>=0)
00492     {
00493         host = std::string(address,0,pos);
00494         port = atoi(std::string(address,pos+1,std::string::npos).c_str());
00495     }
00496     else
00497     {
00498         host = address;
00499     }
00500     
00501     socket.open();
00502     socket.setDelay(false);
00503     socket.setReadBufferSize(1048576);
00504     socket.setWriteBufferSize(1048576);
00505     while(!connected && 
00506           (timeout == -1 || (getSystemTime()-startTime) < timeout))
00507     {
00508         try
00509         {
00510             socket.connect(SocketAddress(host.c_str(),port));
00511             connected = true;
00512         }
00513         catch(...)
00514         {
00515         }
00516     }
00517     if(connected)
00518         return true;
00519     else
00520         return false;
00521 }
00522 
00525 bool GroupSockConnection::acceptSocket(StreamSocket &accept,
00526                                        StreamSocket &from,
00527                                        Time          timeout)
00528 {
00529     if(!accept.waitReadable(timeout))
00530         return false;
00531     from=accept.accept();
00532     from.setDelay(false);
00533     from.setReadBufferSize(1048576);
00534     from.setWriteBufferSize(1048576);
00535     return true;
00536 }
00537 
00538 /*-------------------------------------------------------------------------*/
00539 /*                              static type                                */
00540 
00541 ConnectionType GroupSockConnection::_type(
00542     &GroupSockConnection::create,
00543     "StreamSock");
00544 
00545 /*-------------------------------------------------------------------------*/
00546 /*                              cvs id's                                   */
00547 
00548 #ifdef __sgi
00549 #pragma set woff 1174
00550 #endif
00551 
00552 #ifdef OSG_LINUX_ICC
00553 #pragma warning( disable : 177 )
00554 #endif
00555 
00556 namespace
00557 {
00558     static Char8 cvsid_cpp       [] = "@(#)$Id: $";
00559     static Char8 cvsid_hpp       [] = OSG_GROUPSOCKCONNECTION_HEADER_CVSID;
00560 }
00561 

Generated on Thu Aug 25 04:06:00 2005 for OpenSG by  doxygen 1.4.3