编写一个动作服务器和客户端(Python)

目标: 在Python中实现一个动作服务器和客户端。

教程级别: 中级

时间: 15分钟

背景

在ROS 2中,动作是一种异步通信形式。*动作客户端*向*动作服务器*发送目标请求。*动作服务器*向*动作客户端*发送目标反馈和结果。

先决条件

你需要使用``action_tutorials_interfaces``包和前面教程中定义的``Fibonacci.action``接口。参见 创建一个动作

任务

1 编写动作服务器

让我们专注于编写一个使用我们在 创建一个动作 教程中创建的操作来计算斐波那契数列的操作服务器。

到目前为止,您已经创建了包并使用``ros2 run``运行节点。但是,在本教程中,为了保持简单,我们将将动作服务器限定在一个单独的文件中。如果您想查看动作教程的完整包是什么样子,请查看`action_tutorials <https://github.com/ros2/demos/tree/humble/action_tutorials>`__。

在您的主目录中打开一个新文件,我们将其命名为``fibonacci_action_server.py``,然后添加以下代码:

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

第8行定义了一个名为``FibonacciActionServer``的类,它是``Node``的子类。通过调用``Node``构造函数来初始化该类,并将节点命名为``fibonacci_action_server``:

        super().__init__('fibonacci_action_server')

在构造函数中,我们还实例化了一个新的动作服务器:

        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

一个操作服务器需要四个参数:

  1. 要添加操作客户端的ROS 2节点:self

  2. 操作的类型:``Fibonacci``(在第5行导入)。

  3. 动作名称:'fibonacci'

  4. 用于执行已接受的目标的回调函数:self.execute_callback。该回调函数**必须**返回该操作类型的结果消息。

我们还在类中定义了一个``execute_callback``方法:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result

这是一旦接受目标就会被调用来执行目标的方法。

让我们尝试运行我们的动作服务器:

python3 fibonacci_action_server.py

在另一个终端中,我们可以使用命令行界面发送一个目标:

ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

在正在运行动作服务器的终端中,您应该看到一个记录的消息“正在执行目标...”,然后是一个警告,说明目标状态未设置。默认情况下,如果在执行回调中未设置目标处理状态,则假定为“中止”状态。

我们可以使用方法` succeed() <http://docs.ros2.org/latest/api/rclpy/api/actions.html#rclpy.action.server.ServerGoalHandle.succeed>`_ 来表示目标已成功:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        goal_handle.succeed()
        result = Fibonacci.Result()
        return result

现在,如果重新启动动作服务器并发送另一个目标,您应该看到目标以``SUCCEEDED``状态完成。

现在让我们实际计算并返回请求的斐波那契数列:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            sequence.append(sequence[i] + sequence[i-1])

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence
        return result

计算序列后,我们在返回之前将其分配给结果消息字段。

然后重新启动动作服务器并发送另一个目标。您应该看到目标以正确的结果序列完成。

1.2 发布反馈

动作的一个好处是在目标执行期间向动作客户端提供反馈。我们可以通过调用目标处理器的 publish_feedback() 方法,使我们的动作服务器为动作客户端发布反馈。

我们将替换 sequence 变量,并使用反馈消息来存储序列。在 for 循环中每次更新反馈消息后,我们都会发布反馈消息并休眠以产生戏剧效果:

import time

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

重新启动动作服务器后,我们可以使用带有 --feedback 选项的命令行工具来确认现在已发布反馈:

ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

2 编写一个动作客户端

我们还将将动作客户端限定在单个文件范围内。打开一个新文件,我们称之为``fibonacci_action_client.py``,然后添加以下样板代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    future = action_client.send_goal(10)

    rclpy.spin_until_future_complete(action_client, future)


if __name__ == '__main__':
    main()

我们定义了一个名为``FibonacciActionClient``的类,它是``Node``的子类。通过调用``Node``构造函数来初始化该类,将我们的节点命名为``fibonacci_action_client``:

        super().__init__('fibonacci_action_client')

在类的构造函数中,我们还使用之前教程中的自定义动作定义来创建一个动作客户端: 创建一个动作

        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

我们通过传递三个参数来创建一个``ActionClient``:

  1. 要添加动作客户端的 ROS 2 节点:self

  2. 动作的类型:Fibonacci

  3. 动作的名称:'fibonacci'

我们的动作客户端将能够与具有相同名称和类型的动作服务器进行通信。

我们还在``FibonacciActionClient``类中定义了一个方法``send_goal``:

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)

该方法等待动作服务器可用,然后向服务器发送一个目标。它返回一个未来对象,我们可以稍后等待该对象。

在类定义之后,我们定义了一个函数``main()``,它初始化ROS 2并创建``FibonacciActionClient``节点的一个实例。然后它发送一个目标并等待该目标完成。

最后,在我们的Python程序入口点调用``main()``。

让我们通过首先运行之前构建的动作服务器来测试我们的动作客户端:

python3 fibonacci_action_server.py

在另一个终端中运行动作客户端:

python3 fibonacci_action_client.py

当动作服务器成功执行目标时,您应该会看到由其打印的消息:

[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.

动作客户端应该启动,并快速完成。此时,我们拥有一个功能齐全的动作客户端,但我们看不到任何结果或获得任何反馈。

2.1 获取结果

那么我们可以发送一个目标,但我们如何知道何时完成呢?我们可以通过以下几个步骤获取结果信息。首先,我们需要为发送的目标获取一个目标句柄。然后,我们可以使用目标句柄来请求结果。

以下是这个示例的完整代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

ActionClient.send_goal_async() 方法返回一个对目标句柄的 future。首先,我们为 future 完成时注册一个回调函数:

        self._send_goal_future.add_done_callback(self.goal_response_callback)

请注意,当一个动作服务器接受或拒绝目标请求时,future 将会完成。让我们更详细地查看 goal_response_callback。我们可以检查目标是否被拒绝,并在此处提前返回,因为我们知道将不会有结果:

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

现在我们已经获得了一个目标句柄,我们可以使用它来使用 get_result_async() 方法请求结果。与发送目标类似,我们将得到一个 future,该 future 将在结果准备好时完成。让我们注册一个与目标响应时类似的回调函数:

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

在回调函数中,我们记录结果序列并优雅地关闭 ROS 2 以完成退出:

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

在一个独立的终端中运行动作服务器后,可以尝试运行我们的斐波那契动作客户端!

python3 fibonacci_action_client.py

你应该会看到记录的消息,显示目标已被接受和最终结果。

2.2 获取反馈

我们的动作客户端可以发送目标。很好!但是如果我们能够从动作服务器获取一些关于发送的目标的反馈信息就更好了。

以下是这个示例的完整代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

这是用于反馈消息的回调函数:

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

在回调函数中,我们获取消息的反馈部分并将``partial_sequence``字段打印到屏幕上。

我们需要在动作客户端中注册回调函数。当我们发送一个目标时,可以通过将回调函数附加到动作客户端来实现:

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

一切准备就绪。如果我们运行我们的动作客户端,你应该会在屏幕上看到打印出的反馈信息。

总结

在本教程中,你逐行组装了一个Python动作服务器和动作客户端,并配置它们进行目标、反馈和结果的交换。