使用回调组
在多线程执行器中运行节点时,ROS 2提供了回调组作为控制不同回调的执行的工具。本页面旨在指导如何有效地使用回调组。假设读者对 执行器 的概念有基本的了解。
回调组的基础知识
在使用多线程执行器(Multi-Threaded Executor)运行节点时,ROS 2 提供了两种不同类型的回调组(callback groups)来控制回调函数的执行:
互斥回调组(Mutually Exclusive Callback Group)
可重入回调组(Reentrant Callback Group)
这些回调组以不同的方式限制其回调函数的执行。简而言之
互斥回调组阻止其回调并行执行 - 实际上使得回调组中的回调像由单线程执行器执行一样。
可重入回调组允许执行器按照任意方式调度和执行组中的回调,没有任何限制。这意味着,除了不同的回调之间可以并行运行外,同一个回调的不同实例也可以同时执行。
属于不同回调组(任何类型)的回调始终可以并行执行。
还要注意的是,不同的ROS 2实体将其回调组传递给它们生成的所有回调。例如,如果将回调组分配给一个动作客户端,客户端创建的所有回调都将分配给该回调组。
回调组可以通过节点的 create_callback_group
函数在 rclcpp 中创建,并通过在 rclpy 中调用组的构造函数进行创建。当创建订阅、定时器等时,可以将回调组作为参数/选项进行传递。
my_callback_group = create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
rclcpp::SubscriptionOptions options;
options.callback_group = my_callback_group;
my_subscription = create_subscription<Int32>("/topic", rclcpp::SensorDataQoS(),
callback, options);
my_callback_group = MutuallyExclusiveCallbackGroup()
my_subscription = self.create_subscription(Int32, "/topic", self.callback, qos_profile=1,
callback_group=my_callback_group)
如果用户在创建订阅、定时器等时未指定任何回调组,则该实体将被分配给节点的默认回调组。默认回调组是一个互斥回调组,可以通过 NodeBaseInterface::get_default_callback_group()
在 rclcpp 中查询,并通过 Node.default_callback_group
在 rclpy 中查询。
关于回调
在 ROS 2 和执行器的上下文中,回调是指由执行器处理调度和执行的函数。在这个上下文中,回调的例子有:
订阅回调(从主题接收和处理数据),
定时器回调,
服务回调(用于在服务器中执行服务请求),
动作服务器和客户端中的不同回调,
Futures的完成回调。
以下是在使用回调组时需要记住的一些重要要点。
在ROS 2中,几乎所有的东西都是回调!每个由执行器运行的函数都是回调的定义。在ROS 2系统中,非回调函数主要位于系统的边缘(用户和传感器输入等)。
有时,回调是隐藏的,从用户/开发者API来看可能并不明显。这尤其适用于对服务或操作进行任何类型的“同步”调用(在rclpy中)。例如,对服务进行同步调用``Client.call(request)``会添加一个Future的完成回调,该回调需要在函数调用期间执行,但此回调对用户来说不直接可见。
控制执行
为了使用回调组来控制执行,可以考虑以下准则。
对于一个回调函数与其自身的交互:
如果希望它能够与自身并行执行,将其注册到一个可重入回调组中。例如,一个需要能够并行处理多个动作调用的动作/服务服务器。
如果它**永远不应**与自身并行执行,请将其注册到一个互斥回调组中。一个示例情况可能是一个定时器回调,它运行一个控制循环以发布控制命令。
对于不同回调之间的相互作用:
如果它们**永远不应**并行执行,请将它们注册到同一个互斥回调组中。一个示例情况可能是这些回调正在访问共享的关键资源和非线程安全资源。
如果它们应该并行执行,根据个别回调是否可以重叠,您有两个选择:
将它们注册到不同的互斥回调组(个别回调之间不重叠)
将它们注册到可重入回调组(个别回调之间有重叠)
并行运行不同回调函数的一个示例情况是一个具有同步服务客户端和定时器调用该服务的节点。请参考下面的详细示例。
避免死锁
设置节点的回调组不正确可能会导致死锁(或其他不希望的行为),特别是如果希望在回调中使用同步调用服务或动作。事实上,ROS 2 的 API 文档甚至提到在回调中不应该进行同步调用动作或服务,因为这可能会导致死锁。虽然在这方面使用异步调用确实更安全,但同步调用也可以正常工作。另一方面,同步调用也有其优点,例如使代码更简单易懂。因此,本节提供了一些关于如何正确设置节点的回调组以避免死锁的指南。
这里需要注意的第一点是,每个节点的默认回调组是互斥的回调组(Mutually Exclusive Callback Group)。如果用户在创建定时器、订阅、客户端等时没有指定其他回调组,那么这些实体创建的任何回调都将使用节点的默认回调组。此外,如果节点中的所有内容都使用相同的互斥回调组,即使指定了多线程执行器,该节点实际上也会像单线程执行器一样处理!因此,当决定使用多线程执行器时,应始终指定一个或多个回调组,以使执行器的选择有意义。
在上述内容的基础上,以下是一些指南,可帮助避免死锁:
如果在任何类型的回调中进行同步调用,该回调和进行调用的客户端需要属于
不同的回调组(任意类型),或者
一个可重入回调组。
如果由于其他要求(如线程安全和/或在等待结果时阻塞其他回调)而无法满足上述配置要求,或者您希望确保绝对不会发生死锁的可能性,请使用异步调用。
不满足第一个要点将始终导致死锁。这种情况的一个示例是在定时器回调中进行同步服务调用(请参阅下一节的示例)。
示例
让我们来看一些不同回调组设置的简单示例。下面的演示代码考虑在计时器回调中同步调用一个服务。
演示代码
我们有两个节点 - 一个提供简单服务:
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_srvs/srv/empty.hpp"
using namespace std::placeholders;
namespace cb_group_demo
{
class ServiceNode : public rclcpp::Node
{
public:
ServiceNode() : Node("service_node")
{
service_ptr_ = this->create_service<std_srvs::srv::Empty>(
"test_service",
std::bind(&ServiceNode::service_callback, this, _1, _2, _3)
);
}
private:
rclcpp::Service<std_srvs::srv::Empty>::SharedPtr service_ptr_;
void service_callback(
const std::shared_ptr<rmw_request_id_t> request_header,
const std::shared_ptr<std_srvs::srv::Empty::Request> request,
const std::shared_ptr<std_srvs::srv::Empty::Response> response)
{
(void)request_header;
(void)request;
(void)response;
RCLCPP_INFO(this->get_logger(), "Received request, responding...");
}
}; // class ServiceNode
} // namespace cb_group_demo
int main(int argc, char* argv[])
{
rclcpp::init(argc, argv);
auto service_node = std::make_shared<cb_group_demo::ServiceNode>();
RCLCPP_INFO(service_node->get_logger(), "Starting server node, shut down with CTRL-C");
rclcpp::spin(service_node);
RCLCPP_INFO(service_node->get_logger(), "Keyboard interrupt, shutting down.\n");
rclcpp::shutdown();
return 0;
}
import rclpy
from rclpy.node import Node
from std_srvs.srv import Empty
class ServiceNode(Node):
def __init__(self):
super().__init__('service_node')
self.srv = self.create_service(Empty, 'test_service', callback=self.service_callback)
def service_callback(self, request, result):
self.get_logger().info('Received request, responding...')
return result
if __name__ == '__main__':
rclpy.init()
node = ServiceNode()
try:
node.get_logger().info("Starting server node, shut down with CTRL-C")
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard interrupt, shutting down.\n')
node.destroy_node()
rclpy.shutdown()
另一个包含一个客户端和一个用于进行服务调用的计时器:
注意: rclcpp 中的服务客户端 API 不提供类似于 rclpy 中的同步调用方法,因此我们等待 future 对象来模拟同步调用的效果。
#include <chrono>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_srvs/srv/empty.hpp"
using namespace std::chrono_literals;
namespace cb_group_demo
{
class DemoNode : public rclcpp::Node
{
public:
DemoNode() : Node("client_node")
{
client_cb_group_ = nullptr;
timer_cb_group_ = nullptr;
client_ptr_ = this->create_client<std_srvs::srv::Empty>("test_service", rmw_qos_profile_services_default,
client_cb_group_);
timer_ptr_ = this->create_wall_timer(1s, std::bind(&DemoNode::timer_callback, this),
timer_cb_group_);
}
private:
rclcpp::CallbackGroup::SharedPtr client_cb_group_;
rclcpp::CallbackGroup::SharedPtr timer_cb_group_;
rclcpp::Client<std_srvs::srv::Empty>::SharedPtr client_ptr_;
rclcpp::TimerBase::SharedPtr timer_ptr_;
void timer_callback()
{
RCLCPP_INFO(this->get_logger(), "Sending request");
auto request = std::make_shared<std_srvs::srv::Empty::Request>();
auto result_future = client_ptr_->async_send_request(request);
std::future_status status = result_future.wait_for(10s); // timeout to guarantee a graceful finish
if (status == std::future_status::ready) {
RCLCPP_INFO(this->get_logger(), "Received response");
}
}
}; // class DemoNode
} // namespace cb_group_demo
int main(int argc, char* argv[])
{
rclcpp::init(argc, argv);
auto client_node = std::make_shared<cb_group_demo::DemoNode>();
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(client_node);
RCLCPP_INFO(client_node->get_logger(), "Starting client node, shut down with CTRL-C");
executor.spin();
RCLCPP_INFO(client_node->get_logger(), "Keyboard interrupt, shutting down.\n");
rclcpp::shutdown();
return 0;
}
import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup, ReentrantCallbackGroup
from rclpy.node import Node
from std_srvs.srv import Empty
class CallbackGroupDemo(Node):
def __init__(self):
super().__init__('client_node')
client_cb_group = None
timer_cb_group = None
self.client = self.create_client(Empty, 'test_service', callback_group=client_cb_group)
self.call_timer = self.create_timer(1, self._timer_cb, callback_group=timer_cb_group)
def _timer_cb(self):
self.get_logger().info('Sending request')
_ = self.client.call(Empty.Request())
self.get_logger().info('Received response')
if __name__ == '__main__':
rclpy.init()
node = CallbackGroupDemo()
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
node.get_logger().info('Beginning client, shut down with CTRL-C')
executor.spin()
except KeyboardInterrupt:
node.get_logger().info('Keyboard interrupt, shutting down.\n')
node.destroy_node()
rclpy.shutdown()
客户端节点的构造函数包含设置服务客户端和定时器的回调组的选项。在上述默认设置中(都为 nullptr
/ None
),定时器和客户端都将使用节点的默认互斥回调组。
问题
由于我们使用 1 秒定时器进行服务调用,预期的结果是服务每秒调用一次,客户端始终收到响应并打印 Received response
。如果我们尝试在终端中运行服务器和客户端节点,则会得到以下输出。
[INFO] [1653034371.758739131] [client_node]: Starting client node, shut down with CTRL-C
[INFO] [1653034372.755865649] [client_node]: Sending request
^C[INFO] [1653034398.161674869] [client_node]: Keyboard interrupt, shutting down.
[INFO] [1653034355.308958238] [service_node]: Starting server node, shut down with CTRL-C
[INFO] [1653034372.758197320] [service_node]: Received request, responding...
^C[INFO] [1653034416.021962246] [service_node]: Keyboard interrupt, shutting down.
所以,事实证明,服务被反复调用,但是第一次调用的响应从未接收到,之后客户端节点似乎陷入了困境,并且不再进行进一步的调用。也就是说,执行在一个僵局中停止了!
导致这种情况的原因是计时器回调和客户端使用了相同的互斥回调组(节点的默认组)。当进行服务调用时,客户端将其回调组传递给了Future对象(在Python版本中隐藏在调用方法内部),而该回调需要执行以便使服务调用的结果可用。但是,由于该回调和计时器回调位于相同的互斥组中,并且计时器回调仍在执行(等待服务调用的结果),因此该回调永远不会执行。卡住的计时器回调还会阻止它本身的任何其他执行,因此计时器不会第二次触发。
解决方案
我们可以很容易地解决这个问题 - 例如 - 通过将定时器和客户端分配给不同的回调组。因此,让我们将客户端节点构造函数的前两行更改为如下(其他所有内容保持不变):
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
timer_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
client_cb_group = MutuallyExclusiveCallbackGroup()
timer_cb_group = MutuallyExclusiveCallbackGroup()
现在我们得到了预期的结果,即定时器重复触发,并且每个服务调用都按照应有的方式得到结果:
[INFO] [1653067523.431731177] [client_node]: Starting client node, shut down with CTRL-C
[INFO] [1653067524.431912821] [client_node]: Sending request
[INFO] [1653067524.433230445] [client_node]: Received response
[INFO] [1653067525.431869330] [client_node]: Sending request
[INFO] [1653067525.432912803] [client_node]: Received response
[INFO] [1653067526.431844726] [client_node]: Sending request
[INFO] [1653067526.432893954] [client_node]: Received response
[INFO] [1653067527.431828287] [client_node]: Sending request
[INFO] [1653067527.432848369] [client_node]: Received response
^C[INFO] [1653067528.400052749] [client_node]: Keyboard interrupt, shutting down.
[INFO] [1653067522.052866001] [service_node]: Starting server node, shut down with CTRL-C
[INFO] [1653067524.432577720] [service_node]: Received request, responding...
[INFO] [1653067525.432365009] [service_node]: Received request, responding...
[INFO] [1653067526.432300261] [service_node]: Received request, responding...
[INFO] [1653067527.432272441] [service_node]: Received request, responding...
^C[INFO] [1653034416.021962246] [service_node]: KeyboardInterrupt, shutting down.
有人可能会考虑是否仅仅避免使用节点的默认回调组就足够了。但事实并非如此:用不同的互斥组替换默认组并没有改变什么。因此,以下配置也会导致之前发现的死锁问题。
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
timer_cb_group_ = client_cb_group_;
client_cb_group = MutuallyExclusiveCallbackGroup()
timer_cb_group = client_cb_group
事实上,在这种情况下,一切正常工作的确切条件是计时器和客户端不能属于同一互斥组。因此,以下所有配置(以及其他一些配置)都会产生所需的结果,即计时器重复触发并完成服务调用。
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
timer_cb_group_ = client_cb_group_;
或
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
timer_cb_group_ = nullptr;
或
client_cb_group_ = nullptr;
timer_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
或
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
timer_cb_group_ = nullptr;
client_cb_group = ReentrantCallbackGroup()
timer_cb_group = client_cb_group
或
client_cb_group = MutuallyExclusiveCallbackGroup()
timer_cb_group = None
或
client_cb_group = None
timer_cb_group = MutuallyExclusiveCallbackGroup()
或
client_cb_group = ReentrantCallbackGroup()
timer_cb_group = None