Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

OSGClusterServer.cpp

Go to the documentation of this file.
00001 /*---------------------------------------------------------------------------*\
00002  *                                OpenSG                                     *
00003  *                                                                           *
00004  *                                                                           *
00005  *             Copyright (C) 2000-2002 by the OpenSG Forum                   *
00006  *                                                                           *
00007  *                            www.opensg.org                                 *
00008  *                                                                           *
00009  *   contact: dirk@opensg.org, gerrit.voss@vossg.org, jbehr@zgdv.de          *
00010  *                                                                           *
00011 \*---------------------------------------------------------------------------*/
00012 /*---------------------------------------------------------------------------*\
00013  *                                License                                    *
00014  *                                                                           *
00015  * This library is free software; you can redistribute it and/or modify it   *
00016  * under the terms of the GNU Library General Public License as published    *
00017  * by the Free Software Foundation, version 2.                               *
00018  *                                                                           *
00019  * This library is distributed in the hope that it will be useful, but       *
00020  * WITHOUT ANY WARRANTY; without even the implied warranty of                *
00021  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU         *
00022  * Library General Public License for more details.                          *
00023  *                                                                           *
00024  * You should have received a copy of the GNU Library General Public         *
00025  * License along with this library; if not, write to the Free Software       *
00026  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.                 *
00027  *                                                                           *
00028 \*---------------------------------------------------------------------------*/
00029 /*---------------------------------------------------------------------------*\
00030  *                                Changes                                    *
00031  *                                                                           *
00032  *                                                                           *
00033  *                                                                           *
00034  *                                                                           *
00035  *                                                                           *
00036  *                                                                           *
00037 \*---------------------------------------------------------------------------*/
00038 
00039 #include <stdlib.h>
00040 #include <stdio.h>
00041 
00042 #include "OSGConfig.h"
00043 #include "OSGThread.h"
00044 #include "OSGThreadManager.h"
00045 #include "OSGClusterServer.h"
00046 #include "OSGPointConnection.h"
00047 #include "OSGConnectionFactory.h"
00048 #include "OSGDgramSocket.h"
00049 #include "OSGClusterWindow.h"
00050 #include "OSGBinaryMessage.h"
00051 #include "OSGClusterNetwork.h"
00052 #include "OSGSHLChunk.h"
00053 
00054 OSG_USING_NAMESPACE
00055 
00076 /*-------------------------------------------------------------------------*/
00077 /*                            Constructors                                 */
00078 
00088 ClusterServer::ClusterServer(        WindowPtr    window,
00089                              const std::string    &serviceName,
00090                              const std::string    &connectionType,
00091                              const std::string    &address,
00092                                      UInt32       servicePort,
00093                              const std::string    &serviceGroup):
00094     _window(window),
00095     _connection(NULL),
00096     _requestAddress(address),
00097     _boundAddress(""),
00098     _clusterWindow(),
00099     _aspect(NULL),
00100     _serviceName(serviceName),
00101     _connectionType(connectionType),
00102     _servicePort(servicePort),
00103     _serviceGroup(serviceGroup),
00104     _serverId(0),
00105     _interface("")
00106 {
00107     char localhost[256];
00108 
00109     // default is hostname
00110     if(_serviceName.empty())
00111     {
00112         gethostname(localhost,255);
00113         _serviceName = localhost;
00114     }
00115     // if service contains ":" than treat as address
00116     if(_requestAddress.empty())
00117     {
00118         if(strstr(_serviceName.c_str(),":"))
00119             _requestAddress = _serviceName;
00120     }
00121 }
00122 
00123 /*-------------------------------------------------------------------------*/
00124 /*                             Destructor                                  */
00125 
00128 ClusterServer::~ClusterServer(void)
00129 {
00130     try
00131     {
00132         if(_connection)
00133             delete _connection;
00134         if(_aspect)
00135             delete _aspect;
00136     }
00137     catch(...)
00138     {
00139     }
00140 }
00141 
00142 /*-------------------------------------------------------------------------*/
00143 /*                             Class specific                              */
00144 
00145 
00151 void ClusterServer::start()
00152 {
00153     OSG::FieldContainerType *fct;
00154 
00155     // reset conneciton
00156     if(_connection)
00157         delete _connection;
00158     _connection = NULL;
00159 
00160     // create aspect
00161     _aspect = new RemoteAspect();
00162 
00163     // register interrest for all changed cluster windows
00164     for(UInt32 i = 0; i < OSG::TypeFactory::the()->getNumTypes(); ++i)
00165     {
00166         fct=OSG::FieldContainerFactory::the()->findType(i);
00167         if(fct && fct->isDerivedFrom(ClusterWindow::getClassType()))
00168         {
00169             _aspect->registerChanged(
00170                 *fct,
00171                 osgTypedMethodFunctor2ObjPtrCPtrRef
00172                 <
00173                 bool,
00174                 ClusterServer,
00175                 FieldContainerPtr,
00176                 RemoteAspect     *
00177                 >(this,&ClusterServer::windowChanged));
00178         }
00179     }
00180     // accept incomming connections
00181     try {
00182         UInt8                    forceNetworkOrder;
00183 #if BYTE_ORDER == LITTLE_ENDIAN
00184         UInt8                    littleEndian = true;
00185 #else
00186         UInt8                    littleEndian = false;
00187 #endif
00188 
00189         // accept
00190         acceptClient();
00191         // determine network order
00192         _connection->putValue(littleEndian);
00193         _connection->flush();
00194         _connection->selectChannel();
00195         _connection->getValue(forceNetworkOrder);
00196         _connection->setNetworkOrder((forceNetworkOrder != 0));
00197     } 
00198     catch(...)
00199     {
00200         throw;
00201     }
00202 }
00203 
00207 void ClusterServer::stop()
00208 {
00209     // get aspect ownership
00210     if(_clusterWindow != NullFC)
00211     {
00212         _aspect=_clusterWindow->getNetwork()->getAspect();
00213         _clusterWindow->getNetwork()->setAspect(NULL);
00214     }
00215     // destroy connection
00216     try
00217     {
00218         if(_connection)
00219             delete _connection;
00220         _connection = NULL;
00221     }
00222     catch(...)
00223     {
00224     }
00225     // destroy aspect
00226     if(_aspect)
00227         delete _aspect;
00228     // reset 
00229     _connection=NULL;
00230     _aspect=NULL;
00231     _clusterWindow=NullFC;
00232 }
00233 
00236 void ClusterServer::render(RenderActionBase *action)
00237 {
00238     doSync(false);
00239     doRender(action);
00240     doSwap();
00241 }
00242 
00252 void ClusterServer::doSync(bool applyToChangelist)
00253 {
00254     // do we have a cluster window?
00255     if(_clusterWindow==NullFC)
00256     {
00257         do
00258         {
00259             // recive 
00260             _aspect->receiveSync(*_connection,applyToChangelist);
00261         }
00262         while(_clusterWindow==NullFC);
00263         // get server id
00264         for(_serverId=0;
00265             _clusterWindow->getServers()[_serverId] != _serviceName &&
00266                 _serverId<_clusterWindow->getServers().size();
00267             _serverId++);
00268         // server connected and cluster window found
00269         SINFO << "Start server " << _serviceName 
00270               << " with id "     << _serverId 
00271               << std::endl;
00272         // now the window is responsible for connection and aspect
00273         _clusterWindow->getNetwork()->setMainConnection(_connection);
00274         _clusterWindow->getNetwork()->setAspect        (_aspect);
00275         _connection=NULL;
00276         _aspect=NULL;
00277         SHLChunk::setClusterId(Int32(_serverId));
00278         _clusterWindow->serverInit(_window,_serverId);
00279     }
00280 
00281     RemoteAspect *aspect=_clusterWindow->getNetwork()->getAspect();
00282     Connection   *connection=_clusterWindow->getNetwork()->getMainConnection();
00283 
00284     // sync with render clinet
00285     aspect->receiveSync(*connection,applyToChangelist);
00286 
00287     // sync with render client
00288     if(_clusterWindow->getInterleave())
00289     {
00290         // if the reminder of the division of interleave and 
00291         // framecount is equal to the servers id, the right
00292         // sync point for the current render frame is reached
00293         while( (_clusterWindow->getFrameCount()%
00294                 _clusterWindow->getInterleave())
00295                !=
00296                (_serverId%_clusterWindow->getInterleave()) ) 
00297         {
00298             aspect->receiveSync(*connection,applyToChangelist);
00299         }
00300     }
00301 }
00302 
00305 void ClusterServer::doRender(RenderActionBase *action)
00306 {
00307     _clusterWindow->serverRender( _window,_serverId,action );
00308 }
00309 
00312 void ClusterServer::doSwap(void)
00313 {
00314     _clusterWindow->serverSwap  ( _window,_serverId );
00315 }
00316 
00319 WindowPtr ClusterServer::getClusterWindow (void)
00320 {
00321     return _clusterWindow;
00322 }
00323 
00326 WindowPtr ClusterServer::getServerWindow  (void)
00327 {
00328     return _window;
00329 }
00330 
00334 bool ClusterServer::windowChanged(FieldContainerPtr& fcp,
00335                                   RemoteAspect *)
00336 {
00337     if(_clusterWindow != NullFC)
00338         return true;
00339 
00340     ClusterWindowPtr window=ClusterWindowPtr::dcast(fcp);
00341 
00342     if(window->getServers().size())
00343     {
00344         if(window->getServers().find(_serviceName) == 
00345            window->getServers().end())
00346         {
00347             SWARNING << "wrong window" << std::endl;
00348         }
00349         else
00350         {
00351             _clusterWindow=window;
00352         }
00353     }
00354     return true;
00355 }
00356 
00361 void ClusterServer::acceptClient()
00362 {
00363     BinaryMessage  msg;
00364     DgramSocket    serviceSock;
00365     SocketAddress  addr;
00366     std::string    service;
00367     std::string    connectionType;
00368     UInt32         readable;
00369     bool           connected=false;
00370     std::string    address;
00371     bool           bound = false;
00372 
00373     SINFO << "Waiting for request of "
00374           << _serviceName
00375           << std::endl;
00376 
00377     try
00378     {
00379         if(!_requestAddress.empty())
00380         {            
00381             // create connection
00382             _connection = ConnectionFactory::the().
00383                 createPoint(_connectionType);
00384             if(_connection)
00385             {
00386                 // set interface
00387                 _connection->setInterface(_interface);
00388                 // bind connection
00389                 try 
00390                 {
00391                     // bind to requested address
00392                     _boundAddress = _connection->bind(_requestAddress);
00393                     bound = true;
00394                 }
00395                 catch(...)
00396                 {
00397                     SINFO << "Unable to bind, use name as symbolic service name" 
00398                           << std::endl;
00399                 }
00400             }
00401         }
00402         serviceSock.open();
00403         serviceSock.setReusePort(true);
00404         // join to multicast group
00405         if(!_serviceGroup.empty())
00406         {
00407             SocketAddress groupAddress = SocketAddress(
00408                 _serviceGroup.c_str(),
00409                 _servicePort);
00410             if(groupAddress.isMulticast())
00411             {
00412                 SINFO << "wait for request on multicast:" << 
00413                     _serviceGroup << std::endl;
00414                 serviceSock.bind(SocketAddress(SocketAddress::ANY,
00415                                                _servicePort));
00416                 serviceSock.join(SocketAddress(groupAddress));
00417             }
00418             else
00419             {
00420                 SINFO << "wait for request by broadcast:" << 
00421                     _serviceGroup << std::endl;
00422                 serviceSock.bind(SocketAddress(groupAddress));
00423             }
00424         }
00425         else
00426         {
00427             SINFO << "wait for request by broadcast" << std::endl;
00428             serviceSock.bind(SocketAddress(SocketAddress::ANY,
00429                                            _servicePort));
00430         }
00431 
00432         while(!connected)
00433         {
00434             try
00435             {
00436                 if(_connection) 
00437                     readable = serviceSock.waitReadable(.01);
00438                 else
00439                     readable = true;
00440                 if(readable)
00441                 {
00442                     serviceSock.recvFrom(msg,addr);
00443 
00444                     service        = msg.getString();
00445                     connectionType = msg.getString();
00446 
00447                     SINFO << "Request for " 
00448                           << service << " " 
00449                           << connectionType 
00450                           << std::endl;
00451                     
00452                     if(service == _serviceName)
00453                     {
00454                         // remove old connection if typename missmaches
00455                         if(_connection && 
00456                            _connection->getType()->getName() != connectionType)
00457                         {
00458                             delete _connection;
00459                             _connection = NULL;
00460                         }
00461                         // try to create connection
00462                         if(!_connection)
00463                         {
00464                             // create connection
00465                             _connection = ConnectionFactory::the().
00466                                 createPoint(connectionType);
00467                             if(_connection)
00468                             {
00469                                 // set interface
00470                                 _connection->setInterface(_interface);
00471                                 // bind connection
00472                                 _boundAddress = _connection->bind(_requestAddress);
00473                                 bound = true;
00474                             } 
00475                             else
00476                             {
00477                                 SINFO << "Unknown connection type '" 
00478                                       << connectionType << "'" << std::endl;
00479                             }
00480                         }
00481                         if(_connection)
00482                         {
00483                             msg.clear    (             );
00484                             msg.putString(_serviceName );
00485                             msg.putString(_boundAddress);
00486                             serviceSock.sendTo(msg, addr);
00487                             SINFO << "Response " 
00488                                   << connectionType << ":"
00489                                   << _boundAddress 
00490                                   << std::endl;
00491                         }
00492                     }
00493                 }
00494             }
00495             catch(SocketConnReset &e)
00496             {
00497                 // ignore if there is a connection. This can happen, if
00498                 // a client has send a request. The server has send an
00499                 // answer meanwile the client has send a second request
00500                 // the client gets the answer to the first request and
00501                 // the server tries to send a second answer. The second
00502                 // answer can not be delivered because the client has
00503                 // closed its service port. This is a win-socket problem.
00504 
00505                 SWARNING << e.what() << std::endl;
00506 
00507                 // if there is no connection, then its a real problem
00508                 if(!_connection)
00509                     throw;
00510             }
00511             catch(OSG_STDEXCEPTION_NAMESPACE::exception &e)
00512             {
00513                 SWARNING << e.what() << std::endl;
00514             }
00515             try 
00516             {
00517                 // try to accept
00518                 if(bound && _connection && _connection->acceptGroup(0.2) >= 0)
00519                 {
00520                     connected = true;
00521                     SINFO << "Connection accepted " << _boundAddress << std::endl;
00522                 }
00523             }
00524             catch(OSG_STDEXCEPTION_NAMESPACE::exception &e)
00525             {
00526                 SWARNING << e.what() << std::endl;
00527             }
00528         }
00529         serviceSock.close();
00530     }
00531     catch(OSG_STDEXCEPTION_NAMESPACE::exception &e)
00532     {
00533         throw;
00534     }
00535 }
00536 
00537 
00538 /*-------------------------------------------------------------------------*/
00539 /*                              cvs id's                                   */
00540 
00541 #ifdef __sgi
00542 #pragma set woff 1174
00543 #endif
00544 
00545 #ifdef OSG_LINUX_ICC
00546 #pragma warning( disable : 177 )
00547 #endif
00548 
00549 namespace
00550 {
00551     static Char8 cvsid_cpp[] = "@(#)$Id:$";
00552     static Char8 cvsid_hpp[] = OSG_CLUSTERSERVERHEADER_CVSID;
00553 }

Generated on Thu Aug 25 04:01:54 2005 for OpenSG by  doxygen 1.4.3