00001 /*---------------------------------------------------------------------------*\ 00002 * OpenSG * 00003 * * 00004 * * 00005 * Copyright (C) 2000-2002 by the OpenSG Forum * 00006 * * 00007 * www.opensg.org * 00008 * * 00009 * contact: dirk@opensg.org, gerrit.voss@vossg.org, jbehr@zgdv.de * 00010 * * 00011 \*---------------------------------------------------------------------------*/ 00012 /*---------------------------------------------------------------------------*\ 00013 * License * 00014 * * 00015 * This library is free software; you can redistribute it and/or modify it * 00016 * under the terms of the GNU Library General Public License as published * 00017 * by the Free Software Foundation, version 2. * 00018 * * 00019 * This library is distributed in the hope that it will be useful, but * 00020 * WITHOUT ANY WARRANTY; without even the implied warranty of * 00021 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00022 * Library General Public License for more details. * 00023 * * 00024 * You should have received a copy of the GNU Library General Public * 00025 * License along with this library; if not, write to the Free Software * 00026 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * 00027 * * 00028 \*---------------------------------------------------------------------------*/ 00029 /*---------------------------------------------------------------------------*\ 00030 * Changes * 00031 * * 00032 * * 00033 * * 00034 * * 00035 * * 00036 * * 00037 \*---------------------------------------------------------------------------*/ 00038 00039 //--------------------------------------------------------------------------- 00040 // Includes 00041 //--------------------------------------------------------------------------- 00042 #include <stdlib.h> 00043 #include <stdio.h> 00044 #include <assert.h> 00045 00046 #include <set> 00047 00048 #include "OSGConfig.h" 00049 #include "OSGLog.h" 00050 #include "OSGBaseFunctions.h" 00051 #include "OSGBaseThread.h" 00052 #include "OSGSocketSelection.h" 00053 #include "OSGBinaryMessage.h" 00054 #include "OSGGroupSockPipeline.h" 00055 #include "OSGConnectionType.h" 00056 #include "OSGBinaryMessage.h" 00057 00058 OSG_USING_NAMESPACE 00059 00064 /*-------------------------------------------------------------------------*/ 00065 /* constructor destructor */ 00066 00070 GroupSockPipeline::GroupSockPipeline(): 00071 Inherited(), 00072 _initialized(false) 00073 { 00074 _next.open(); 00075 } 00076 00079 GroupSockPipeline::~GroupSockPipeline(void) 00080 { 00081 _next.close(); 00082 } 00083 00086 const ConnectionType *GroupSockPipeline::getType() 00087 { 00088 return &_type; 00089 } 00090 00091 /*-------------------------------------------------------------------------*/ 00092 /* connection */ 00093 00097 GroupConnection::Channel GroupSockPipeline::connectPoint( 00098 const std::string &address, 00099 Time timeout) 00100 { 00101 Channel channel = Inherited::connectPoint(address,timeout); 00102 return channel; 00103 } 00104 00107 void GroupSockPipeline::disconnect(Channel channel) 00108 { 00109 Inherited::disconnect(channel); 00110 } 00111 00115 GroupConnection::Channel GroupSockPipeline::acceptPoint(Time timeout) 00116 { 00117 Connection::Channel channel = Inherited::acceptPoint(timeout); 00118 return channel; 00119 } 00120 00121 /*-------------------------- helpers --------------------------------------*/ 00122 00126 GroupConnection *GroupSockPipeline::create(void) 00127 { 00128 return new GroupSockPipeline(); 00129 } 00130 00131 /*-------------------------------------------------------------------------*/ 00132 /* read write */ 00133 00141 void GroupSockPipeline::write(MemoryHandle mem,UInt32 size) 00142 { 00143 if(!_initialized) 00144 initialize(); 00145 00146 try 00147 { 00148 if(getChannelCount()) 00149 _next.send(mem,size); 00150 } 00151 catch(SocketException &e) 00152 { 00153 throw WriteError(e.what()); 00154 } 00155 } 00156 00162 void GroupSockPipeline::writeBuffer(void) 00163 { 00164 Int32 index; 00165 00166 if(!_initialized) 00167 initialize(); 00168 00169 UInt32 size = writeBufBegin()->getDataSize(); 00170 // write size to header 00171 ((SocketBufferHeader*)&_socketWriteBuffer[0])->size=osghtonl(size); 00172 if(size) 00173 { 00174 _next.send(&_socketWriteBuffer[0], 00175 size+sizeof(SocketBufferHeader)); 00176 } 00177 } 00178 00179 /*-------------------------------------------------------------------------*/ 00180 /* private helpers */ 00181 00184 void GroupSockPipeline::initialize(void) 00185 { 00186 UInt32 index,len; 00187 UInt32 nextPort; 00188 std::string nextHost; 00189 BinaryMessage message; 00190 00191 for(index = 0 ; index<_sockets.size() ; ++index) 00192 { 00193 len = _sockets[index].recv(message); 00194 if(len == 0) 00195 throw ReadError("Channel closed\n"); 00196 nextHost = message.getString(); 00197 nextPort = message.getUInt32(); 00198 00199 message.clear(); 00200 if(index == 0) 00201 { 00202 message.putUInt32(true); 00203 _sockets[_sockets.size()-1].send(message); 00204 for(;;) 00205 { 00206 try 00207 { 00208 _next.connect(SocketAddress(nextHost.c_str(), 00209 nextPort)); 00210 break; 00211 } 00212 catch(...) 00213 { 00214 } 00215 } 00216 } 00217 else 00218 { 00219 message.clear(); 00220 message.putUInt32(false); 00221 message.putString(nextHost); 00222 message.putUInt32(nextPort); 00223 _sockets[index-1].send(message); 00224 } 00225 } 00226 _initialized = true; 00227 } 00228 00229 /*-------------------------------------------------------------------------*/ 00230 /* static type */ 00231 00232 ConnectionType GroupSockPipeline::_type( 00233 &GroupSockPipeline::create, 00234 "SockPipeline"); 00235 00236 /*-------------------------------------------------------------------------*/ 00237 /* cvs id's */ 00238 00239 #ifdef __sgi 00240 #pragma set woff 1174 00241 #endif 00242 00243 #ifdef OSG_LINUX_ICC 00244 #pragma warning( disable : 177 ) 00245 #endif 00246 00247 namespace 00248 { 00249 static Char8 cvsid_cpp [] = "@(#)$Id: $"; 00250 static Char8 cvsid_hpp [] = OSG_GROUPSOCKPIPELINE_HEADER_CVSID; 00251 } 00252
1.4.3