|
orca-robotics INTRODUCTION Overview Download and Install Quick Start Documentation Publications REPOSITORY Interfaces Components Libraries Utilities Software Map DEVELOPER Tutorials Examples Dev Guide Dashboard Wiki login/pass: orca/orca PEOPLE Contributors Users Project Download Mailing lists
|
topichandler.h00001 /* 00002 * Orca-Robotics Project: Components for robotics 00003 * http://orca-robotics.sf.net/ 00004 * Copyright (c) 2004-2008 Alex Brooks, Alexei Makarenko, Tobias Kaupp 00005 * 00006 * This copy of Orca is licensed to you under the terms described in 00007 * the LICENSE file included in this distribution. 00008 * 00009 */ 00010 00011 #ifndef ORCAICE_TOPIC_HANDLER_H 00012 #define ORCAICE_TOPIC_HANDLER_H 00013 00014 #include <IceStorm/IceStorm.h> 00015 #include <orca/exceptions.h> 00016 #include <orcaice/context.h> 00017 #include <orcaice/icestormutils.h> 00018 #include <orcaice/multiicestormutils.h> 00019 00020 namespace gbxiceutilacfr { class Thread; } 00021 00022 namespace orcaice { 00023 00024 // this is a utility class which wraps up the functionality 00025 // of interacting with IceStorm Topic. 00026 template<class ConsumerProxyType, class DataType> 00027 class TopicHandler 00028 { 00029 00030 private: 00031 00032 ConsumerProxyType publisherPrx_; 00033 IceStorm::TopicPrx topicPrx_; 00034 00035 std::string topicName_; 00036 00037 orcaice::Context context_; 00038 00039 // utility function: subscribes and returns individual publisher 00040 // (the pointer may be empty if the subscriber is already subscribed) 00041 ConsumerProxyType internalSubscribe( const ConsumerProxyType& subscriber ) 00042 { 00043 if ( !topicPrx_ ) 00044 throw orca::SubscriptionFailedException( "Not connected to topic yet" ); 00045 00046 context_.tracer().debug( "TopicHandler::internalSubscribe(): subscriber='"+subscriber->ice_toString()+"'", 4 ); 00047 // see Ice Manual sec.45.6 "Publishing to a specific subscriber" 00048 ConsumerProxyType individualPublisher; 00049 try { 00050 // this talks to IceStorm 00051 Ice::ObjectPrx pub = topicPrx_->subscribeAndGetPublisher( IceStorm::QoS(), subscriber->ice_twoway()); 00052 individualPublisher = ConsumerProxyType::uncheckedCast(pub); 00053 } 00054 catch ( const IceStorm::AlreadySubscribed& e ) { 00055 std::stringstream ss; 00056 ss <<"Request for subscribe but this proxy has already been subscribed, so I do nothing: "<< e.what(); 00057 context_.tracer().debug( ss.str(), 2 ); 00058 // will return an empty poiinter! 00059 } 00060 catch ( const Ice::Exception& e ) { 00061 std::stringstream ss; 00062 ss <<"TopicHandler::internalSubscribe: failed to subscribe: "<< e.what(); 00063 context_.tracer().warning( ss.str() ); 00064 // throws exception back to the subscriber 00065 throw orca::SubscriptionFailedException( ss.str() ); 00066 } 00067 // this pointer may be empty 00068 return individualPublisher; 00069 } 00070 00071 public: 00072 00073 TopicHandler( const std::string &topicName, const orcaice::Context &context ) : 00074 topicName_(topicName), 00075 context_(context) 00076 { 00077 } 00078 00079 IceStorm::TopicPrx topic() 00080 { 00081 return topicPrx_; 00082 } 00083 00085 ConsumerProxyType publisherPrx() const 00086 { 00087 return publisherPrx_; 00088 } 00089 00090 // Returns TRUE on success, FALSE otherwise. 00091 // Catches only the exceptions expected in the case when IceStorm is not unavailable. 00092 bool connectToTopic() 00093 { 00094 context_.tracer().debug( std::string("TopicHandler: connecting to topic ")+topicName_, 2 ); 00095 // Find IceStorm Topic to which we'll publish 00096 try 00097 { 00098 topicPrx_ = orcaice::connectToTopicWithString<ConsumerProxyType> 00099 ( context_, publisherPrx_, topicName_ ); 00100 } 00101 // we only catch the exception which would be thrown if IceStorm is not there. 00102 catch ( const orcaice::NetworkException& e ) 00103 { 00104 return false; 00105 } 00106 return true; 00107 } 00108 00109 bool connectToTopic( gbxiceutilacfr::Thread* thread, const std::string& subsysName, int retryInterval ) 00110 { 00111 context_.tracer().debug( std::string("TopicHandler: connecting to topic ")+topicName_, 2 ); 00112 // Find IceStorm Topic to which we'll publish 00113 try 00114 { 00115 topicPrx_ = orcaice::connectToTopicWithString<ConsumerProxyType> 00116 ( context_, publisherPrx_, topicName_, thread, subsysName, retryInterval ); 00117 } 00118 // we only catch the exception which would be thrown if IceStorm is not there. 00119 catch ( const orcaice::NetworkException& e ) 00120 { 00121 return false; 00122 } 00123 return true; 00124 } 00125 00126 // sub subscribers to the topic we're publishing to 00127 IceStorm::TopicPrx subscribe( const ConsumerProxyType& subscriber ) 00128 { 00129 internalSubscribe( subscriber ); 00130 return topicPrx_; 00131 } 00132 00133 IceStorm::TopicPrx subscribe( const ConsumerProxyType& subscriber, const DataType& initData ) 00134 { 00135 ConsumerProxyType individualPublisher = internalSubscribe( subscriber ); 00136 00137 // the individualPublisher may be NULL. 00138 // this normally happens when the subscriber is already subscribed. 00139 // one example when this happens routinely: when logplayer is paused the subscriber 00140 // freaks out and tries to resubscribe but, as far as logplayer is concerned, it is 00141 // already subscribed. 00142 if ( individualPublisher ) 00143 { 00144 // send all the information we have to the new subscriber (and to no one else) 00145 try 00146 { 00147 // this talks to IceStorm who will forward this data to this subscriber only. 00148 individualPublisher->setData( initData ); 00149 } 00150 catch ( const Ice::Exception& e ) { 00151 std::stringstream ss; 00152 ss <<"TopicHandler::subscribe: failed to send information to the new subscriber: "<< e.what(); 00153 // show this warning locally 00154 context_.tracer().warning( ss.str() ); 00155 // throws exception back to the subscriber 00156 throw orca::OrcaException( ss.str() ); 00157 } 00158 context_.tracer().info( std::string("TopicHandler::subscribe(): sent status info to new subscriber: ")+individualPublisher->ice_toString() ); 00159 } 00160 00161 return topicPrx_; 00162 } 00163 00164 void unsubscribe( const ConsumerProxyType& subscriber ) 00165 { 00166 if ( !topicPrx_ ) 00167 return; 00168 00169 context_.tracer().debug( "TopicHandler::internalUnsubscribe(): subscriber='"+subscriber->ice_toString()+"'", 4 ); 00170 topicPrx_->unsubscribe( subscriber ); 00171 } 00172 00173 void publish( const DataType& data ) 00174 { 00175 // Try to push to IceStorm. 00176 orcaice::tryPushToIceStormWithReconnect<ConsumerProxyType,DataType> ( 00177 context_, 00178 publisherPrx_, 00179 data, 00180 topicPrx_, 00181 topicName_ ); 00182 }; 00183 }; 00184 00185 } 00186 00187 #endif |
Webmaster: Tobias Kaupp (tobasco at users.sourceforge.net)
1.4.5