实现自定义内存分配器
目标: 本教程将展示如何在编写 ROS 2 C++ 代码时使用自定义内存分配器。
教程级别: 高级
本教程将教你如何为发布者和订阅者集成自定义分配器,以便在你的ROS节点执行时永远不会调用默认的堆分配器。该教程的代码可在`此处<https://github.com/ros2/demos/blob/humble/demo_nodes_cpp/src/topics/allocator_tutorial.cpp>`__获取。
背景
假设您想编写实时安全的代码,并且听说在实时关键部分调用"new"存在许多危险,因为大多数平台上的默认堆分配器是不确定的。
默认情况下,许多C++标准库的结构在增长时会隐式分配内存,比如``std::vector``。然而,这些数据结构还接受一个"Allocator"模板参数。如果您为这些数据结构指定了自定义分配器,它将使用该分配器来代替系统分配器来增长或缩小数据结构。您的自定义分配器可以在堆栈上预分配一块内存池,这可能更适合实时应用程序。
在ROS 2的C++客户端库(rclcpp)中,我们遵循与C++标准库类似的理念。发布者、订阅者和执行器(Executor)接受一个Allocator模板参数,用于控制实体在执行期间进行的分配。
编写一个分配器
要编写与ROS 2的分配器接口兼容的分配器,您的分配器必须与C++标准库的分配器接口兼容。
C++11库提供了一种称为“allocator_traits”的东西。C++11标准规定,自定义分配器只需要满足一组最小要求,就可以按照标准方式分配和释放内存。``allocator_traits``是一个通用的结构,它根据满足最小要求的分配器填充其他属性。
例如,下面这个自定义分配器的声明将满足``allocator_traits``(当然,您仍然需要在该结构中实现声明的函数):
template <class T>
struct custom_allocator {
using value_type = T;
custom_allocator() noexcept;
template <class U> custom_allocator (const custom_allocator<U>&) noexcept;
T* allocate (std::size_t n);
void deallocate (T* p, std::size_t n);
};
template <class T, class U>
constexpr bool operator== (const custom_allocator<T>&, const custom_allocator<U>&) noexcept;
template <class T, class U>
constexpr bool operator!= (const custom_allocator<T>&, const custom_allocator<U>&) noexcept;
然后,您可以通过``allocator_traits``填充的其他函数和成员来访问分配器,如下所示:std::allocator_traits<custom_allocator<T>>::construct(...)
要了解``allocator_traits``的全部功能,请参阅 https://en.cppreference.com/w/cpp/memory/allocator_traits 。
然而,一些只支持部分C++11的编译器(如GCC 4.8)仍然需要分配器实现大量样板代码,以便与标准库结构(如向量和字符串)一起工作,因为这些结构在内部不使用``allocator_traits``。因此,如果您正在使用支持部分C++11的编译器,则您的分配器需要更像这样:
template<typename T>
struct pointer_traits {
using reference = T &;
using const_reference = const T &;
};
// Avoid declaring a reference to void with an empty specialization
template<>
struct pointer_traits<void> {
};
template<typename T = void>
struct MyAllocator : public pointer_traits<T> {
public:
using value_type = T;
using size_type = std::size_t;
using pointer = T *;
using const_pointer = const T *;
using difference_type = typename std::pointer_traits<pointer>::difference_type;
MyAllocator() noexcept;
~MyAllocator() noexcept;
template<typename U>
MyAllocator(const MyAllocator<U> &) noexcept;
T * allocate(size_t size, const void * = 0);
void deallocate(T * ptr, size_t size);
template<typename U>
struct rebind {
typedef MyAllocator<U> other;
};
};
template<typename T, typename U>
constexpr bool operator==(const MyAllocator<T> &,
const MyAllocator<U> &) noexcept;
template<typename T, typename U>
constexpr bool operator!=(const MyAllocator<T> &,
const MyAllocator<U> &) noexcept;
编写一个示例主函数
一旦您编写了一个有效的C++分配器,您必须将其作为共享指针传递给您的发布者、订阅者和执行器。
auto alloc = std::make_shared<MyAllocator<void>>();
rclcpp::PublisherOptionsWithAllocator<MyAllocator<void>> publisher_options;
publisher_options.allocator = alloc;
auto publisher = node->create_publisher<std_msgs::msg::UInt32>(
"allocator_tutorial", 10, publisher_options);
rclcpp::SubscriptionOptionsWithAllocator<MyAllocator<void>> subscription_options;
subscription_options.allocator = alloc;
auto msg_mem_strat = std::make_shared<
rclcpp::message_memory_strategy::MessageMemoryStrategy<
std_msgs::msg::UInt32, MyAllocator<void>>>(alloc);
auto subscriber = node->create_subscription<std_msgs::msg::UInt32>(
"allocator_tutorial", 10, callback, subscription_options, msg_mem_strat);
std::shared_ptr<rclcpp::memory_strategy::MemoryStrategy> memory_strategy =
std::make_shared<AllocatorMemoryStrategy<MyAllocator<void>>>(alloc);
rclcpp::ExecutorOptions options;
options.memory_strategy = memory_strategy;
rclcpp::executors::SingleThreadedExecutor executor(options);
你还需要使用你的分配器来分配通过执行代码路径传递的任何消息。
auto alloc = std::make_shared<MyAllocator<void>>();
一旦你实例化了节点并将执行器添加到节点中,就可以开始运行了:
uint32_t i = 0;
while (rclcpp::ok()) {
msg->data = i;
i++;
publisher->publish(msg);
rclcpp::sleep_for(std::chrono::milliseconds(1));
executor.spin_some();
}
将分配器传递给进程内管道
即使我们在同一个进程中实例化了发布者和订阅者,但我们还没有使用进程内管道。
IntraProcessManager是一个通常对用户隐藏的类,但是为了向其传递自定义分配器,我们需要通过从rclcpp Context中获取它来公开它。IntraProcessManager使用了几个标准库结构,因此如果没有自定义分配器,它将调用默认的new函数。
auto context = rclcpp::contexts::get_global_default_context();
auto options = rclcpp::NodeOptions()
.context(context)
.use_intra_process_comms(true);
auto node = rclcpp::Node::make_shared("allocator_example", options);
请确保在以此方式构建节点后实例化发布者和订阅者。
对代码进行测试和验证。
您如何知道自定义分配器实际上被调用了?
显而易见的做法是统计自定义分配器的``allocate``和``deallocate``函数的调用次数,并将其与``new``和``delete``的调用次数进行比较。
在自定义分配器中添加计数很简单:
T * allocate(size_t size, const void * = 0) {
// ...
num_allocs++;
// ...
}
void deallocate(T * ptr, size_t size) {
// ...
num_deallocs++;
// ...
}
您还可以重载全局的``new``和``delete``运算符:
void operator delete(void * ptr) noexcept {
if (ptr != nullptr) {
if (is_running) {
global_runtime_deallocs++;
}
std::free(ptr);
ptr = nullptr;
}
}
void operator delete(void * ptr, size_t) noexcept {
if (ptr != nullptr) {
if (is_running) {
global_runtime_deallocs++;
}
std::free(ptr);
ptr = nullptr;
}
}
其中我们递增的变量只是全局静态整数,而``is_running``是一个全局静态布尔值,在调用``spin``之前切换。
这个 示例可执行文件 打印变量的值。要运行示例可执行文件,请使用:
ros2 run demo_nodes_cpp allocator_tutorial
或者,要在进程内管道上运行示例:
ros2 run demo_nodes_cpp allocator_tutorial intra
你应该会得到如下的数字:
Global new was called 15590 times during spin
Global delete was called 15590 times during spin
Allocator new was called 27284 times during spin
Allocator delete was called 27281 times during spin
我们捕获到了大约三分之二发生在执行路径上的分配/释放,但是剩下的三分之一从哪里来?
事实上,这些分配/释放操作源于在本示例中使用的底层DDS实现。
证明这一点超出了本教程的范围,但您可以查看作为ROS 2持续集成测试的一部分运行的分配路径测试,该测试通过代码回溯并确定某些函数调用是在rmw实现中还是在DDS实现中发起的:
https://github.com/ros2/realtime_support/blob/humble/tlsf_cpp/test/test_tlsf.cpp#L41
请注意,该测试未使用刚刚创建的自定义分配器,而是使用了TLSF分配器(请参阅下文)。
TLSF 分配器
ROS 2 提供对 TLSF(Two Level Segregate Fit)分配器的支持,该分配器旨在满足实时要求:
https://github.com/ros2/realtime_support/tree/humble/tlsf_cpp
有关 TLSF 的更多信息,请参阅 http://www.gii.upv.es/tlsf/
请注意,TLSF分配器使用双重GPL/LGPL许可证授权。
这里有一个使用TLSF分配器的完整工作示例:https://github.com/ros2/realtime_support/blob/humble/tlsf_cpp/example/allocator_example.cpp