|
orca-robotics INTRODUCTION Overview Download and Install Quick Start Documentation Publications REPOSITORY Interfaces Components Libraries Utilities Software Map DEVELOPER Tutorials Examples Dev Guide Dashboard Wiki login/pass: orca/orca PEOPLE Contributors Users Project Download Mailing lists
|
consumerImpl.h00001 /* 00002 * Orca-Robotics Project: Components for robotics 00003 * http://orca-robotics.sf.net/ 00004 * Copyright (c) 2004-2008 Alex Brooks, Alexei Makarenko, Tobias Kaupp 00005 * 00006 * This copy of Orca is licensed to you under the terms described in 00007 * the LICENSE file included in this distribution. 00008 * 00009 */ 00010 00011 #ifndef ORCAIFACEIMPL_CONSUMER_IMPL_H 00012 #define ORCAIFACEIMPL_CONSUMER_IMPL_H 00013 00014 #include <orcaice/context.h> 00015 #include <orcaice/multiconnectutils.h> 00016 #include <orcaice/iceutils.h> 00017 #include <orcaice/icestormutils.h> 00018 #include <orcaice/icegridutils.h> 00019 #include <gbxsickacfr/gbxiceutilacfr/store.h> 00020 #include <gbxsickacfr/gbxiceutilacfr/thread.h> 00021 00022 namespace orcaifaceimpl 00023 { 00024 00028 class ConsumerSubscriber 00029 { 00030 public: 00031 ConsumerSubscriber( const orcaice::Context& context ); 00032 00033 virtual ~ConsumerSubscriber() {}; 00034 00038 virtual void subscribeWithString( const std::string& proxyString )=0; 00039 00041 void subscribeWithTag( const std::string& interfaceTag ); 00042 00045 virtual void unsubscribe()=0; 00046 00054 virtual bool subscribeWithString( const std::string& proxyString, 00055 gbxiceutilacfr::Thread* thread, const std::string& subsysName="", 00056 int retryInterval=2, int retryNumber=-1 )=0; 00057 00062 bool subscribeWithTag( const std::string& interfaceTag, 00063 gbxiceutilacfr::Thread* thread, const std::string& subsysName="", 00064 int retryInterval=2, int retryNumber=-1 ); 00065 protected: 00066 00067 // This could be used for re-subscribing. 00068 // gbxiceutilacfr::Store<std::string> proxyString_; 00069 00071 gbxiceutilacfr::Store<IceStorm::TopicPrx> topic_; 00072 00074 orcaice::Context context_; 00075 }; 00076 00077 // have to define it as a separate abstract class 00078 // because ConsumerTypeI only needs to know about dataEvent() 00079 // (and does not know about the 2 extra template parameters in 00080 // ConsumerImpl) 00081 template<class ObjectType> 00082 class AbstractConsumer 00083 { 00084 public: 00085 // Implement this callback in the derived class. 00086 virtual void dataEvent( const ObjectType& data )=0; 00087 }; 00088 00089 // implements Slice consumer interface 00090 // the end user does not need to know about it. 00091 // redirects incoming data to ConsumerImpl's derivatives. 00092 template<class ConsumerType, class ObjectType> 00093 class ConsumerTypeI : virtual public ConsumerType 00094 { 00095 public: 00096 ConsumerTypeI( AbstractConsumer<ObjectType> &impl ) : 00097 impl_(impl) {} 00098 virtual ~ConsumerTypeI() {}; 00099 00100 // implementation of remote call defined in all *Consumer interfaces 00101 // this implementation redirects to the Impl class 00102 virtual void setData( const ObjectType& data, const Ice::Current& ) 00103 { 00104 // context_.tracer().debug( "Received data from provider", 8 ); 00105 impl_.dataEvent( data ); 00106 } 00107 00108 private: 00109 AbstractConsumer<ObjectType>& impl_; 00110 }; 00111 00131 template<class ProviderType, class ProviderPrxType, class ConsumerType, class ConsumerPrxType, class ObjectType> 00132 class ConsumerImpl : public ConsumerSubscriber, 00133 public AbstractConsumer<ObjectType>, 00134 public IceUtil::Shared 00135 { 00136 00137 protected: 00138 // these are protected so that it's possible to re-implement initConsumer() 00139 00141 ConsumerPrxType consumerPrx_; 00142 00144 Ice::ObjectPtr consumerPtr_; 00145 00146 public: 00148 ConsumerImpl( const orcaice::Context &context ) : 00149 ConsumerSubscriber(context) 00150 { 00151 consumerPtr_ = new ConsumerTypeI<ConsumerType,ObjectType>( *this ); 00152 // this function does not throw, because it never talks to the Registry 00153 consumerPrx_ = orcaice::createConsumerInterface<ConsumerPrxType>(context_,consumerPtr_); 00154 } 00155 00156 virtual ~ConsumerImpl() 00157 { 00158 // context_.tracer().debug( "ConsumerImpl::~ConsumerImpl()" ); 00159 // unsubscribe from the info provider 00160 try { 00161 unsubscribe(); 00162 } 00163 catch ( const std::exception& e ) { 00164 std::stringstream ss; 00165 ss << "failed to unsubscribe in destructor: " << e.what(); 00166 context_.tracer().warning( ss.str() ); 00167 } 00168 catch ( ... ) { 00169 context_.tracer().warning( "failed to unsubscribe in destructor." ); 00170 } 00171 00172 // now destroy our consumer object 00173 if ( !consumerPrx_ ) 00174 return; 00175 00176 orcaice::tryRemoveInterfaceWithIdentity( context_, consumerPrx_->ice_getIdentity() ); 00177 } 00178 00180 ConsumerPrxType consumerPrx() const { return consumerPrx_; } 00181 00182 // This is tricky! Can't leave it pure virtual because we unsubscribe and detsroy 00183 // in ConsumerImpl destructor. By that time, the derived class (e.g. StoringConsumer) 00184 // is already destroyed and we'll get "pure virtual method called". 00188 virtual void dataEvent( const ObjectType& data ) 00189 { 00190 } 00191 00192 // no doxytags, these functions are already documented above. 00193 00194 virtual void subscribeWithString( const std::string& proxyString ) 00195 { 00196 ProviderPrxType providerPrx; 00197 00198 std::string staticId = ConsumerType::ice_staticId(); 00199 // Home does not have a consumer 00200 if ( staticId == "::orca::StatusConsumer" || staticId == "::orca::TracerConsumer" ) { 00201 orca::FQInterfaceName fqIfaceName = orcaice::toInterfaceName( proxyString ); 00202 orca::FQComponentName fqCompName; 00203 fqCompName.platform = fqIfaceName.platform; 00204 fqCompName.component = fqIfaceName.component; 00205 orcaice::connectToAdminInterface<ProviderType,ProviderPrxType>( context_, providerPrx, fqCompName ); 00206 } 00207 else { 00208 orcaice::connectToInterfaceWithString( context_, providerPrx, proxyString ); 00209 } 00210 00211 IceStorm::TopicPrx topicPrx = providerPrx->subscribe( consumerPrx_ ); 00212 topic_.set( topicPrx ); 00213 00214 std::stringstream ss; 00215 ss << "Subscribed to " << proxyString; 00216 context_.tracer().debug( ss.str() ); 00217 } 00218 00219 virtual void unsubscribe() 00220 { 00221 if ( !topic_.isEmpty() ) 00222 { 00223 IceStorm::TopicPrx topicPrx; 00224 topic_.get( topicPrx ); 00225 00226 topicPrx->unsubscribe( consumerPrx_ ); 00227 std::stringstream ss; 00228 ss << "Unsubscribed from " << topicPrx->ice_toString(); 00229 context_.tracer().debug( ss.str() ); 00230 } 00231 } 00232 00233 virtual bool subscribeWithString( const std::string& proxyString, 00234 gbxiceutilacfr::Thread* thread, const std::string& subsysName="", 00235 int retryInterval=2, int retryNumber=-1 ) 00236 { 00237 ProviderPrxType providerPrx; 00238 // multi-try 00239 orcaice::connectToInterfaceWithString( context_, 00240 providerPrx, 00241 proxyString, 00242 thread, 00243 subsysName, 00244 retryInterval, 00245 retryNumber ); 00246 00247 int count = 0; 00248 while ( !thread->isStopping() && ( retryNumber<0 || count<retryNumber) ) 00249 { 00250 try { 00251 IceStorm::TopicPrx topicPrx = providerPrx->subscribe( consumerPrx_ ); 00252 topic_.set( topicPrx ); 00253 00254 std::stringstream ss; 00255 ss << "Subscribed to " << proxyString; 00256 context_.tracer().debug( ss.str() ); 00257 return true; 00258 } 00259 catch ( const orca::OrcaException &e ) 00260 { 00261 std::stringstream ss; 00262 ss << "Failed to subscribe: " << e.what << std::endl 00263 <<"Will retry in "<<retryInterval<<"s."; 00264 context_.tracer().warning( ss.str() ); 00265 } 00266 catch ( const std::exception &e ) 00267 { 00268 std::stringstream ss; 00269 ss << "Failed to subscribe: " << e.what() << std::endl 00270 <<"Will retry in "<<retryInterval<<"s."; 00271 context_.tracer().warning( ss.str() ); 00272 } 00273 catch ( ... ) 00274 { 00275 std::stringstream ss; 00276 ss << "Failed to subscribe for unknown reason. " 00277 <<"Will retry in "<<retryInterval<<"s."; 00278 context_.tracer().warning( ss.str() ); 00279 } 00280 ++count; 00281 if ( !subsysName.empty() ) 00282 context_.status().heartbeat( subsysName ); 00283 IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(retryInterval)); 00284 if ( !subsysName.empty() ) 00285 context_.status().heartbeat( subsysName ); 00286 } 00287 00288 return false; 00289 } 00290 }; 00291 00292 } // namespace 00293 00294 #endif |
Webmaster: Tobias Kaupp (tobasco at users.sourceforge.net)
1.4.5