Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "rclcpp/logging.hpp"
#include "tracetools/tracetools.h"

namespace rclcpp
Expand Down Expand Up @@ -70,22 +72,26 @@ class SubscriptionIntraProcess
using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr;
using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr;
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;
using SubscriptionTopicStatisticsSharedPtr =
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>;

SubscriptionIntraProcess(
AnySubscriptionCallback<MessageT, Alloc> callback,
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
rclcpp::IntraProcessBufferType buffer_type,
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
: SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
SubscribedTypeDeleter, ROSMessageType>(
std::make_shared<SubscribedTypeAlloc>(*allocator),
context,
topic_name,
qos_profile,
buffer_type),
any_callback_(callback)
any_callback_(callback),
subscription_topic_statistics_(std::move(subscription_topic_statistics))
{
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_callback_added,
Expand Down Expand Up @@ -174,6 +180,14 @@ class SubscriptionIntraProcess
msg_info.publisher_gid = {0, {0}};
msg_info.from_intra_process = true;

std::chrono::time_point<std::chrono::system_clock> now;
if (subscription_topic_statistics_) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger("rclcpp"),
"Intra-process communication does not support accurate message age statistics");
now = std::chrono::system_clock::now();
}

auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
data);

Expand All @@ -185,9 +199,17 @@ class SubscriptionIntraProcess
any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
}
shared_ptr.reset();

if (subscription_topic_statistics_) {
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
const auto time = rclcpp::Time(nanos.time_since_epoch().count());
subscription_topic_statistics_->handle_message(msg_info, time);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least for period statistics, this can enables it to calculate intra and inter-process communication messages all together. i think that is the right thing to do.

}
}

AnySubscriptionCallback<MessageT, Alloc> any_callback_;
/// Optional statistics calculator for intra-process deliveries.
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_;
};

} // namespace experimental
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ class Subscription : public SubscriptionBase
context,
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
qos_profile,
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback),
subscription_topic_statistics);
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_init,
static_cast<const void *>(get_subscription_handle().get()),
Expand Down
6 changes: 5 additions & 1 deletion rclcpp/test/rclcpp/test_intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "rclcpp/context.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "rmw/types.h"
#include "rmw/qos_profiles.h"

Expand Down Expand Up @@ -376,7 +377,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer<
public:
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess)

explicit SubscriptionIntraProcess(const std::string & topic, const rclcpp::QoS & qos)
explicit SubscriptionIntraProcess(
const std::string & topic,
const rclcpp::QoS & qos,
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics> = nullptr)
: SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>(topic, qos)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ class SubscriberWithTopicStatistics : public rclcpp::Node
public:
SubscriberWithTopicStatistics(
const std::string & name, const std::string & topic,
std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod)
: Node(name)
std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod,
bool use_intra_process = false)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process))
{
// Manually enable topic statistics via options
auto options = rclcpp::SubscriptionOptions();
Expand Down Expand Up @@ -265,7 +266,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_invalid_publish_period)
{
ASSERT_THROW(
SubscriberWithTopicStatistics<Empty>(
"test_period_node", "should_throw_invalid_arg", std::chrono::milliseconds(0)
"test_period_node", "should_throw_invalid_arg", std::chrono::milliseconds(0), false
),
std::invalid_argument);
}
Expand All @@ -278,7 +279,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction)
{
auto empty_subscriber = std::make_shared<SubscriberWithTopicStatistics<Empty>>(
kTestSubNodeName,
kTestSubStatsEmptyTopic);
kTestSubStatsEmptyTopic,
defaultStatisticsPublishPeriod,
false);

// Manually create publisher tied to the node
auto topic_stats_publisher =
Expand Down Expand Up @@ -322,7 +325,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no

auto empty_subscriber = std::make_shared<SubscriberWithTopicStatistics<Empty>>(
kTestSubNodeName,
kTestSubStatsEmptyTopic);
kTestSubStatsEmptyTopic,
defaultStatisticsPublishPeriod,
false);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(empty_publisher);
Expand Down Expand Up @@ -367,7 +372,9 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window
auto msg_subscriber_with_topic_statistics =
std::make_shared<SubscriberWithTopicStatistics<test_msgs::msg::Strings>>(
kTestSubNodeName,
kTestSubStatsTopic);
kTestSubStatsTopic,
defaultStatisticsPublishPeriod,
false);

// Create a message publisher
auto msg_publisher =
Expand Down Expand Up @@ -421,3 +428,45 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window
}
}
}

void run_intra_process_test(bool use_intra_process)
{
auto empty_publisher = std::make_shared<PublisherNode<Empty>>(
kTestPubNodeName,
kTestSubStatsEmptyTopic);
auto statistics_listener = std::make_shared<rclcpp::topic_statistics::MetricsMessageSubscriber>(
use_intra_process ? "stats_listener_intra" : "stats_listener_inter",
"/statistics",
kNumExpectedWindows);
auto empty_subscriber = std::make_shared<SubscriberWithTopicStatistics<Empty>>(
kTestSubNodeName,
kTestSubStatsEmptyTopic,
defaultStatisticsPublishPeriod,
use_intra_process);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(empty_publisher);
ex.add_node(statistics_listener);
ex.add_node(empty_subscriber);

ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout);

const auto received_messages = statistics_listener->GetReceivedMessages();
EXPECT_EQ(kNumExpectedWindows, received_messages.size());

for (const auto & msg : received_messages) {
if (msg.metrics_source == kMessagePeriodSourceLabel) {
check_if_statistic_message_is_populated(msg);
}
}
}

TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_period_inter_process)
{
run_intra_process_test(false);
}

TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_period_intra_process)
{
run_intra_process_test(true);
}