00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <stdlib.h>
00040 #include <stdio.h>
00041
00042 #include "OSGConfig.h"
00043 #include "OSGThread.h"
00044 #include "OSGThreadManager.h"
00045 #include "OSGClusterServer.h"
00046 #include "OSGPointConnection.h"
00047 #include "OSGConnectionFactory.h"
00048 #include "OSGDgramSocket.h"
00049 #include "OSGClusterWindow.h"
00050 #include "OSGBinaryMessage.h"
00051 #include "OSGClusterNetwork.h"
00052 #include "OSGSHLChunk.h"
00053
00054 OSG_USING_NAMESPACE
00055
00076
00077
00078
00088 ClusterServer::ClusterServer( WindowPtr window,
00089 const std::string &serviceName,
00090 const std::string &connectionType,
00091 const std::string &address,
00092 UInt32 servicePort,
00093 const std::string &serviceGroup):
00094 _window(window),
00095 _connection(NULL),
00096 _requestAddress(address),
00097 _boundAddress(""),
00098 _clusterWindow(),
00099 _aspect(NULL),
00100 _serviceName(serviceName),
00101 _connectionType(connectionType),
00102 _servicePort(servicePort),
00103 _serviceGroup(serviceGroup),
00104 _serverId(0),
00105 _interface("")
00106 {
00107 char localhost[256];
00108
00109
00110 if(_serviceName.empty())
00111 {
00112 gethostname(localhost,255);
00113 _serviceName = localhost;
00114 }
00115
00116 if(_requestAddress.empty())
00117 {
00118 if(strstr(_serviceName.c_str(),":"))
00119 _requestAddress = _serviceName;
00120 }
00121 }
00122
00123
00124
00125
00128 ClusterServer::~ClusterServer(void)
00129 {
00130 try
00131 {
00132 if(_connection)
00133 delete _connection;
00134 if(_aspect)
00135 delete _aspect;
00136 }
00137 catch(...)
00138 {
00139 }
00140 }
00141
00142
00143
00144
00145
00151 void ClusterServer::start()
00152 {
00153 OSG::FieldContainerType *fct;
00154
00155
00156 if(_connection)
00157 delete _connection;
00158 _connection = NULL;
00159
00160
00161 _aspect = new RemoteAspect();
00162
00163
00164 for(UInt32 i = 0; i < OSG::TypeFactory::the()->getNumTypes(); ++i)
00165 {
00166 fct=OSG::FieldContainerFactory::the()->findType(i);
00167 if(fct && fct->isDerivedFrom(ClusterWindow::getClassType()))
00168 {
00169 _aspect->registerChanged(
00170 *fct,
00171 osgTypedMethodFunctor2ObjPtrCPtrRef
00172 <
00173 bool,
00174 ClusterServer,
00175 FieldContainerPtr,
00176 RemoteAspect *
00177 >(this,&ClusterServer::windowChanged));
00178 }
00179 }
00180
00181 try {
00182 UInt8 forceNetworkOrder;
00183 #if BYTE_ORDER == LITTLE_ENDIAN
00184 UInt8 littleEndian = true;
00185 #else
00186 UInt8 littleEndian = false;
00187 #endif
00188
00189
00190 acceptClient();
00191
00192 _connection->putValue(littleEndian);
00193 _connection->flush();
00194 _connection->selectChannel();
00195 _connection->getValue(forceNetworkOrder);
00196 _connection->setNetworkOrder((forceNetworkOrder != 0));
00197 }
00198 catch(...)
00199 {
00200 throw;
00201 }
00202 }
00203
00207 void ClusterServer::stop()
00208 {
00209
00210 if(_clusterWindow != NullFC)
00211 {
00212 _aspect=_clusterWindow->getNetwork()->getAspect();
00213 _clusterWindow->getNetwork()->setAspect(NULL);
00214 }
00215
00216 try
00217 {
00218 if(_connection)
00219 delete _connection;
00220 _connection = NULL;
00221 }
00222 catch(...)
00223 {
00224 }
00225
00226 if(_aspect)
00227 delete _aspect;
00228
00229 _connection=NULL;
00230 _aspect=NULL;
00231 _clusterWindow=NullFC;
00232 }
00233
00236 void ClusterServer::render(RenderActionBase *action)
00237 {
00238 doSync(false);
00239 doRender(action);
00240 doSwap();
00241 }
00242
00252 void ClusterServer::doSync(bool applyToChangelist)
00253 {
00254
00255 if(_clusterWindow==NullFC)
00256 {
00257 do
00258 {
00259
00260 _aspect->receiveSync(*_connection,applyToChangelist);
00261 }
00262 while(_clusterWindow==NullFC);
00263
00264 for(_serverId=0;
00265 _clusterWindow->getServers()[_serverId] != _serviceName &&
00266 _serverId<_clusterWindow->getServers().size();
00267 _serverId++);
00268
00269 SINFO << "Start server " << _serviceName
00270 << " with id " << _serverId
00271 << std::endl;
00272
00273 _clusterWindow->getNetwork()->setMainConnection(_connection);
00274 _clusterWindow->getNetwork()->setAspect (_aspect);
00275 _connection=NULL;
00276 _aspect=NULL;
00277 SHLChunk::setClusterId(Int32(_serverId));
00278 _clusterWindow->serverInit(_window,_serverId);
00279 }
00280
00281 RemoteAspect *aspect=_clusterWindow->getNetwork()->getAspect();
00282 Connection *connection=_clusterWindow->getNetwork()->getMainConnection();
00283
00284
00285 aspect->receiveSync(*connection,applyToChangelist);
00286
00287
00288 if(_clusterWindow->getInterleave())
00289 {
00290
00291
00292
00293 while( (_clusterWindow->getFrameCount()%
00294 _clusterWindow->getInterleave())
00295 !=
00296 (_serverId%_clusterWindow->getInterleave()) )
00297 {
00298 aspect->receiveSync(*connection,applyToChangelist);
00299 }
00300 }
00301 }
00302
00305 void ClusterServer::doRender(RenderActionBase *action)
00306 {
00307 _clusterWindow->serverRender( _window,_serverId,action );
00308 }
00309
00312 void ClusterServer::doSwap(void)
00313 {
00314 _clusterWindow->serverSwap ( _window,_serverId );
00315 }
00316
00319 WindowPtr ClusterServer::getClusterWindow (void)
00320 {
00321 return _clusterWindow;
00322 }
00323
00326 WindowPtr ClusterServer::getServerWindow (void)
00327 {
00328 return _window;
00329 }
00330
00334 bool ClusterServer::windowChanged(FieldContainerPtr& fcp,
00335 RemoteAspect *)
00336 {
00337 if(_clusterWindow != NullFC)
00338 return true;
00339
00340 ClusterWindowPtr window=ClusterWindowPtr::dcast(fcp);
00341
00342 if(window->getServers().size())
00343 {
00344 if(window->getServers().find(_serviceName) ==
00345 window->getServers().end())
00346 {
00347 SWARNING << "wrong window" << std::endl;
00348 }
00349 else
00350 {
00351 _clusterWindow=window;
00352 }
00353 }
00354 return true;
00355 }
00356
00361 void ClusterServer::acceptClient()
00362 {
00363 BinaryMessage msg;
00364 DgramSocket serviceSock;
00365 SocketAddress addr;
00366 std::string service;
00367 std::string connectionType;
00368 UInt32 readable;
00369 bool connected=false;
00370 std::string address;
00371 bool bound = false;
00372
00373 SINFO << "Waiting for request of "
00374 << _serviceName
00375 << std::endl;
00376
00377 try
00378 {
00379 if(!_requestAddress.empty())
00380 {
00381
00382 _connection = ConnectionFactory::the().
00383 createPoint(_connectionType);
00384 if(_connection)
00385 {
00386
00387 _connection->setInterface(_interface);
00388
00389 try
00390 {
00391
00392 _boundAddress = _connection->bind(_requestAddress);
00393 bound = true;
00394 }
00395 catch(...)
00396 {
00397 SINFO << "Unable to bind, use name as symbolic service name"
00398 << std::endl;
00399 }
00400 }
00401 }
00402 serviceSock.open();
00403 serviceSock.setReusePort(true);
00404
00405 if(!_serviceGroup.empty())
00406 {
00407 SocketAddress groupAddress = SocketAddress(
00408 _serviceGroup.c_str(),
00409 _servicePort);
00410 if(groupAddress.isMulticast())
00411 {
00412 SINFO << "wait for request on multicast:" <<
00413 _serviceGroup << std::endl;
00414 serviceSock.bind(SocketAddress(SocketAddress::ANY,
00415 _servicePort));
00416 serviceSock.join(SocketAddress(groupAddress));
00417 }
00418 else
00419 {
00420 SINFO << "wait for request by broadcast:" <<
00421 _serviceGroup << std::endl;
00422 serviceSock.bind(SocketAddress(groupAddress));
00423 }
00424 }
00425 else
00426 {
00427 SINFO << "wait for request by broadcast" << std::endl;
00428 serviceSock.bind(SocketAddress(SocketAddress::ANY,
00429 _servicePort));
00430 }
00431
00432 while(!connected)
00433 {
00434 try
00435 {
00436 if(_connection)
00437 readable = serviceSock.waitReadable(.01);
00438 else
00439 readable = true;
00440 if(readable)
00441 {
00442 serviceSock.recvFrom(msg,addr);
00443
00444 service = msg.getString();
00445 connectionType = msg.getString();
00446
00447 SINFO << "Request for "
00448 << service << " "
00449 << connectionType
00450 << std::endl;
00451
00452 if(service == _serviceName)
00453 {
00454
00455 if(_connection &&
00456 _connection->getType()->getName() != connectionType)
00457 {
00458 delete _connection;
00459 _connection = NULL;
00460 }
00461
00462 if(!_connection)
00463 {
00464
00465 _connection = ConnectionFactory::the().
00466 createPoint(connectionType);
00467 if(_connection)
00468 {
00469
00470 _connection->setInterface(_interface);
00471
00472 _boundAddress = _connection->bind(_requestAddress);
00473 bound = true;
00474 }
00475 else
00476 {
00477 SINFO << "Unknown connection type '"
00478 << connectionType << "'" << std::endl;
00479 }
00480 }
00481 if(_connection)
00482 {
00483 msg.clear ( );
00484 msg.putString(_serviceName );
00485 msg.putString(_boundAddress);
00486 serviceSock.sendTo(msg, addr);
00487 SINFO << "Response "
00488 << connectionType << ":"
00489 << _boundAddress
00490 << std::endl;
00491 }
00492 }
00493 }
00494 }
00495 catch(SocketConnReset &e)
00496 {
00497
00498
00499
00500
00501
00502
00503
00504
00505 SWARNING << e.what() << std::endl;
00506
00507
00508 if(!_connection)
00509 throw;
00510 }
00511 catch(OSG_STDEXCEPTION_NAMESPACE::exception &e)
00512 {
00513 SWARNING << e.what() << std::endl;
00514 }
00515 try
00516 {
00517
00518 if(bound && _connection && _connection->acceptGroup(0.2) >= 0)
00519 {
00520 connected = true;
00521 SINFO << "Connection accepted " << _boundAddress << std::endl;
00522 }
00523 }
00524 catch(OSG_STDEXCEPTION_NAMESPACE::exception &e)
00525 {
00526 SWARNING << e.what() << std::endl;
00527 }
00528 }
00529 serviceSock.close();
00530 }
00531 catch(OSG_STDEXCEPTION_NAMESPACE::exception &e)
00532 {
00533 throw;
00534 }
00535 }
00536
00537
00538
00539
00540
00541 #ifdef __sgi
00542 #pragma set woff 1174
00543 #endif
00544
00545 #ifdef OSG_LINUX_ICC
00546 #pragma warning( disable : 177 )
00547 #endif
00548
00549 namespace
00550 {
00551 static Char8 cvsid_cpp[] = "@(#)$Id:$";
00552 static Char8 cvsid_hpp[] = OSG_CLUSTERSERVERHEADER_CVSID;
00553 }