嵌入式 C++ 教程:Slab / Arena 实现与比较¶
这里开始的是使用固定池 / Slab / Arena来展开我们涉及到内存分配中剩下的一些内容——Slab、Arena(Bump / Region)。当然,部分内容实际上已经是操作系统的层次的知识了,但是知道了总不赖!
TL;DR¶
- 固定池:固定大小对象、超低碎片、常用于驱动/对象池;实现简单,分配和释放都是 O(1)。
- Slab:内核/复杂嵌入式系统常用,支持多种对象 size-class,减少内存碎片,易于缓存局部性优化。
- Arena(Region):非常适合短生命周期对象或一次性分配的场景(例如解析、启动阶段),分配快(仅移动指针),回收一次性做
reset()。
Slab 分配器¶
Linux kernel 的 slab 概念特别适合多种大小对象的优化:为每个 size-class 维护一个或多个 slab(本质上就是一组固定大小的对象池),并且可以跟踪对象使用情况,做对象构造/销毁优化(缓存 warm objects)。虽然单片机可能不太适合搞很重的slab,但是我们可以仿照类似的设计,对于一些比较高端的芯片设计类似的简化机制的内存管理
简单的说,我们定义若干 size-class(例如 16B、32B、64B、128B...),而每个 size-class 管理若干个 slab。Slab 是一整块内存,被划分为 N 个对象槽(slot),Slab 自己可以有三种状态:empty(全空)、partial(部分使用)、full(无空闲)。这个时候,我们分配时从 partial slab 或 empty slab 中拿 slot;释放时加入回 slab 的 free list。
看起来好像没有什么。但是,我们现在可以做临近的合并措施,不至于内存东一片西一片;而且可以快速的匹配最近的大小,当然,因为都已经池化处理了,我们就可以为不同对象类型做专门优化(构造/析构缓存、debug header)。
精简版的¶
为了篇幅,我们做一个精简版 slab:
- 静态定义 size-class(运行时选择合适 bucket)
- 每个 slab 用一段连续内存和一个位图/链表管理空闲槽
简化的关键结构¶
struct Slab {
uint8_t* data; // 指向对象存储区
uint32_t freeBitmap; // 仅示例,最多32个 slot
Slab* next;
};
struct SlabBucket {
size_t objSize;
Slab* partial;
Slab* full;
Slab* empty;
};
真实系统会需要更复杂的位图、锁策略和扩展机制,但这个示例足以用于嵌入式场景。
Arena(Region / Bump Allocator)¶
Arena 常见于:解析器、一次性分配任务、初始化阶段或短生命周期对象池。它的核心简单到令人发笑:
- 拿一大块内存(或者多个 chunk)
- 用一个指针
head记录当前分配位置 alloc(size)就是把head向前移动size,返回旧的位置reset()把head回退到初始位置(一次性回收所有分配)
所以,Arena的分配速度极快(指针运算),但是又非常适合临时内存,零/低碎片。但是问题也很多:
- 不能单独释放单个对象(除非做更复杂的回收策略)
- 外部生命周期控制由用户负责
class Arena {
public:
Arena(void* buffer, size_t size) : base_(reinterpret_cast<uint8_t*>(buffer)), cap_(size), head_(0) {}
void* alloc(size_t n, size_t align = alignof(std::max_align_t)) {
size_t cur = reinterpret_cast<size_t>(base_) + head_;
size_t aligned = (cur + (align - 1)) & ~(align - 1);
size_t offset = aligned - reinterpret_cast<size_t>(base_);
if (offset + n > cap_) return nullptr;
head_ = offset + n;
return base_ + offset;
}
void reset() { head_ = 0; }
private:
uint8_t* base_;
size_t cap_;
size_t head_;
};
当然上面的代码不是线程安全的,这个需要注意了。
代码示例¶
查看完整可编译示例
#include <iostream>
#include <cstdint>
#include <cstddef>
#include <new>
#include <cassert>
// 固定池分配器 - 最简单的内存池实现
template<size_t BLOCK_SIZE, size_t BLOCK_COUNT>
class FixedPool {
struct Block {
alignas(std::max_align_t) uint8_t data[BLOCK_SIZE];
};
Block pool_[BLOCK_COUNT];
size_t free_list_[BLOCK_COUNT];
size_t free_head_;
size_t used_count_;
public:
FixedPool() : used_count_(0) {
// 初始化空闲链表
for (size_t i = 0; i < BLOCK_COUNT; ++i) {
free_list_[i] = i + 1;
}
free_list_[BLOCK_COUNT - 1] = static_cast<size_t>(-1); // 标记结尾
free_head_ = 0;
}
// 分配一个块
void* allocate() {
if (free_head_ == static_cast<size_t>(-1)) {
return nullptr; // 池已满
}
size_t idx = free_head_;
free_head_ = free_list_[idx];
used_count_++;
return &pool_[idx].data[0];
}
// 释放一个块
void deallocate(void* ptr) {
if (!ptr) return;
size_t idx = static_cast<Block*>(ptr) - pool_;
assert(idx < BLOCK_COUNT && "Pointer not from this pool");
free_list_[idx] = free_head_;
free_head_ = idx;
used_count_--;
}
size_t block_size() const { return BLOCK_SIZE; }
size_t capacity() const { return BLOCK_COUNT; }
size_t used_count() const { return used_count_; }
size_t free_count() const { return BLOCK_COUNT - used_count_; }
bool is_full() const { return free_head_ == static_cast<size_t>(-1); }
bool is_empty() const { return used_count_ == 0; }
// 调试:打印池状态
void print_status() const {
std::cout << "Pool[" << BLOCK_SIZE << " x " << BLOCK_COUNT << "]: "
<< used_count_ << " used, " << free_count() << " free\n";
}
};
// 类型安全的固定池
template<typename T, size_t N>
class TypedFixedPool {
alignas(T) uint8_t storage_[N * sizeof(T)];
size_t free_list_[N];
size_t free_head_;
size_t used_count_;
public:
TypedFixedPool() : used_count_(0) {
for (size_t i = 0; i < N; ++i) {
free_list_[i] = i + 1;
}
free_list_[N - 1] = static_cast<size_t>(-1);
free_head_ = 0;
}
T* allocate() {
if (free_head_ == static_cast<size_t>(-1)) {
return nullptr;
}
size_t idx = free_head_;
free_head_ = free_list_[idx];
used_count_++;
T* obj = reinterpret_cast<T*>(&storage_[idx * sizeof(T)]);
new (obj) T(); // placement new,默认构造
return obj;
}
template<typename... Args>
T* construct(Args&&... args) {
if (free_head_ == static_cast<size_t>(-1)) {
return nullptr;
}
size_t idx = free_head_;
free_head_ = free_list_[idx];
used_count_++;
T* obj = reinterpret_cast<T*>(&storage_[idx * sizeof(T)]);
new (obj) T(std::forward<Args>(args)...);
return obj;
}
void deallocate(T* ptr) {
if (!ptr) return;
ptr->~T(); // 调用析构函数
size_t idx = (reinterpret_cast<uint8_t*>(ptr) - storage_) / sizeof(T);
free_list_[idx] = free_head_;
free_head_ = idx;
used_count_--;
}
size_t capacity() const { return N; }
size_t used_count() const { return used_count_; }
size_t free_count() const { return N - used_count_; }
};
// 使用示例
struct Packet {
uint8_t data[64];
size_t length;
Packet() : length(0) {
std::cout << "Packet constructed\n";
}
~Packet() {
std::cout << "Packet destructed\n";
}
void set_data(const char* msg) {
size_t i = 0;
while (msg[i] && i < sizeof(data) - 1) {
data[i] = static_cast<uint8_t>(msg[i]);
++i;
}
data[i] = '\0';
length = i;
}
void print() const {
std::cout << "Packet[len=" << length << ", data=\"";
for (size_t i = 0; i < length && i < 20; ++i) {
std::cout << static_cast<char>(data[i]);
}
std::cout << "\"]\n";
}
};
void basic_pool_demo() {
std::cout << "=== Basic Fixed Pool Demo ===\n\n";
FixedPool<64, 8> pool;
std::cout << "Initial: ";
pool.print_status();
// 分配一些块
void* blocks[5];
for (int i = 0; i < 5; ++i) {
blocks[i] = pool.allocate();
std::cout << "Allocated block " << i << " at " << blocks[i] << "\n";
}
std::cout << "\nAfter 5 allocations: ";
pool.print_status();
// 释放一些块
pool.deallocate(blocks[1]);
pool.deallocate(blocks[3]);
std::cout << "\nAfter freeing 2 blocks: ";
pool.print_status();
// 再次分配
void* new_block = pool.allocate();
std::cout << "Reallocated at " << new_block << "\n";
std::cout << "Should be same as freed block 1: " << (new_block == blocks[1] ? "yes" : "no") << "\n";
// 清理
for (int i = 0; i < 5; ++i) {
if (i != 1 && i != 3) {
pool.deallocate(blocks[i]);
}
}
pool.deallocate(new_block);
}
void typed_pool_demo() {
std::cout << "\n=== Typed Fixed Pool Demo ===\n\n";
TypedFixedPool<Packet, 4> pool;
std::cout << "Packet pool: " << pool.used_count() << " / " << pool.capacity() << " used\n";
// 分配包
Packet* p1 = pool.construct();
Packet* p2 = pool.construct();
Packet* p3 = pool.construct();
std::cout << "\nAfter 3 allocations: " << pool.used_count() << " / " << pool.capacity() << " used\n";
// 使用包
p1->set_data("Hello");
p2->set_data("World");
p3->set_data("Test");
p1->print();
p2->print();
p3->print();
// 尝试分配超过容量
Packet* p4 = pool.construct();
Packet* p5 = pool.construct(); // 应该失败
std::cout << "\np4 allocated: " << (p4 != nullptr ? "yes" : "no") << "\n";
std::cout << "p5 allocated: " << (p5 != nullptr ? "yes (unexpected!)" : "no (expected)") << "\n";
// 清理
pool.deallocate(p1);
pool.deallocate(p2);
pool.deallocate(p3);
if (p4) pool.deallocate(p4);
}
// 多尺寸池系统
class MultiSizePoolSystem {
FixedPool<32, 16> pool32_;
FixedPool<64, 16> pool64_;
FixedPool<128, 16> pool128_;
FixedPool<256, 16> pool256_;
public:
void* allocate(size_t size) {
if (size <= 32) return pool32_.allocate();
if (size <= 64) return pool64_.allocate();
if (size <= 128) return pool128_.allocate();
if (size <= 256) return pool256_.allocate();
return nullptr;
}
void deallocate(void* ptr, size_t size) {
if (size <= 32) pool32_.deallocate(ptr);
else if (size <= 64) pool64_.deallocate(ptr);
else if (size <= 128) pool128_.deallocate(ptr);
else if (size <= 256) pool256_.deallocate(ptr);
}
void print_stats() const {
std::cout << "Pool Statistics:\n";
std::cout << " 32-byte: "; pool32_.print_status();
std::cout << " 64-byte: "; pool64_.print_status();
std::cout << " 128-byte: "; pool128_.print_status();
std::cout << " 256-byte: "; pool256_.print_status();
}
};
void multi_size_pool_demo() {
std::cout << "\n=== Multi-Size Pool System Demo ===\n\n";
MultiSizePoolSystem pools;
// 分配不同大小的对象
void* a = pools.allocate(20); // 使用32字节池
void* b = pools.allocate(50); // 使用64字节池
void* c = pools.allocate(100); // 使用128字节池
void* d = pools.allocate(200); // 使用256字节池
std::cout << "After allocations:\n";
pools.print_stats();
// 释放
pools.deallocate(a, 20);
pools.deallocate(c, 100);
std::cout << "\nAfter freeing some:\n";
pools.print_stats();
// 再次分配
void* e = pools.allocate(30); // 应该重用32字节池
std::cout << "\nReallocation matches freed: " << (e == a ? "yes" : "no") << "\n";
// 清理
pools.deallocate(b, 50);
pools.deallocate(d, 200);
pools.deallocate(e, 30);
}
int main() {
basic_pool_demo();
typed_pool_demo();
multi_size_pool_demo();
std::cout << "\n=== Key Takeaways ===\n";
std::cout << "1. Fixed pool provides O(1) allocation/deallocation\n";
std::cout << "2. No fragmentation with fixed-size blocks\n";
std::cout << "3. Typed pool adds type safety and construction/destruction\n";
std::cout << "4. Multi-size system serves various allocation needs\n";
std::cout << "5. Predictable memory usage and timing\n";
return 0;
}
#include <iostream>
#include <cstdint>
#include <cstddef>
#include <cstring>
#include <new>
// Slab分配器 - 简化版Linux内核slab
template<size_t OBJECT_SIZE, size_t SLAB_CAPACITY>
class Slab {
public:
struct SlabBlock {
alignas(std::max_align_t) uint8_t memory[SLAB_CAPACITY * OBJECT_SIZE];
uint32_t in_use; // 位图跟踪使用情况(最多32个对象)
SlabBlock* next;
size_t free_count;
SlabBlock() : in_use(0), next(nullptr), free_count(SLAB_CAPACITY) {}
};
private:
SlabBlock* partial_list_; // 部分使用的slab
SlabBlock* full_list_; // 已满的slab
SlabBlock* empty_list_; // 全空的slab
size_t total_slabs_;
// 从slab中分配一个对象
void* allocate_from_slab(SlabBlock* slab) {
for (size_t i = 0; i < SLAB_CAPACITY; ++i) {
if ((slab->in_use & (1U << i)) == 0) {
slab->in_use |= (1U << i);
slab->free_count--;
return &slab->memory[i * OBJECT_SIZE];
}
}
return nullptr;
}
SlabBlock* new_slab() {
SlabBlock* slab = new (std::nothrow) SlabBlock();
if (slab) {
total_slabs_++;
}
return slab;
}
public:
Slab() : partial_list_(nullptr), full_list_(nullptr), empty_list_(nullptr), total_slabs_(0) {
// 预分配一个空slab
empty_list_ = new_slab();
}
~Slab() {
// 释放所有slab
SlabBlock* slab = partial_list_;
while (slab) {
SlabBlock* next = slab->next;
delete slab;
slab = next;
}
slab = full_list_;
while (slab) {
SlabBlock* next = slab->next;
delete slab;
slab = next;
}
slab = empty_list_;
while (slab) {
SlabBlock* next = slab->next;
delete slab;
slab = next;
}
}
void* allocate() {
// 优先从partial slab分配
if (partial_list_) {
void* ptr = allocate_from_slab(partial_list_);
if (ptr) {
// 检查slab是否变满
if (partial_list_->free_count == 0) {
// 移到full list
SlabBlock* full = partial_list_;
partial_list_ = partial_list_->next;
full->next = full_list_;
full_list_ = full;
}
return ptr;
}
}
// 从empty list获取
if (empty_list_) {
void* ptr = allocate_from_slab(empty_list_);
if (ptr) {
// 移到partial list
SlabBlock* partial = empty_list_;
empty_list_ = empty_list_->next;
partial->next = partial_list_;
partial_list_ = partial;
return ptr;
}
}
// 需要分配新slab
SlabBlock* new_slab_block = new_slab();
if (!new_slab_block) {
return nullptr; // 分配失败
}
void* ptr = allocate_from_slab(new_slab_block);
new_slab_block->next = partial_list_;
partial_list_ = new_slab_block;
return ptr;
}
void deallocate(void* ptr) {
if (!ptr) return;
// 查找ptr所属的slab
SlabBlock** slab_ptr = &full_list_;
while (*slab_ptr) {
if (ptr >= (*slab_ptr)->memory &&
ptr < (*slab_ptr)->memory + sizeof((*slab_ptr)->memory)) {
break;
}
slab_ptr = &((*slab_ptr)->next);
}
if (!*slab_ptr) {
slab_ptr = &partial_list_;
while (*slab_ptr) {
if (ptr >= (*slab_ptr)->memory &&
ptr < (*slab_ptr)->memory + sizeof((*slab_ptr)->memory)) {
break;
}
slab_ptr = &((*slab_ptr)->next);
}
}
if (!*slab_ptr) {
return; // 不属于这个分配器
}
SlabBlock* slab = *slab_ptr;
size_t index = (static_cast<uint8_t*>(ptr) - slab->memory) / OBJECT_SIZE;
slab->in_use &= ~(1U << index);
slab->free_count++;
// 根据使用情况移动slab
if (slab->free_count == SLAB_CAPACITY) {
// 全空,移到empty list
*slab_ptr = slab->next;
slab->next = empty_list_;
empty_list_ = slab;
} else if (slab == full_list_) {
// 从full移到partial
*slab_ptr = slab->next;
slab->next = partial_list_;
partial_list_ = slab;
}
}
void print_stats() const {
size_t partial_count = 0;
size_t full_count = 0;
size_t empty_count = 0;
SlabBlock* slab = partial_list_;
while (slab) { partial_count++; slab = slab->next; }
slab = full_list_;
while (slab) { full_count++; slab = slab->next; }
slab = empty_list_;
while (slab) { empty_count++; slab = slab->next; }
std::cout << "Slab stats: " << total_slabs_ << " total ("
<< partial_count << " partial, "
<< full_count << " full, "
<< empty_count << " empty)\n";
}
};
// Arena/Bump分配器
template<size_t ARENA_SIZE>
class ArenaAllocator {
alignas(std::max_align_t) uint8_t memory_[ARENA_SIZE];
size_t offset_;
public:
ArenaAllocator() : offset_(0) {}
void* allocate(size_t size, size_t alignment = alignof(std::max_align_t)) {
uintptr_t addr = reinterpret_cast<uintptr_t>(memory_ + offset_);
uintptr_t aligned = (addr + alignment - 1) & ~(alignment - 1);
size_t aligned_offset = aligned - reinterpret_cast<uintptr_t>(memory_);
if (aligned_offset + size > ARENA_SIZE) {
return nullptr;
}
offset_ = aligned_offset + size;
return reinterpret_cast<void*>(aligned);
}
void reset() {
offset_ = 0;
}
size_t used() const { return offset_; }
size_t available() const { return ARENA_SIZE - offset_; }
};
// 使用示例
struct Task {
int id;
char name[32];
void (*func)(void);
Task(int i, const char* n, void (*f)(void)) : id(i), func(f) {
std::strncpy(name, n, sizeof(name) - 1);
name[sizeof(name) - 1] = '\0';
}
void execute() {
std::cout << "Task " << id << " (" << name << ") executing\n";
if (func) func();
}
};
void slab_demo() {
std::cout << "=== Slab Allocator Demo ===\n\n";
// 创建一个Task类型的slab分配器
// 每个slab最多16个Task对象
Slab<sizeof(Task), 16> task_slab;
std::cout << "Initial state: ";
task_slab.print_stats();
// 分配任务
Task* tasks[20];
for (int i = 0; i < 20; ++i) {
void* mem = task_slab.allocate();
if (mem) {
tasks[i] = new (mem) Task(i, "Task", nullptr);
} else {
tasks[i] = nullptr;
std::cout << "Failed to allocate task " << i << "\n";
}
}
std::cout << "\nAfter allocating 20 tasks: ";
task_slab.print_stats();
// 执行一些任务
for (int i = 0; i < 5; ++i) {
if (tasks[i]) tasks[i]->execute();
}
// 释放一些任务
for (int i = 0; i < 10; ++i) {
if (tasks[i]) {
task_slab.deallocate(tasks[i]);
}
}
std::cout << "\nAfter freeing 10 tasks: ";
task_slab.print_stats();
// 再次分配
Task* new_task = nullptr;
void* mem = task_slab.allocate();
if (mem) {
new_task = new (mem) Task(99, "NewTask", nullptr);
std::cout << "Allocated new task after freeing\n";
}
// 清理剩余
for (int i = 10; i < 20; ++i) {
if (tasks[i]) {
task_slab.deallocate(tasks[i]);
}
}
if (new_task) {
task_slab.deallocate(new_task);
}
}
void arena_demo() {
std::cout << "\n=== Arena Allocator Demo ===\n\n";
ArenaAllocator<4096> arena;
std::cout << "Arena: " << arena.available() << " bytes available\n";
// 分配一些对象
void* a = arena.allocate(sizeof(int), alignof(int));
void* b = arena.allocate(100, 8);
void* c = arena.allocate(sizeof(double), alignof(double));
std::cout << "After allocations: " << arena.used() << " bytes used\n";
// 重置
arena.reset();
std::cout << "After reset: " << arena.used() << " bytes used\n";
// 新的分配从头开始
void* d = arena.allocate(200, 8);
std::cout << "New allocation at " << d << " (should be at start)\n";
}
// 混合使用:Slab + Arena
class HybridAllocator {
Slab<64, 32> small_objects_; // 小对象(<=64字节)
ArenaAllocator<4096> arena_; // 大对象/临时对象
public:
enum class AllocationType {
Small,
Large
};
void* allocate(size_t size, AllocationType type = AllocationType::Small) {
if (type == AllocationType::Small && size <= 64) {
return small_objects_.allocate();
} else {
return arena_.allocate(size);
}
}
void deallocate(void* ptr, AllocationType type) {
if (type == AllocationType::Small) {
small_objects_.deallocate(ptr);
}
// Arena对象不需要单独释放
}
void reset_arena() {
arena_.reset();
}
void print_stats() const {
std::cout << "Small object slab: ";
small_objects_.print_stats();
std::cout << "Arena: " << arena_.used() << " / " << 4096 << " bytes used\n";
}
};
void hybrid_demo() {
std::cout << "\n=== Hybrid Allocator Demo ===\n\n";
HybridAllocator alloc;
// 分配小对象
void* small1 = alloc.allocate(32, HybridAllocator::AllocationType::Small);
void* small2 = alloc.allocate(64, HybridAllocator::AllocationType::Small);
void* small3 = alloc.allocate(16, HybridAllocator::AllocationType::Small);
// 分配大对象(使用arena)
void* large1 = alloc.allocate(512, HybridAllocator::AllocationType::Large);
void* large2 = alloc.allocate(1024, HybridAllocator::AllocationType::Large);
alloc.print_stats();
// 释放小对象
alloc.deallocate(small1, HybridAllocator::AllocationType::Small);
alloc.deallocate(small2, HybridAllocator::AllocationType::Small);
std::cout << "\nAfter freeing small objects:\n";
alloc.print_stats();
// 重置arena
alloc.reset_arena();
std::cout << "\nAfter arena reset:\n";
alloc.print_stats();
// 清理
alloc.deallocate(small3, HybridAllocator::AllocationType::Small);
}
int main() {
slab_demo();
arena_demo();
hybrid_demo();
std::cout << "\n=== Key Takeaways ===\n";
std::cout << "1. Slab: Fixed-size objects, maintains free/partial/full lists\n";
std::cout << "2. Arena: Linear allocation, fast, no individual free\n";
std::cout << "3. Hybrid: Combine both for different allocation patterns\n";
std::cout << "4. Slab good for: object caching, reuse, reducing construction cost\n";
std::cout << "5. Arena good for: initialization, temporary objects, batch processing\n";
return 0;
}