Go to the documentation of this file.
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
22 #include <boost/bind.hpp>
23 #include <boost/enable_shared_from_this.hpp>
39 class GZ_TRANSPORT_VISIBLE PublishTask :
public tbb::task
45 const google::protobuf::Message &_message)
48 this->msg = _message.New();
49 this->msg->CopyFrom(_message);
54 public: tbb::task *execute()
56 this->pub->WaitForConnection();
57 this->pub->Publish(*this->msg,
true);
58 this->pub->SendMessage();
68 private: google::protobuf::Message *msg;
78 class GZ_TRANSPORT_VISIBLE
Node :
79 public boost::enable_shared_from_this<Node>
95 public:
void Init(
const std::string &_space =
"");
157 public:
template<
typename M>
159 const google::protobuf::Message &_message)
162 PublishTask *task =
new(tbb::task::allocate_root())
163 PublishTask(pub, _message);
165 tbb::task::enqueue(*task);
176 public:
template<
typename M>
178 unsigned int _queueLimit = 1000,
181 std::string decodedTopic = this->DecodeTopicName(_topic);
184 decodedTopic, _queueLimit, _hzRate);
186 boost::mutex::scoped_lock lock(this->publisherMutex);
187 publisher->SetNode(shared_from_this());
188 this->publishers.push_back(publisher);
199 public:
void Publish(
const std::string &_topic,
200 const google::protobuf::Message &_message)
203 _message.GetTypeName());
204 pub->WaitForConnection();
206 pub->Publish(_message,
true);
217 const std::string &_msgTypeName,
218 unsigned int _queueLimit = 1000,
221 std::string decodedTopic = this->DecodeTopicName(_topic);
224 decodedTopic, _msgTypeName, _queueLimit, _hzRate);
226 boost::mutex::scoped_lock lock(this->publisherMutex);
227 publisher->SetNode(shared_from_this());
228 this->publishers.push_back(publisher);
240 public:
template<
typename M,
typename T>
242 void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
243 bool _latching =
false)
246 std::string decodedTopic = this->DecodeTopicName(_topic);
247 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
250 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
258 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
269 public:
template<
typename M>
271 void(*_fp)(
const boost::shared_ptr<M const> &),
272 bool _latching =
false)
275 std::string decodedTopic = this->DecodeTopicName(_topic);
276 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
279 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
280 this->callbacks[decodedTopic].push_back(
287 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
301 void(T::*_fp)(
const std::string &), T *_obj,
302 bool _latching =
false)
305 std::string decodedTopic = this->DecodeTopicName(_topic);
306 ops.
Init(decodedTopic, shared_from_this(), _latching);
309 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
317 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
330 void(*_fp)(
const std::string &),
bool _latching =
false)
333 std::string decodedTopic = this->DecodeTopicName(_topic);
334 ops.
Init(decodedTopic, shared_from_this(), _latching);
337 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
338 this->callbacks[decodedTopic].push_back(
345 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
355 const std::string &_msg);
370 const std::string &_msg);
384 public: std::string
GetMsgType(
const std::string &_topic)
const;
404 private:
bool PrivateInit(
const std::string &_space,
406 const bool _fallbackToDefault);
408 private: std::string topicNamespace;
409 private: std::vector<PublisherPtr> publishers;
410 private: std::vector<PublisherPtr>::iterator publishersIter;
411 private:
static unsigned int idCounter;
412 private:
unsigned int id;
414 private:
typedef std::list<CallbackHelperPtr> Callback_L;
415 private:
typedef std::map<std::string, Callback_L> Callback_M;
416 private: Callback_M callbacks;
417 private: std::map<std::string, std::list<std::string> > incomingMsgs;
420 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
422 private: boost::mutex publisherMutex;
423 private: boost::mutex publisherDeleteMutex;
424 private: boost::recursive_mutex incomingMutex;
428 private: boost::recursive_mutex processIncomingMutex;
430 private:
bool initialized;
void ProcessIncoming()
Process incoming messages.
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:216
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:270
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:199
Options for a subscription.
Definition: SubscribeOptions.hh:36
virtual ~Node()
Destructor.
Forward declarations for the common classes.
Definition: Animation.hh:27
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
A Time class, can be used to hold wall- or sim-time. stored as sec and nano-sec.
Definition: Time.hh:48
#define NULL
Definition: CommonTypes.hh:31
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
Forward declarations for transport.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:158
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:329
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:80
unsigned int GetId() const
Get the unique ID of the node.
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
Callback helper Template.
Definition: CallbackHelper.hh:112
void InsertLatchedMsg(const std::string &_topic, MessagePtr _msg)
Add a latched message to the node for publication.
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
void RemoveCallback(const std::string &_topic, unsigned int _id)
void Init(const std::string &_space="")
Init the node.
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
transport
Definition: ConnectionManager.hh:35
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:177
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:300
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
std::string GetTopicNamespace() const
Get the topic namespace for this node.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:241
void Fini()
Finalize the node.
bool IsInitialized() const
Check if this Node has been initialized.
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:178