创建内容过滤订阅

目标: 创建一个内容过滤订阅。

教程级别: 高级

时间: 15分钟

概述

ROS 2应用程序通常由主题组成,用于将数据从发布者传输到订阅者。基本上,订阅者接收来自主题发布者的所有已发布数据。但有时,订阅者可能只对发布者发送的数据的子集感兴趣。内容过滤订阅允许只接收应用程序所关注的数据。

在这个演示中,我们将重点介绍如何创建一个内容过滤订阅以及它们的工作原理。

RMW支持

内容过滤订阅需要RMW实现支持。

内容过滤订阅支持状态

rmw_fastrtps

支持

rmw_connextdds

支持

rmw_cyclonedds

不支持

目前,所有支持内容过滤订阅的RMW实现都是基于`DDS <https://www.omg.org/omg-dds-portal/>`__的。这意味着支持的过滤表达式和参数也依赖于`DDS <https://www.omg.org/omg-dds-portal/>`__,您可以参考`DDS规范 <https://www.omg.org/spec/DDS/1.4/PDF>`__的``附录B - 查询和过滤语法``了解详细信息。

安装演示程序

详细了解安装ROS 2的方法,请参阅:doc:安装说明

如果您已从软件包中安装了ROS 2,请确保已安装``ros-humble-demo-nodes-cpp``。如果您从存档中下载或从源代码构建ROS 2,它已经包含在安装中。

温度过滤演示

该演示展示了如何使用内容过滤订阅仅接收超出可接受温度范围的温度值,以检测紧急情况。内容过滤订阅会过滤掉无趣的温度数据,以避免发出订阅回调。

内容过滤发布者:

https://github.com/ros2/demos/blob/humble/demo_nodes_cpp/src/topics/content_filtering_publisher.cpp

#include <chrono>
#include <cstdio>
#include <memory>
#include <utility>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp_components/register_node_macro.hpp"

#include "std_msgs/msg/float32.hpp"

#include "demo_nodes_cpp/visibility_control.h"

using namespace std::chrono_literals;

namespace demo_nodes_cpp
{
// The simulated temperature data starts from -100.0 and ends at 150.0 with a step size of 10.0
constexpr std::array<float, 3> TEMPERATURE_SETTING {-100.0f, 150.0f, 10.0f};

// Create a ContentFilteringPublisher class that subclasses the generic rclcpp::Node base class.
// The main function below will instantiate the class as a ROS node.
class ContentFilteringPublisher : public rclcpp::Node
{
public:
  DEMO_NODES_CPP_PUBLIC
  explicit ContentFilteringPublisher(const rclcpp::NodeOptions & options)
  : Node("content_filtering_publisher", options)
  {
    // Create a function for when messages are to be sent.
    setvbuf(stdout, NULL, _IONBF, BUFSIZ);
    auto publish_message =
      [this]() -> void
      {
        msg_ = std::make_unique<std_msgs::msg::Float32>();
        msg_->data = temperature_;
        temperature_ += TEMPERATURE_SETTING[2];
        if (temperature_ > TEMPERATURE_SETTING[1]) {
          temperature_ = TEMPERATURE_SETTING[0];
        }
        RCLCPP_INFO(this->get_logger(), "Publishing: '%f'", msg_->data);
        // Put the message into a queue to be processed by the middleware.
        // This call is non-blocking.
        pub_->publish(std::move(msg_));
      };
    // Create a publisher with a custom Quality of Service profile.
    // Uniform initialization is suggested so it can be trivially changed to
    // rclcpp::KeepAll{} if the user wishes.
    // (rclcpp::KeepLast(7) -> rclcpp::KeepAll() fails to compile)
    rclcpp::QoS qos(rclcpp::KeepLast{7});
    pub_ = this->create_publisher<std_msgs::msg::Float32>("temperature", qos);

    // Use a timer to schedule periodic message publishing.
    timer_ = this->create_wall_timer(1s, publish_message);
  }

private:
  float temperature_ = TEMPERATURE_SETTING[0];
  std::unique_ptr<std_msgs::msg::Float32> msg_;
  rclcpp::Publisher<std_msgs::msg::Float32>::SharedPtr pub_;
  rclcpp::TimerBase::SharedPtr timer_;
};

}  // namespace demo_nodes_cpp

内容过滤器在订阅端定义,发布者不需要以任何特殊方式进行配置以允许内容过滤。ContentFilteringPublisher 节点每秒发布模拟温度数据,从-100.0开始,以10.0的步长逐渐增加,直到达到150.0。

我们可以通过运行``ros2 run demo_nodes_cpp content_filtering_publisher``可执行文件来运行演示(不要忘记首先源化设置文件):

$ ros2 run demo_nodes_cpp content_filtering_publisher
[INFO] [1651094594.822753479] [content_filtering_publisher]: Publishing: '-100.000000'
[INFO] [1651094595.822723857] [content_filtering_publisher]: Publishing: '-90.000000'
[INFO] [1651094596.822752996] [content_filtering_publisher]: Publishing: '-80.000000'
[INFO] [1651094597.822752475] [content_filtering_publisher]: Publishing: '-70.000000'
[INFO] [1651094598.822721485] [content_filtering_publisher]: Publishing: '-60.000000'
[INFO] [1651094599.822696188] [content_filtering_publisher]: Publishing: '-50.000000'
[INFO] [1651094600.822699217] [content_filtering_publisher]: Publishing: '-40.000000'
[INFO] [1651094601.822744113] [content_filtering_publisher]: Publishing: '-30.000000'
[INFO] [1651094602.822694805] [content_filtering_publisher]: Publishing: '-20.000000'
[INFO] [1651094603.822735805] [content_filtering_publisher]: Publishing: '-10.000000'
[INFO] [1651094604.822722094] [content_filtering_publisher]: Publishing: '0.000000'
[INFO] [1651094605.822699960] [content_filtering_publisher]: Publishing: '10.000000'
[INFO] [1651094606.822748946] [content_filtering_publisher]: Publishing: '20.000000'
[INFO] [1651094607.822694017] [content_filtering_publisher]: Publishing: '30.000000'
[INFO] [1651094608.822708798] [content_filtering_publisher]: Publishing: '40.000000'
[INFO] [1651094609.822692417] [content_filtering_publisher]: Publishing: '50.000000'
[INFO] [1651094610.822696426] [content_filtering_publisher]: Publishing: '60.000000'
[INFO] [1651094611.822751913] [content_filtering_publisher]: Publishing: '70.000000'
[INFO] [1651094612.822692231] [content_filtering_publisher]: Publishing: '80.000000'
[INFO] [1651094613.822745549] [content_filtering_publisher]: Publishing: '90.000000'
[INFO] [1651094614.822701982] [content_filtering_publisher]: Publishing: '100.000000'
[INFO] [1651094615.822691465] [content_filtering_publisher]: Publishing: '110.000000'
[INFO] [1651094616.822649070] [content_filtering_publisher]: Publishing: '120.000000'
[INFO] [1651094617.822693616] [content_filtering_publisher]: Publishing: '130.000000'
[INFO] [1651094618.822691832] [content_filtering_publisher]: Publishing: '140.000000'
[INFO] [1651094619.822688452] [content_filtering_publisher]: Publishing: '150.000000'
[INFO] [1651094620.822645327] [content_filtering_publisher]: Publishing: '-100.000000'
[INFO] [1651094621.822689219] [content_filtering_publisher]: Publishing: '-90.000000'
[INFO] [1651094622.822694292] [content_filtering_publisher]: Publishing: '-80.000000'
[...]

ContentFilteringSubscriber:

https://github.com/ros2/demos/blob/humble/demo_nodes_cpp/src/topics/content_filtering_subscriber.cpp

#include "rclcpp/rclcpp.hpp"
#include "rclcpp_components/register_node_macro.hpp"
#include "rcpputils/join.hpp"

#include "std_msgs/msg/float32.hpp"

#include "demo_nodes_cpp/visibility_control.h"

namespace demo_nodes_cpp
{
// Emergency temperature data less than -30 or greater than 100
constexpr std::array<float, 2> EMERGENCY_TEMPERATURE {-30.0f, 100.0f};

// Create a ContentFilteringSubscriber class that subclasses the generic rclcpp::Node base class.
// The main function below will instantiate the class as a ROS node.
class ContentFilteringSubscriber : public rclcpp::Node
{
public:
  DEMO_NODES_CPP_PUBLIC
  explicit ContentFilteringSubscriber(const rclcpp::NodeOptions & options)
  : Node("content_filtering_subscriber", options)
  {
    setvbuf(stdout, NULL, _IONBF, BUFSIZ);
    // Create a callback function for when messages are received.
    auto callback =
      [this](const std_msgs::msg::Float32 & msg) -> void
      {
        if (msg.data < EMERGENCY_TEMPERATURE[0] || msg.data > EMERGENCY_TEMPERATURE[1]) {
          RCLCPP_INFO(
            this->get_logger(),
            "I receive an emergency temperature data: [%f]", msg.data);
        } else {
          RCLCPP_INFO(this->get_logger(), "I receive a temperature data: [%f]", msg.data);
        }
      };

    // Initialize a subscription with a content filter to receive emergency temperature data that
    // are less than -30 or greater than 100.
    rclcpp::SubscriptionOptions sub_options;
    sub_options.content_filter_options.filter_expression = "data < %0 OR data > %1";
    sub_options.content_filter_options.expression_parameters = {
      std::to_string(EMERGENCY_TEMPERATURE[0]),
      std::to_string(EMERGENCY_TEMPERATURE[1])
    };

    sub_ = create_subscription<std_msgs::msg::Float32>("temperature", 10, callback, sub_options);

    if (!sub_->is_cft_enabled()) {
      RCLCPP_WARN(
        this->get_logger(), "Content filter is not enabled since it's not supported");
    } else {
      RCLCPP_INFO(
        this->get_logger(),
        "subscribed to topic \"%s\" with content filter options \"%s, {%s}\"",
        sub_->get_topic_name(),
        sub_options.content_filter_options.filter_expression.c_str(),
        rcpputils::join(sub_options.content_filter_options.expression_parameters, ", ").c_str());
    }
  }

private:
  rclcpp::Subscription<std_msgs::msg::Float32>::SharedPtr sub_;
};

}  // namespace demo_nodes_cpp

为了启用内容过滤,应用程序可以在``SubscriptionOptions``中设置过滤表达式和表达式参数。应用程序还可以检查订阅中是否启用了内容过滤。

在这个演示中,``ContentFilteringSubscriber``节点创建了一个内容过滤订阅,只有当温度值小于-30.0或大于100.0时才接收消息。

如前所述,内容过滤订阅的支持取决于RMW实现。应用程序可以使用``is_cft_enabled``方法来检查订阅是否实际上启用了内容过滤。

为了测试内容过滤订阅,让我们运行它:

$ ros2 run demo_nodes_cpp content_filtering_subscriber
[INFO] [1651094590.682660703] [content_filtering_subscriber]: subscribed to topic "/temperature" with content filter options "data < %0 OR data > %1, {-30.000000, 100.000000}"
[INFO] [1651094594.823805294] [content_filtering_subscriber]: I receive an emergency temperature data: [-100.000000]
[INFO] [1651094595.823419993] [content_filtering_subscriber]: I receive an emergency temperature data: [-90.000000]
[INFO] [1651094596.823410859] [content_filtering_subscriber]: I receive an emergency temperature data: [-80.000000]
[INFO] [1651094597.823350377] [content_filtering_subscriber]: I receive an emergency temperature data: [-70.000000]
[INFO] [1651094598.823282657] [content_filtering_subscriber]: I receive an emergency temperature data: [-60.000000]
[INFO] [1651094599.823297857] [content_filtering_subscriber]: I receive an emergency temperature data: [-50.000000]
[INFO] [1651094600.823355597] [content_filtering_subscriber]: I receive an emergency temperature data: [-40.000000]
[INFO] [1651094615.823315377] [content_filtering_subscriber]: I receive an emergency temperature data: [110.000000]
[INFO] [1651094616.823258458] [content_filtering_subscriber]: I receive an emergency temperature data: [120.000000]
[INFO] [1651094617.823323525] [content_filtering_subscriber]: I receive an emergency temperature data: [130.000000]
[INFO] [1651094618.823315527] [content_filtering_subscriber]: I receive an emergency temperature data: [140.000000]
[INFO] [1651094619.823331424] [content_filtering_subscriber]: I receive an emergency temperature data: [150.000000]
[INFO] [1651094620.823271748] [content_filtering_subscriber]: I receive an emergency temperature data: [-100.000000]
[INFO] [1651094621.823343550] [content_filtering_subscriber]: I receive an emergency temperature data: [-90.000000]
[INFO] [1651094622.823286326] [content_filtering_subscriber]: I receive an emergency temperature data: [-80.000000]
[INFO] [1651094623.823371031] [content_filtering_subscriber]: I receive an emergency temperature data: [-70.000000]
[INFO] [1651094624.823333112] [content_filtering_subscriber]: I receive an emergency temperature data: [-60.000000]
[INFO] [1651094625.823266469] [content_filtering_subscriber]: I receive an emergency temperature data: [-50.000000]
[INFO] [1651094626.823284093] [content_filtering_subscriber]: I receive an emergency temperature data: [-40.000000]

如果温度值小于-30.0或大于100.0,你应该会看到显示所使用的内容过滤选项和每个接收到的消息的日志。

如果RMW实现不支持内容过滤,订阅仍然会被创建,但不会启用内容过滤。我们可以通过执行``RMW_IMPLEMENTATION=rmw_cyclonedds_cpp ros2 run demo_nodes_cpp content_filtering_publisher``来尝试这一点。

$ RMW_IMPLEMENTATION=rmw_cyclonedds_cpp ros2 run demo_nodes_cpp content_filtering_subscriber
[WARN] [1651096637.893842072] [content_filtering_subscriber]: Content filter is not enabled since it is not supported
[INFO] [1651096641.246043703] [content_filtering_subscriber]: I receive an emergency temperature data: [-100.000000]
[INFO] [1651096642.245833527] [content_filtering_subscriber]: I receive an emergency temperature data: [-90.000000]
[INFO] [1651096643.245743471] [content_filtering_subscriber]: I receive an emergency temperature data: [-80.000000]
[INFO] [1651096644.245833932] [content_filtering_subscriber]: I receive an emergency temperature data: [-70.000000]
[INFO] [1651096645.245916679] [content_filtering_subscriber]: I receive an emergency temperature data: [-60.000000]
[INFO] [1651096646.245861895] [content_filtering_subscriber]: I receive an emergency temperature data: [-50.000000]
[INFO] [1651096647.245946352] [content_filtering_subscriber]: I receive an emergency temperature data: [-40.000000]
[INFO] [1651096648.245934569] [content_filtering_subscriber]: I receive a temperature data: [-30.000000]
[INFO] [1651096649.245877906] [content_filtering_subscriber]: I receive a temperature data: [-20.000000]
[INFO] [1651096650.245939068] [content_filtering_subscriber]: I receive a temperature data: [-10.000000]
[INFO] [1651096651.245911450] [content_filtering_subscriber]: I receive a temperature data: [0.000000]
[INFO] [1651096652.245879830] [content_filtering_subscriber]: I receive a temperature data: [10.000000]
[INFO] [1651096653.245858329] [content_filtering_subscriber]: I receive a temperature data: [20.000000]
[INFO] [1651096654.245916370] [content_filtering_subscriber]: I receive a temperature data: [30.000000]
[INFO] [1651096655.245933741] [content_filtering_subscriber]: I receive a temperature data: [40.000000]
[INFO] [1651096656.245833975] [content_filtering_subscriber]: I receive a temperature data: [50.000000]
[INFO] [1651096657.245971483] [content_filtering_subscriber]: I receive a temperature data: [60.000000]

您可以看到消息``Content filter is not enabled``,这是因为底层RMW实现不支持该功能,但演示仍然成功创建了正常的订阅以接收所有温度数据。