Skip to content

Commit b1a9c6b

Browse files
committed
feat: add intra-process communication support for topic statistics
1 parent dd92601 commit b1a9c6b

File tree

2 files changed

+59
-8
lines changed

2 files changed

+59
-8
lines changed

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "rclcpp/qos.hpp"
3333
#include "rclcpp/type_support_decl.hpp"
3434
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
35+
#include "rclcpp/logging.hpp"
3536
#include "tracetools/tracetools.h"
3637

3738
namespace rclcpp
@@ -181,9 +182,10 @@ class SubscriptionIntraProcess
181182

182183
std::chrono::time_point<std::chrono::system_clock> now;
183184
if (subscription_topic_statistics_) {
185+
RCLCPP_WARN_ONCE(
186+
rclcpp::get_logger("rclcpp"),
187+
"Intra-process communication does not support accurate message age statistics");
184188
now = std::chrono::system_clock::now();
185-
auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
186-
msg_info.source_timestamp = nanos.time_since_epoch().count();
187189
}
188190

189191
auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(

rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,9 @@ class SubscriberWithTopicStatistics : public rclcpp::Node
181181
public:
182182
SubscriberWithTopicStatistics(
183183
const std::string & name, const std::string & topic,
184-
std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod)
185-
: Node(name)
184+
std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod,
185+
bool use_intra_process = false)
186+
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process))
186187
{
187188
// Manually enable topic statistics via options
188189
auto options = rclcpp::SubscriptionOptions();
@@ -265,7 +266,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_invalid_publish_period)
265266
{
266267
ASSERT_THROW(
267268
SubscriberWithTopicStatistics<Empty>(
268-
"test_period_node", "should_throw_invalid_arg", std::chrono::milliseconds(0)
269+
"test_period_node", "should_throw_invalid_arg", std::chrono::milliseconds(0), false
269270
),
270271
std::invalid_argument);
271272
}
@@ -278,7 +279,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction)
278279
{
279280
auto empty_subscriber = std::make_shared<SubscriberWithTopicStatistics<Empty>>(
280281
kTestSubNodeName,
281-
kTestSubStatsEmptyTopic);
282+
kTestSubStatsEmptyTopic,
283+
defaultStatisticsPublishPeriod,
284+
false);
282285

283286
// Manually create publisher tied to the node
284287
auto topic_stats_publisher =
@@ -322,7 +325,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no
322325

323326
auto empty_subscriber = std::make_shared<SubscriberWithTopicStatistics<Empty>>(
324327
kTestSubNodeName,
325-
kTestSubStatsEmptyTopic);
328+
kTestSubStatsEmptyTopic,
329+
defaultStatisticsPublishPeriod,
330+
false);
326331

327332
rclcpp::executors::SingleThreadedExecutor ex;
328333
ex.add_node(empty_publisher);
@@ -367,7 +372,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window
367372
auto msg_subscriber_with_topic_statistics =
368373
std::make_shared<SubscriberWithTopicStatistics<test_msgs::msg::Strings>>(
369374
kTestSubNodeName,
370-
kTestSubStatsTopic);
375+
kTestSubStatsTopic,
376+
defaultStatisticsPublishPeriod,
377+
false);
371378

372379
// Create a message publisher
373380
auto msg_publisher =
@@ -421,3 +428,45 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window
421428
}
422429
}
423430
}
431+
432+
void run_intra_process_test(bool use_intra_process)
433+
{
434+
auto empty_publisher = std::make_shared<PublisherNode<Empty>>(
435+
kTestPubNodeName,
436+
kTestSubStatsEmptyTopic);
437+
auto statistics_listener = std::make_shared<rclcpp::topic_statistics::MetricsMessageSubscriber>(
438+
use_intra_process ? "stats_listener_intra" : "stats_listener_inter",
439+
"/statistics",
440+
kNumExpectedWindows);
441+
auto empty_subscriber = std::make_shared<SubscriberWithTopicStatistics<Empty>>(
442+
kTestSubNodeName,
443+
kTestSubStatsEmptyTopic,
444+
defaultStatisticsPublishPeriod,
445+
use_intra_process);
446+
447+
rclcpp::executors::SingleThreadedExecutor ex;
448+
ex.add_node(empty_publisher);
449+
ex.add_node(statistics_listener);
450+
ex.add_node(empty_subscriber);
451+
452+
ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout);
453+
454+
const auto received_messages = statistics_listener->GetReceivedMessages();
455+
EXPECT_EQ(kNumExpectedWindows, received_messages.size());
456+
457+
for (const auto & msg : received_messages) {
458+
if (msg.metrics_source == kMessagePeriodSourceLabel) {
459+
check_if_statistic_message_is_populated(msg);
460+
}
461+
}
462+
}
463+
464+
TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_period_inter_process)
465+
{
466+
run_intra_process_test(false);
467+
}
468+
469+
TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_period_intra_process)
470+
{
471+
run_intra_process_test(true);
472+
}

0 commit comments

Comments
 (0)