00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045
00046 #include "OSGConfig.h"
00047 #include "OSGLog.h"
00048 #include "OSGBaseFunctions.h"
00049 #include "OSGBaseThread.h"
00050 #include "OSGSocketSelection.h"
00051 #include "OSGBinaryMessage.h"
00052 #include "OSGPointSockPipeline.h"
00053 #include "OSGGroupSockPipeline.h"
00054 #include "OSGConnectionType.h"
00055 #include "OSGBinaryMessage.h"
00056
00057 OSG_USING_NAMESPACE
00058
00063
00064
00065
00069 PointSockPipeline::PointSockPipeline():
00070 Inherited(),
00071 _initialized(false)
00072 {
00073 _prev.open();
00074 _next.open();
00075 }
00076
00079 PointSockPipeline::~PointSockPipeline(void)
00080 {
00081 _prev.close();
00082 _next.close();
00083 }
00084
00087 const ConnectionType *PointSockPipeline::getType()
00088 {
00089 return &_type;
00090 }
00091
00092
00093
00094
00098 Connection::Channel PointSockPipeline::connectGroup(
00099 const std::string &address,
00100 Time timeout)
00101 {
00102 Channel channel = Inherited::connectGroup(address,timeout);
00103 return channel;
00104 }
00105
00108 void PointSockPipeline::disconnect(void)
00109 {
00110 _socket.close();
00111 }
00112
00116 Connection::Channel PointSockPipeline::acceptGroup(Time timeout)
00117 {
00118 Channel channel = Inherited::acceptGroup(timeout);
00119 return channel;
00120 }
00121
00122
00123
00124
00128 Connection::Channel PointSockPipeline::selectChannel(Time timeout)
00129 throw (ReadError)
00130 {
00131 if(!_initialized)
00132 initialize();
00133 try
00134 {
00135 if(_prev.waitReadable(timeout))
00136 return 0;
00137 }
00138 catch(SocketError &e)
00139 {
00140 throw ReadError(e.what());
00141 }
00142 return -1;
00143 }
00144
00145
00146
00150 PointConnection *PointSockPipeline::create(void)
00151 {
00152 return new PointSockPipeline();
00153 }
00154
00155
00156
00157
00165 void PointSockPipeline::read(MemoryHandle mem,UInt32 size)
00166 {
00167 int len;
00168
00169 if(!_initialized)
00170 initialize();
00171
00172 len=_prev.recv(mem,size);
00173 if(len==0)
00174 {
00175 throw ReadError("read got 0 bytes!");
00176 }
00177
00178 if(!_last)
00179 _next.send(mem,size);
00180 }
00181
00189 void PointSockPipeline::readBuffer()
00190 {
00191 int size;
00192 int len;
00193
00194 if(!_initialized)
00195 initialize();
00196
00197
00198 len=_prev.recv(&_socketReadBuffer[0],sizeof(SocketBufferHeader));
00199 if(len==0)
00200 throw ReadError("peek got 0 bytes!");
00201
00202 size = osgntohl(
00203 reinterpret_cast<SocketBufferHeader *>(&_socketReadBuffer[0])->size);
00204
00205 len=_prev.recv(&_socketReadBuffer[sizeof(SocketBufferHeader)],
00206 size);
00207 if(len==0)
00208 throw ReadError("read got 0 bytes!");
00209 readBufBegin()->setDataSize(size);
00210
00211 if(!_last)
00212 _next.send(&_socketReadBuffer[0],
00213 sizeof(SocketBufferHeader)+size);
00214 }
00215
00216
00217
00218
00221 void PointSockPipeline::initialize(void)
00222 {
00223 BinaryMessage message;
00224 StreamSocket sock;
00225 UInt32 nextPort;
00226 std::string nextHost;
00227 UInt32 len;
00228 char localhost[256];
00229 std::string interf;
00230
00231
00232 osgGetHostname(localhost,255);
00233 if(!getInterface().empty())
00234 interf = getInterface();
00235 else
00236 interf = localhost;
00237
00238 sock.open();
00239 sock.bind(SocketAddress(interf.c_str(),0));
00240 sock.listen();
00241
00242
00243 message.putString(interf);
00244 message.putUInt32(sock.getAddress().getPort());
00245 _socket.send(message);
00246
00247 _prev = sock.accept();
00248 sock.close();
00249
00250 len = _socket.recv(message);
00251 if(len == 0)
00252 throw ReadError("Channel closed\n");
00253 _last = message.getUInt32();
00254 if(!_last)
00255 {
00256 nextHost = message.getString();
00257 nextPort = message.getUInt32();
00258 for(;;)
00259 {
00260 try
00261 {
00262 _next.connect(SocketAddress(nextHost.c_str(),
00263 nextPort));
00264 break;
00265 }
00266 catch(...)
00267 {
00268 }
00269 }
00270 }
00271
00272 _initialized = true;
00273 }
00274
00275
00276
00277
00278 ConnectionType PointSockPipeline::_type(
00279 &PointSockPipeline::create,
00280 "SockPipeline");
00281
00282
00283
00284
00285 #ifdef __sgi
00286 #pragma set woff 1174
00287 #endif
00288
00289 #ifdef OSG_LINUX_ICC
00290 #pragma warning( disable : 177 )
00291 #endif
00292
00293 namespace
00294 {
00295 static Char8 cvsid_cpp [] = "@(#)$Id: $";
00296 static Char8 cvsid_hpp [] = OSG_GROUPSOCKPIPELINE_HEADER_CVSID;
00297 }
00298