从节点录制包(Python)

目标: 将自己的 Python 节点数据录制到一个包中。

教程级别: 高级

**时间:**20分钟

背景

rosbag2 不仅提供了 ros2 bag 命令行工具,还提供了一个 Python API,用于从你自己的源代码中读取和写入包。这使得你可以订阅一个话题并将接收到的数据同时保存到一个包中,同时对数据进行任何其他处理。例如,你可以保存话题的数据和处理结果,而无需将处理后的数据发送到另一个话题只为了记录它。由于任何数据都可以记录在包中,因此也可以保存由话题以外的其他源生成的数据,例如用于训练集的合成数据。这对于快速生成包含大量样本且播放时间较长的包非常有用。

先决条件

你应该在常规的 ROS 2 配置中安装了 rosbag2 包。

如果你是通过在 Linux 上安装 Debian 包进行安装的,它可能是默认安装的。如果没有安装,可以使用以下命令安装:

sudo apt install ros-humble-rosbag2

本教程讨论了如何使用 ROS 2 bags,包括在终端上的使用。你应该已经完成了 基本 ROS 2 bag 教程

任务

1 创建一个包

在一个新的终端中 源化你的 ROS 2 安装,这样 ros2 命令才能正常工作。

按照:ref:这些指示 创建名为``ros2_ws``的新工作空间。

进入``ros2_ws/src``目录并创建一个新包:

ros2 pkg create --build-type ament_python bag_recorder_nodes_py --dependencies rclpy rosbag2_py example_interfaces std_msgs

终端将返回一条消息,验证您的``bag_recorder_nodes_py``包及其所有必要的文件和文件夹已创建。``--dependencies``参数将自动将必要的依赖行添加到``package.xml``中。在此情况下,该包将使用``rosbag2_py``包和``rclpy``包。还需要对``example_interfaces``包进行依赖,以获取消息定义。

1.1 更新``package.xml``和``setup.py``文件

因为您在包创建过程中使用了``--dependencies``选项,所以您不必手动添加依赖项到``package.xml``中。然而,仍然要确保将描述、维护者电子邮件和名称以及许可证信息添加到``package.xml``中。

<description>Python bag writing tutorial</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache License 2.0</license>

还要确保将这些信息添加到``setup.py``文件中。

maintainer='Your Name',
maintainer_email='you@email.com',
description='Python bag writing tutorial',
license='Apache License 2.0',

2 编写Python节点

在``ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py``目录中,创建一个名为``simple_bag_recorder.py``的新文件,并将以下代码粘贴到其中。

import rclpy
from rclpy.node import Node
from rclpy.serialization import serialize_message
from std_msgs.msg import String

import rosbag2_py

class SimpleBagRecorder(Node):
    def __init__(self):
        super().__init__('simple_bag_recorder')
        self.writer = rosbag2_py.SequentialWriter()

        storage_options = rosbag2_py._storage.StorageOptions(
            uri='my_bag',
            storage_id='sqlite3')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        topic_info = rosbag2_py._storage.TopicMetadata(
            name='chatter',
            type='std_msgs/msg/String',
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.topic_callback,
            10)
        self.subscription

    def topic_callback(self, msg):
        self.writer.write(
            'chatter',
            serialize_message(msg),
            self.get_clock().now().nanoseconds)


def main(args=None):
    rclpy.init(args=args)
    sbr = SimpleBagRecorder()
    rclpy.spin(sbr)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

2.1 检查代码

顶部的``import``语句是包的依赖项。注意导入``rosbag2_py``包,用于处理bag文件所需的函数和结构。

在类构造函数中,我们首先创建一个写入器对象,用于将数据写入bag文件。我们创建了一个``SequentialWriter``,它按照接收到的顺序将消息写入bag文件。`rosbag2 <https://github.com/ros2/rosbag2/tree/humble/rosbag2_cpp/include/rosbag2_cpp/writers>`__中可能还提供了其他具有不同行为的写入器。

self.writer = rosbag2_py.SequentialWriter()

现在我们有了一个写入器对象,我们可以使用它打开bag文件。我们指定要创建的bag文件的URI和格式(sqlite3),将其他选项保持默认。使用默认的转换选项,不进行转换,将消息以接收到的序列化格式存储。

storage_options = rosbag2_py._storage.StorageOptions(
    uri='my_bag',
    storage_id='sqlite3')
converter_options = rosbag2_py._storage.ConverterOptions('', '')
self.writer.open(storage_options, converter_options)

接下来,我们需要告诉写入器我们希望存储的话题。这是通过创建一个``TopicMetadata``对象并将其注册到写入器中来完成的。该对象指定了话题名称、话题数据类型和使用的序列化格式。

topic_info = rosbag2_py._storage.TopicMetadata(
    name='chatter',
    type='std_msgs/msg/String',
    serialization_format='cdr')
self.writer.create_topic(topic_info)

现在设置好了writer以记录我们传递给它的数据,我们创建一个订阅并为其指定一个回调函数。我们将在回调函数中向bag写入数据。

self.subscription = self.create_subscription(
    String,
    'chatter',
    self.topic_callback,
    10)
self.subscription

回调函数以未序列化的形式接收消息(这是``rclpy`` API的标准方式),并将消息传递给写入器,指定数据所属的主题和要记录的时间戳。然而,写入器需要序列化的消息来存储在包中。这意味着我们在将数据传递给写入器之前需要对其进行序列化。因此,我们调用``serialize_message()``并将其结果传递给写入器,而不是直接传递消息。

def topic_callback(self, msg):
    self.writer.write(
        'chatter',
        serialize_message(msg),
        self.get_clock().now().nanoseconds)

文件以``main``函数结束,用于创建一个节点实例并启动ROS处理。

def main(args=None):
    rclpy.init(args=args)
    sbr = SimpleBagRecorder()
    rclpy.spin(sbr)
    rclpy.shutdown()

2.2 添加入口点

打开``bag_recorder_nodes_py``包中的``setup.py``文件,并为您的节点添加一个入口点。

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
    ],
},

3. 构建和运行

返回到你的工作空间的根目录,ros2_ws,并构建你的新包。

colcon build --packages-select bag_recorder_nodes_py

打开一个新终端,进入``ros2_ws``目录,并源化设置文件。

source install/setup.bash

现在运行节点:

ros2 run bag_recorder_nodes_py simple_bag_recorder

打开第二个终端并运行``talker``示例节点。

ros2 run demo_nodes_cpp talker

这将开始在``chatter``主题上发布数据。当包写入节点接收到此数据时,它将将其写入``my_bag``包中。如果``my_bag``目录已经存在,则在运行``simple_bag_recorder``节点之前必须首先删除它。这是因为``rosbag2``默认不会覆盖现有的包,因此目标目录不能存在。

终止这两个节点。然后,在一个终端中启动 listener 示例节点。

ros2 run demo_nodes_cpp listener

在另一个终端中,使用 ros2 bag 命令来播放由你的节点记录的包。

ros2 bag play my_bag

你将看到包中的消息被 listener 节点接收到。

如果您希望再次运行写入包节点,则首先需要删除``my_bag``目录。

4 从节点记录合成数据

任何数据都可以记录到包中,不仅限于通过主题接收到的数据。从自己的节点写入包的常见用例是生成并存储合成数据。在本节中,您将学习如何编写一个生成数据并将其存储在包中的节点。我们将演示两种实现方法。第一种方法使用定时器的节点;这是您在数据生成与节点外部的情况下使用的方法,例如直接从硬件(如相机)读取数据。第二种方法不使用节点;这是您在不需要使用ROS基础设施的任何功能时可以使用的方法。

4.1 编写一个Python节点

ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py 目录下,创建一个名为 data_generator_node.py 的新文件,并将以下代码粘贴到其中。

import rclpy
from rclpy.node import Node
from rclpy.serialization import serialize_message
from example_interfaces.msg import Int32

import rosbag2_py

class DataGeneratorNode(Node):
    def __init__(self):
        super().__init__('data_generator_node')
        self.data = Int32()
        self.data.data = 0
        self.writer = rosbag2_py.SequentialWriter()

        storage_options = rosbag2_py._storage.StorageOptions(
            uri='timed_synthetic_bag',
            storage_id='sqlite3')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        topic_info = rosbag2_py._storage.TopicMetadata(
            name='synthetic',
            type='example_interfaces/msg/Int32',
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        self.timer = self.create_timer(1, self.timer_callback)

    def timer_callback(self):
        self.writer.write(
            'synthetic',
            serialize_message(self.data),
            self.get_clock().now().nanoseconds)
        self.data.data += 1


def main(args=None):
    rclpy.init(args=args)
    dgn = DataGeneratorNode()
    rclpy.spin(dgn)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

4.2 检查代码

大部分代码与第一个示例相同。以下是重要的区别描述。

首先,更改了包的名称。

storage_options = rosbag2_py._storage.StorageOptions(
    uri='timed_synthetic_bag',
    storage_id='sqlite3')

主题的名称也已更改,存储的数据类型也已更改。

topic_info = rosbag2_py._storage.TopicMetadata(
    name='synthetic',
    type='example_interfaces/msg/Int32',
    serialization_format='cdr')
self.writer.create_topic(topic_info)

这个节点不是订阅某个话题,而是具有一个定时器。定时器以1秒的周期触发,并在触发时调用给定的成员函数。

self.timer = self.create_timer(1, self.timer_callback)

在定时器回调中,我们生成(或以其他方式获取,例如从连接到某些硬件的串口读取)希望存储在包中的数据。与之前的示例一样,数据尚未序列化,因此我们在将其传递给写入器之前必须对其进行序列化。

self.writer.write(
    'synthetic',
    serialize_message(self.data),
    self.get_clock().now().nanoseconds)

4.3 添加可执行文件

打开``bag_recorder_nodes_py``包中的``setup.py``文件,并为您的节点添加一个入口点。

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main',
    ],
},

4.4 构建和运行

返回到你的工作空间根目录``ros2_ws``,并构建你的包。

colcon build --packages-select bag_recorder_nodes_py

打开一个新终端,进入``ros2_ws``目录,并源化设置文件。

source install/setup.bash

如果 timed_synthetic_bag 目录已经存在,在运行节点之前必须先删除它。

现在运行节点:

ros2 run bag_recorder_nodes_py data_generator_node

等待大约30秒,然后使用 ctrl-c 终止节点。接下来,播放已创建的 bag 文件。

ros2 bag play timed_synthetic_bag

打开第二个终端并回显 /synthetic 话题。

ros2 topic echo /synthetic

您将在控制台上以每秒一条消息的速度看到生成并存储在 bag 文件中的数据被打印出来。

5 使用可执行文件记录合成数据。

现在你可以创建一个能够存储来自话题以外数据的袋子了,接下来你将学习如何从非节点可执行文件生成和记录合成数据。这种方法的优点是代码更简单,可以快速创建大量数据。

5.1 编写一个 Python 可执行文件

ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py 目录下,创建一个名为 data_generator_executable.py 的新文件,并将以下代码粘贴到其中。

from rclpy.clock import Clock
from rclpy.duration import Duration
from rclpy.serialization import serialize_message
from example_interfaces.msg import Int32

import rosbag2_py


def main(args=None):
    writer = rosbag2_py.SequentialWriter()

    storage_options = rosbag2_py._storage.StorageOptions(
        uri='big_synthetic_bag',
        storage_id='sqlite3')
    converter_options = rosbag2_py._storage.ConverterOptions('', '')
    writer.open(storage_options, converter_options)

    topic_info = rosbag2_py._storage.TopicMetadata(
        name='synthetic',
        type='example_interfaces/msg/Int32',
        serialization_format='cdr')
    writer.create_topic(topic_info)

    time_stamp = Clock().now()
    for ii in range(0, 100):
        data = Int32()
        data.data = ii
        writer.write(
            'synthetic',
            serialize_message(data),
            time_stamp.nanoseconds)
        time_stamp += Duration(seconds=1)

if __name__ == '__main__':
    main()

5.2 检查代码

通过比较这个示例和之前的示例,我们会发现它们并没有那么不同。唯一的显著差异是使用了for循环来驱动数据生成,而不是使用定时器。

请注意,我们现在还为数据生成时间戳,而不是依赖于当前系统时间为每个样本生成时间戳。时间戳可以是任何您需要的值。数据将按照这些时间戳给出的速度进行播放,因此这是控制样本默认播放速度的一种有用方式。还要注意,虽然每个样本之间的间隔是一秒钟,但这个可执行文件不需要在每个样本之间等待一秒钟。这样我们就能够在比播放所需的时间少得多的时间内生成大量跨越很长时间范围的数据。

time_stamp = Clock().now()
for ii in range(0, 100):
    data = Int32()
    data.data = ii
    writer.write(
        'synthetic',
        serialize_message(data),
        time_stamp.nanoseconds)
    time_stamp += Duration(seconds=1)

5.3 添加可执行文件

打开``bag_recorder_nodes_py``包中的``setup.py``文件,并为您的节点添加一个入口点。

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main',
        'data_generator_executable = bag_recorder_nodes_py.data_generator_executable:main',
    ],
},

5.4 构建和运行

返回到你的工作空间根目录``ros2_ws``,并构建你的包。

colcon build --packages-select bag_recorder_nodes_py

打开终端,进入 ros2_ws 目录,并加载设置文件。

source install/setup.bash

如果 big_synthetic_bag 目录已经存在,在运行可执行文件之前必须先删除它。

现在运行可执行文件:

ros2 run bag_recorder_nodes_py data_generator_executable

注意可执行文件的运行和完成非常快速。

现在播放创建的包。

ros2 bag play big_synthetic_bag

打开第二个终端并回显 /synthetic 话题。

ros2 topic echo /synthetic

您将会看到以每秒一条消息的速率在控制台打印在包中生成并存储的数据。即使包生成速度很快,回放仍按照时间戳指示的速率进行。

总结

您创建了一个节点,用于将其接收到的数据记录到一个包中。您通过使用该节点进行包记录测试,并通过回放包来验证数据已记录。这种方法可用于记录包含比通过主题接收到的数据更多的附加数据,例如通过处理接收到的数据获得的结果。然后,您继续创建了一个节点和一个可执行文件,用于生成合成数据并将其存储在包中。后一种方法特别适用于生成可用作训练集的合成数据。