Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/generic_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase
/// This function is currently not implemented.
RCLCPP_PUBLIC
void handle_loaned_message(
void * loaned_message, const rclcpp::MessageInfo & message_info) override;
std::shared_ptr<void> loaned_message, const rclcpp::MessageInfo & message_info) override;

// Same as return_serialized_message() as the subscription is to serialized_messages only
RCLCPP_PUBLIC
Expand Down
7 changes: 2 additions & 5 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class Subscription : public SubscriptionBase

void
handle_loaned_message(
void * loaned_message,
std::shared_ptr<void> loaned_message,
const rclcpp::MessageInfo & message_info) override
{
if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
Expand All @@ -339,10 +339,7 @@ class Subscription : public SubscriptionBase
return;
}

auto typed_message = static_cast<ROSMessageType *>(loaned_message);
// message is loaned, so we have to make sure that the deleter does not deallocate the message
auto sptr = std::shared_ptr<ROSMessageType>(
typed_message, [](ROSMessageType * msg) {(void) msg;});
auto sptr = std::static_pointer_cast<ROSMessageType>(loaned_message);

std::chrono::time_point<std::chrono::system_clock> now;
if (subscription_topic_statistics_) {
Expand Down
4 changes: 3 additions & 1 deletion rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
RCLCPP_PUBLIC
virtual
void
handle_loaned_message(void * loaned_message, const rclcpp::MessageInfo & message_info) = 0;
handle_loaned_message(
std::shared_ptr<void> loaned_message,
const rclcpp::MessageInfo & message_info) = 0;

/// Return the message borrowed in create_message.
/** \param[in] message Shared pointer to the returned message. */
Expand Down
36 changes: 23 additions & 13 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,19 +558,29 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
}
return true;
},
[&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
if (nullptr != loaned_msg) {
rcl_ret_t ret = rcl_return_loaned_message_from_subscription(
subscription->get_subscription_handle().get(), loaned_msg);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"rcl_return_loaned_message_from_subscription() failed for subscription on topic "
"'%s': %s",
subscription->get_topic_name(), rcl_get_error_string().str);
}
loaned_msg = nullptr;
}
[&]()
{
// Create a shared_ptr with a custom deleter to return the loaned
// to the RMW when last reference goes out of scope. This allows the user
// to store copies of the message outside the subscription callback.
auto loaned_message_ptr = std::shared_ptr<void>(
loaned_msg,
[subscription](void* loaned_msg_ptr)
{
rcl_ret_t ret = rcl_return_loaned_message_from_subscription(
subscription->get_subscription_handle().get(),
loaned_msg_ptr);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"rcl_return_loaned_message_from_subscription() failed for subscription on topic '%s': %s",
subscription->get_topic_name(), rcl_get_error_string().str);
}
});

// Pass the shared_ptr to the user's callback
subscription->handle_loaned_message(loaned_message_ptr, message_info);
});
} else {
// This case is taking a copy of the message data from the middleware via
// inter-process communication.
Expand Down
5 changes: 2 additions & 3 deletions rclcpp/src/rclcpp/generic_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ GenericSubscription::handle_serialized_message(
any_callback_.dispatch(message, message_info);
}

void
GenericSubscription::handle_loaned_message(
void * message, const rclcpp::MessageInfo & message_info)
void GenericSubscription::handle_loaned_message(
std::shared_ptr<void> message, const rclcpp::MessageInfo & message_info)
{
(void) message;
(void) message_info;
Expand Down
4 changes: 4 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ ament_add_gtest(test_loaned_message test_loaned_message.cpp)
ament_add_test_label(test_loaned_message mimick)
target_link_libraries(test_loaned_message ${PROJECT_NAME} mimick ${test_msgs_TARGETS})

ament_add_gtest(test_shared_memory test_shared_memory.cpp)
ament_add_test_label(test_shared_memory mimick)
target_link_libraries(test_shared_memory ${PROJECT_NAME} mimick ${test_msgs_TARGETS})

ament_add_gtest(test_memory_strategy test_memory_strategy.cpp)
target_link_libraries(test_memory_strategy ${PROJECT_NAME} ${test_msgs_TARGETS})

Expand Down
2 changes: 1 addition & 1 deletion rclcpp/test/rclcpp/node_interfaces/test_node_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class TestSubscription : public rclcpp::SubscriptionBase
create_serialized_message() override {return nullptr;}

void handle_message(std::shared_ptr<void> &, const rclcpp::MessageInfo &) override {}
void handle_loaned_message(void *, const rclcpp::MessageInfo &) override {}
void handle_loaned_message(std::shared_ptr<void>, const rclcpp::MessageInfo &) override {}
void handle_serialized_message(
const std::shared_ptr<rclcpp::SerializedMessage> &, const rclcpp::MessageInfo &) override {}
void return_message(std::shared_ptr<void> &) override {}
Expand Down
Loading