INTRODUCTION
Overview
Download and Install
Quick Start
Documentation
Publications

NONFRAMEWORK CODE
Driver Interfaces
Drivers
Libraries
Utilities

FRAMEWORK CODE
Interfaces
Components
Libraries
Utilities

Full Software Listings

DEVELOPER
Tutorials
Examples
Dev Guide
Dashboard

PEOPLE
Contributors
Users

SourceForge.net Logo
Project
Download
Mailing lists

 

         

consumerImpl.h

00001 /*
00002  * Orca-Robotics Project: Components for robotics 
00003  *               http://orca-robotics.sf.net/
00004  * Copyright (c) 2004-2009 Alex Brooks, Alexei Makarenko, Tobias Kaupp
00005  *
00006  * This copy of Orca is licensed to you under the terms described in
00007  * the LICENSE file included in this distribution.
00008  *
00009  */
00010  
00011 #ifndef ORCAIFACEIMPL_CONSUMER_IMPL_H
00012 #define ORCAIFACEIMPL_CONSUMER_IMPL_H
00013 
00014 #include <orcaice/context.h>
00015 #include <orcaice/multiconnectutils.h>
00016 #include <orcaice/iceutils.h>
00017 #include <orcaice/icestormutils.h>
00018 #include <orcaice/icegridutils.h>
00019 #include <gbxsickacfr/gbxiceutilacfr/store.h>
00020 // we only need the definition of Stoppable and checkedSleep() function.
00021 // (we don't need the actual Thread class).
00022 #include <gbxsickacfr/gbxiceutilacfr/threadutils.h>
00023 #include <gbxsickacfr/gbxiceutilacfr/timer.h>
00024 
00025 namespace orcaifaceimpl
00026 {
00027 
00031 class ConsumerSubscriber
00032 {
00033 public:
00034     ConsumerSubscriber( const orcaice::Context& context );
00035 
00036     virtual ~ConsumerSubscriber() {};
00037 
00041     virtual void subscribeWithString( const std::string& proxyString )=0;
00042 
00044     void subscribeWithTag( const std::string& interfaceTag );
00045 
00048     virtual void unsubscribe()=0;
00049 
00057     virtual bool subscribeWithString( const std::string& proxyString,  
00058                           gbxutilacfr::Stoppable* activity, const std::string& subsysName="", 
00059                           int retryIntervalSec=2, int retryNumber=-1 )=0;
00060 
00065     bool subscribeWithTag( const std::string& interfaceTag, 
00066                           gbxutilacfr::Stoppable* activity, const std::string& subsysName="", 
00067                           int retryIntervalSec=2, int retryNumber=-1 );
00068 
00070     bool hasBeenSubscribed()
00071         { return !topic_.isEmpty(); }
00072 
00073 protected:
00074 
00075     // This could be used for re-subscribing.
00076 //     gbxiceutilacfr::Store<std::string> proxyString_;
00077 
00078     //
00079     // Can connect to Admin interfaces to subscribe their consumers (Status and Tracer)
00080     //
00081     gbxiceutilacfr::Store<IceStorm::TopicPrx> topic_;
00082 
00084     orcaice::Context context_;
00085 };
00086 
00087 // have to define it as a separate abstract class
00088 // because ConsumerTypeI only needs to know about dataEvent()
00089 // (and does not know about the 2 extra template parameters in
00090 // ConsumerImpl)
00091 template<class ObjectType>
00092 class AbstractConsumer
00093 {
00094 public:
00095     virtual ~AbstractConsumer() {}
00096 
00097     // Implement this callback in the derived class. 
00098     virtual void dataEvent( const ObjectType& data )=0;
00099 };
00100 
00101 // implements Slice consumer interface
00102 // the end user does not need to know about it.
00103 // redirects incoming data to ConsumerImpl's derivatives.
00104 template<class ConsumerType, class ObjectType>
00105 class ConsumerTypeI : virtual public ConsumerType
00106 {
00107 public:
00108     ConsumerTypeI( AbstractConsumer<ObjectType> &impl ) :
00109         impl_(impl)
00110         {}
00111     virtual ~ConsumerTypeI() {}
00112 
00113     // implementation of remote call defined in all *Consumer interfaces
00114     // this implementation redirects to the Impl class
00115     virtual void setData( const ObjectType& data, const Ice::Current& )
00116     {
00117 //         context_.tracer().debug( "Received data from provider", 8 );
00118         impl_.dataEvent( data );
00119     }
00120 
00121 private:
00122     AbstractConsumer<ObjectType>& impl_;
00123 };
00124 
00140 template<class ProviderType, 
00141          class ConsumerType, 
00142          class ObjectType,
00143          class ConsumerTypeIType=ConsumerTypeI<ConsumerType,ObjectType> >
00144 class ConsumerImpl : public ConsumerSubscriber, 
00145                      public AbstractConsumer<ObjectType>,
00146                      public IceUtil::Shared
00147 {
00148 typedef typename ProviderType::ProxyType ProviderPrxType;
00149 typedef typename ConsumerType::ProxyType ConsumerPrxType;
00150 
00151 protected:
00152     // these are protected so that it's possible to re-implement initConsumer()
00153 
00155     ConsumerPrxType consumerPrx_;
00156 
00158     Ice::ObjectPtr consumerPtr_;
00159 
00160 public:
00162     ConsumerImpl( const orcaice::Context &context ) :
00163         ConsumerSubscriber(context)
00164     {
00165         consumerPtr_ = new ConsumerTypeIType( *this );
00166         // this function does not throw, because it never talks to the Registry
00167         // we do NOT currently convert to a one-way proxy, but maybe we should for efficiency.
00168         consumerPrx_ = orcaice::createConsumerInterface<ConsumerPrxType>(context_,consumerPtr_);
00169     }
00170 
00171     virtual ~ConsumerImpl() 
00172     {
00173             // unsubscribe from the info provider
00174             try {
00175                 unsubscribe();
00176             }
00177             catch ( const std::exception& e ) {
00178                 std::stringstream ss;
00179                 ss << "failed to unsubscribe in destructor: " << e.what();
00180                 context_.tracer().warning( ss.str() );
00181             }
00182             catch ( ... ) {
00183                 context_.tracer().warning( "failed to unsubscribe in destructor." );
00184             }
00185 
00186             // now, remove our consumer object from Adapter
00187             if ( consumerPrx_ )
00188             {
00189                 orcaice::tryRemoveInterfaceWithIdentity( context_, consumerPrx_->ice_getIdentity() );
00190             }
00191 
00192             // Make sure that the adapter has let go of its smart point -- if not, it means
00193             // that a 'setData()' call is still executing
00194             gbxiceutilacfr::Timer timer;
00195             while ( !context_.adapter()->isDeactivated() &&
00196                     consumerPtr_->__getRef() > 1 )
00197             {
00198                 const double maxSec = 10.0; // This is a bit arbitrary...
00199                 if ( timer.elapsedSec() > maxSec )
00200                 {
00201                     std::cout<<"TRACE(consumerImpl.h): been waiting " << timer.elapsedSec() << "s for setData() call to finish..." << std::endl;                    
00202                 }
00203                 usleep(100000);
00204             }
00205         }
00206 
00208     ConsumerPrxType consumerPrx() const { return consumerPrx_; }
00209 
00211     orcaice::Context& context() { return context_; };
00212 
00213     // This is tricky! Can't leave it pure virtual because we unsubscribe and destroy
00214     // in ConsumerImpl destructor. By that time, the derived class (e.g. StoringConsumer)
00215     // is already destroyed and we'll get "pure virtual method called".
00216     //
00217     // It's tempting to try to make this function private and declare the class which
00218     // calls it a friend. But this is complicated with templates.
00222     virtual void dataEvent( const ObjectType& data )
00223     {
00224     }
00225 
00226     // no doxytags, these functions are already documented above.
00227 
00228     //
00229     // Can connect to Admin interfaces to subscribe their consumers (Status and Tracer)
00230     //
00231     virtual void subscribeWithString( const std::string& proxyString )
00232     {
00233         ProviderPrxType providerPrx;
00234 
00235         std::string staticId = ConsumerType::ice_staticId();
00236         // Home does not have a consumer
00237         if ( staticId == "::orca::StatusConsumer" || staticId == "::orca::TracerConsumer" ) {
00238             orca::FQInterfaceName fqIfaceName = orcaice::toInterfaceName( proxyString );
00239             orca::FQComponentName fqCompName;
00240             fqCompName.platform = fqIfaceName.platform;
00241             fqCompName.component = fqIfaceName.component;
00242             orcaice::connectToAdminInterface<ProviderType,ProviderPrxType>( context_, providerPrx, fqCompName );
00243         }
00244         else {
00245             orcaice::connectToInterfaceWithString( context_, providerPrx, proxyString );
00246         }
00247 
00248         IceStorm::TopicPrx topicPrx = providerPrx->subscribe( consumerPrx_ );
00249         topic_.set( topicPrx );
00250 
00251         std::stringstream ss;
00252         ss << "Subscribed to topic=" << topicPrx->ice_toString() << " consumer=" << proxyString;
00253         context_.tracer().debug( ss.str(),6 );
00254     }
00255 
00256     virtual void unsubscribe()
00257     {
00258         if ( !topic_.isEmpty() )
00259         {
00260             IceStorm::TopicPrx topicPrx;
00261             topic_.get( topicPrx );
00262 
00263             topicPrx->unsubscribe( consumerPrx_ );
00264             std::stringstream ss;
00265             ss << "Unsubscribed from " << topicPrx->ice_toString();
00266             context_.tracer().debug( ss.str(),6 );
00267         }
00268     }
00269 
00270     virtual bool subscribeWithString( const std::string& proxyString, 
00271                           gbxutilacfr::Stoppable* activity, const std::string& subsysName="", 
00272                           int retryIntervalSec=2, int retryNumber=-1 )
00273     {
00274         ProviderPrxType providerPrx;
00275         // multi-try
00276         orcaice::connectToInterfaceWithString( context_,
00277                                                providerPrx,
00278                                                proxyString,
00279                                                activity,
00280                                                subsysName,
00281                                                retryIntervalSec,
00282                                                retryNumber );
00283 
00284         int count = 0;
00285         while ( !activity->isStopping() && ( retryNumber<0 || count<retryNumber) )
00286         {
00287             try {
00288                 IceStorm::TopicPrx topicPrx = providerPrx->subscribe( consumerPrx_ );
00289                 topic_.set( topicPrx );
00290 
00291                 std::stringstream ss;
00292                 ss << "Subscribed to " << proxyString;
00293                 context_.tracer().debug( ss.str(),6 );
00294                 return true;
00295             }
00296             catch ( const orca::OrcaException &e )
00297             {
00298                 std::stringstream ss;
00299                 ss << "Failed to subscribe: " << e.what << std::endl
00300                    <<"Will retry in "<<retryIntervalSec<<"s.";
00301                 context_.tracer().warning( ss.str() );                
00302             }
00303             catch ( const std::exception &e )
00304             {
00305                 std::stringstream ss;
00306                 ss << "Failed to subscribe: " << e.what() << std::endl
00307                    <<"Will retry in "<<retryIntervalSec<<"s.";
00308                 context_.tracer().warning( ss.str() );
00309             }
00310             catch ( ... )
00311             {
00312                 std::stringstream ss;
00313                 ss << "Failed to subscribe for unknown reason. "
00314                    <<"Will retry in "<<retryIntervalSec<<"s.";
00315                 context_.tracer().warning( ss.str() );
00316             }
00317             ++count;
00318             if ( !subsysName.empty() )
00319                 context_.status().heartbeat( subsysName );
00320             gbxiceutilacfr::checkedSleep( activity, retryIntervalSec*1000 );
00321             if ( !subsysName.empty() )
00322                 context_.status().heartbeat( subsysName );
00323         }
00324 
00325         return false;
00326     }
00327 };
00328 
00329 } // namespace
00330 
00331 #endif
 

Webmaster: Tobias Kaupp (tobasco at users.sourceforge.net)


Generated for Orca Robotics by  doxygen 1.4.5