00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #include <stdlib.h>
00044 #include <stdio.h>
00045
00046 #include <sstream>
00047
00048 #include <OSGConfig.h>
00049 #include <OSGSystemDef.h>
00050 #include "OSGClusterWindow.h"
00051 #include "OSGDgramSocket.h"
00052 #include "OSGStreamSocket.h"
00053 #include "OSGRemoteAspect.h"
00054 #include "OSGConnection.h"
00055 #include "OSGBinaryMessage.h"
00056 #include "OSGRemoteAspect.h"
00057 #include "OSGConnectionFactory.h"
00058 #include "OSGClusterNetwork.h"
00059 #include "OSGXmlpp.h"
00060 #include "OSGDisplayCalibration.h"
00061
00062 OSG_USING_NAMESPACE
00063
00076
00077
00078
00080
00081 void ClusterWindow::changed(BitVector whichField, UInt32 origin)
00082 {
00083 Inherited::changed(whichField, origin);
00084 }
00085
00087
00088 void ClusterWindow::dump( UInt32 ,
00089 const BitVector ) const
00090 {
00091 SLOG << "Dump ClusterWindow NI" << std::endl;
00092 }
00093
00094 void (*ClusterWindow::getFunctionByName ( const Char8 * ))()
00095 {
00096 return NULL;
00097 }
00098
00102 void ClusterWindow::init( void )
00103 {
00104 GroupConnection *connection;
00105 RemoteAspect *remoteAspect;
00106 int c,i,id;
00107 MFString::iterator s;
00108 Connection::Channel channel;
00109 bool directConnect=false;
00110
00111 if(getNetwork()->getMainConnection())
00112 {
00113 SWARNING << "init called twice" << std::endl;
00114 return;
00115 }
00116
00117 if(getConnectionType().empty())
00118 {
00119 setConnectionType("StreamSock");
00120 }
00121
00122 connection = ConnectionFactory::the().createGroup(getConnectionType());
00123 if(connection == NULL)
00124 {
00125 SFATAL << "Unknown connection type "
00126 << getConnectionType()
00127 << std::endl;
00128 return;
00129 }
00130
00131 connection->setDestination(getConnectionDestination());
00132 connection->setInterface(getConnectionInterface());
00133 connection->setParams(getConnectionParams());
00134
00135 getNetwork()->setMainConnection(connection);
00136
00137 remoteAspect = new RemoteAspect();
00138 getNetwork()->setAspect(remoteAspect);
00139 if(_statistics)
00140 remoteAspect->setStatistics(_statistics);
00141
00142
00143 std::string server;
00144 std::string autostart;
00145 std::string env;
00146
00147 Real32 progress = 0.0f;
00148 Real32 progressStep = 1.0f / Real32(getServers().size());
00149
00150 if(getAutostart().size())
00151 {
00152 progressStep /= 2;
00153 std::vector<FILE*> pipes;
00154
00155 for(id=0 ; id<getServers().size() ; ++id)
00156 {
00157 std::ostringstream command;
00158
00159 server = getServers()[id];
00160 int pos=server.find(":");
00161 if(pos>=0)
00162 server.erase(pos);
00163
00164 autostart = getAutostart()[id % getAutostart().size()];
00165
00166 for(c = 0 ; c < autostart.length() ; ++c)
00167 {
00168 if(autostart[c] == '%' && c+1 < autostart.length())
00169 switch(autostart[++c])
00170 {
00171 case 's':
00172 command << server;
00173 break;
00174 case 'n':
00175 command << getServers()[id];
00176 break;
00177 case 'i':
00178 command << id;
00179 break;
00180 case '{':
00181 env = "" ;
00182 while(++c < autostart.length() &&
00183 autostart[c] != '}')
00184 env += autostart[c];
00185 if(getenv(env.c_str()))
00186 command << getenv(env.c_str());
00187 break;
00188 case '%':
00189 command << '%';
00190 break;
00191 default:
00192 command << '%' << autostart[c];
00193 }
00194 else
00195 command << autostart[c];
00196 }
00197 SINFO << command.str() << std::endl;
00198 #ifdef WIN32
00199 FILE *pipe = _popen(command.str().c_str(),"r");
00200 #else
00201 FILE *pipe = popen(command.str().c_str(),"r");
00202 #endif
00203 if(!pipe)
00204 SFATAL << "Error starting: " << command << std::endl;
00205 pipes.push_back(pipe);
00206 }
00207 for(id=0 ; id<getServers().size() ; ++id)
00208 {
00209 if(pipes[id])
00210 {
00211
00212 if(_connectionFP != NULL)
00213 {
00214 std::string message;
00215 message += "Starting:" + getServers()[id];
00216 if(!_connectionFP(message, progress))
00217 {
00218
00219 for( ; id<getServers().size() ; ++id)
00220 {
00221 if(pipes[id])
00222 {
00223 #ifdef WIN32
00224 _pclose(pipes[id]);
00225 #else
00226 pclose(pipes[id]);
00227 #endif
00228 }
00229 throw AsyncCancel();
00230 }
00231 }
00232 }
00233 SINFO << "Waiting for " << getServers()[id] << " to start." << std::endl;
00234 char result;
00235 std::string line="";
00236 while((result=fgetc(pipes[id])) != EOF)
00237 {
00238 line += result;
00239 if(result == '\n')
00240 {
00241 SINFO << line;
00242 line = "";
00243 }
00244 }
00245 if(!line.empty())
00246 SINFO << line << std::endl;
00247 #ifdef WIN32
00248 _pclose(pipes[id]);
00249 #else
00250 pclose(pipes[id]);
00251 #endif
00252 SINFO << getServers()[id] << " started." << std::endl;
00253 progress += progressStep;
00254 }
00255 }
00256 }
00257
00258
00259 for(s =getServers().begin();
00260 s!=getServers().end();
00261 s++)
00262 {
00263 DgramSocket serviceSock;
00264 BinaryMessage msg;
00265 std::string respServer;
00266 std::string respAddress;
00267 bool retry=true;
00268
00269 if(strstr((*s).c_str(),":"))
00270 directConnect = true;
00271 else
00272 directConnect = false;
00273
00274 SINFO << "Connect to " << (*s) << std::endl;
00275 serviceSock.open();
00276 serviceSock.setTTL(8);
00277 while(retry)
00278 {
00279 try
00280 {
00281
00282 if(_connectionFP != NULL)
00283 {
00284 std::string message;
00285 message += "Connecting:" + *s;
00286 if(!_connectionFP(message, progress))
00287 {
00288 serviceSock.close();
00289 throw AsyncCancel();
00290 }
00291 }
00292
00293
00294 try
00295 {
00296 if(directConnect)
00297 {
00298 channel = connection->connectPoint(*s,0.5);
00299 if(channel >= 0) {
00300 retry=false;
00301 SINFO << "Connected with address:" << *s << std::endl;
00302 break;
00303 }
00304 }
00305 }
00306 catch(...)
00307 {
00308 }
00309
00310 msg.clear();
00311 msg.putString(*s);
00312 msg.putString(getConnectionType());
00313
00314 if(_sfServiceAddress.getValue().size() != 0)
00315 {
00316 SINFO << "send request to:" <<
00317 _sfServiceAddress.getValue()
00318 << std::endl;
00319 serviceSock.sendTo(
00320 msg,SocketAddress(
00321 _sfServiceAddress.getValue().c_str(),
00322 getServicePort()));
00323 }
00324 SINFO << "send request to:"
00325 << SocketAddress(SocketAddress::BROADCAST,
00326 getServicePort()).getHost().c_str()
00327 << std::endl;
00328 serviceSock.sendTo(
00329 msg,SocketAddress(SocketAddress::BROADCAST,
00330 getServicePort()));
00331 if(serviceSock.waitReadable(0.1))
00332 {
00333 serviceSock.recv(msg);
00334 msg.getString(respServer);
00335 msg.getString(respAddress);
00336 if(respServer == *s)
00337 {
00338 SINFO << "Found at address " << respAddress << std::endl;
00339
00340 channel = connection->connectPoint(respAddress);
00341 if(channel >= 0)
00342 retry=false;
00343 }
00344 }
00345 }
00346 catch(AsyncCancel &)
00347 {
00348 throw;
00349 }
00350 catch(OSG_STDEXCEPTION_NAMESPACE::exception &e)
00351 {
00352 SINFO << e.what() << std::endl;
00353 }
00354 }
00355 serviceSock.close();
00356 progress += progressStep;
00357 }
00358
00359 UInt8 serverLittleEndian;
00360 UInt8 forceNetworkOrder=false;
00361 #if BYTE_ORDER == LITTLE_ENDIAN
00362 UInt8 littleEndian = true;
00363 #else
00364 UInt8 littleEndian = false;
00365 #endif
00366 for(UInt32 i=0;i<getServers().size();++i)
00367 {
00368 channel = connection->selectChannel();
00369 connection->subSelection(channel);
00370 connection->getValue(serverLittleEndian);
00371 if(serverLittleEndian != littleEndian)
00372 {
00373 forceNetworkOrder=true;
00374 }
00375 }
00376 connection->resetSelection();
00377
00378 connection->putValue(forceNetworkOrder);
00379 connection->flush();
00380 connection->setNetworkOrder((forceNetworkOrder != 0));
00381 if(forceNetworkOrder)
00382 {
00383 SINFO << "Run clustering in network order mode" << std::endl;
00384 }
00385
00386 if(_connectionFP != NULL)
00387 _connectionFP("ok", 1.0);
00388 }
00389
00390 bool ClusterWindow::initAsync(connectioncbfp fp)
00391 {
00392 bool result;
00393 connectioncbfp saveFP = _connectionFP;
00394
00395 _connectionFP = fp;
00396 try
00397 {
00398 init();
00399 result = true;
00400 }
00401 catch(AsyncCancel &e)
00402 {
00403 result = false;
00404 }
00405 _connectionFP = saveFP;
00406
00407 return result;
00408 }
00409
00410 void ClusterWindow::setConnectionCB(connectioncbfp fp)
00411 {
00412 _connectionFP = fp;
00413 }
00414
00415 void ClusterWindow::render(RenderActionBase *action)
00416 {
00417 activate();
00418 frameInit();
00419 renderAllViewports(action);
00420 swap();
00421 frameExit();
00422 }
00423
00424 void ClusterWindow::activate( void )
00425 {
00426 }
00427
00428 void ClusterWindow::deactivate( void )
00429 {
00430 }
00431
00432 void ClusterWindow::swap( void )
00433 {
00434 if(getNetwork()->getMainConnection() && getNetwork()->getAspect())
00435 {
00436 clientSwap();
00437 }
00438 }
00439
00440 void ClusterWindow::renderAllViewports(RenderActionBase *action)
00441 {
00442 if(getNetwork()->getMainConnection() && getNetwork()->getAspect())
00443 {
00444 clientRender(action);
00445 }
00446 }
00447
00448 void ClusterWindow::frameInit(void)
00449 {
00450 ClusterWindowPtr ptr(this);
00451 Connection *connection =getNetwork()->getMainConnection();
00452 RemoteAspect *remoteAspect=getNetwork()->getAspect();
00453
00454 if(remoteAspect && connection)
00455 {
00456 if(_firstFrame)
00457 {
00458 beginEditCP(ptr,ClusterWindow::FrameCountFieldMask);
00459 setFrameCount(0);
00460 endEditCP(ptr,ClusterWindow::FrameCountFieldMask);
00461
00462 remoteAspect->sendSync(*connection);
00463 ChangeList cl;
00464 cl.clearAll();
00465 cl.merge(*Thread::getCurrentChangeList());
00466 Thread::getCurrentChangeList()->clearAll();
00467
00468 clientInit();
00469
00470 clientPreSync();
00471
00472 remoteAspect->sendSync(*connection);
00473 cl.merge(*Thread::getCurrentChangeList());
00474 Thread::getCurrentChangeList()->clearAll();
00475 Thread::getCurrentChangeList()->merge(cl);
00476 _firstFrame=false;
00477 }
00478 else
00479 {
00480 beginEditCP(ptr,ClusterWindow::FrameCountFieldMask);
00481 getFrameCount()++;
00482 endEditCP(ptr,ClusterWindow::FrameCountFieldMask);
00483 clientPreSync();
00484 remoteAspect->sendSync(*connection);
00485 }
00486 }
00487 }
00488
00489 void ClusterWindow::frameExit(void)
00490 {
00491 }
00492
00493
00494
00495
00496 void ClusterWindow::setStatistics(StatCollector *statistics)
00497 {
00498 _statistics = statistics;
00499 if(getNetwork()->getAspect())
00500 getNetwork()->getAspect()->setStatistics(statistics);
00501 }
00502
00503
00504
00505
00508 bool ClusterWindow::loadCalibration(std::istream &in)
00509 {
00510 ClusterWindowPtr ptr(this);
00511 DisplayCalibrationPtr calibPtr;
00512 xmlpp::xmlcontextptr ctxptr(new xmlpp::xmlcontext());
00513 xmlpp::xmldocument doc(ctxptr);
00514 xmlpp::xmlnodelist servers;
00515 xmlpp::xmlnodelist colors;
00516 xmlpp::xmlnodelist points;
00517 xmlpp::xmlnodelist rows;
00518 xmlpp::xmlnodelist::const_iterator sI;
00519 xmlpp::xmlnodelist::const_iterator nI;
00520 xmlpp::xmlnodelist::const_iterator cI;
00521 xmlpp::xmlnodelist::const_iterator rI;
00522 xmlpp::xmlnodelist::const_iterator pI;
00523 xmlpp::xmlstring serverTag("server");
00524 xmlpp::xmlstring colorTag("color");
00525 xmlpp::xmlstring rowTag("row");
00526 xmlpp::xmlstring pointTag("point");
00527 Matrix colorMatrix;
00528 Real32 gamma;
00529 xmlpp::xmlnodeptr nP;
00530
00531 getCalibration().clear();
00532 try
00533 {
00534 doc.load(in,ctxptr);
00535 servers = doc.select_nodes(serverTag);
00536
00537 for(sI = servers.begin() ; sI != servers.end(); ++sI)
00538 {
00539
00540 calibPtr = DisplayCalibration::create();
00541 beginEditCP(calibPtr);
00542 addRefCP(calibPtr);
00543 beginEditCP(ptr,CalibrationFieldMask);
00544 getCalibration().push_back(calibPtr);
00545 endEditCP(ptr,CalibrationFieldMask);
00546
00547
00548 if((*sI)->get_attrmap().count("name"))
00549 calibPtr->setServer((*sI)->get_attrmap()["name"]);
00550
00551
00552 for(nI = (*sI)->get_nodelist().begin();
00553 nI != (*sI)->get_nodelist().end();
00554 ++nI)
00555 {
00556 if((*nI)->get_name() == "colormatrix")
00557 {
00558 nP = (*nI);
00559 do
00560 nP = nP->get_nodelist().front();
00561 while (nP->get_nodelist().size() == 1);
00562 if(nP->get_type() == xmlpp::xml_nt_cdata)
00563 calibPtr->getColorMatrix().setValue(nP->get_cdata().c_str());
00564 }
00565
00566 if((*nI)->get_name() == "scaledown")
00567 {
00568 nP = (*nI);
00569 do
00570 nP = nP->get_nodelist().front();
00571 while (nP->get_nodelist().size() == 1);
00572 if(nP->get_type() == xmlpp::xml_nt_cdata)
00573 sscanf(nP->get_cdata().c_str(),"%f",&calibPtr->getScaleDown());
00574 }
00575
00576 if((*nI)->get_name() == "gamma")
00577 {
00578 nP = (*nI);
00579 do
00580 nP = nP->get_nodelist().front();
00581 while (nP->get_nodelist().size() == 1);
00582 if(nP->get_type() == xmlpp::xml_nt_cdata)
00583 sscanf(nP->get_cdata().c_str(),"%f",&calibPtr->getGamma());
00584 }
00585
00586 if((*nI)->get_name() == "gammaramp")
00587 {
00588 colors = (*nI)->select_nodes(colorTag);
00589 for(cI = colors.begin() ; cI != colors.end(); ++cI)
00590 {
00591 nP = (*cI);
00592 do
00593 nP = nP->get_nodelist().front();
00594 while (nP->get_nodelist().size() == 1);
00595 if(nP->get_type() == xmlpp::xml_nt_cdata)
00596 {
00597 Color3f col;
00598 col.setValue(nP->get_cdata().c_str());
00599 calibPtr->getGammaRamp().push_back(col);
00600 }
00601 }
00602 }
00603 if((*nI)->get_name() == "grid")
00604 {
00605 rows = (*nI)->select_nodes(rowTag);
00606 calibPtr->getGridHeight() = 0;
00607 for(rI = rows.begin() ; rI != rows.end(); ++rI)
00608 {
00609 calibPtr->getGridHeight()++;
00610 calibPtr->getGridWidth() = 0;
00611 points = (*rI)->select_nodes(pointTag);
00612 for(pI = points.begin() ; pI != points.end(); ++pI)
00613 {
00614 nP = (*pI);
00615 do
00616 nP = nP->get_nodelist().front();
00617 while (nP->get_nodelist().size() == 1);
00618 if(nP->get_type() == xmlpp::xml_nt_cdata)
00619 {
00620 Vec2f pos;
00621 calibPtr->getGridWidth()++;
00622 pos.setValueFromCString(nP->get_cdata().c_str());
00623 calibPtr->getGrid().push_back(pos);
00624 }
00625 }
00626 }
00627 }
00628 }
00629 endEditCP(calibPtr);
00630 }
00631 }
00632 catch (xmlpp::xmlerror e)
00633 {
00634
00635 xmlpp::xmllocation where( ctxptr->get_location() );
00636 xmlpp::xmlstring errmsg( e.get_strerror() );
00637 SFATAL << "XML error line " << where.get_line() << " "
00638 << "at position " << where.get_pos()
00639 << ": error: " << errmsg.c_str()
00640 << std::endl;
00641 return false;
00642 }
00643 return true;
00644 }
00645
00648 bool ClusterWindow::saveCalibration(std::ostream &out)
00649 {
00650 DisplayCalibrationPtr calibPtr;
00651 UInt32 c;
00652 UInt32 color,row,col,pos;
00653
00654 out << "<?xml version=\"1.0\"?>\n"
00655 << "<displaycalibration>\n";
00656 for(c=0 ; c<getCalibration().size() ; ++c)
00657 {
00658 calibPtr = getCalibration()[0];
00659 out << "<server name=\"" << calibPtr->getServer() << "\">\n";
00660 out << "<gamma>" << calibPtr->getGamma() << "</gamma>\n";
00661 out << "<scaledown>" << calibPtr->getScaleDown() << "</scaledown>\n";
00662 out << "<colormatrix>\n"
00663 << calibPtr->getColorMatrix()
00664 << "</colormatrix>\n";
00665 out << "<gammaramp>\n";
00666 for(color=0 ; color< calibPtr->getGammaRamp().size() ; ++color)
00667 out << "<color>"
00668 << calibPtr->getGammaRamp()[color]
00669 << "</color>\n";
00670 out << "</gammaramp>\n";
00671 out << "<grid>\n";
00672 for(row=0 ; row< calibPtr->getGridHeight() ; ++row)
00673 {
00674 out << "<row>\n";
00675 for(col=0 ; col< calibPtr->getGridWidth() ; ++col)
00676 {
00677 pos = row*calibPtr->getGridHeight()+col;
00678 out << "<point>";
00679 if(pos < calibPtr->getGrid().size())
00680 out << calibPtr->getGrid()[pos][0] << " "
00681 << calibPtr->getGrid()[pos][1];
00682 else
00683 out << col << " " << row;
00684 out << "</point>\n";
00685 }
00686 out << "</row>\n";
00687 }
00688 out << "</grid>\n";
00689 out << "</server>\n";
00690 }
00691 out << "</displaycalibration>\n";
00692 return true;
00693 }
00694
00695
00696
00697
00698 ClusterWindow::AsyncCancel::AsyncCancel()
00699 {
00700 }
00701
00702
00703
00704
00709 void ClusterWindow::clientInit( void )
00710 {
00711 }
00712
00720 void ClusterWindow::clientPreSync( void )
00721 {
00722 if(getClientWindow() != NullFC)
00723 {
00724 getClientWindow()->activate();
00725 getClientWindow()->frameInit();
00726 }
00727 }
00728
00736 void ClusterWindow::clientRender(RenderActionBase *action)
00737 {
00738 if(getClientWindow() != NullFC)
00739 {
00740 getClientWindow()->renderAllViewports( action );
00741 }
00742 }
00743
00750 void ClusterWindow::clientSwap( void )
00751 {
00752 if(getClientWindow() != NullFC)
00753 {
00754 getClientWindow()->swap( );
00755 getClientWindow()->frameExit();
00756 }
00757 }
00758
00759
00760
00761
00770 void ClusterWindow::serverInit( WindowPtr ,
00771 UInt32 )
00772 {
00773 }
00774
00786 void ClusterWindow::serverRender( WindowPtr window,
00787 UInt32 id,
00788 RenderActionBase *action )
00789 {
00790 window->activate();
00791 window->frameInit();
00792 window->renderAllViewports( action );
00793
00794 UInt32 c;
00795 DisplayCalibrationPtr calibPtr=NullFC;
00796 for(c=0 ; c<getCalibration().size() ; ++c)
00797 {
00798 if(getCalibration()[c]->getServer() == getServers()[id])
00799 {
00800 calibPtr = getCalibration()[c];
00801 break;
00802 }
00803 }
00804 if(calibPtr != NullFC)
00805 calibPtr->calibrate(window,action);
00806 }
00807
00817 void ClusterWindow::serverSwap( WindowPtr window,
00818 UInt32)
00819 {
00820 window->swap();
00821 window->frameExit();
00822 }
00823
00824
00825
00826
00828
00829 ClusterWindow::ClusterWindow(void) :
00830 Inherited(),
00831 _firstFrame(true),
00832 _statistics(NULL),
00833 _connectionFP(NULL),
00834 _network(NULL)
00835 {
00836 }
00837
00839
00840 ClusterWindow::ClusterWindow(const ClusterWindow &source) :
00841 Inherited(source),
00842 _firstFrame(true),
00843 _statistics(NULL),
00844 _connectionFP(source._connectionFP),
00845 _network(NULL)
00846 {
00847 }
00848
00850
00851 ClusterWindow::~ClusterWindow(void)
00852 {
00853 if(_network)
00854 subRefP(_network);
00855 _network=NULL;
00856 }
00857
00858
00859
00860
00863 ClusterNetwork *ClusterWindow::getNetwork(void)
00864 {
00865 if(!_network)
00866 {
00867 ClusterWindowPtr ptr(this);
00868 _network=ClusterNetwork::getInstance(ptr.getFieldContainerId());
00869 addRefP(_network);
00870 }
00871 return _network;
00872 }
00873
00876 void ClusterWindow::initMethod (void)
00877 {
00878 }
00879
00880
00881
00882
00883 #ifdef __sgi
00884 #pragma set woff 1174
00885 #endif
00886
00887 #ifdef OSG_LINUX_ICC
00888 #pragma warning( disable : 177 )
00889 #endif
00890
00891 namespace
00892 {
00893 static char cvsid_cpp[] = "@(#)$Id: $";
00894 static char cvsid_hpp[] = OSGCLUSTERWINDOW_HEADER_CVSID;
00895 static char cvsid_inl[] = OSGCLUSTERWINDOW_INLINE_CVSID;
00896 }