INTRODUCTION Overview Download and Install Quick Start Documentation Publications NONFRAMEWORK CODE Driver Interfaces Drivers Libraries Utilities FRAMEWORK CODE Interfaces Components Libraries Utilities Full Software Listings DEVELOPER Tutorials Examples Dev Guide Dashboard PEOPLE Contributors Users Project Download Mailing lists
|
topichandler.h00001 /* 00002 * Orca-Robotics Project: Components for robotics 00003 * http://orca-robotics.sf.net/ 00004 * Copyright (c) 2004-2009 Alex Brooks, Alexei Makarenko, Tobias Kaupp 00005 * 00006 * This copy of Orca is licensed to you under the terms described in 00007 * the LICENSE file included in this distribution. 00008 * 00009 */ 00010 00011 #ifndef ORCAICE_TOPIC_HANDLER_H 00012 #define ORCAICE_TOPIC_HANDLER_H 00013 00014 #include <IceStorm/IceStorm.h> 00015 #include <orca/exceptions.h> 00016 #include <orcaice/context.h> 00017 #include <orcaice/icestormutils.h> 00018 #include <orcaice/multiicestormutils.h> 00019 00020 namespace gbxiceutilacfr { class Thread; } 00021 00022 namespace orcaice { 00023 00024 // this is a utility class which wraps up the functionality 00025 // of interacting with IceStorm Topic. 00026 template<class ConsumerProxyType, class DataType> 00027 class TopicHandler 00028 { 00029 00030 private: 00031 00032 ConsumerProxyType publisherPrx_; 00033 IceStorm::TopicPrx topicPrx_; 00034 00035 std::string topicName_; 00036 00037 orcaice::Context context_; 00038 00039 // utility function: subscribes and returns individual publisher 00040 // (the pointer may be empty if the subscriber is already subscribed) 00041 ConsumerProxyType internalSubscribe( const ConsumerProxyType& subscriber, 00042 const IceStorm::QoS& qos ) 00043 { 00044 if ( !topicPrx_ ) 00045 throw orca::SubscriptionFailedException( "Not connected to topic yet" ); 00046 00047 context_.tracer().debug( "TopicHandler::internalSubscribe(): subscriber='"+subscriber->ice_toString()+"'", 4 ); 00048 // see Ice Manual sec.45.6 "Publishing to a specific subscriber" 00049 ConsumerProxyType individualPublisher; 00050 try { 00051 // this talks to IceStorm 00052 Ice::ObjectPrx pub = topicPrx_->subscribeAndGetPublisher( qos, subscriber->ice_twoway()); 00053 individualPublisher = ConsumerProxyType::uncheckedCast(pub); 00054 } 00055 catch ( const IceStorm::AlreadySubscribed& e ) { 00056 std::stringstream ss; 00057 ss <<"Request for subscribe but this proxy has already been subscribed, so I do nothing: "<< e.what(); 00058 context_.tracer().debug( ss.str(), 2 ); 00059 // will return an empty poiinter! 00060 } 00061 catch ( const Ice::Exception& e ) { 00062 std::stringstream ss; 00063 ss <<"TopicHandler::internalSubscribe: failed to subscribe: "<< e.what(); 00064 context_.tracer().warning( ss.str() ); 00065 // throws exception back to the subscriber 00066 throw orca::SubscriptionFailedException( ss.str() ); 00067 } 00068 // this pointer may be empty 00069 return individualPublisher; 00070 } 00071 00072 public: 00073 00074 TopicHandler( const std::string &topicName, const orcaice::Context &context ) : 00075 topicName_(topicName), 00076 context_(context) 00077 { 00078 } 00079 00080 IceStorm::TopicPrx topic() 00081 { 00082 return topicPrx_; 00083 } 00084 00086 ConsumerProxyType publisherPrx() const 00087 { 00088 return publisherPrx_; 00089 } 00090 00091 // Returns TRUE on success, FALSE otherwise. 00092 // Catches only the exceptions expected in the case when IceStorm is not unavailable. 00093 // 00094 // Set localReportingOnly to true when calling this function from classes which deal with 00095 // error reporting, such as Tracer and Status. Otherwise, potential connection problems will 00096 // lead to infinite multiplication of fault messages. 00097 bool connectToTopic( bool localReportingOnly=false ) 00098 { 00099 context_.tracer().debug( std::string("TopicHandler: connecting to topic ")+topicName_, 2 ); 00100 // Find IceStorm Topic to which we'll publish 00101 try 00102 { 00103 topicPrx_ = orcaice::connectToTopicWithString<ConsumerProxyType> 00104 ( context_, publisherPrx_, topicName_, localReportingOnly ); 00105 } 00106 // we only catch the exception which would be thrown if IceStorm is not there. 00107 catch ( const orcaice::NetworkException& e ) 00108 { 00109 return false; 00110 } 00111 00112 // fix the order of endpoints 00113 // topicPrx_ = topicPrx_->ice_endpointSelection( Ice::Ordered ); 00114 return true; 00115 } 00116 00117 bool connectToTopic( gbxutilacfr::Stoppable* activity, const std::string& subsysName, int retryInterval, 00118 bool localReportingOnly=false ) 00119 { 00120 context_.tracer().debug( subsysName, std::string("TopicHandler: connecting to topic ")+topicName_, 2 ); 00121 // Find IceStorm Topic to which we'll publish 00122 try 00123 { 00124 topicPrx_ = orcaice::connectToTopicWithString<ConsumerProxyType> 00125 ( context_, publisherPrx_, topicName_, activity, subsysName, retryInterval, -1, localReportingOnly ); 00126 } 00127 // we only catch the exception which would be thrown if IceStorm is not there. 00128 catch ( const orcaice::NetworkException& e ) 00129 { 00130 return false; 00131 } 00132 00133 // fix the order of endpoints 00134 // topicPrx_ = topicPrx_->ice_endpointSelection( Ice::Ordered ); 00135 return true; 00136 } 00137 00138 // sub subscribers to the topic we're publishing to 00139 IceStorm::TopicPrx subscribe( const ConsumerProxyType& subscriber, 00140 const IceStorm::QoS& qos=IceStorm::QoS() ) 00141 { 00142 internalSubscribe( subscriber, qos ); 00143 return topicPrx_; 00144 } 00145 00146 IceStorm::TopicPrx subscribe( const ConsumerProxyType& subscriber, 00147 const DataType& initData, 00148 const IceStorm::QoS& qos=IceStorm::QoS() ) 00149 { 00150 ConsumerProxyType individualPublisher = internalSubscribe( subscriber, qos ); 00151 00152 // the individualPublisher may be NULL. 00153 // this normally happens when the subscriber is already subscribed. 00154 // one example when this happens routinely: when logplayer is paused the subscriber 00155 // freaks out and tries to resubscribe but, as far as logplayer is concerned, it is 00156 // already subscribed. 00157 if ( individualPublisher ) 00158 { 00159 // send all the information we have to the new subscriber (and to no one else) 00160 try 00161 { 00162 // this talks to IceStorm who will forward this data to this subscriber only. 00163 individualPublisher->setData( initData ); 00164 } 00165 catch ( const Ice::Exception& e ) { 00166 std::stringstream ss; 00167 ss <<"TopicHandler::subscribe: failed to send information to the new subscriber: "<< e.what(); 00168 // show this warning locally 00169 context_.tracer().warning( ss.str() ); 00170 // throws exception back to the subscriber 00171 throw orca::SubscriptionPushFailedException( ss.str() ); 00172 } 00173 context_.tracer().info( std::string("TopicHandler::subscribe(): sent status info to new subscriber: ")+individualPublisher->ice_toString() ); 00174 } 00175 00176 return topicPrx_; 00177 } 00178 00179 void unsubscribe( const ConsumerProxyType& subscriber ) 00180 { 00181 if ( !topicPrx_ ) 00182 return; 00183 00184 context_.tracer().debug( "TopicHandler::internalUnsubscribe(): subscriber='"+subscriber->ice_toString()+"'", 4 ); 00185 topicPrx_->unsubscribe( subscriber ); 00186 } 00187 00188 void publish( const DataType& data ) 00189 { 00190 // Try to push to IceStorm. 00191 orcaice::tryPushToIceStormWithReconnect<ConsumerProxyType,DataType> ( 00192 context_, 00193 publisherPrx_, 00194 data, 00195 topicPrx_, 00196 topicName_ ); 00197 }; 00198 }; 00199 00200 } 00201 00202 #endif |
Webmaster: Tobias Kaupp (tobasco at users.sourceforge.net)