Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@

namespace rclcpp
{
namespace graph_listener
{
class GraphListener;
} // namespace graph_listener

/// Thrown when init is called on an already initialized context.
class ContextAlreadyInitialized : public std::runtime_error
Expand Down Expand Up @@ -309,6 +313,10 @@ class Context : public std::enable_shared_from_this<Context>
std::shared_ptr<rcl_context_t>
get_rcl_context();

RCLCPP_PUBLIC
std::shared_ptr<rclcpp::graph_listener::GraphListener>
get_graph_listener();

/// Sleep for a given period of time or until shutdown() is called.
/**
* This function can be interrupted early if:
Expand Down Expand Up @@ -391,6 +399,9 @@ class Context : public std::enable_shared_from_this<Context>
/// Mutex for protecting the global condition variable.
std::mutex interrupt_mutex_;

/// Graph Listener which waits on graph changes for the node and is shared across nodes.
std::shared_ptr<rclcpp::graph_listener::GraphListener> graph_listener_;

/// Keep shared ownership of global vector of weak contexts
std::shared_ptr<WeakContextsWrapper> weak_contexts_;

Expand Down
7 changes: 6 additions & 1 deletion rclcpp/include/rclcpp/graph_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
bool
is_shutdown();

/// Return true if the graph listener was started.
RCLCPP_PUBLIC
virtual
bool
is_started();

protected:
/// Main function for the listening thread.
RCLCPP_PUBLIC
Expand Down Expand Up @@ -181,7 +187,6 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
void
__shutdown();

std::weak_ptr<rclcpp::Context> weak_parent_context_;
std::shared_ptr<rcl_context_t> rcl_parent_context_;

std::thread listener_thread_;
Expand Down
2 changes: 0 additions & 2 deletions rclcpp/include/rclcpp/node_interfaces/node_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ class NodeGraph : public NodeGraphInterface
/// Handle to the NodeBaseInterface given in the constructor.
rclcpp::node_interfaces::NodeBaseInterface * node_base_;

/// Graph Listener which waits on graph changes for the node and is shared across nodes.
std::shared_ptr<rclcpp::graph_listener::GraphListener> graph_listener_;
/// Whether or not this node needs to be added to the graph listener.
std::atomic_bool should_add_to_graph_listener_;

Expand Down
30 changes: 28 additions & 2 deletions rclcpp/src/rclcpp/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "rclcpp/detail/utilities.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/logging.hpp"

#include "rclcpp/graph_listener.hpp"
#include "rcutils/error_handling.h"
#include "rcutils/macros.h"

Expand Down Expand Up @@ -184,7 +184,8 @@ MutexLookup mutexStorage;
Context::Context()
: rcl_context_(nullptr),
shutdown_reason_(""),
logging_mutex_(nullptr)
logging_mutex_(nullptr),
graph_listener_(nullptr)
{
// allocate mutexes
mutexStorage.getMutexes(this);
Expand Down Expand Up @@ -289,6 +290,25 @@ Context::init(

weak_contexts_ = get_weak_contexts();
weak_contexts_->add_context(this->shared_from_this());


std::lock_guard<std::recursive_mutex> lock(mutexStorage.getMutexes(
this).on_shutdown_callbacks_mutex_);

graph_listener_ = std::make_shared<graph_listener::GraphListener>(shared_from_this());

if (!graph_listener_->is_started()) {
// Register an on_shutdown hook to shutdown the graph listener.
// This is important to ensure that the wait set is finalized before
// destruction of static objects occurs.
std::weak_ptr<rclcpp::graph_listener::GraphListener> weak_graph_listener = graph_listener_;
on_shutdown ([weak_graph_listener]() {
auto shared_graph_listener = weak_graph_listener.lock();
if(shared_graph_listener) {
shared_graph_listener->shutdown(std::nothrow);
}
});
}
} catch (const std::exception & e) {
ret = rcl_shutdown(rcl_context_.get());
rcl_context_.reset();
Expand Down Expand Up @@ -555,6 +575,12 @@ Context::get_rcl_context()
return rcl_context_;
}

std::shared_ptr<rclcpp::graph_listener::GraphListener>
Context::get_graph_listener()
{
return graph_listener_;
}

bool
Context::sleep_for(const std::chrono::nanoseconds & nanoseconds)
{
Expand Down
38 changes: 15 additions & 23 deletions rclcpp/src/rclcpp/graph_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <memory>
#include <string>
#include <vector>

#include "rcl/error_handling.h"
#include "rcl/types.h"
#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
Expand All @@ -38,8 +37,7 @@ namespace graph_listener
{

GraphListener::GraphListener(const std::shared_ptr<Context> & parent_context)
: weak_parent_context_(parent_context),
rcl_parent_context_(parent_context->get_rcl_context()),
: rcl_parent_context_(parent_context->get_rcl_context()),
is_started_(false),
is_shutdown_(false),
interrupt_guard_condition_(parent_context)
Expand Down Expand Up @@ -72,23 +70,11 @@ void
GraphListener::start_if_not_started()
{
std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
if (is_shutdown_.load()) {
if (is_shutdown()) {
throw GraphListenerShutdownError();
}
auto parent_context = weak_parent_context_.lock();
if (!is_started_ && parent_context) {
// Register an on_shutdown hook to shtudown the graph listener.
// This is important to ensure that the wait set is finalized before
// destruction of static objects occurs.
std::weak_ptr<GraphListener> weak_this = shared_from_this();
parent_context->on_shutdown(
[weak_this]() {
auto shared_this = weak_this.lock();
if (shared_this) {
// should not throw from on_shutdown if it can be avoided
shared_this->shutdown(std::nothrow);
}
});

if (!is_started()) {
// Initialize the wait set before starting.
init_wait_set();
// Start the listener thread.
Expand Down Expand Up @@ -122,7 +108,7 @@ GraphListener::run_loop()
{
while (true) {
// If shutdown() was called, exit.
if (is_shutdown_.load()) {
if (is_shutdown()) {
return;
}
rcl_ret_t ret;
Expand Down Expand Up @@ -190,7 +176,7 @@ GraphListener::run_loop()
if (graph_gc == wait_set_.guard_conditions[graph_gc_indexes[i]]) {
node_ptr->notify_graph_change();
}
if (is_shutdown_) {
if (is_shutdown()) {
// If shutdown, then notify the node of this as well.
node_ptr->notify_shutdown();
}
Expand Down Expand Up @@ -257,7 +243,7 @@ GraphListener::add_node(rclcpp::node_interfaces::NodeGraphInterface * node_graph
throw std::invalid_argument("node is nullptr");
}
std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
if (is_shutdown_.load()) {
if (is_shutdown()) {
throw GraphListenerShutdownError();
}

Expand Down Expand Up @@ -332,11 +318,11 @@ GraphListener::__shutdown()
{
std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
if (!is_shutdown_.exchange(true)) {
if (is_started_) {
if (is_started()) {
interrupt_(&interrupt_guard_condition_);
listener_thread_.join();
}
if (is_started_) {
if (is_started()) {
cleanup_wait_set();
}
}
Expand Down Expand Up @@ -365,6 +351,12 @@ GraphListener::shutdown(const std::nothrow_t &) noexcept
}
}

bool
GraphListener::is_started()
{
return is_started_;
}

bool
GraphListener::is_shutdown()
{
Expand Down
9 changes: 3 additions & 6 deletions rclcpp/src/rclcpp/node_interfaces/node_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ using rclcpp::graph_listener::GraphListener;

NodeGraph::NodeGraph(rclcpp::node_interfaces::NodeBaseInterface * node_base)
: node_base_(node_base),
graph_listener_(
node_base->get_context()->get_sub_context<GraphListener>(node_base->get_context())
),
should_add_to_graph_listener_(true),
graph_users_count_(0)
{}
Expand All @@ -50,7 +47,7 @@ NodeGraph::~NodeGraph()
// graph listener after checking that it was not here.
if (!should_add_to_graph_listener_.exchange(false)) {
// If it was already false, then it needs to now be removed.
graph_listener_->remove_node(this);
node_base_->get_context()->get_graph_listener()->remove_node(this);
}
}

Expand Down Expand Up @@ -603,8 +600,8 @@ NodeGraph::get_graph_event()
}
// on first call, add node to graph_listener_
if (should_add_to_graph_listener_.exchange(false)) {
graph_listener_->add_node(this);
graph_listener_->start_if_not_started();
node_base_->get_context()->get_graph_listener()->add_node(this);
node_base_->get_context()->get_graph_listener()->start_if_not_started();
}
return event;
}
Expand Down
4 changes: 1 addition & 3 deletions rclcpp/test/rclcpp/test_graph_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ class TestGraphListener : public ::testing::Test
node_graph_ = node_->get_node_graph_interface();
ASSERT_NE(nullptr, node_graph_);

graph_listener_ =
std::make_shared<rclcpp::graph_listener::GraphListener>(
rclcpp::contexts::get_global_default_context());
graph_listener_ = rclcpp::contexts::get_global_default_context()->get_graph_listener();
}

void TearDown()
Expand Down