INTRODUCTION Overview Download and Install Quick Start Documentation Publications NONFRAMEWORK CODE Driver Interfaces Drivers Libraries Utilities FRAMEWORK CODE Interfaces Components Libraries Utilities Full Software Listings DEVELOPER Tutorials Examples Dev Guide Dashboard PEOPLE Contributors Users Project Download Mailing lists
|
consumerImpl.h00001 /* 00002 * Orca-Robotics Project: Components for robotics 00003 * http://orca-robotics.sf.net/ 00004 * Copyright (c) 2004-2009 Alex Brooks, Alexei Makarenko, Tobias Kaupp 00005 * 00006 * This copy of Orca is licensed to you under the terms described in 00007 * the LICENSE file included in this distribution. 00008 * 00009 */ 00010 00011 #ifndef ORCAIFACEIMPL_CONSUMER_IMPL_H 00012 #define ORCAIFACEIMPL_CONSUMER_IMPL_H 00013 00014 #include <orcaice/context.h> 00015 #include <orcaice/multiconnectutils.h> 00016 #include <orcaice/iceutils.h> 00017 #include <orcaice/icestormutils.h> 00018 #include <orcaice/icegridutils.h> 00019 #include <gbxsickacfr/gbxiceutilacfr/store.h> 00020 // we only need the definition of Stoppable and checkedSleep() function. 00021 // (we don't need the actual Thread class). 00022 #include <gbxsickacfr/gbxiceutilacfr/threadutils.h> 00023 #include <gbxsickacfr/gbxiceutilacfr/timer.h> 00024 00025 namespace orcaifaceimpl 00026 { 00027 00031 class ConsumerSubscriber 00032 { 00033 public: 00034 ConsumerSubscriber( const orcaice::Context& context ); 00035 00036 virtual ~ConsumerSubscriber() {}; 00037 00041 virtual void subscribeWithString( const std::string& proxyString )=0; 00042 00044 void subscribeWithTag( const std::string& interfaceTag ); 00045 00048 virtual void unsubscribe()=0; 00049 00057 virtual bool subscribeWithString( const std::string& proxyString, 00058 gbxutilacfr::Stoppable* activity, const std::string& subsysName="", 00059 int retryIntervalSec=2, int retryNumber=-1 )=0; 00060 00065 bool subscribeWithTag( const std::string& interfaceTag, 00066 gbxutilacfr::Stoppable* activity, const std::string& subsysName="", 00067 int retryIntervalSec=2, int retryNumber=-1 ); 00068 00070 bool hasBeenSubscribed() 00071 { return !topic_.isEmpty(); } 00072 00073 protected: 00074 00075 // This could be used for re-subscribing. 00076 // gbxiceutilacfr::Store<std::string> proxyString_; 00077 00078 // 00079 // Can connect to Admin interfaces to subscribe their consumers (Status and Tracer) 00080 // 00081 gbxiceutilacfr::Store<IceStorm::TopicPrx> topic_; 00082 00084 orcaice::Context context_; 00085 }; 00086 00087 // have to define it as a separate abstract class 00088 // because ConsumerTypeI only needs to know about dataEvent() 00089 // (and does not know about the 2 extra template parameters in 00090 // ConsumerImpl) 00091 template<class ObjectType> 00092 class AbstractConsumer 00093 { 00094 public: 00095 virtual ~AbstractConsumer() {} 00096 00097 // Implement this callback in the derived class. 00098 virtual void dataEvent( const ObjectType& data )=0; 00099 }; 00100 00101 // implements Slice consumer interface 00102 // the end user does not need to know about it. 00103 // redirects incoming data to ConsumerImpl's derivatives. 00104 template<class ConsumerType, class ObjectType> 00105 class ConsumerTypeI : virtual public ConsumerType 00106 { 00107 public: 00108 ConsumerTypeI( AbstractConsumer<ObjectType> &impl ) : 00109 impl_(impl) 00110 {} 00111 virtual ~ConsumerTypeI() {} 00112 00113 // implementation of remote call defined in all *Consumer interfaces 00114 // this implementation redirects to the Impl class 00115 virtual void setData( const ObjectType& data, const Ice::Current& ) 00116 { 00117 // context_.tracer().debug( "Received data from provider", 8 ); 00118 impl_.dataEvent( data ); 00119 } 00120 00121 private: 00122 AbstractConsumer<ObjectType>& impl_; 00123 }; 00124 00140 template<class ProviderType, 00141 class ConsumerType, 00142 class ObjectType, 00143 class ConsumerTypeIType=ConsumerTypeI<ConsumerType,ObjectType> > 00144 class ConsumerImpl : public ConsumerSubscriber, 00145 public AbstractConsumer<ObjectType>, 00146 public IceUtil::Shared 00147 { 00148 typedef typename ProviderType::ProxyType ProviderPrxType; 00149 typedef typename ConsumerType::ProxyType ConsumerPrxType; 00150 00151 protected: 00152 // these are protected so that it's possible to re-implement initConsumer() 00153 00155 ConsumerPrxType consumerPrx_; 00156 00158 Ice::ObjectPtr consumerPtr_; 00159 00160 public: 00162 ConsumerImpl( const orcaice::Context &context ) : 00163 ConsumerSubscriber(context) 00164 { 00165 consumerPtr_ = new ConsumerTypeIType( *this ); 00166 // this function does not throw, because it never talks to the Registry 00167 // we do NOT currently convert to a one-way proxy, but maybe we should for efficiency. 00168 consumerPrx_ = orcaice::createConsumerInterface<ConsumerPrxType>(context_,consumerPtr_); 00169 } 00170 00171 virtual ~ConsumerImpl() 00172 { 00173 // unsubscribe from the info provider 00174 try { 00175 unsubscribe(); 00176 } 00177 catch ( const std::exception& e ) { 00178 std::stringstream ss; 00179 ss << "failed to unsubscribe in destructor: " << e.what(); 00180 context_.tracer().warning( ss.str() ); 00181 } 00182 catch ( ... ) { 00183 context_.tracer().warning( "failed to unsubscribe in destructor." ); 00184 } 00185 00186 // now, remove our consumer object from Adapter 00187 if ( consumerPrx_ ) 00188 { 00189 orcaice::tryRemoveInterfaceWithIdentity( context_, consumerPrx_->ice_getIdentity() ); 00190 } 00191 00192 // Make sure that the adapter has let go of its smart point -- if not, it means 00193 // that a 'setData()' call is still executing 00194 gbxiceutilacfr::Timer timer; 00195 while ( !context_.adapter()->isDeactivated() && 00196 consumerPtr_->__getRef() > 1 ) 00197 { 00198 const double maxSec = 10.0; // This is a bit arbitrary... 00199 if ( timer.elapsedSec() > maxSec ) 00200 { 00201 std::cout<<"TRACE(consumerImpl.h): been waiting " << timer.elapsedSec() << "s for setData() call to finish..." << std::endl; 00202 } 00203 usleep(100000); 00204 } 00205 } 00206 00208 ConsumerPrxType consumerPrx() const { return consumerPrx_; } 00209 00211 orcaice::Context& context() { return context_; }; 00212 00213 // This is tricky! Can't leave it pure virtual because we unsubscribe and destroy 00214 // in ConsumerImpl destructor. By that time, the derived class (e.g. StoringConsumer) 00215 // is already destroyed and we'll get "pure virtual method called". 00216 // 00217 // It's tempting to try to make this function private and declare the class which 00218 // calls it a friend. But this is complicated with templates. 00222 virtual void dataEvent( const ObjectType& data ) 00223 { 00224 } 00225 00226 // no doxytags, these functions are already documented above. 00227 00228 // 00229 // Can connect to Admin interfaces to subscribe their consumers (Status and Tracer) 00230 // 00231 virtual void subscribeWithString( const std::string& proxyString ) 00232 { 00233 ProviderPrxType providerPrx; 00234 00235 std::string staticId = ConsumerType::ice_staticId(); 00236 // Home does not have a consumer 00237 if ( staticId == "::orca::StatusConsumer" || staticId == "::orca::TracerConsumer" ) { 00238 orca::FQInterfaceName fqIfaceName = orcaice::toInterfaceName( proxyString ); 00239 orca::FQComponentName fqCompName; 00240 fqCompName.platform = fqIfaceName.platform; 00241 fqCompName.component = fqIfaceName.component; 00242 orcaice::connectToAdminInterface<ProviderType,ProviderPrxType>( context_, providerPrx, fqCompName ); 00243 } 00244 else { 00245 orcaice::connectToInterfaceWithString( context_, providerPrx, proxyString ); 00246 } 00247 00248 IceStorm::TopicPrx topicPrx = providerPrx->subscribe( consumerPrx_ ); 00249 topic_.set( topicPrx ); 00250 00251 std::stringstream ss; 00252 ss << "Subscribed to topic=" << topicPrx->ice_toString() << " consumer=" << proxyString; 00253 context_.tracer().debug( ss.str(),6 ); 00254 } 00255 00256 virtual void unsubscribe() 00257 { 00258 if ( !topic_.isEmpty() ) 00259 { 00260 IceStorm::TopicPrx topicPrx; 00261 topic_.get( topicPrx ); 00262 00263 topicPrx->unsubscribe( consumerPrx_ ); 00264 std::stringstream ss; 00265 ss << "Unsubscribed from " << topicPrx->ice_toString(); 00266 context_.tracer().debug( ss.str(),6 ); 00267 } 00268 } 00269 00270 virtual bool subscribeWithString( const std::string& proxyString, 00271 gbxutilacfr::Stoppable* activity, const std::string& subsysName="", 00272 int retryIntervalSec=2, int retryNumber=-1 ) 00273 { 00274 ProviderPrxType providerPrx; 00275 // multi-try 00276 orcaice::connectToInterfaceWithString( context_, 00277 providerPrx, 00278 proxyString, 00279 activity, 00280 subsysName, 00281 retryIntervalSec, 00282 retryNumber ); 00283 00284 int count = 0; 00285 while ( !activity->isStopping() && ( retryNumber<0 || count<retryNumber) ) 00286 { 00287 try { 00288 IceStorm::TopicPrx topicPrx = providerPrx->subscribe( consumerPrx_ ); 00289 topic_.set( topicPrx ); 00290 00291 std::stringstream ss; 00292 ss << "Subscribed to " << proxyString; 00293 context_.tracer().debug( ss.str(),6 ); 00294 return true; 00295 } 00296 catch ( const orca::OrcaException &e ) 00297 { 00298 std::stringstream ss; 00299 ss << "Failed to subscribe: " << e.what << std::endl 00300 <<"Will retry in "<<retryIntervalSec<<"s."; 00301 context_.tracer().warning( ss.str() ); 00302 } 00303 catch ( const std::exception &e ) 00304 { 00305 std::stringstream ss; 00306 ss << "Failed to subscribe: " << e.what() << std::endl 00307 <<"Will retry in "<<retryIntervalSec<<"s."; 00308 context_.tracer().warning( ss.str() ); 00309 } 00310 catch ( ... ) 00311 { 00312 std::stringstream ss; 00313 ss << "Failed to subscribe for unknown reason. " 00314 <<"Will retry in "<<retryIntervalSec<<"s."; 00315 context_.tracer().warning( ss.str() ); 00316 } 00317 ++count; 00318 if ( !subsysName.empty() ) 00319 context_.status().heartbeat( subsysName ); 00320 gbxiceutilacfr::checkedSleep( activity, retryIntervalSec*1000 ); 00321 if ( !subsysName.empty() ) 00322 context_.status().heartbeat( subsysName ); 00323 } 00324 00325 return false; 00326 } 00327 }; 00328 00329 } // namespace 00330 00331 #endif |
Webmaster: Tobias Kaupp (tobasco at users.sourceforge.net)