#include <OSGGroupSockConnection.h>
Inheritance diagram for osg::GroupSockConnection:

Public Member Functions | |
Constructors | |
| * | GroupSockConnection (void) |
| virtual | ~GroupSockConnection (void) |
type info | |
| *virtual const ConnectionType * | getType (void) |
connection | |
| *virtual Channel | connectPoint (const std::string &address, Time timeout=-1) |
| virtual void | disconnect (Channel channel) |
| virtual Channel | acceptPoint (Time timeout=-1) |
| virtual std::string | bind (const std::string &interf) |
params | |
| *virtual void | setParams (const std::string ¶ms) |
channel handling | |
| *virtual Channel | selectChannel (Time timeout=-1) throw (ReadError) |
channel handling | |
| *UInt32 | getChannelCount (void) |
| void | addSelection (Channel channel) |
| void | subSelection (Channel channel) |
| void | clearSelection (void) |
| void | resetSelection (void) |
| UInt32 | getSelectionCount (void) |
group address | |
| *void | setDestination (const std::string &destination) |
| std::string | getDestination (void) |
interface | |
| *const std::string & | getInterface (void) |
| void | setInterface (const std::string &interf) |
Put | |
| *void | put (void const *src, UInt32 size) |
| void | putAndFree (MemoryHandle src, UInt32 size) |
| void | putValue (const bool &value) |
| void | putValue (const UInt8 &value) |
| void | putValue (const UInt16 &value) |
| void | putValue (const UInt32 &value) |
| void | putValue (const UInt64 &value) |
| void | putValue (const Int8 &value) |
| void | putValue (const Int16 &value) |
| void | putValue (const Int32 &value) |
| void | putValue (const Int64 &value) |
| void | putValue (const Real16 &value) |
| void | putValue (const Real32 &value) |
| void | putValue (const Real64 &value) |
| void | putValue (const Real128 &value) |
| void | putValue (const std::string &value) |
| void | putValues (const bool *value, UInt32 size) |
| void | putValues (const UInt8 *value, UInt32 size) |
| void | putValues (const UInt16 *value, UInt32 size) |
| void | putValues (const UInt32 *value, UInt32 size) |
| void | putValues (const UInt64 *value, UInt32 size) |
| void | putValues (const Int8 *value, UInt32 size) |
| void | putValues (const Int16 *value, UInt32 size) |
| void | putValues (const Int32 *value, UInt32 size) |
| void | putValues (const Int64 *value, UInt32 size) |
| void | putValues (const Real16 *value, UInt32 size) |
| void | putValues (const Real32 *value, UInt32 size) |
| void | putValues (const Real64 *value, UInt32 size) |
| void | putValues (const Real128 *value, UInt32 size) |
| void | putValues (const std::string *value, UInt32 size) |
Get | |
| *void | get (void *dst, UInt32 size) |
| void | getAndAlloc (MemoryHandle &src, UInt32 size) |
| void | getValue (bool &value) |
| void | getValue (UInt8 &value) |
| void | getValue (UInt16 &value) |
| void | getValue (UInt32 &value) |
| void | getValue (UInt64 &value) |
| void | getValue (Int8 &value) |
| void | getValue (Int16 &value) |
| void | getValue (Int32 &value) |
| void | getValue (Int64 &value) |
| void | getValue (Real16 &value) |
| void | getValue (Real32 &value) |
| void | getValue (Real64 &value) |
| void | getValue (Real128 &value) |
| void | getValue (std::string &value) |
| void | getValues (bool *value, UInt32 size) |
| void | getValues (UInt8 *value, UInt32 size) |
| void | getValues (UInt16 *value, UInt32 size) |
| void | getValues (UInt32 *value, UInt32 size) |
| void | getValues (UInt64 *value, UInt32 size) |
| void | getValues (Int8 *value, UInt32 size) |
| void | getValues (Int16 *value, UInt32 size) |
| void | getValues (Int32 *value, UInt32 size) |
| void | getValues (Int64 *value, UInt32 size) |
| void | getValues (Real16 *value, UInt32 size) |
| void | getValues (Real32 *value, UInt32 size) |
| void | getValues (Real64 *value, UInt32 size) |
| void | getValues (Real128 *value, UInt32 size) |
| void | getValues (std::string *value, UInt32 size) |
Helper | |
| *virtual void | forceCopy (void) |
| virtual void | forceDirectIO (void) |
| void | flush (void) |
| write data not yet written | |
| void | setNetworkOrder (bool value) |
| bool | getNetworkOrder (void) |
Static Public Member Functions | |
create | |
| *static GroupConnection * | create (void) |
| create conneciton | |
Public Attributes | |
public types | |
| *typedef Int32 | Channel |
Protected Types | |
| typedef std::vector< MemoryBlock > | BuffersT |
| typedef std::list< MemoryHandle > | FreeMemT |
Protected Member Functions | |
IO Implementation | |
| *virtual void | read (MemoryHandle mem, UInt32 size) |
| virtual void | readBuffer (void) |
| virtual void | write (MemoryHandle mem, UInt32 size) |
| virtual void | writeBuffer (void) |
synchronisation | |
| *virtual bool | wait (Time timeout) throw (ReadError) |
| virtual void | signal (void) throw (WriteError) |
internal channel handling | |
| *Channel | newChannelIndex (ChannelIndex index) |
| void | delChannelIndex (ChannelIndex index) |
channel index mapping | |
| *ChannelIndex | channelToIndex (Channel channel) const |
| Channel | indexToChannel (ChannelIndex index) const |
Read | |
| *BuffersT::iterator | readBufBegin (void) |
| BuffersT::iterator | readBufEnd (void) |
| void | readBufAdd (MemoryHandle mem, UInt32 size, UInt32 dataSize=0) |
| void | readBufClear (void) |
Write | |
| *BuffersT::iterator | writeBufBegin (void) |
| BuffersT::iterator | writeBufEnd (void) |
| void | writeBufAdd (MemoryHandle mem, UInt32 size, UInt32 dataSize=0) |
| void | writeBufClear (void) |
Helper | |
| *bool | isReadBufferEmpty (void) |
Protected Attributes | |
members | |
| *StreamSocket | _acceptSocket |
| std::vector< StreamSocket > | _sockets |
| ChannelIndex | _readIndex |
| std::vector< UInt8 > | _socketReadBuffer |
| std::vector< UInt8 > | _socketWriteBuffer |
protected fields | |
| *std::vector< UInt8 > | _selection |
| std::string | _destination |
| std::set< Channel > | _disconnectedChannel |
protected types | |
| *typedef Int32 | ChannelIndex |
protected members | |
| *std::string | _interface |
Member | |
| *BuffersT | _readBuffers |
| BuffersT | _writeBuffers |
| BuffersT | _zeroCopyBuffers |
| UInt32 | _zeroCopyThreshold |
| FreeMemT | _freeMem |
| BuffersT::iterator | _currentReadBuffer |
| UInt32 | _currentReadBufferPos |
| BuffersT::iterator | _currentWriteBuffer |
| UInt32 | _currentWriteBufferPos |
| bool | _networkOrder |
Private Types | |
| typedef GroupConnection | Inherited |
Private Member Functions | |
| GroupSockConnection (const GroupSockConnection &source) | |
| GroupSockConnection & | operator= (const GroupSockConnection &source) |
Static Private Member Functions | |
internal methods | |
| *static bool | connectSocket (StreamSocket &socket, std::string address, Time timeout) |
| static bool | acceptSocket (StreamSocket &accept, StreamSocket &from, Time timeout) |
Static Private Attributes | |
static type | |
| *static ConnectionType | _type |
Friends | |
| class | PointSockConnection |
Classes | |
| struct | SocketBufferHeader |
Definition at line 61 of file OSGGroupSockConnection.h.
|
|
Reimplemented from osg::GroupConnection. Reimplemented in osg::GroupMCastConnection, and osg::GroupSockPipeline. Definition at line 181 of file OSGGroupSockConnection.h. |
|
|
Definition at line 213 of file OSGBinaryDataHandler.h. |
|
|
Definition at line 214 of file OSGBinaryDataHandler.h. |
|
|
Constructor Definition at line 68 of file OSGGroupSockConnection.cpp. References _acceptSocket, _socketReadBuffer, _socketWriteBuffer, osg::StreamSocket::open(), osg::BinaryDataHandler::readBufAdd(), osg::Socket::setReusePort(), and osg::BinaryDataHandler::writeBufAdd(). Referenced by create(). 00068 : 00069 GroupConnection(0) 00070 { 00071 _acceptSocket.open(); 00072 _acceptSocket.setReusePort(true); 00073 00074 _socketReadBuffer.resize(131071); 00075 _socketWriteBuffer.resize( _socketReadBuffer.size() ); 00076 // reserve first bytes for buffer size 00077 readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)], 00078 _socketReadBuffer.size() -sizeof(SocketBufferHeader)); 00079 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)], 00080 _socketWriteBuffer.size()-sizeof(SocketBufferHeader)); 00081 }
|
|
|
Destructor Definition at line 85 of file OSGGroupSockConnection.cpp. References _acceptSocket, _sockets, and osg::StreamSocket::close(). 00086 { 00087 // close and remove sockets 00088 while(_sockets.size()) 00089 { 00090 try 00091 { 00092 _sockets.begin()->close(); 00093 _sockets.erase(_sockets.begin()); 00094 } 00095 catch(...) 00096 { 00097 } 00098 } 00099 _acceptSocket.close(); 00100 }
|
|
|
|
|
|
get connection type Implements osg::GroupConnection. Reimplemented in osg::GroupMCastConnection, and osg::GroupSockPipeline. Definition at line 104 of file OSGGroupSockConnection.cpp. References _type. 00105 { 00106 return &_type; 00107 }
|
|
||||||||||||
|
connect to the given point. If timeout is reached, -1 is returned Implements osg::Connection. Reimplemented in osg::GroupMCastConnection, and osg::GroupSockPipeline. Definition at line 115 of file OSGGroupSockConnection.cpp. References _readIndex, _sockets, osg::Connection::Channel, connectSocket(), and osg::GroupConnection::newChannelIndex(). Referenced by osg::GroupSockPipeline::connectPoint(), and osg::GroupMCastConnection::connectPoint(). 00118 { 00119 Channel channel = -1; 00120 StreamSocket socket; 00121 if(connectSocket(socket,address,timeout)) 00122 { 00123 channel = newChannelIndex(_sockets.size()); 00124 _sockets.push_back(socket); 00125 _readIndex = 0; 00126 } 00127 return channel; 00128 }
|
|
|
disconnect the given channel Implements osg::GroupConnection. Reimplemented in osg::GroupMCastConnection, and osg::GroupSockPipeline. Definition at line 132 of file OSGGroupSockConnection.cpp. References _readIndex, _sockets, osg::GroupConnection::ChannelIndex, osg::GroupConnection::channelToIndex(), and osg::GroupConnection::delChannelIndex(). Referenced by osg::GroupSockPipeline::disconnect(), and osg::GroupMCastConnection::disconnect(). 00133 { 00134 ChannelIndex index = channelToIndex(channel); 00135 try 00136 { 00137 _sockets[index].close(); 00138 } 00139 catch(...) 00140 { 00141 } 00142 _sockets.erase(_sockets.begin() + index); 00143 delChannelIndex(index); 00144 _readIndex = 0; 00145 }
|
|
|
accept an icomming point connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout Implements osg::Connection. Reimplemented in osg::GroupMCastConnection, and osg::GroupSockPipeline. Definition at line 150 of file OSGGroupSockConnection.cpp. References _acceptSocket, _readIndex, _sockets, acceptSocket(), osg::Connection::Channel, and osg::GroupConnection::newChannelIndex(). Referenced by osg::GroupSockPipeline::acceptPoint(), and osg::GroupMCastConnection::acceptPoint(). 00151 { 00152 StreamSocket from; 00153 if(GroupSockConnection::acceptSocket(_acceptSocket,from,timeout)) 00154 { 00155 Channel channel = newChannelIndex(_sockets.size()); 00156 _sockets.push_back(from); 00157 _readIndex = 0; 00158 return channel; 00159 } 00160 else 00161 { 00162 return -1; 00163 } 00164 }
|
|
|
bind the connection to an network interface. The address is returned, on wich the port could be connected. The interface is determined by the connection interface filed and the address parameter. Address can be empty, wich means to use a free port or address can contain a port number. Implements osg::Connection. Definition at line 172 of file OSGGroupSockConnection.cpp. References _acceptSocket, osg::Socket::bind(), osg::Socket::getAddress(), osg::SocketAddress::getHost(), osg::Connection::getInterface(), osg::SocketAddress::getPort(), osg::Socket::listen(), osg::Socket::setReusePort(), and SINFO. 00173 { 00174 int port=0; 00175 char localhost[256]; 00176 char host[256]; 00177 char portStr[256]; 00178 std::string interf; 00179 std::string boundedAddress; 00180 00181 // get local host name 00182 gethostname(localhost,255); 00183 if(!getInterface().empty()) 00184 interf = getInterface(); 00185 else 00186 interf = localhost; 00187 // parse address 00188 if(!address.empty()) 00189 if(sscanf(address.c_str(),"%*[^:]:%d",&port) != 1) 00190 if(sscanf(address.c_str(),":%d",&port) != 1) 00191 port = 0; 00192 // bind port 00193 _acceptSocket.setReusePort(true); 00194 _acceptSocket.bind(SocketAddress(interf.c_str(),port)); 00195 SINFO << "Connection bound to " 00196 << _acceptSocket.getAddress().getHost() << ":" 00197 << _acceptSocket.getAddress().getPort() << std::endl; 00198 _acceptSocket.listen(); 00199 // create address 00200 sprintf(portStr,"%d",_acceptSocket.getAddress().getPort()); 00201 return interf + ":" + portStr; 00202 }
|
|
|
parse the params string. Reimplemented from osg::Connection. Reimplemented in osg::GroupMCastConnection. Definition at line 206 of file OSGGroupSockConnection.cpp. References _socketReadBuffer, _socketWriteBuffer, FINFO, osg::BinaryDataHandler::readBufAdd(), osg::BinaryDataHandler::readBufClear(), osg::BinaryDataHandler::writeBufAdd(), and osg::BinaryDataHandler::writeBufClear(). Referenced by osg::GroupMCastConnection::setParams(). 00207 { 00208 if(params.empty()) 00209 return; 00210 00211 std::string option = "bufferSize="; 00212 std::string::size_type i = 0; 00213 if((i=params.find(option)) != std::string::npos) 00214 { 00215 std::string str = params.substr(i + option.size()); 00216 00217 std::stringstream ss; 00218 std::string::size_type j = 0; 00219 while(j < str.length() && str[j] != ',' && isdigit(str[j])) 00220 { 00221 ss << str[j++]; 00222 } 00223 UInt32 bufferSize; 00224 ss >> bufferSize; 00225 00226 // clear old buffer. 00227 readBufClear(); 00228 writeBufClear(); 00229 00230 _socketReadBuffer.resize(bufferSize); 00231 _socketWriteBuffer.resize(_socketReadBuffer.size()); 00232 00233 // reserve first bytes for buffer size 00234 readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)], 00235 _socketReadBuffer.size() -sizeof(SocketBufferHeader)); 00236 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)], 00237 _socketWriteBuffer.size()-sizeof(SocketBufferHeader)); 00238 00239 FINFO(("GroupSockConnection::setParams : setting buffer size to %u.\n", bufferSize)); 00240 } 00241 }
|
|
|
select the next channel for reading. If timeout is not -1 then -1 is returned if timeout is reached Implements osg::Connection. Definition at line 249 of file OSGGroupSockConnection.cpp. References FFATAL, osg::SocketSelection::isSetRead(), osg::SocketSelection::select(), and osg::SocketSelection::setRead(). 00251 { 00252 Int32 maxnread=0,nread; 00253 ChannelIndex index; 00254 SocketSelection selection,result; 00255 00256 // if there is data in the read buffer, return current channel 00257 if(_zeroCopyThreshold != 1 && 00258 _currentReadBuffer != readBufEnd()) 00259 { 00260 FFATAL(("Channel change ignores data in current buffer")) 00261 return indexToChannel(_readIndex); 00262 } 00263 00264 if(_selection[_readIndex] && 00265 _sockets[_readIndex].getAvailable()) 00266 { 00267 return indexToChannel(_readIndex);; 00268 } 00269 00270 // wait for first socket to deliver data 00271 for(index = 0 ; index < _sockets.size() ; ++index) 00272 { 00273 if(_selection[index]) 00274 selection.setRead(_sockets[index]); 00275 } 00276 00277 try 00278 { 00279 // select ok ? 00280 if(!selection.select(timeout,result)) 00281 return -1; 00282 00283 // use socket with most data 00284 for(index = 0 ; index < _sockets.size() ; ++index) 00285 { 00286 if(result.isSetRead(_sockets[index])) 00287 { 00288 nread=_sockets[index].getAvailable(); 00289 if(maxnread < nread) 00290 { 00291 maxnread = nread; 00292 _readIndex=index; 00293 } 00294 } 00295 } 00296 } 00297 catch(SocketException &e) 00298 { 00299 throw ReadError(e.what()); 00300 } 00301 00302 // return channel id 00303 return indexToChannel(_readIndex); 00304 }
|
|
|
Reimplemented in osg::GroupMCastConnection, and osg::GroupSockPipeline. Definition at line 377 of file OSGGroupSockConnection.cpp. References GroupSockConnection(). 00378 { 00379 return new GroupSockConnection(); 00380 }
|
|
||||||||||||
|
Read data into given memory Read data form the current read socket. The read socket is that socket, that was selectet in selectChannel. Reimplemented from osg::BinaryDataHandler. Definition at line 392 of file OSGGroupSockConnection.cpp. References _readIndex, and _sockets. 00393 { 00394 int len; 00395 00396 // read data 00397 len=_sockets[_readIndex].recv(mem,size); 00398 if(len==0) 00399 { 00400 // throw ChannelClosed(indexToChannel(_readIndex)); 00401 throw ReadError("Channel closed"); 00402 } 00403 }
|
|
|
Read next data block The stream connection uses only BinaryDataHandler buffer. If more then one buffer is present, then this methode must be changed! Reimplemented from osg::BinaryDataHandler. Definition at line 412 of file OSGGroupSockConnection.cpp. References _readIndex, _socketReadBuffer, _sockets, osg::osgntohl(), and osg::BinaryDataHandler::readBufBegin(). 00413 { 00414 int size; 00415 int len; 00416 00417 // read buffer header 00418 len=_sockets[_readIndex].recv(&_socketReadBuffer[0],sizeof(SocketBufferHeader)); 00419 if(len==0) 00420 throw ReadError("Channel closed"); 00421 // read remaining data 00422 size=osgntohl(((SocketBufferHeader*)&_socketReadBuffer[0])->size); 00423 len=_sockets[_readIndex].recv(&_socketReadBuffer[sizeof(SocketBufferHeader)], 00424 size); 00425 if(len==0) 00426 throw ReadError("Channel closed"); 00427 readBufBegin()->setDataSize(size); 00428 }
|
|
||||||||||||
|
Write data to all destinations
Reimplemented from osg::BinaryDataHandler. Reimplemented in osg::GroupMCastConnection, and osg::GroupSockPipeline. Definition at line 437 of file OSGGroupSockConnection.cpp. References _sockets. 00438 { 00439 Int32 index; 00440 00441 try 00442 { 00443 // write to all connected sockets 00444 for(index = 0 ; index < _sockets.size() ; ++index) 00445 _sockets[index].send(mem,size); 00446 } 00447 catch(SocketException &e) 00448 { 00449 throw WriteError(e.what()); 00450 } 00451 }
|
|
|
Write buffer Write blocksize and data. Reimplemented from osg::BinaryDataHandler. Reimplemented in osg::GroupMCastConnection, and osg::GroupSockPipeline. Definition at line 458 of file OSGGroupSockConnection.cpp. References _sockets, _socketWriteBuffer, osg::osghtonl(), and osg::BinaryDataHandler::writeBufBegin(). 00459 { 00460 Int32 index; 00461 UInt32 size = writeBufBegin()->getDataSize(); 00462 // write size to header 00463 ((SocketBufferHeader*)&_socketWriteBuffer[0])->size=osghtonl(size); 00464 if(size) 00465 { 00466 // write data to all sockets 00467 for(index = 0 ; index < _sockets.size() ; ++index) 00468 { 00469 // write whole block 00470 _sockets[index].send(&_socketWriteBuffer[0], 00471 size+sizeof(SocketBufferHeader)); 00472 } 00473 } 00474 }
|
|
|
wait for signal Implements osg::Connection. Reimplemented in osg::GroupMCastConnection. Definition at line 311 of file OSGGroupSockConnection.cpp. References osg::SocketSelection::clearRead(), FFATAL, osg::SocketSelection::isSetRead(), osg::osgntohl(), osg::SocketSelection::select(), and osg::SocketSelection::setRead(). 00312 { 00313 UInt32 len; 00314 UInt32 index; 00315 UInt32 tag=314156; 00316 UInt32 missing = _sockets.size(); 00317 SocketSelection selection,result; 00318 00319 for(index = 0 ; index < _sockets.size() ; ++index) 00320 selection.setRead(_sockets[index]); 00321 00322 try 00323 { 00324 while(missing) 00325 { 00326 if(!selection.select(timeout,result)) 00327 return false; 00328 for(index = 0 ; index < _sockets.size() ; ++index) 00329 { 00330 if(result.isSetRead(_sockets[index])) 00331 { 00332 len = _sockets[index].recv(&tag,sizeof(tag)); 00333 tag = osgntohl(tag); 00334 if(len == 0) 00335 throw ReadError("Channel closed"); 00336 selection.clearRead(_sockets[index]); 00337 missing--; 00338 if(tag != 314156) 00339 { 00340 FFATAL(("Stream out of sync in SockConnection\n")); 00341 throw ReadError("Stream out of sync"); 00342 } 00343 } 00344 } 00345 } 00346 } 00347 catch(SocketException &e) 00348 { 00349 throw ReadError(e.what()); 00350 } 00351 return true; 00352 }
|
|
|
send signal Implements osg::Connection. Reimplemented in osg::GroupMCastConnection. Definition at line 356 of file OSGGroupSockConnection.cpp. References _sockets, and osg::osghtonl(). 00357 { 00358 UInt32 tag=osghtonl(314156); 00359 UInt32 index; 00360 00361 try 00362 { 00363 for(index = 0 ; index<_sockets.size() ; ++index) 00364 _sockets[index].send(&tag,sizeof(tag)); 00365 } 00366 catch(SocketError &e) 00367 { 00368 throw WriteError(e.what()); 00369 } 00370 }
|
|
||||||||||||||||
|
connect two sockets until success or timeout Definition at line 481 of file OSGGroupSockConnection.cpp. References osg::Socket::connect(), osg::getSystemTime(), osg::StreamSocket::open(), osg::StreamSocket::setDelay(), osg::Socket::setReadBufferSize(), osg::Socket::setWriteBufferSize(), and startTime. Referenced by osg::PointSockConnection::connectGroup(), osg::PointSockConnection::connectPoint(), and connectPoint(). 00484 { 00485 std::string host="unknown"; 00486 int port=0; 00487 Time startTime = getSystemTime(); 00488 bool connected=false; 00489 00490 int pos = address.find(':'); 00491 if(pos>=0) 00492 { 00493 host = std::string(address,0,pos); 00494 port = atoi(std::string(address,pos+1,std::string::npos).c_str()); 00495 } 00496 else 00497 { 00498 host = address; 00499 } 00500 00501 socket.open(); 00502 socket.setDelay(false); 00503 socket.setReadBufferSize(1048576); 00504 socket.setWriteBufferSize(1048576); 00505 while(!connected && 00506 (timeout == -1 || (getSystemTime()-startTime) < timeout)) 00507 { 00508 try 00509 { 00510 socket.connect(SocketAddress(host.c_str(),port)); 00511 connected = true; 00512 } 00513 catch(...) 00514 { 00515 } 00516 } 00517 if(connected) 00518 return true; 00519 else 00520 return false; 00521 }
|
|