00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include <stdlib.h>
00043 #include <stdio.h>
00044 #include <assert.h>
00045
00046 #include "OSGConfig.h"
00047 #include "OSGLog.h"
00048 #include "OSGBaseFunctions.h"
00049 #include "OSGBaseThread.h"
00050 #include "OSGSocketSelection.h"
00051 #include "OSGBinaryMessage.h"
00052 #include "OSGPointSockPipeline.h"
00053 #include "OSGGroupSockPipeline.h"
00054 #include "OSGConnectionType.h"
00055 #include "OSGBinaryMessage.h"
00056
00057 OSG_USING_NAMESPACE
00058
00063
00064
00065
00069 PointSockPipeline::PointSockPipeline():
00070 Inherited(),
00071 _initialized(false)
00072 {
00073 _prev.open();
00074 _next.open();
00075 }
00076
00079 PointSockPipeline::~PointSockPipeline(void)
00080 {
00081 _prev.close();
00082 _next.close();
00083 }
00084
00087 const ConnectionType *PointSockPipeline::getType()
00088 {
00089 return &_type;
00090 }
00091
00092
00093
00094
00098 Connection::Channel PointSockPipeline::connectGroup(
00099 const std::string &address,
00100 Time timeout)
00101 {
00102 Channel channel = Inherited::connectGroup(address,timeout);
00103 return channel;
00104 }
00105
00108 void PointSockPipeline::disconnect(void)
00109 {
00110 _socket.close();
00111 }
00112
00116 Connection::Channel PointSockPipeline::acceptGroup(Time timeout)
00117 {
00118 Channel channel = Inherited::acceptGroup(timeout);
00119 return channel;
00120 }
00121
00122
00123
00124
00128 Connection::Channel PointSockPipeline::selectChannel(Time timeout)
00129 throw (ReadError)
00130 {
00131 if(!_initialized)
00132 initialize();
00133 try
00134 {
00135 if(_prev.waitReadable(timeout))
00136 return 0;
00137 }
00138 catch(SocketError &e)
00139 {
00140 throw ReadError(e.what());
00141 }
00142 return -1;
00143 }
00144
00145
00146
00150 PointConnection *PointSockPipeline::create(void)
00151 {
00152 return new PointSockPipeline();
00153 }
00154
00155
00156
00157
00165 void PointSockPipeline::read(MemoryHandle mem,UInt32 size)
00166 {
00167 int len;
00168
00169 if(!_initialized)
00170 initialize();
00171
00172 len=_prev.recv(mem,size);
00173 if(len==0)
00174 {
00175 throw ReadError("read got 0 bytes!");
00176 }
00177
00178 if(!_last)
00179 _next.send(mem,size);
00180 }
00181
00189 void PointSockPipeline::readBuffer()
00190 {
00191 int size;
00192 int len;
00193
00194 if(!_initialized)
00195 initialize();
00196
00197
00198 len=_prev.recv(&_socketReadBuffer[0],sizeof(SocketBufferHeader));
00199 if(len==0)
00200 throw ReadError("peek got 0 bytes!");
00201
00202 size=osgntohl(((SocketBufferHeader*)&_socketReadBuffer[0])->size);
00203 len=_prev.recv(&_socketReadBuffer[sizeof(SocketBufferHeader)],
00204 size);
00205 if(len==0)
00206 throw ReadError("read got 0 bytes!");
00207 readBufBegin()->setDataSize(size);
00208
00209 if(!_last)
00210 _next.send(&_socketReadBuffer[0],
00211 sizeof(SocketBufferHeader)+size);
00212 }
00213
00214
00215
00216
00219 void PointSockPipeline::initialize(void)
00220 {
00221 BinaryMessage message;
00222 StreamSocket sock;
00223 UInt32 nextPort;
00224 std::string nextHost;
00225 UInt32 len;
00226 char localhost[256];
00227 std::string interf;
00228
00229
00230 gethostname(localhost,255);
00231 if(!getInterface().empty())
00232 interf = getInterface();
00233 else
00234 interf = localhost;
00235
00236 sock.open();
00237 sock.bind(SocketAddress(interf.c_str(),0));
00238 sock.listen();
00239
00240
00241 message.putString(interf);
00242 message.putUInt32(sock.getAddress().getPort());
00243 _socket.send(message);
00244
00245 _prev = sock.accept();
00246 sock.close();
00247
00248 len = _socket.recv(message);
00249 if(len == 0)
00250 throw ReadError("Channel closed\n");
00251 _last = message.getUInt32();
00252 if(!_last)
00253 {
00254 nextHost = message.getString();
00255 nextPort = message.getUInt32();
00256 for(;;)
00257 {
00258 try
00259 {
00260 _next.connect(SocketAddress(nextHost.c_str(),
00261 nextPort));
00262 break;
00263 }
00264 catch(...)
00265 {
00266 }
00267 }
00268 }
00269
00270 _initialized = true;
00271 }
00272
00273
00274
00275
00276 ConnectionType PointSockPipeline::_type(
00277 &PointSockPipeline::create,
00278 "SockPipeline");
00279
00280
00281
00282
00283 #ifdef __sgi
00284 #pragma set woff 1174
00285 #endif
00286
00287 #ifdef OSG_LINUX_ICC
00288 #pragma warning( disable : 177 )
00289 #endif
00290
00291 namespace
00292 {
00293 static Char8 cvsid_cpp [] = "@(#)$Id: $";
00294 static Char8 cvsid_hpp [] = OSG_GROUPSOCKPIPELINE_HEADER_CVSID;
00295 }
00296