编写监听器(Python)

**目标:**学习如何使用tf2来获取帧变换的访问权限。

教程级别: 中级

时间: 10分钟

背景

在之前的教程中,我们创建了一个tf2广播器,将乌龟的姿态发布到tf2中。

在本教程中,我们将创建一个tf2监听器,开始使用tf2。

先决条件

本教程假设你已经完成了 tf2广播器教程(Python)。在前一个教程中,我们创建了一个名为``learning_tf2_py``的软件包,在这里我们将继续工作。

任务

1 编写监听器节点

让我们首先创建源文件。转到前一个教程中创建的``learning_tf2_py``软件包。在``src/learning_tf2_py/learning_tf2_py``目录中,通过输入以下命令下载示例监听器代码:

wget https://raw.githubusercontent.com/ros/geometry_tutorials/ros2/turtle_tf2_py/turtle_tf2_py/turtle_tf2_listener.py

使用您偏爱的文本编辑器打开该文件。

import math

from geometry_msgs.msg import Twist

import rclpy
from rclpy.node import Node

from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener

from turtlesim.srv import Spawn


class FrameListener(Node):

    def __init__(self):
        super().__init__('turtle_tf2_frame_listener')

        # Declare and acquire `target_frame` parameter
        self.target_frame = self.declare_parameter(
          'target_frame', 'turtle1').get_parameter_value().string_value

        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)

        # Create a client to spawn a turtle
        self.spawner = self.create_client(Spawn, 'spawn')
        # Boolean values to store the information
        # if the service for spawning turtle is available
        self.turtle_spawning_service_ready = False
        # if the turtle was successfully spawned
        self.turtle_spawned = False

        # Create turtle2 velocity publisher
        self.publisher = self.create_publisher(Twist, 'turtle2/cmd_vel', 1)

        # Call on_timer function every second
        self.timer = self.create_timer(1.0, self.on_timer)

    def on_timer(self):
        # Store frame names in variables that will be used to
        # compute transformations
        from_frame_rel = self.target_frame
        to_frame_rel = 'turtle2'

        if self.turtle_spawning_service_ready:
            if self.turtle_spawned:
                # Look up for the transformation between target_frame and turtle2 frames
                # and send velocity commands for turtle2 to reach target_frame
                try:
                    t = self.tf_buffer.lookup_transform(
                        to_frame_rel,
                        from_frame_rel,
                        rclpy.time.Time())
                except TransformException as ex:
                    self.get_logger().info(
                        f'Could not transform {to_frame_rel} to {from_frame_rel}: {ex}')
                    return

                msg = Twist()
                scale_rotation_rate = 1.0
                msg.angular.z = scale_rotation_rate * math.atan2(
                    t.transform.translation.y,
                    t.transform.translation.x)

                scale_forward_speed = 0.5
                msg.linear.x = scale_forward_speed * math.sqrt(
                    t.transform.translation.x ** 2 +
                    t.transform.translation.y ** 2)

                self.publisher.publish(msg)
            else:
                if self.result.done():
                    self.get_logger().info(
                        f'Successfully spawned {self.result.result().name}')
                    self.turtle_spawned = True
                else:
                    self.get_logger().info('Spawn is not finished')
        else:
            if self.spawner.service_is_ready():
                # Initialize request with turtle name and coordinates
                # Note that x, y and theta are defined as floats in turtlesim/srv/Spawn
                request = Spawn.Request()
                request.name = 'turtle2'
                request.x = float(4)
                request.y = float(2)
                request.theta = float(0)
                # Call request
                self.result = self.spawner.call_async(request)
                self.turtle_spawning_service_ready = True
            else:
                # Check if the service is ready
                self.get_logger().info('Service is not ready')


def main():
    rclpy.init()
    node = FrameListener()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass

    rclpy.shutdown()

1.1 检查代码

要了解生成海龟背后的服务如何工作,请参考 编写简单服务和客户端(Python) 教程。

现在,让我们看一下与获取帧转换相关的代码。tf2_ros 包提供了一个 TransformListener 的实现,以帮助简化接收转换的任务。

from tf2_ros.transform_listener import TransformListener

在这里,我们创建了一个``TransformListener``对象。一旦创建了监听器,它就开始通过网络接收tf2变换,并将其缓冲最多10秒。

self.tf_listener = TransformListener(self.tf_buffer, self)

最后,我们查询监听器以获取特定的变换。我们使用以下参数调用``lookup_transform``方法:

  1. 目标帧

  2. 源帧

  3. 我们想要进行变换的时间

提供 rclpy.time.Time() 只会获取最新可用的转换。所有这些都包装在 try-except 块中,以处理可能的异常。

t = self.tf_buffer.lookup_transform(
    to_frame_rel,
    from_frame_rel,
    rclpy.time.Time())

1.2 添加入口点

为了让 ros2 run 命令能够运行你的节点,你必须在 setup.py``(位于 ``src/learning_tf2_py 目录中)中添加入口点。

'turtle_tf2_listener = learning_tf2_py.turtle_tf2_listener:main',

2 更新启动文件

使用文本编辑器打开名为 turtle_tf2_demo.launch.py 的启动文件,向启动描述中添加两个新节点,添加一个启动参数,并添加导入语句。结果文件应如下所示:

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

from launch_ros.actions import Node


def generate_launch_description():
    return LaunchDescription([
        Node(
            package='turtlesim',
            executable='turtlesim_node',
            name='sim'
        ),
        Node(
            package='learning_tf2_py',
            executable='turtle_tf2_broadcaster',
            name='broadcaster1',
            parameters=[
                {'turtlename': 'turtle1'}
            ]
        ),
        DeclareLaunchArgument(
            'target_frame', default_value='turtle1',
            description='Target frame name.'
        ),
        Node(
            package='learning_tf2_py',
            executable='turtle_tf2_broadcaster',
            name='broadcaster2',
            parameters=[
                {'turtlename': 'turtle2'}
            ]
        ),
        Node(
            package='learning_tf2_py',
            executable='turtle_tf2_listener',
            name='listener',
            parameters=[
                {'target_frame': LaunchConfiguration('target_frame')}
            ]
        ),
    ])

这将声明一个 target_frame 启动参数,启动一个我们将要生成的第二只乌龟的广播器和订阅这些变换的监听器。

3 构建

在你的工作空间的根目录中运行 rosdep,以检查缺少的依赖项。

rosdep install -i --from-path src --rosdistro humble -y

仍然在工作空间的根目录中,构建您的软件包:

colcon build --packages-select learning_tf2_py

打开一个新终端,进入你的工作空间的根目录,并源化设置文件:

. install/setup.bash

4 运行

现在您可以开始完整的乌龟演示了:

ros2 launch learning_tf2_py turtle_tf2_demo.launch.py

你应该能看到带有两只乌龟的乌龟模拟器。在第二个终端窗口中输入以下命令:

ros2 run turtlesim turtle_teleop_key

为了测试是否正常工作,只需使用箭头键驱动第一只乌龟(确保你的终端窗口处于活动状态,而不是模拟器窗口),你将看到第二只乌龟跟随着第一只乌龟!

总结

在本教程中,你学会了如何使用tf2获取帧变换的访问权限。你还完成了自己编写的乌龟模拟器演示,该演示首次在 tf2介绍教程 中尝试。