00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045
00046 #include <sstream>
00047
00048 #include "OSGConfig.h"
00049 #include "OSGLog.h"
00050 #include "OSGBaseFunctions.h"
00051 #include "OSGSocketSelection.h"
00052 #include "OSGSocketException.h"
00053 #include "OSGGroupSockConnection.h"
00054 #include "OSGConnectionType.h"
00055
00056 OSG_USING_NAMESPACE
00057
00062
00063
00064
00068 GroupSockConnection::GroupSockConnection():
00069 GroupConnection(0)
00070 {
00071 _acceptSocket.open();
00072 _acceptSocket.setReusePort(true);
00073
00074 _socketReadBuffer.resize(131071);
00075 _socketWriteBuffer.resize( _socketReadBuffer.size() );
00076
00077 readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)],
00078 _socketReadBuffer.size() -sizeof(SocketBufferHeader));
00079 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00080 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00081 }
00082
00085 GroupSockConnection::~GroupSockConnection(void)
00086 {
00087
00088 while(_sockets.size())
00089 {
00090 try
00091 {
00092 _sockets.begin()->close();
00093 _sockets.erase(_sockets.begin());
00094 }
00095 catch(...)
00096 {
00097 }
00098 }
00099 _acceptSocket.close();
00100 }
00101
00104 const ConnectionType *GroupSockConnection::getType()
00105 {
00106 return &_type;
00107 }
00108
00109
00110
00111
00115 GroupConnection::Channel GroupSockConnection::connectPoint(
00116 const std::string &address,
00117 Time timeout)
00118 {
00119 Channel channel = -1;
00120 StreamSocket socket;
00121 if(connectSocket(socket,address,timeout))
00122 {
00123 channel = newChannelIndex(_sockets.size());
00124 _sockets.push_back(socket);
00125 _readIndex = 0;
00126 }
00127 return channel;
00128 }
00129
00132 void GroupSockConnection::disconnect(Channel channel)
00133 {
00134 ChannelIndex index = channelToIndex(channel);
00135 try
00136 {
00137 _sockets[index].close();
00138 }
00139 catch(...)
00140 {
00141 }
00142 _sockets.erase(_sockets.begin() + index);
00143 delChannelIndex(index);
00144 _readIndex = 0;
00145 }
00146
00150 GroupConnection::Channel GroupSockConnection::acceptPoint(Time timeout)
00151 {
00152 StreamSocket from;
00153 if(GroupSockConnection::acceptSocket(_acceptSocket,from,timeout))
00154 {
00155 Channel channel = newChannelIndex(_sockets.size());
00156 _sockets.push_back(from);
00157 _readIndex = 0;
00158 return channel;
00159 }
00160 else
00161 {
00162 return -1;
00163 }
00164 }
00165
00172 std::string GroupSockConnection::bind(const std::string &address)
00173 {
00174 int port=0;
00175 char localhost[256];
00176 char host[256];
00177 char portStr[256];
00178 std::string interf;
00179 std::string boundedAddress;
00180
00181
00182 gethostname(localhost,255);
00183 if(!getInterface().empty())
00184 interf = getInterface();
00185 else
00186 interf = localhost;
00187
00188 if(!address.empty())
00189 if(sscanf(address.c_str(),"%*[^:]:%d",&port) != 1)
00190 if(sscanf(address.c_str(),":%d",&port) != 1)
00191 port = 0;
00192
00193 _acceptSocket.setReusePort(true);
00194 _acceptSocket.bind(SocketAddress(interf.c_str(),port));
00195 SINFO << "Connection bound to "
00196 << _acceptSocket.getAddress().getHost() << ":"
00197 << _acceptSocket.getAddress().getPort() << std::endl;
00198 _acceptSocket.listen();
00199
00200 sprintf(portStr,"%d",_acceptSocket.getAddress().getPort());
00201 return interf + ":" + portStr;
00202 }
00203
00206 void GroupSockConnection::setParams(const std::string ¶ms)
00207 {
00208 if(params.empty())
00209 return;
00210
00211 std::string option = "bufferSize=";
00212 std::string::size_type i = 0;
00213 if((i=params.find(option)) != std::string::npos)
00214 {
00215 std::string str = params.substr(i + option.size());
00216
00217 std::stringstream ss;
00218 std::string::size_type j = 0;
00219 while(j < str.length() && str[j] != ',' && isdigit(str[j]))
00220 {
00221 ss << str[j++];
00222 }
00223 UInt32 bufferSize;
00224 ss >> bufferSize;
00225
00226
00227 readBufClear();
00228 writeBufClear();
00229
00230 _socketReadBuffer.resize(bufferSize);
00231 _socketWriteBuffer.resize(_socketReadBuffer.size());
00232
00233
00234 readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)],
00235 _socketReadBuffer.size() -sizeof(SocketBufferHeader));
00236 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00237 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00238
00239 FINFO(("GroupSockConnection::setParams : setting buffer size to %u.\n", bufferSize));
00240 }
00241 }
00242
00243
00244
00245
00249 Connection::Channel GroupSockConnection::selectChannel(Time timeout)
00250 throw (ReadError)
00251 {
00252 Int32 maxnread=0,nread;
00253 ChannelIndex index;
00254 SocketSelection selection,result;
00255
00256
00257 if(_zeroCopyThreshold != 1 &&
00258 _currentReadBuffer != readBufEnd())
00259 {
00260 FFATAL(("Channel change ignores data in current buffer"))
00261 return indexToChannel(_readIndex);
00262 }
00263
00264 if(_selection[_readIndex] &&
00265 _sockets[_readIndex].getAvailable())
00266 {
00267 return indexToChannel(_readIndex);;
00268 }
00269
00270
00271 for(index = 0 ; index < _sockets.size() ; ++index)
00272 {
00273 if(_selection[index])
00274 selection.setRead(_sockets[index]);
00275 }
00276
00277 try
00278 {
00279
00280 if(!selection.select(timeout,result))
00281 return -1;
00282
00283
00284 for(index = 0 ; index < _sockets.size() ; ++index)
00285 {
00286 if(result.isSetRead(_sockets[index]))
00287 {
00288 nread=_sockets[index].getAvailable();
00289 if(maxnread < nread)
00290 {
00291 maxnread = nread;
00292 _readIndex=index;
00293 }
00294 }
00295 }
00296 }
00297 catch(SocketException &e)
00298 {
00299 throw ReadError(e.what());
00300 }
00301
00302
00303 return indexToChannel(_readIndex);
00304 }
00305
00306
00307
00308
00311 bool GroupSockConnection::wait(Time timeout) throw (ReadError)
00312 {
00313 UInt32 len;
00314 UInt32 index;
00315 UInt32 tag=314156;
00316 UInt32 missing = _sockets.size();
00317 SocketSelection selection,result;
00318
00319 for(index = 0 ; index < _sockets.size() ; ++index)
00320 selection.setRead(_sockets[index]);
00321
00322 try
00323 {
00324 while(missing)
00325 {
00326 if(!selection.select(timeout,result))
00327 return false;
00328 for(index = 0 ; index < _sockets.size() ; ++index)
00329 {
00330 if(result.isSetRead(_sockets[index]))
00331 {
00332 len = _sockets[index].recv(&tag,sizeof(tag));
00333 tag = osgntohl(tag);
00334 if(len == 0)
00335 throw ReadError("Channel closed");
00336 selection.clearRead(_sockets[index]);
00337 missing--;
00338 if(tag != 314156)
00339 {
00340 FFATAL(("Stream out of sync in SockConnection\n"));
00341 throw ReadError("Stream out of sync");
00342 }
00343 }
00344 }
00345 }
00346 }
00347 catch(SocketException &e)
00348 {
00349 throw ReadError(e.what());
00350 }
00351 return true;
00352 }
00353
00356 void GroupSockConnection::signal(void) throw (WriteError)
00357 {
00358 UInt32 tag=osghtonl(314156);
00359 UInt32 index;
00360
00361 try
00362 {
00363 for(index = 0 ; index<_sockets.size() ; ++index)
00364 _sockets[index].send(&tag,sizeof(tag));
00365 }
00366 catch(SocketError &e)
00367 {
00368 throw WriteError(e.what());
00369 }
00370 }
00371
00372
00373
00377 GroupConnection *GroupSockConnection::create(void)
00378 {
00379 return new GroupSockConnection();
00380 }
00381
00382
00383
00384
00392 void GroupSockConnection::read(MemoryHandle mem,UInt32 size)
00393 {
00394 int len;
00395
00396
00397 len=_sockets[_readIndex].recv(mem,size);
00398 if(len==0)
00399 {
00400
00401 throw ReadError("Channel closed");
00402 }
00403 }
00404
00412 void GroupSockConnection::readBuffer()
00413 {
00414 int size;
00415 int len;
00416
00417
00418 len=_sockets[_readIndex].recv(&_socketReadBuffer[0],sizeof(SocketBufferHeader));
00419 if(len==0)
00420 throw ReadError("Channel closed");
00421
00422 size=osgntohl(((SocketBufferHeader*)&_socketReadBuffer[0])->size);
00423 len=_sockets[_readIndex].recv(&_socketReadBuffer[sizeof(SocketBufferHeader)],
00424 size);
00425 if(len==0)
00426 throw ReadError("Channel closed");
00427 readBufBegin()->setDataSize(size);
00428 }
00429
00437 void GroupSockConnection::write(MemoryHandle mem,UInt32 size)
00438 {
00439 Int32 index;
00440
00441 try
00442 {
00443
00444 for(index = 0 ; index < _sockets.size() ; ++index)
00445 _sockets[index].send(mem,size);
00446 }
00447 catch(SocketException &e)
00448 {
00449 throw WriteError(e.what());
00450 }
00451 }
00452
00458 void GroupSockConnection::writeBuffer(void)
00459 {
00460 Int32 index;
00461 UInt32 size = writeBufBegin()->getDataSize();
00462
00463 ((SocketBufferHeader*)&_socketWriteBuffer[0])->size=osghtonl(size);
00464 if(size)
00465 {
00466
00467 for(index = 0 ; index < _sockets.size() ; ++index)
00468 {
00469
00470 _sockets[index].send(&_socketWriteBuffer[0],
00471 size+sizeof(SocketBufferHeader));
00472 }
00473 }
00474 }
00475
00476
00477
00478
00481 bool GroupSockConnection::connectSocket(StreamSocket &socket,
00482 std::string address,
00483 Time timeout)
00484 {
00485 std::string host="unknown";
00486 int port=0;
00487 Time startTime = getSystemTime();
00488 bool connected=false;
00489
00490 int pos = address.find(':');
00491 if(pos>=0)
00492 {
00493 host = std::string(address,0,pos);
00494 port = atoi(std::string(address,pos+1,std::string::npos).c_str());
00495 }
00496 else
00497 {
00498 host = address;
00499 }
00500
00501 socket.open();
00502 socket.setDelay(false);
00503 socket.setReadBufferSize(1048576);
00504 socket.setWriteBufferSize(1048576);
00505 while(!connected &&
00506 (timeout == -1 || (getSystemTime()-startTime) < timeout))
00507 {
00508 try
00509 {
00510 socket.connect(SocketAddress(host.c_str(),port));
00511 connected = true;
00512 }
00513 catch(...)
00514 {
00515 }
00516 }
00517 if(connected)
00518 return true;
00519 else
00520 return false;
00521 }
00522
00525 bool GroupSockConnection::acceptSocket(StreamSocket &accept,
00526 StreamSocket &from,
00527 Time timeout)
00528 {
00529 if(!accept.waitReadable(timeout))
00530 return false;
00531 from=accept.accept();
00532 from.setDelay(false);
00533 from.setReadBufferSize(1048576);
00534 from.setWriteBufferSize(1048576);
00535 return true;
00536 }
00537
00538
00539
00540
00541 ConnectionType GroupSockConnection::_type(
00542 &GroupSockConnection::create,
00543 "StreamSock");
00544
00545
00546
00547
00548 #ifdef __sgi
00549 #pragma set woff 1174
00550 #endif
00551
00552 #ifdef OSG_LINUX_ICC
00553 #pragma warning( disable : 177 )
00554 #endif
00555
00556 namespace
00557 {
00558 static Char8 cvsid_cpp [] = "@(#)$Id: $";
00559 static Char8 cvsid_hpp [] = OSG_GROUPSOCKCONNECTION_HEADER_CVSID;
00560 }
00561