3131#include " rclcpp/experimental/subscription_intra_process_buffer.hpp"
3232#include " rclcpp/qos.hpp"
3333#include " rclcpp/type_support_decl.hpp"
34+ #include " rclcpp/topic_statistics/subscription_topic_statistics.hpp"
3435#include " tracetools/tracetools.h"
3536
3637namespace rclcpp
@@ -70,22 +71,26 @@ class SubscriptionIntraProcess
7071 using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr;
7172 using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr;
7273 using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;
74+ using SubscriptionTopicStatisticsSharedPtr =
75+ std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>;
7376
7477 SubscriptionIntraProcess (
7578 AnySubscriptionCallback<MessageT, Alloc> callback,
7679 std::shared_ptr<Alloc> allocator,
7780 rclcpp::Context::SharedPtr context,
7881 const std::string & topic_name,
7982 const rclcpp::QoS & qos_profile,
80- rclcpp::IntraProcessBufferType buffer_type)
83+ rclcpp::IntraProcessBufferType buffer_type,
84+ SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr )
8185 : SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
8286 SubscribedTypeDeleter, ROSMessageType>(
8387 std::make_shared<SubscribedTypeAlloc>(*allocator),
8488 context,
8589 topic_name,
8690 qos_profile,
8791 buffer_type),
88- any_callback_ (callback)
92+ any_callback_ (callback),
93+ subscription_topic_statistics_ (std::move(subscription_topic_statistics))
8994 {
9095 TRACETOOLS_TRACEPOINT (
9196 rclcpp_subscription_callback_added,
@@ -174,6 +179,13 @@ class SubscriptionIntraProcess
174179 msg_info.publisher_gid = {0 , {0 }};
175180 msg_info.from_intra_process = true ;
176181
182+ std::chrono::time_point<std::chrono::system_clock> now;
183+ if (subscription_topic_statistics_) {
184+ now = std::chrono::system_clock::now ();
185+ auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
186+ msg_info.source_timestamp = nanos.time_since_epoch ().count ();
187+ }
188+
177189 auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
178190 data);
179191
@@ -185,9 +197,16 @@ class SubscriptionIntraProcess
185197 any_callback_.dispatch_intra_process (std::move (unique_msg), msg_info);
186198 }
187199 shared_ptr.reset ();
200+
201+ if (subscription_topic_statistics_) {
202+ const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
203+ const auto time = rclcpp::Time (nanos.time_since_epoch ().count ());
204+ subscription_topic_statistics_->handle_message (msg_info, time);
205+ }
188206 }
189207
190208 AnySubscriptionCallback<MessageT, Alloc> any_callback_;
209+ SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_;
191210};
192211
193212} // namespace experimental
0 commit comments