嵌入式C++教程:对象池(Object Pool)模式¶
前言¶
内存分配是一个非常常见的事情,这是我们无法回避讨论的。任何一个生命周期需要自己掌控而非自动的对象(或者你说结构体或者说是变量都对)都需要分配堆上内存。尽管单片机上也许没有太过严格的划分,但是我们一定需要一些持久化分配的对象。
上位机中,我们往往直接采用new/delete(底层封装malloc/free)进行内存分配。但是,一般的单片机上的new/delete很容易造成内存的稀疏,而且具备不确定的延迟、以及在某些平台上不可接受的失败风险。
这些实时性的特征很难允许我们像上位机那样随意而又自由的频繁使用 new/delete 或 malloc/free
这里,对象池(Object Pool)就是一种常见且实用的模式:提前分配一组对象(或内存块),运行时从池中借出对象、用完归还,从而实现确定性的内存使用与低延迟分配/回收。
什么时候用对象池¶
对象池可以被看做一个若干的一堆对象聚合,由于嵌入式的场景固定,一般咱们的对象尺寸和数量可预估(或有上限)。而且对象分配频繁且需要确定性延迟(比如网络包缓冲、任务对象、驱动上下文)。系统不允许运行时内存碎片(长期运行的设备、无人值守系统)。
像一些更加复杂的,比如说对象大小和最大并发数无法预先估计,或者需要弹性伸缩,对象池可能不合适。
API 设计¶
// 高层语义
template<typename T, size_t N, typename SyncPolicy>
class ObjectPool;
// 使用方式(伪代码)
static ObjectPool<MyObj, 16, NoLockPolicy> pool;
auto ptr = pool.try_acquire(); // 返回 nullptr 表示耗尽
ptr->init(...);
// 使用
pool.release(ptr);
我们提供 acquire()(阻塞或断言耗尽)与 try_acquire()(非阻塞、返回 nullptr)的组合。
核心实现¶
我们先看看一种可能的实现——
#pragma once
#include <cstddef>
#include <cstdint>
#include <new>
#include <type_traits>
// 简单断言(可替换为项目断言)
#ifndef EP_ASSERT
#include <cassert>
#define EP_ASSERT(x) assert(x)
#endif
// ========== 同步策略接口 ==========
// 这些策略为空壳或实现平台相关的保护操作
struct NoLockPolicy {
static void lock() {}
static void unlock() {}
};
// 关中断保护(伪代码,需由平台实现)
struct InterruptLockPolicy {
static inline unsigned primask_save() { unsigned p = 0; /* read PRIMASK */ return p; }
static inline void primask_restore(unsigned p) { /* write PRIMASK */ }
unsigned state;
InterruptLockPolicy() : state(primask_save()) {}
~InterruptLockPolicy() { primask_restore(state); }
};
// 基于 mutex 的保护(RTOS)
struct MutexLockPolicy {
static void lock(); // 在平台文件中实现
static void unlock();
};
// ========== 对象池实现 ==========
template<typename T, size_t N, typename Sync = NoLockPolicy>
class ObjectPool {
public:
static_assert(N > 0, "Pool size must be > 0");
static_assert(std::is_default_constructible<T>::value || std::is_trivially_default_constructible<T>::value,
"T must be default constructible or trivially default constructible for placement new usage");
ObjectPool() {
for (size_t i = 0; i < N; ++i) {
next_idx_[i] = (i + 1 < N) ? i + 1 : kInvalidIndex;
}
free_head_ = 0;
}
// 非阻塞借出,耗尽返回 nullptr
T* try_acquire() {
Sync::lock();
if (free_head_ == kInvalidIndex) {
Sync::unlock();
return nullptr;
}
size_t idx = free_head_;
free_head_ = next_idx_[idx];
used_count_++;
Sync::unlock();
T* obj = reinterpret_cast<T*>(&storage_[idx]);
// placement-new 初始化
new (obj) T();
return obj;
}
// 归还对象(必须来自本池)
void release(T* obj) {
EP_ASSERT(obj != nullptr);
size_t idx = ptr_to_index(obj);
EP_ASSERT(idx < N);
// 调用析构
obj->~T();
Sync::lock();
next_idx_[idx] = free_head_;
free_head_ = idx;
used_count_--;
Sync::unlock();
}
// 获取当前空闲/已用数量
size_t free_count() const {
return N - used_count_;
}
size_t used_count() const { return used_count_; }
private:
static constexpr size_t kInvalidIndex = static_cast<size_t>(-1);
// 未初始化的原始存储
typename std::aligned_storage<sizeof(T), alignof(T)>::type storage_[N];
size_t next_idx_[N];
size_t free_head_ = kInvalidIndex;
size_t used_count_ = 0;
static size_t ptr_to_index(T* ptr) {
uintptr_t base = reinterpret_cast<uintptr_t>(&storage_[0]);
uintptr_t p = reinterpret_cast<uintptr_t>(ptr);
EP_ASSERT(p >= base);
size_t offset = (p - base) / sizeof(storage_[0]);
return offset;
}
};
注意:
InterruptLockPolicy中对中断的读写是平台相关的,需要替换为目标 MCU 的实现(如 ARM Cortex-M 的 PRIMASK 读写)。如果使用 FreeRTOS,请把MutexLockPolicy的lock()/unlock()实现映射到xSemaphoreTake()/xSemaphoreGive()或taskENTER_CRITICAL()。
如何使用呢?
// 假设我们有一个包缓冲对象
struct Packet {
uint8_t buf[256];
size_t len;
void init() { len = 0; }
};
// 在全局或模块静态区分配池
static ObjectPool<Packet, 8, NoLockPolicy> pktPool;
void on_receive() {
Packet* p = pktPool.try_acquire();
if (!p) {
// 资源耗尽:丢包或记录错误
return;
}
p->init();
// 填充 p->buf, p->len ...
// 使用完毕
pktPool.release(p);
}
对于中断上下文分配的情况,如果在 ISR 中分配/释放,务必使用 InterruptLockPolicy 或实现无锁算法;避免在 ISR 中执行复杂初始化,尽量只借出对象并将处理推到任务上下文。
快速回顾¶
对象池在嵌入式开发中是极为实用的工具:它能把运行时内存管理的不可预测性降低到可控范围,同时提供高效的分配/回收路径。实现时需要权衡线程安全、ISR 场景、对象构造成本与诊断能力。
代码示例¶
查看完整可编译示例
#include <iostream>
#include <cstddef>
#include <cstdint>
#include <new>
#include <type_traits>
#include <cassert>
// 对象池实现 - 文章中的完整代码
// 简单断言(可替换为项目断言)
#ifndef EP_ASSERT
#define EP_ASSERT(x) assert(x)
#endif
// ========== 同步策略接口 ==========
struct NoLockPolicy {
static void lock() {}
static void unlock() {}
};
// 关中断保护(伪代码,需由平台实现)
struct InterruptLockPolicy {
static inline unsigned primask_save() {
unsigned p = 0;
// 实际实现需要读取 PRIMASK 寄存器
return p;
}
static inline void primask_restore(unsigned p) {
// 实际实现需要写 PRIMASK 寄存器
}
unsigned state;
InterruptLockPolicy() : state(primask_save()) {}
~InterruptLockPolicy() { primask_restore(state); }
};
// 基于 mutex 的保护(RTOS)
struct MutexLockPolicy {
static void lock() {
// 在平台文件中实现
// xSemaphoreTake(mutex, portMAX_DELAY);
}
static void unlock() {
// xSemaphoreGive(mutex);
}
};
// ========== 对象池实现 ==========
template<typename T, size_t N, typename Sync = NoLockPolicy>
class ObjectPool {
public:
static_assert(N > 0, "Pool size must be > 0");
static_assert(std::is_default_constructible<T>::value || std::is_trivially_default_constructible<T>::value,
"T must be default constructible or trivially default constructible");
ObjectPool() {
for (size_t i = 0; i < N; ++i) {
next_idx_[i] = (i + 1 < N) ? i + 1 : kInvalidIndex;
}
free_head_ = 0;
}
// 非阻塞借出,耗尽返回 nullptr
T* try_acquire() {
Sync::lock();
if (free_head_ == kInvalidIndex) {
Sync::unlock();
return nullptr;
}
size_t idx = free_head_;
free_head_ = next_idx_[idx];
used_count_++;
Sync::unlock();
T* obj = reinterpret_cast<T*>(&storage_[idx]);
// placement-new 初始化
new (obj) T();
return obj;
}
// 阻塞式借出(断言版本)
T* acquire() {
T* obj = try_acquire();
EP_ASSERT(obj != nullptr && "Pool exhausted");
return obj;
}
// 归还对象(必须来自本池)
void release(T* obj) {
EP_ASSERT(obj != nullptr);
size_t idx = ptr_to_index(obj);
EP_ASSERT(idx < N);
// 调用析构
obj->~T();
Sync::lock();
next_idx_[idx] = free_head_;
free_head_ = idx;
used_count_--;
Sync::unlock();
}
// 获取当前空闲/已用数量
size_t free_count() const {
return N - used_count_;
}
size_t used_count() const { return used_count_; }
size_t capacity() const { return N; }
private:
static constexpr size_t kInvalidIndex = static_cast<size_t>(-1);
// 未初始化的原始存储
typename std::aligned_storage<sizeof(T), alignof(T)>::type storage_[N];
size_t next_idx_[N];
size_t free_head_ = kInvalidIndex;
size_t used_count_ = 0;
static size_t ptr_to_index(T* ptr) {
uintptr_t base = reinterpret_cast<uintptr_t>(&storage_[0]);
uintptr_t p = reinterpret_cast<uintptr_t>(ptr);
EP_ASSERT(p >= base);
size_t offset = (p - base) / sizeof(storage_[0]);
return offset;
}
};
// ========== 使用示例 ==========
// 网络包缓冲对象
struct Packet {
uint8_t buf[256];
size_t len;
Packet() : len(0) {
std::cout << "Packet constructed\n";
}
~Packet() {
std::cout << "Packet destructed\n";
}
void init() {
len = 0;
for (size_t i = 0; i < sizeof(buf); ++i) {
buf[i] = 0;
}
}
void append(const char* data, size_t n) {
size_t copy_len = (len + n <= sizeof(buf)) ? n : sizeof(buf) - len;
std::memcpy(buf + len, data, copy_len);
len += copy_len;
}
void print() const {
std::cout << "Packet[len=" << len << "] ";
for (size_t i = 0; i < len && i < 20; ++i) {
std::cout << static_cast<char>(buf[i]);
}
if (len > 20) std::cout << "...";
std::cout << "\n";
}
};
void packet_pool_demo() {
std::cout << "=== Packet Pool Demo ===\n\n";
// 全局或模块静态区分配池
static ObjectPool<Packet, 4, NoLockPolicy> pktPool;
std::cout << "Pool capacity: " << pktPool.capacity() << "\n";
std::cout << "Free slots: " << pktPool.free_count() << "\n\n";
std::cout << "--- Acquiring packets ---\n";
Packet* p1 = pktPool.try_acquire();
Packet* p2 = pktPool.try_acquire();
Packet* p3 = pktPool.try_acquire();
Packet* p4 = pktPool.try_acquire();
std::cout << "Free slots after acquiring 4: " << pktPool.free_count() << "\n";
std::cout << "\n--- Trying to acquire from empty pool ---\n";
Packet* p5 = pktPool.try_acquire();
if (!p5) {
std::cout << "Pool exhausted (expected)\n";
}
std::cout << "\n--- Using packets ---\n";
p1->init();
p1->append("Hello, ", 7);
p1->append("World!", 6);
p1->print();
p2->init();
p2->append("Packet 2", 8);
p2->print();
std::cout << "\n--- Releasing packets ---\n";
pktPool.release(p1);
std::cout << "After releasing p1, free slots: " << pktPool.free_count() << "\n";
pktPool.release(p2);
std::cout << "After releasing p2, free slots: " << pktPool.free_count() << "\n";
std::cout << "\n--- Acquire again ---\n";
Packet* p6 = pktPool.try_acquire();
if (p6) {
p6->init();
p6->append("Reused packet", 13);
p6->print();
}
// 清理
pktPool.release(p3);
pktPool.release(p4);
pktPool.release(p6);
}
// 演示自定义同步策略
void thread_safe_pool_demo() {
std::cout << "\n=== Thread-Safe Pool Demo ===\n\n";
// 使用 MutexLockPolicy 的池(RTOS环境)
// static ObjectPool<Packet, 8, MutexLockPolicy> threadSafePool;
// 使用 InterruptLockPolicy 的池(ISR环境)
static ObjectPool<Packet, 8, InterruptLockPolicy> isrSafePool;
std::cout << "ISR-safe pool capacity: " << isrSafePool.capacity() << "\n";
std::cout << "Free slots: " << isrSafePool.free_count() << "\n";
}
// RAII包装器
template<typename T, typename PoolType>
class PooledPtr {
T* ptr_;
PoolType* pool_;
public:
PooledPtr(T* ptr, PoolType* pool) : ptr_(ptr), pool_(pool) {}
~PooledPtr() {
if (ptr_) {
pool_->release(ptr_);
}
}
// 禁止拷贝
PooledPtr(const PooledPtr&) = delete;
PooledPtr& operator=(const PooledPtr&) = delete;
// 移动支持
PooledPtr(PooledPtr&& other) noexcept : ptr_(other.ptr_), pool_(other.pool_) {
other.ptr_ = nullptr;
}
T* operator->() { return ptr_; }
T& operator*() { return *ptr_; }
T* get() { return ptr_; }
explicit operator bool() const { return ptr_ != nullptr; }
};
void raii_pool_demo() {
std::cout << "\n=== RAII Pool Wrapper Demo ===\n\n";
static ObjectPool<Packet, 4, NoLockPolicy> pool;
{
auto p1 = PooledPtr<Packet, decltype(pool)>(pool.try_acquire(), &pool);
auto p2 = PooledPtr<Packet, decltype(pool)>(pool.try_acquire(), &pool);
if (p1) {
p1->init();
p1->append("RAII packet", 11);
p1->print();
}
std::cout << "Free slots inside scope: " << pool.free_count() << "\n";
// 自动释放
}
std::cout << "Free slots after scope: " << pool.free_count() << "\n";
}
int main() {
packet_pool_demo();
thread_safe_pool_demo();
raii_pool_demo();
std::cout << "\n=== Key Takeaways ===\n";
std::cout << "1. Object pool provides deterministic memory management\n";
std::cout << "2. Use try_acquire() for non-blocking, acquire() for assert-on-fail\n";
std::cout << "3. Always release objects back to the pool\n";
std::cout << "4. Use RAII wrapper for exception safety\n";
std::cout << "5. Choose sync policy based on context (none, ISR, RTOS)\n";
return 0;
}
#include <iostream>
#include <cstddef>
#include <cstdint>
#include <new>
#include <cstring>
// 专用对象池示例 - 针对特定类型优化
// ========== 固定大小整数池 ==========
// 用于存储消息ID、句柄等小对象
template<size_t N>
class UInt32Pool {
uint32_t storage_[N];
bool used_[N];
size_t free_head_;
public:
UInt32Pool() {
std::memset(used_, 0, sizeof(used_));
free_head_ = 0;
}
// 分配一个索引,返回uint32_t
bool allocate(size_t& out_index) {
for (size_t i = 0; i < N; ++i) {
if (!used_[free_head_]) {
used_[free_head_] = true;
out_index = free_head_;
// 循环查找下一个空闲
size_t next = (free_head_ + 1) % N;
while (next != free_head_ && used_[next]) {
next = (next + 1) % N;
}
free_head_ = next;
return true;
}
}
return false;
}
void free(size_t index) {
if (index < N && used_[index]) {
used_[index] = false;
if (index < free_head_ || free_head_ == N) {
free_head_ = index;
}
}
}
uint32_t& operator[](size_t index) { return storage_[index]; }
const uint32_t& operator[](size_t index) const { return storage_[index]; }
};
// ========== 位图对象池 ==========
// 使用位图跟踪状态,内存效率高
template<typename T, size_t N>
class BitmapObjectPool {
alignas(T) uint8_t storage_[N * sizeof(T)];
uint32_t bitmap_[ (N + 31) / 32 ]; // 每位跟踪一个槽
size_t last_hint_;
static constexpr size_t bitmap_size = (N + 31) / 32;
public:
BitmapObjectPool() : last_hint_(0) {
std::memset(bitmap_, 0xFF, sizeof(bitmap_)); // 初始全部空闲(1=空闲)
}
T* allocate() {
// 从hint开始搜索
for (size_t word = 0; word < bitmap_size; ++word) {
size_t w = (last_hint_ / 32 + word) % bitmap_size;
if (bitmap_[w] != 0) { // 有空闲位
uint32_t bits = bitmap_[w];
for (size_t bit = 0; bit < 32; ++bit) {
if (bits & (1U << bit)) {
size_t idx = w * 32 + bit;
if (idx < N) {
bitmap_[w] &= ~(1U << bit); // 标记为使用
last_hint_ = idx;
return reinterpret_cast<T*>(&storage_[idx * sizeof(T)]);
}
}
}
}
}
return nullptr; // 池已满
}
void free(T* ptr) {
auto addr = reinterpret_cast<uint8_t*>(ptr);
size_t idx = (addr - storage_) / sizeof(T);
if (idx < N) {
bitmap_[idx / 32] |= (1U << (idx % 32)); // 标记为空闲
}
}
size_t used_count() const {
size_t count = 0;
for (size_t w = 0; w < bitmap_size; ++w) {
count += __builtin_popcount(~bitmap_[w]);
}
return count;
}
size_t free_count() const { return N - used_count(); }
};
// ========== 按类型分层对象池 ==========
// 为不同类型提供不同大小和同步策略
struct SmallObject { int data; };
struct MediumObject { int data[16]; };
struct LargeObject { int data[64]; };
template<typename T>
struct PoolTraits;
template<>
struct PoolTraits<SmallObject> {
static constexpr size_t capacity = 32;
using SyncPolicy = struct { static void lock() {} static void unlock() {} };
};
template<>
struct PoolTraits<MediumObject> {
static constexpr size_t capacity = 16;
using SyncPolicy = struct { static void lock() {} static void unlock() {} };
};
template<>
struct PoolTraits<LargeObject> {
static constexpr size_t capacity = 8;
using SyncPolicy = struct { static void lock() {} static void unlock() {} };
};
template<typename T>
class TypedObjectPool {
alignas(T) uint8_t storage_[PoolTraits<T>::capacity * sizeof(T)];
uint32_t free_list_[PoolTraits<T>::capacity];
size_t free_head_;
size_t used_count_;
using Sync = typename PoolTraits<T>::SyncPolicy;
public:
TypedObjectPool() : used_count_(0) {
for (size_t i = 0; i < PoolTraits<T>::capacity; ++i) {
free_list_[i] = i + 1;
}
free_list_[PoolTraits<T>::capacity - 1] = static_cast<size_t>(-1);
free_head_ = 0;
}
T* allocate() {
Sync::lock();
if (free_head_ == static_cast<size_t>(-1)) {
Sync::unlock();
return nullptr;
}
size_t idx = free_head_;
free_head_ = free_list_[idx];
used_count_++;
Sync::unlock();
T* obj = reinterpret_cast<T*>(&storage_[idx * sizeof(T)]);
new (obj) T();
return obj;
}
void free(T* ptr) {
if (!ptr) return;
ptr->~T();
size_t idx = (reinterpret_cast<uint8_t*>(ptr) - storage_) / sizeof(T);
Sync::lock();
free_list_[idx] = free_head_;
free_head_ = idx;
used_count_--;
Sync::unlock();
}
size_t used() const { return used_count_; }
size_t capacity() const { return PoolTraits<T>::capacity; }
};
// ========== 使用示例 ==========
void uint32_pool_demo() {
std::cout << "=== UInt32 Pool Demo ===\n\n";
UInt32Pool<16> pool;
std::cout << "Allocating indices:\n";
for (int i = 0; i < 10; ++i) {
size_t idx;
if (pool.allocate(idx)) {
pool[idx] = i * 100;
std::cout << " Allocated index " << idx << " = " << pool[idx] << "\n";
}
}
std::cout << "\nFreeing some indices:\n";
pool.free(2);
pool.free(5);
pool.free(8);
std::cout << "\nAllocating after free:\n";
for (int i = 0; i < 3; ++i) {
size_t idx;
if (pool.allocate(idx)) {
pool[idx] = 999;
std::cout << " Allocated index " << idx << " = " << pool[idx] << "\n";
}
}
}
struct MyData {
int value;
char name[32];
MyData() : value(0) {
std::strcpy(name, "default");
}
};
void bitmap_pool_demo() {
std::cout << "\n=== Bitmap Pool Demo ===\n\n";
BitmapObjectPool<MyData, 64> pool;
std::cout << "Pool capacity: " << 64 << "\n";
std::cout << "Free slots: " << pool.free_count() << "\n";
MyData* items[10];
for (int i = 0; i < 10; ++i) {
items[i] = pool.allocate();
if (items[i]) {
items[i]->value = i;
std::snprintf(items[i]->name, sizeof(items[i]->name), "item%d", i);
}
}
std::cout << "Used slots after allocating 10: " << pool.used_count() << "\n";
for (int i = 0; i < 10; ++i) {
if (items[i]) {
std::cout << " " << items[i]->name << " = " << items[i]->value << "\n";
}
}
// 释放一些
for (int i = 0; i < 5; ++i) {
pool.free(items[i]);
}
std::cout << "Used slots after freeing 5: " << pool.used_count() << "\n";
}
void typed_pool_demo() {
std::cout << "\n=== Typed Pool Demo ===\n\n";
TypedObjectPool<SmallObject> small_pool;
TypedObjectPool<MediumObject> medium_pool;
TypedObjectPool<LargeObject> large_pool;
std::cout << "SmallObject pool: " << small_pool.used() << "/" << small_pool.capacity() << "\n";
std::cout << "MediumObject pool: " << medium_pool.used() << "/" << medium_pool.capacity() << "\n";
std::cout << "LargeObject pool: " << large_pool.used() << "/" << large_pool.capacity() << "\n";
SmallObject* s = small_pool.allocate();
MediumObject* m = medium_pool.allocate();
LargeObject* l = large_pool.allocate();
if (s) s->data = 1;
if (m) m->data[0] = 2;
if (l) l->data[0] = 3;
std::cout << "\nAfter allocation:\n";
std::cout << "SmallObject pool: " << small_pool.used() << "/" << small_pool.capacity() << "\n";
std::cout << "MediumObject pool: " << medium_pool.used() << "/" << medium_pool.capacity() << "\n";
std::cout << "LargeObject pool: " << large_pool.used() << "/" << large_pool.capacity() << "\n";
small_pool.free(s);
medium_pool.free(m);
large_pool.free(l);
}
int main() {
uint32_pool_demo();
bitmap_pool_demo();
typed_pool_demo();
std::cout << "\n=== Key Takeaways ===\n";
std::cout << "1. Specialize pools for specific use cases\n";
std::cout << "2. Bitmap pool saves memory for many small objects\n";
std::cout << "3. Typed pool provides type safety with traits\n";
std::cout << "4. Choose pool implementation based on object size and count\n";
return 0;
}