INTRODUCTION
Overview
Download and Install
Quick Start
Documentation
Publications

NONFRAMEWORK CODE
Driver Interfaces
Drivers
Libraries
Utilities

FRAMEWORK CODE
Interfaces
Components
Libraries
Utilities

Full Software Listings

DEVELOPER
Tutorials
Examples
Dev Guide
Dashboard

PEOPLE
Contributors
Users

SourceForge.net Logo
Project
Download
Mailing lists

 

         

topichandler.h

00001 /*
00002  * Orca-Robotics Project: Components for robotics 
00003  *               http://orca-robotics.sf.net/
00004  * Copyright (c) 2004-2009 Alex Brooks, Alexei Makarenko, Tobias Kaupp
00005  *
00006  * This copy of Orca is licensed to you under the terms described in
00007  * the LICENSE file included in this distribution.
00008  *
00009  */
00010 
00011 #ifndef ORCAICE_TOPIC_HANDLER_H
00012 #define ORCAICE_TOPIC_HANDLER_H
00013 
00014 #include <IceStorm/IceStorm.h>
00015 #include <orca/exceptions.h>
00016 #include <orcaice/context.h>
00017 #include <orcaice/icestormutils.h>
00018 #include <orcaice/multiicestormutils.h>
00019 
00020 namespace gbxiceutilacfr { class Thread; }
00021 
00022 namespace orcaice {
00023 
00024 // this is a utility class which wraps up the functionality
00025 // of interacting with IceStorm Topic.
00026 template<class ConsumerProxyType, class DataType>
00027 class TopicHandler
00028 {
00029 
00030 private: 
00031 
00032     ConsumerProxyType publisherPrx_;
00033     IceStorm::TopicPrx topicPrx_;
00034 
00035     std::string topicName_;
00036 
00037     orcaice::Context context_;
00038 
00039     // utility function: subscribes and returns individual publisher
00040     // (the pointer may be empty if the subscriber is already subscribed)
00041     ConsumerProxyType internalSubscribe( const ConsumerProxyType& subscriber, 
00042                                          const IceStorm::QoS& qos )
00043     {    
00044         if ( !topicPrx_ )
00045             throw orca::SubscriptionFailedException( "Not connected to topic yet" );
00046 
00047         context_.tracer().debug( "TopicHandler::internalSubscribe(): subscriber='"+subscriber->ice_toString()+"'", 4 );
00048         // see Ice Manual sec.45.6 "Publishing to a specific subscriber"
00049         ConsumerProxyType individualPublisher;
00050         try {
00051             // this talks to IceStorm
00052             Ice::ObjectPrx pub = topicPrx_->subscribeAndGetPublisher( qos, subscriber->ice_twoway());
00053             individualPublisher = ConsumerProxyType::uncheckedCast(pub);
00054         }
00055         catch ( const IceStorm::AlreadySubscribed& e ) {
00056             std::stringstream ss;
00057             ss <<"Request for subscribe but this proxy has already been subscribed, so I do nothing: "<< e.what();
00058             context_.tracer().debug( ss.str(), 2 );
00059             // will return an empty poiinter!
00060         }
00061         catch ( const Ice::Exception& e ) {
00062             std::stringstream ss;
00063             ss <<"TopicHandler::internalSubscribe: failed to subscribe: "<< e.what();
00064             context_.tracer().warning( ss.str() );
00065             // throws exception back to the subscriber
00066             throw orca::SubscriptionFailedException( ss.str() );
00067         }
00068         // this pointer may be empty
00069         return individualPublisher;
00070     }
00071 
00072 public: 
00073 
00074     TopicHandler( const std::string &topicName, const orcaice::Context &context ) :
00075         topicName_(topicName),
00076         context_(context)
00077     {
00078     }
00079 
00080     IceStorm::TopicPrx topic()
00081     {
00082         return topicPrx_;
00083     }
00084 
00086     ConsumerProxyType publisherPrx() const 
00087     { 
00088         return publisherPrx_; 
00089     }
00090 
00091     // Returns TRUE on success, FALSE otherwise.
00092     // Catches only the exceptions expected in the case when IceStorm is not unavailable.
00093     //
00094     // Set localReportingOnly to true when calling this function from classes which deal with
00095     // error reporting, such as Tracer and Status. Otherwise, potential connection problems will
00096     // lead to infinite multiplication of fault messages.
00097     bool connectToTopic( bool localReportingOnly=false )
00098     {
00099         context_.tracer().debug( std::string("TopicHandler: connecting to topic ")+topicName_, 2 );
00100         // Find IceStorm Topic to which we'll publish
00101         try
00102         {
00103             topicPrx_ = orcaice::connectToTopicWithString<ConsumerProxyType>
00104                 ( context_, publisherPrx_, topicName_, localReportingOnly );
00105         }
00106         // we only catch the exception which would be thrown if IceStorm is not there.
00107         catch ( const orcaice::NetworkException& e )
00108         {
00109             return false;
00110         }
00111 
00112         // fix the order of endpoints
00113 //         topicPrx_ = topicPrx_->ice_endpointSelection( Ice::Ordered );
00114         return true;
00115     }
00116 
00117     bool connectToTopic( gbxutilacfr::Stoppable* activity, const std::string& subsysName, int retryInterval,
00118                          bool localReportingOnly=false )
00119     {
00120         context_.tracer().debug( subsysName, std::string("TopicHandler: connecting to topic ")+topicName_, 2 );
00121         // Find IceStorm Topic to which we'll publish
00122         try
00123         {
00124             topicPrx_ = orcaice::connectToTopicWithString<ConsumerProxyType>
00125                 ( context_, publisherPrx_, topicName_, activity, subsysName, retryInterval, -1, localReportingOnly );
00126         }
00127         // we only catch the exception which would be thrown if IceStorm is not there.
00128         catch ( const orcaice::NetworkException& e )
00129         {
00130             return false;
00131         }
00132 
00133         // fix the order of endpoints
00134 //         topicPrx_ = topicPrx_->ice_endpointSelection( Ice::Ordered );
00135         return true;
00136     }
00137 
00138     // sub subscribers to the topic we're publishing to
00139     IceStorm::TopicPrx subscribe( const ConsumerProxyType& subscriber, 
00140                                   const IceStorm::QoS& qos=IceStorm::QoS() )
00141     {
00142         internalSubscribe( subscriber, qos );
00143         return topicPrx_;
00144     }
00145 
00146     IceStorm::TopicPrx subscribe( const ConsumerProxyType& subscriber, 
00147                                    const DataType& initData, 
00148                                    const IceStorm::QoS& qos=IceStorm::QoS() )
00149     {    
00150         ConsumerProxyType individualPublisher = internalSubscribe( subscriber, qos );
00151     
00152         // the individualPublisher may be NULL.
00153         // this normally happens when the subscriber is already subscribed.
00154         // one example when this happens routinely: when logplayer is paused the subscriber
00155         // freaks out and tries to resubscribe but, as far as logplayer is concerned, it is
00156         // already subscribed.
00157         if ( individualPublisher ) 
00158         {
00159             // send all the information we have to the new subscriber (and to no one else)
00160             try
00161             {
00162                 // this talks to IceStorm who will forward this data to this subscriber only.
00163                 individualPublisher->setData( initData );   
00164             }
00165             catch ( const Ice::Exception&  e ) {
00166                 std::stringstream ss;
00167                 ss <<"TopicHandler::subscribe: failed to send information to the new subscriber: "<< e.what();
00168                 // show this warning locally
00169                 context_.tracer().warning( ss.str() );
00170                 // throws exception back to the subscriber
00171                 throw orca::SubscriptionPushFailedException( ss.str() );
00172             }
00173             context_.tracer().info( std::string("TopicHandler::subscribe(): sent status info to new subscriber: ")+individualPublisher->ice_toString() );
00174         }
00175 
00176         return topicPrx_;
00177     }
00178 
00179     void unsubscribe( const ConsumerProxyType& subscriber )
00180     {
00181         if ( !topicPrx_ )
00182             return;
00183     
00184         context_.tracer().debug( "TopicHandler::internalUnsubscribe(): subscriber='"+subscriber->ice_toString()+"'", 4 );
00185         topicPrx_->unsubscribe( subscriber );
00186     }
00187 
00188     void publish( const DataType& data )
00189     {    
00190         // Try to push to IceStorm.
00191         orcaice::tryPushToIceStormWithReconnect<ConsumerProxyType,DataType> ( 
00192             context_,
00193             publisherPrx_,
00194             data,
00195             topicPrx_,
00196             topicName_ );
00197     };
00198 };
00199 
00200 }
00201 
00202 #endif
 

Webmaster: Tobias Kaupp (tobasco at users.sourceforge.net)


Generated for Orca Robotics by  doxygen 1.4.5