#include <OSGGroupSockPipeline.h>
Inheritance diagram for osg::GroupSockPipeline:

Public Member Functions | |
connection | |
| virtual std::string | bind (const std::string &interf) |
channel handling | |
| *virtual Channel | selectChannel (Time timeout=-1) throw (ReadError) |
channel handling | |
| *UInt32 | getChannelCount (void) |
| void | addSelection (Channel channel) |
| void | subSelection (Channel channel) |
| void | clearSelection (void) |
| void | resetSelection (void) |
| UInt32 | getSelectionCount (void) |
group address | |
| *void | setDestination (const std::string &destination) |
| std::string | getDestination (void) |
Constructors | |
| * | GroupSockPipeline (void) |
| virtual | ~GroupSockPipeline (void) |
type info | |
| *virtual const ConnectionType * | getType (void) |
connection | |
| *virtual Channel | connectPoint (const std::string &address, Time timeout=-1) |
| virtual void | disconnect (Channel channel) |
| virtual Channel | acceptPoint (Time timeout=-1) |
connection | |
| virtual std::string | bind (const std::string &interf) |
params | |
| *virtual void | setParams (const std::string ¶ms) |
channel handling | |
| *virtual Channel | selectChannel (Time timeout=-1) throw (ReadError) |
channel handling | |
| *UInt32 | getChannelCount (void) |
| void | addSelection (Channel channel) |
| void | subSelection (Channel channel) |
| void | clearSelection (void) |
| void | resetSelection (void) |
| UInt32 | getSelectionCount (void) |
group address | |
| *void | setDestination (const std::string &destination) |
| std::string | getDestination (void) |
Static Public Member Functions | |
create | |
| *static GroupConnection * | create (void) |
| create conneciton | |
Protected Types | |
| typedef std::vector< MemoryBlock > | BuffersT |
| typedef std::list< MemoryHandle > | FreeMemT |
Protected Member Functions | |
IO Implementation | |
| *virtual void | read (MemoryHandle mem, UInt32 size) |
| virtual void | readBuffer (void) |
internal channel handling | |
| *Channel | newChannelIndex (ChannelIndex index) |
| void | delChannelIndex (ChannelIndex index) |
channel index mapping | |
| *ChannelIndex | channelToIndex (Channel channel) const |
| Channel | indexToChannel (ChannelIndex index) const |
IO Implementation | |
| *virtual void | write (MemoryHandle mem, UInt32 size) |
| virtual void | writeBuffer (void) |
IO Implementation | |
| *virtual void | read (MemoryHandle mem, UInt32 size) |
| virtual void | readBuffer (void) |
synchronisation | |
| *virtual bool | wait (Time timeout) throw (ReadError) |
| virtual void | signal (void) throw (WriteError) |
internal channel handling | |
| *Channel | newChannelIndex (ChannelIndex index) |
| void | delChannelIndex (ChannelIndex index) |
channel index mapping | |
| *ChannelIndex | channelToIndex (Channel channel) const |
| Channel | indexToChannel (ChannelIndex index) const |
Protected Attributes | |
members | |
| *StreamSocket | _acceptSocket |
| std::vector< StreamSocket > | _sockets |
| ChannelIndex | _readIndex |
| std::vector< UInt8 > | _socketReadBuffer |
| std::vector< UInt8 > | _socketWriteBuffer |
protected fields | |
| *std::vector< UInt8 > | _selection |
| std::string | _destination |
| std::set< Channel > | _disconnectedChannel |
protected types | |
| *typedef Int32 | ChannelIndex |
Members | |
| *StreamSocket | _next |
| bool | _initialized |
members | |
| *StreamSocket | _acceptSocket |
| std::vector< StreamSocket > | _sockets |
| ChannelIndex | _readIndex |
| std::vector< UInt8 > | _socketReadBuffer |
| std::vector< UInt8 > | _socketWriteBuffer |
protected fields | |
| *std::vector< UInt8 > | _selection |
| std::string | _destination |
| std::set< Channel > | _disconnectedChannel |
protected types | |
| *typedef Int32 | ChannelIndex |
Private Types | |
| typedef GroupSockConnection | Inherited |
Private Member Functions | |
| GroupSockPipeline (const GroupSockPipeline &source) | |
| GroupSockPipeline & | operator= (const GroupSockPipeline &source) |
private helpers | |
| *void | initialize (void) |
Static Private Attributes | |
static members | |
| *static ConnectionType | _type |
Friends | |
| class | PointSockPipeline |
Definition at line 62 of file OSGGroupSockPipeline.h.
|
|
Reimplemented from osg::GroupSockConnection. Definition at line 141 of file OSGGroupSockPipeline.h. |
|
|
Definition at line 213 of file OSGBinaryDataHandler.h. |
|
|
Definition at line 214 of file OSGBinaryDataHandler.h. |
|
|
Constructor Definition at line 70 of file OSGGroupSockPipeline.cpp. References _next, and osg::StreamSocket::open(). Referenced by create(). 00070 : 00071 Inherited(), 00072 _initialized(false) 00073 { 00074 _next.open(); 00075 }
|
|
|
Destructor Definition at line 79 of file OSGGroupSockPipeline.cpp. References _next, and osg::StreamSocket::close().
|
|
|
|
|
|
get connection type Reimplemented from osg::GroupSockConnection. Definition at line 86 of file OSGGroupSockPipeline.cpp. References _type. 00087 { 00088 return &_type; 00089 }
|
|
||||||||||||
|
connect to the given point. If timeout is reached, -1 is returned Reimplemented from osg::GroupSockConnection. Definition at line 97 of file OSGGroupSockPipeline.cpp. References osg::Connection::Channel, and osg::GroupSockConnection::connectPoint(). 00100 { 00101 Channel channel = Inherited::connectPoint(address,timeout); 00102 return channel; 00103 }
|
|
|
disconnect the given channel Reimplemented from osg::GroupSockConnection. Definition at line 107 of file OSGGroupSockPipeline.cpp. References osg::GroupSockConnection::disconnect(). 00108 { 00109 Inherited::disconnect(channel); 00110 }
|
|
|
accept an icomming point connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout Reimplemented from osg::GroupSockConnection. Definition at line 115 of file OSGGroupSockPipeline.cpp. References osg::GroupSockConnection::acceptPoint(), and osg::Connection::Channel. 00116 { 00117 Connection::Channel channel = Inherited::acceptPoint(timeout); 00118 return channel; 00119 }
|
|
|
Reimplemented from osg::GroupSockConnection. Definition at line 126 of file OSGGroupSockPipeline.cpp. References GroupSockPipeline(). 00127 { 00128 return new GroupSockPipeline(); 00129 }
|
|
||||||||||||
|
Write data to all destinations
Reimplemented from osg::GroupSockConnection. Definition at line 141 of file OSGGroupSockPipeline.cpp. References _initialized, _next, osg::GroupConnection::getChannelCount(), initialize(), and osg::Socket::send(). 00142 { 00143 if(!_initialized) 00144 initialize(); 00145 00146 try 00147 { 00148 if(getChannelCount()) 00149 _next.send(mem,size); 00150 } 00151 catch(SocketException &e) 00152 { 00153 throw WriteError(e.what()); 00154 } 00155 }
|
|
|
Write buffer Write blocksize and data. Reimplemented from osg::GroupSockConnection. Definition at line 162 of file OSGGroupSockPipeline.cpp. References _initialized, _next, osg::GroupSockConnection::_socketWriteBuffer, initialize(), osg::osghtonl(), osg::Socket::send(), and osg::BinaryDataHandler::writeBufBegin(). 00163 { 00164 Int32 index; 00165 00166 if(!_initialized) 00167 initialize(); 00168 00169 UInt32 size = writeBufBegin()->getDataSize(); 00170 // write size to header 00171 ((SocketBufferHeader*)&_socketWriteBuffer[0])->size=osghtonl(size); 00172 if(size) 00173 { 00174 _next.send(&_socketWriteBuffer[0], 00175 size+sizeof(SocketBufferHeader)); 00176 } 00177 }
|
|
|
initialize pipeline Definition at line 184 of file OSGGroupSockPipeline.cpp. References _initialized, _next, osg::GroupSockConnection::_sockets, osg::BinaryMessage::clear(), osg::Socket::connect(), osg::BinaryMessage::getString(), osg::BinaryMessage::getUInt32(), osg::BinaryMessage::putString(), and osg::BinaryMessage::putUInt32(). Referenced by write(), and writeBuffer(). 00185 { 00186 UInt32 index,len; 00187 UInt32 nextPort; 00188 std::string nextHost; 00189 BinaryMessage message; 00190 00191 for(index = 0 ; index<_sockets.size() ; ++index) 00192 { 00193 len = _sockets[index].recv(message); 00194 if(len == 0) 00195 throw ReadError("Channel closed\n"); 00196 nextHost = message.getString(); 00197 nextPort = message.getUInt32(); 00198 00199 message.clear(); 00200 if(index == 0) 00201 { 00202 message.putUInt32(true); 00203 _sockets[_sockets.size()-1].send(message); 00204 for(;;) 00205 { 00206 try 00207 { 00208 _next.connect(SocketAddress(nextHost.c_str(), 00209 nextPort)); 00210 break; 00211 } 00212 catch(...) 00213 { 00214 } 00215 } 00216 } 00217 else 00218 { 00219 message.clear(); 00220 message.putUInt32(false); 00221 message.putString(nextHost); 00222 message.putUInt32(nextPort); 00223 _sockets[index-1].send(message); 00224 } 00225 } 00226 _initialized = true; 00227 }
|
|
|
|
|
|
bind the connection to an network interface. The address is returned, on wich the port could be connected. The interface is determined by the connection interface filed and the address parameter. Address can be empty, wich means to use a free port or address can contain a port number. Implements osg::Connection. Definition at line 172 of file OSGGroupSockConnection.cpp. References osg::GroupSockConnection::_acceptSocket, osg::Socket::bind(), osg::Socket::getAddress(), osg::SocketAddress::getHost(), osg::Connection::getInterface(), osg::SocketAddress::getPort(), osg::Socket::listen(), osg::Socket::setReusePort(), and SINFO. 00173 { 00174 int port=0; 00175 char localhost[256]; 00176 char host[256]; 00177 char portStr[256]; 00178 std::string interf; 00179 std::string boundedAddress; 00180 00181 // get local host name 00182 gethostname(localhost,255); 00183 if(!getInterface().empty()) 00184 interf = getInterface(); 00185 else 00186 interf = localhost; 00187 // parse address 00188 if(!address.empty()) 00189 if(sscanf(address.c_str(),"%*[^:]:%d",&port) != 1) 00190 if(sscanf(address.c_str(),":%d",&port) != 1) 00191 port = 0; 00192 // bind port 00193 _acceptSocket.setReusePort(true); 00194 _acceptSocket.bind(SocketAddress(interf.c_str(),port)); 00195 SINFO << "Connection bound to " 00196 << _acceptSocket.getAddress().getHost() << ":" 00197 << _acceptSocket.getAddress().getPort() << std::endl; 00198 _acceptSocket.listen(); 00199 // create address 00200 sprintf(portStr,"%d",_acceptSocket.getAddress().getPort()); 00201 return interf + ":" + portStr; 00202 }
|
|
|
parse the params string. Reimplemented from osg::Connection. Reimplemented in osg::GroupMCastConnection. Definition at line 206 of file OSGGroupSockConnection.cpp. References osg::GroupSockConnection::_socketReadBuffer, osg::GroupSockConnection::_socketWriteBuffer, FINFO, osg::BinaryDataHandler::readBufAdd(), osg::BinaryDataHandler::readBufClear(), osg::BinaryDataHandler::writeBufAdd(), and osg::BinaryDataHandler::writeBufClear(). Referenced by osg::GroupMCastConnection::setParams(). 00207 { 00208 if(params.empty()) 00209 return; 00210 00211 std::string option = "bufferSize="; 00212 std::string::size_type i = 0; 00213 if((i=params.find(option)) != std::string::npos) 00214 { 00215 std::string str = params.substr(i + option.size()); 00216 00217 std::stringstream ss; 00218 std::string::size_type j = 0; 00219 while(j < str.length() && str[j] != ',' && isdigit(str[j])) 00220 { 00221 ss << str[j++]; 00222 } 00223 UInt32 bufferSize; 00224 ss >> bufferSize; 00225 00226 // clear old buffer. 00227 readBufClear(); 00228 writeBufClear(); 00229 00230 _socketReadBuffer.resize(bufferSize); 00231 _socketWriteBuffer.resize(_socketReadBuffer.size()); 00232 00233 // reserve first bytes for buffer size 00234 readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)], 00235 _socketReadBuffer.size() -sizeof(SocketBufferHeader)); 00236 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)], 00237 _socketWriteBuffer.size()-sizeof(SocketBufferHeader)); 00238 00239 FINFO(("GroupSockConnection::setParams : setting buffer size to %u.\n", bufferSize)); 00240 } 00241 }
|
|
|
select the next channel for reading. If timeout is not -1 then -1 is returned if timeout is reached Implements osg::Connection. Definition at line 249 of file OSGGroupSockConnection.cpp. References FFATAL, osg::SocketSelection::isSetRead(), osg::SocketSelection::select(), and osg::SocketSelection::setRead(). 00251 { 00252 Int32 maxnread=0,nread; 00253 ChannelIndex index; 00254 SocketSelection selection,result; 00255 00256 // if there is data in the read buffer, return current channel 00257 if(_zeroCopyThreshold != 1 && 00258 _currentReadBuffer != readBufEnd()) 00259 { 00260 FFATAL(("Channel change ignores data in current buffer")) 00261 return indexToChannel(_readIndex); 00262 } 00263 00264 if(_selection[_readIndex] && 00265 _sockets[_readIndex].getAvailable()) 00266 { 00267 return indexToChannel(_readIndex);; 00268 } 00269 00270 // wait for first socket to deliver data 00271 for(index = 0 ; index < _sockets.size() ; ++index) 00272 { 00273 if(_selection[index]) 00274 selection.setRead(_sockets[index]); 00275 } 00276 00277 try 00278 { 00279 // select ok ? 00280 if(!selection.select(timeout,result)) 00281 return -1; 00282 00283 // use socket with most data 00284 for(index = 0 ; index < _sockets.size() ; ++index) 00285 { 00286 if(result.isSetRead(_sockets[index])) 00287 { 00288 nread=_sockets[index].getAvailable(); 00289 if(maxnread < nread) 00290 { 00291 maxnread = nread; 00292 _readIndex=index; 00293 } 00294 } 00295 } 00296 } 00297 catch(SocketException &e) 00298 { 00299 throw ReadError(e.what()); 00300 } 00301 00302 // return channel id 00303 return indexToChannel(_readIndex); 00304 }
|
|
||||||||||||
|
Read data into given memory Read data form the current read socket. The read socket is that socket, that was selectet in selectChannel. Reimplemented from osg::BinaryDataHandler. Definition at line 392 of file OSGGroupSockConnection.cpp. References osg::GroupSockConnection::_readIndex, and osg::GroupSockConnection::_sockets. 00393 { 00394 int len; 00395 00396 // read data 00397 len=_sockets[_readIndex].recv(mem,size); 00398 if(len==0) 00399 { 00400 // throw ChannelClosed(indexToChannel(_readIndex)); 00401 throw ReadError("Channel closed"); 00402 } 00403 }
|
|
|
Read next data block The stream connection uses only BinaryDataHandler buffer. If more then one buffer is present, then this methode must be changed! Reimplemented from osg::BinaryDataHandler. Definition at line 412 of file OSGGroupSockConnection.cpp. References osg::GroupSockConnection::_readIndex, osg::GroupSockConnection::_socketReadBuffer, osg::GroupSockConnection::_sockets, osg::osgntohl(), and osg::BinaryDataHandler::readBufBegin(). 00413 { 00414 int size; 00415 int len; 00416 00417 // read buffer header 00418 len=_sockets[_readIndex].recv(&_socketReadBuffer[0],sizeof(SocketBufferHeader)); 00419 if(len==0) 00420 throw ReadError("Channel closed"); 00421 // read remaining data 00422 size=osgntohl(((SocketBufferHeader*)&_socketReadBuffer[0])->size); 00423 len=_sockets[_readIndex].recv(&_socketReadBuffer[sizeof(SocketBufferHeader)], 00424 size); 00425 if(len==0) 00426 throw ReadError("Channel closed"); 00427 readBufBegin()->setDataSize(size); 00428 }
|
|
|
wait for signal Implements osg::Connection. Reimplemented in osg::GroupMCastConnection. Definition at line 311 of file OSGGroupSockConnection.cpp. References osg::SocketSelection::clearRead(), FFATAL, osg::SocketSelection::isSetRead(), osg::osgntohl(), osg::SocketSelection::select(), and osg::SocketSelection::setRead(). 00312 { 00313 UInt32 len; 00314 UInt32 index; 00315 UInt32 tag=314156; 00316 UInt32 missing = _sockets.size(); 00317 SocketSelection selection,result; 00318 00319 for(index = 0 ; index < _sockets.size() ; ++index) 00320 selection.setRead(_sockets[index]); 00321 00322 try 00323 { 00324 while(missing) 00325 { 00326 if(!selection.select(timeout,result)) 00327 return false; 00328 for(index = 0 ; index < _sockets.size() ; ++index) 00329 { 00330 if(result.isSetRead(_sockets[index])) 00331 { 00332 len = _sockets[index].recv(&tag,sizeof(tag)); 00333 tag = osgntohl(tag); 00334 if(len == 0) 00335 throw ReadError("Channel closed"); 00336 selection.clearRead(_sockets[index]); 00337 missing--; 00338 if(tag != 314156) 00339 { 00340 FFATAL(("Stream out of sync in SockConnection\n")); 00341 throw ReadError("Stream out of sync"); 00342 } 00343 } 00344 } 00345 } 00346 } 00347 catch(SocketException &e) 00348 { 00349 throw ReadError(e.what()); 00350 } 00351 return true; 00352 }
|
|
|
send signal Implements osg::Connection. Reimplemented in osg::GroupMCastConnection. Definition at line 356 of file OSGGroupSockConnection.cpp. References osg::GroupSockConnection::_sockets, and osg::osghtonl(). 00357 { 00358 UInt32 tag=osghtonl(314156); 00359 UInt32 index; 00360 00361 try 00362 { 00363 for(index = 0 ; index<_sockets.size() ; ++index) 00364 _sockets[index].send(&tag,sizeof(tag)); 00365 } 00366 catch(SocketError &e) 00367 { 00368 throw WriteError(e.what()); 00369 } 00370 }
|
|
|
number of connected channels Definition at line 78 of file OSGGroupConnection.cpp. References osg::GroupConnection::_selection. Referenced by osg::SortFirstWindow::clientInit(), osg::ClusterNetwork::connectAllGroupToPoint(), osg::ClusterNetwork::connectAllPointToPoint(), osg::ClusterViewBuffer::recv(), and write(). 00079 { 00080 return _selection.size(); 00081 }
|
|
|
mark the given channel as selectable Definition at line 85 of file OSGGroupConnection.cpp. References osg::GroupConnection::_channelToIndex, osg::GroupConnection::_selection, and osg::GroupConnection::ChannelIndex. 00086 { 00087 ChannelIndex index = _channelToIndex[channel]; 00088 _selection[index] = true; 00089 }
|
|
|
mark the given channel as not selectable Definition at line 93 of file OSGGroupConnection.cpp. References osg::GroupConnection::_channelToIndex, osg::GroupConnection::_selection, and osg::GroupConnection::ChannelIndex. Referenced by osg::SortFirstWindow::clientInit(), osg::ClusterNetwork::connectAllGroupToPoint(), osg::ClusterNetwork::connectAllPointToPoint(), osg::ClusterWindow::init(), and osg::ClusterViewBuffer::recv(). 00094 { 00095 ChannelIndex index = _channelToIndex[channel]; 00096 _selection[index] = false; 00097 }
|
|
|
mark all channels as selectable Definition at line 101 of file OSGGroupConnection.cpp. References osg::GroupConnection::_selection. 00102 { 00103 std::fill(_selection.begin(),_selection.end(),false); 00104 }
|
|
|
mark all channels as not selectable Definition at line 108 of file OSGGroupConnection.cpp. References osg::GroupConnection::_selection. Referenced by osg::SortFirstWindow::clientInit(), osg::ClusterNetwork::connectAllGroupToPoint(), osg::ClusterNetwork::connectAllPointToPoint(), osg::ClusterWindow::init(), and osg::ClusterViewBuffer::recv(). 00109 { 00110 std::fill(_selection.begin(),_selection.end(),true); 00111 }
|
|
|
get number of sockets in the selection Definition at line 115 of file OSGGroupConnection.cpp. References osg::GroupConnection::_selection. Referenced by osg::ClusterNetwork::connectAllGroupToPoint(). 00116 { 00117 UInt32 selectionCount=0; 00118 std::vector<UInt8>::iterator i; 00119 for(i=_selection.begin() ; i!=_selection.end() ; ++i) 00120 if(*i) 00121 selectionCount++; 00122 return selectionCount; 00123 }
|
|
|
set destination address used to broadcast or multicast to all connected points. On Multicast connections this could be 224.22.22.1 or something like that. Definition at line 132 of file OSGGroupConnection.cpp. References osg::GroupConnection::_destination. Referenced by osg::ClusterWindow::init(). 00133 { 00134 _destination = destination; 00135 }
|
|
|
get destination address Definition at line 139 of file OSGGroupConnection.cpp. References osg::GroupConnection::_destination. Referenced by osg::GroupMCastConnection::initialize(). 00140 { 00141 return _destination; 00142 }
|
|
|
create a new channel Definition at line 149 of file OSGGroupConnection.cpp. References osg::GroupConnection::_channelToIndex, osg::GroupConnection::_indexToChannel, osg::GroupConnection::_reuseChannel, osg::GroupConnection::_selection, and osg::Connection::Channel. Referenced by osg::GroupSockConnection::acceptPoint(), and osg::GroupSockConnection::connectPoint(). 00150 { 00151 Channel channel; 00152 if(_reuseChannel.begin() != _reuseChannel.end()) 00153 { 00154 channel = *(_reuseChannel.begin()); 00155 _reuseChannel.pop_front(); 00156 } 00157 else 00158 { 00159 channel = _channelToIndex.size(); 00160 _channelToIndex.resize(channel+1); 00161 } 00162 if(index >= _indexToChannel.size()) 00163 _indexToChannel.resize(index+1); 00164 if(index >= _selection.size()) 00165 _selection.resize(index+1); 00166 // enable selection 00167 _selection[index] = true; 00168 // set index to channel mapping 00169 _indexToChannel[index] = channel; 00170 // set channel to index mapping 00171 _channelToIndex[channel] = index; 00172 return channel; 00173 }
|
|
|
remove a channel. Definition at line 177 of file OSGGroupConnection.cpp. References osg::GroupConnection::_indexToChannel, osg::GroupConnection::_reuseChannel, osg::GroupConnection::_selection, and osg::Connection::Channel. Referenced by osg::GroupSockConnection::disconnect(). 00178 { 00179 Channel channel = _indexToChannel[index]; 00180 // erase from indexed arrays 00181 _selection .erase(_selection.begin() + index); 00182 _indexToChannel.erase(_indexToChannel.begin() + index); 00183 // move to reuse 00184 _reuseChannel.push_back(channel); 00185 }
|
|
|
Definition at line 190 of file OSGGroupConnection.cpp. References osg::GroupConnection::_channelToIndex. Referenced by osg::GroupSockConnection::disconnect(), and osg::GroupMCastConnection::disconnect(). 00191 { 00192 return _channelToIndex[channel]; 00193 }
|
|
|
Definition at line 195 of file OSGGroupConnection.cpp. References osg::GroupConnection::_indexToChannel. 00196 { 00197 return _indexToChannel[index]; 00198 }
|
|
|
get network interface Definition at line 98 of file OSGConnection.cpp. References osg::Connection::_interface. Referenced by osg::PointSockConnection::bind(), osg::GroupSockConnection::bind(), osg::PointSockPipeline::initialize(), osg::PointMCastConnection::initialize(), and osg::GroupMCastConnection::initialize(). 00099 { 00100 return _interface; 00101 }
|
|
|
set network interface Definition at line 105 of file OSGConnection.cpp. References osg::Connection::_interface. Referenced by osg::ClusterServer::acceptClient(), a |