嵌入式C++教程——循环缓冲区¶
在嵌入式世界里,有一类问题反复出现:数据源不停地产生数据,消费者慢慢地处理数据,中间还不想 malloc。于是,一个古老但永不过时的数据结构登场了——循环缓冲区(Circular Buffer / Ring Buffer)。
你可以把它理解为一个仓库,只有固定大小,装满了就从头再来。没有扩容、没有碎片、没有"new 失败",非常适合 MCU、驱动、中断、DMA、串口、音频流等场景。
为什么嵌入式这么爱循环缓冲区?¶
在 PC 世界,我们可以随便 new、随便 std::vector::push_back。但在嵌入式里,这些操作听起来就很危险:
- 堆内存小,还容易碎片化
- 中断上下文里不能 malloc
- 实时系统里不希望出现不可控延迟
而循环缓冲区的特性,几乎是为嵌入式量身定做的:
- 固定大小,编译期或初始化时确定
- O(1) 入队 / 出队
- 内存连续,Cache 友好
- 不需要动态分配
- 实现简单,容易做成 lock-free / 中断安全
一句话总结:
它不聪明,但它很可靠。
循环缓冲区的核心思想(其实非常朴素)¶
循环缓冲区本质上就是:
- 一块固定大小的数组
- 两个索引:
head:写入位置tail:读取位置
当索引走到数组末尾,就绕回开头,像一个圆。
写入数据:移动 head
读取数据:移动 tail
重点只有一个问题需要想清楚: 👉 如何区分"满"和"空"?
如何区分"空"和"满"?(经典难题)¶
有三种常见方案:
- 浪费一个元素(最常见)
- 额外维护一个
count - 用一个额外的
full标志位
在嵌入式里,方案 1 最受欢迎:简单、无歧义、逻辑清晰。规则是:
- 缓冲区大小为
N - 实际最多只能存
N - 1个元素 - 条件判断:
- 空:
head == tail - 满:
(head + 1) % N == tail
是的,我们牺牲了一个格子,换来一生的安宁。
一个干净的 C++ 循环缓冲区实现¶
下面是一个无动态内存、模板化、适合嵌入式的实现。
基本接口设计¶
#pragma once
#include <cstddef>
#include <array>
template<typename T, std::size_t Capacity>
class RingBuffer {
public:
bool push(const T& value);
bool pop(T& out);
bool empty() const;
bool full() const;
std::size_t size() const;
std::size_t capacity() const { return Capacity - 1; }
private:
std::array<T, Capacity> buffer_{};
std::size_t head_ = 0;
std::size_t tail_ = 0;
};
注意一个细节:
👉 Capacity 实际数组大小 = 用户可用容量 + 1
入队(push):向前走一步¶
template<typename T, std::size_t Capacity>
bool RingBuffer<T, Capacity>::push(const T& value)
{
if (full()) {
return false; // 缓冲区满了
}
buffer_[head_] = value;
head_ = (head_ + 1) % Capacity;
return true;
}
这里没有任何黑魔法:
- 先判断是否满
- 写数据
- 移动
head - 如果走到末尾,绕回开头
O(1),永远不会慢。
出队(pop):消费者登场¶
template<typename T, std::size_t Capacity>
bool RingBuffer<T, Capacity>::pop(T& out)
{
if (empty()) {
return false; // 没数据
}
out = buffer_[tail_];
tail_ = (tail_ + 1) % Capacity;
return true;
}
同样简单:
- 空就失败
- 读数据
- 移动
tail
状态判断函数¶
template<typename T, std::size_t Capacity>
bool RingBuffer<T, Capacity>::empty() const
{
return head_ == tail_;
}
template<typename T, std::size_t Capacity>
bool RingBuffer<T, Capacity>::full() const
{
return (head_ + 1) % Capacity == tail_;
}
template<typename T, std::size_t Capacity>
std::size_t RingBuffer<T, Capacity>::size() const
{
if (head_ >= tail_) {
return head_ - tail_;
}
return Capacity - (tail_ - head_);
}
size() 这个写法在嵌入式里很常见,
避免分支复杂化,也没有使用额外计数器。
一个真实的嵌入式使用场景¶
串口接收(ISR + 主循环)¶
RingBuffer<uint8_t, 128> rx_buffer;
void USART_IRQHandler()
{
uint8_t data = UART_Read();
rx_buffer.push(data); // 中断里只做这件事
}
int main()
{
while (1) {
uint8_t ch;
if (rx_buffer.pop(ch)) {
process_char(ch);
}
}
}
这种写法有几个非常嵌入式的优点:
- ISR 里逻辑极短
- 不 malloc
- 主循环慢慢处理
- 即使处理慢一点,也不会阻塞中断
关于线程安全 / 中断安全的一点现实提醒¶
上面的实现是:
- 单生产者 + 单消费者
- 一个在中断,一个在主循环
在很多 MCU 上,这是天然安全的(只要索引读写是原子的)。
但如果你遇到下面情况之一:
- 多线程
- 多个生产者
- SMP
- RTOS 任务间通信
那你需要:
- 关中断
- 原子变量
- 或者 mutex / spinlock
和 std::queue / std::vector 比一比¶
| 方案 | 是否动态分配 | 是否确定性 | 嵌入式友好 |
|---|---|---|---|
| std::vector | 是 | 否 | ❌ |
| std::queue | 取决于底层 | 否 | ❌ |
| 循环缓冲区 | 否 | 是 | ✅ |
查看完整可编译示例
// ring_buffer.h - 简单的循环缓冲区实现
#pragma once
#include <cstddef>
#include <array>
template<typename T, std::size_t Capacity>
class RingBuffer {
public:
bool push(const T& value) {
if (full()) {
return false; // 缓冲区满了
}
buffer_[head_] = value;
head_ = (head_ + 1) % Capacity;
return true;
}
bool pop(T& out) {
if (empty()) {
return false; // 没数据
}
out = buffer_[tail_];
tail_ = (tail_ + 1) % Capacity;
return true;
}
bool empty() const {
return head_ == tail_;
}
bool full() const {
return (head_ + 1) % Capacity == tail_;
}
std::size_t size() const {
if (head_ >= tail_) {
return head_ - tail_;
}
return Capacity - (tail_ - head_);
}
std::size_t capacity() const {
return Capacity - 1; // 实际可用容量
}
void clear() {
head_ = tail_ = 0;
}
private:
std::array<T, Capacity> buffer_{};
std::size_t head_ = 0;
std::size_t tail_ = 0;
};
// basic_example.cpp - 循环缓冲区基本用法演示
#include "ring_buffer.h"
#include <iostream>
int main() {
// 创建一个容量为 16 的循环缓冲区(实际可用 15)
RingBuffer<int, 16> buffer;
std::cout << "Ring Buffer Demo\n";
std::cout << "Capacity: " << buffer.capacity() << "\n\n";
// 测试:填充和读取
std::cout << "=== Pushing 10 elements ===\n";
for (int i = 0; i < 10; ++i) {
if (buffer.push(i)) {
std::cout << "Pushed: " << i << ", size: " << buffer.size() << '\n';
}
}
std::cout << "\n=== Popping 5 elements ===\n";
for (int i = 0; i < 5; ++i) {
int value;
if (buffer.pop(value)) {
std::cout << "Popped: " << value << ", size: " << buffer.size() << '\n';
}
}
std::cout << "\n=== Pushing 8 more elements ===\n";
for (int i = 10; i < 18; ++i) {
if (buffer.push(i)) {
std::cout << "Pushed: " << i << ", size: " << buffer.size() << '\n';
} else {
std::cout << "Failed to push: " << i << " (buffer full)\n";
}
}
std::cout << "\n=== Remaining contents ===\n";
while (!buffer.empty()) {
int value;
buffer.pop(value);
std::cout << value << ' ';
}
std::cout << "\n";
std::cout << "\nFinal size: " << buffer.size() << '\n';
std::cout << "Empty: " << (buffer.empty() ? "yes" : "no") << '\n';
std::cout << "Full: " << (buffer.full() ? "yes" : "no") << '\n';
return 0;
}
查看更多示例:UART 模拟、线程安全版本
// uart_example.cpp - 模拟 UART 串口接收场景
#include "ring_buffer.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <csignal>
#include <cstdlib>
// 模拟硬件寄存器
std::atomic<bool> uart_data_ready{false};
std::atomic<uint8_t> uart_rx_reg{0};
// 接收缓冲区
RingBuffer<uint8_t, 128> rx_buffer;
// 模拟 UART 硬件接收中断
void USART_IRQHandler() {
// 在真实硬件中,这是由硬件触发的中断
uint8_t data = uart_rx_reg.load();
uart_data_ready.store(false);
// 中断里只做这件事:把数据放入缓冲区
if (!rx_buffer.push(data)) {
std::cout << "[ISR] Buffer overrun! Data lost.\n";
}
}
// 模拟 UART 接收线程(模拟硬件产生数据)
void uart_rx_thread() {
const char* test_message = "Hello, Embedded World!";
size_t index = 0;
while (test_message[index] != '\0') {
// 模拟硬件接收一个字节
uart_rx_reg.store(static_cast<uint8_t>(test_message[index]));
uart_data_ready.store(true);
// 触发中断(模拟)
USART_IRQHandler();
++index;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
// 处理单个字符
void process_char(uint8_t ch) {
if (ch >= 32 && ch < 127) {
std::cout << "Processing: '" << static_cast<char>(ch) << "'\n";
} else {
std::cout << "Processing: 0x" << std::hex << static_cast<int>(ch) << std::dec << '\n';
}
}
int main() {
std::cout << "=== UART Circular Buffer Example ===\n\n";
// 启动模拟 UART 接收线程
std::thread uart_thread(uart_rx_thread);
// 主循环:处理接收到的数据
int idle_count = 0;
while (idle_count < 50) { // 运行一段时间后退出
uint8_t ch;
if (rx_buffer.pop(ch)) {
process_char(ch);
idle_count = 0;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
++idle_count;
}
}
uart_thread.join();
std::cout << "\n=== Summary ===\n";
std::cout << "Buffer size: " << rx_buffer.size() << '\n';
std::cout << "This pattern demonstrates:\n";
std::cout << "- ISR pushes data quickly\n";
std::cout << "- Main loop processes at its own pace\n";
std::cout << "- No malloc, deterministic behavior\n";
return 0;
}
// atomic_ring_buffer.h - 线程安全的循环缓冲区实现
#pragma once
#include <cstddef>
#include <array>
#include <atomic>
template<typename T, std::size_t Capacity>
class AtomicRingBuffer {
public:
AtomicRingBuffer() : head_(0), tail_(0) {}
bool push(const T& value) {
std::size_t head = head_.load(std::memory_order_relaxed);
std::size_t next_head = (head + 1) % Capacity;
std::size_t tail = tail_.load(std::memory_order_acquire);
if (next_head == tail) {
return false; // 缓冲区满了
}
buffer_[head] = value;
head_.store(next_head, std::memory_order_release);
return true;
}
bool pop(T& out) {
std::size_t tail = tail_.load(std::memory_order_relaxed);
std::size_t head = head_.load(std::memory_order_acquire);
if (tail == head) {
return false; // 没数据
}
out = buffer_[tail];
std::size_t next_tail = (tail + 1) % Capacity;
tail_.store(next_tail, std::memory_order_release);
return true;
}
bool empty() const {
std::size_t head = head_.load(std::memory_order_acquire);
std::size_t tail = tail_.load(std::memory_order_acquire);
return head == tail;
}
bool full() const {
std::size_t head = head_.load(std::memory_order_acquire);
std::size_t tail = tail_.load(std::memory_order_acquire);
return (head + 1) % Capacity == tail;
}
std::size_t size() const {
std::size_t head = head_.load(std::memory_order_acquire);
std::size_t tail = tail_.load(std::memory_order_acquire);
if (head >= tail) {
return head - tail;
}
return Capacity - (tail - head);
}
std::size_t capacity() const {
return Capacity - 1; // 实际可用容量
}
private:
std::array<T, Capacity> buffer_;
std::atomic<std::size_t> head_;
std::atomic<std::size_t> tail_;
};
// thread_safe_example.cpp - 多线程安全的循环缓冲区演示
#include "atomic_ring_buffer.h"
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
// 多生产者-多消费者测试
const int PRODUCER_COUNT = 2;
const int CONSUMER_COUNT = 2;
const int ITEMS_PER_PRODUCER = 100;
AtomicRingBuffer<int, 64> shared_buffer;
// 生产者线程
void producer(int id) {
for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) {
int value = id * 1000 + i;
while (!shared_buffer.push(value)) {
// 缓冲区满,等待
std::this_thread::yield();
}
// 模拟生产时间
std::this_thread::sleep_for(std::chrono::microseconds(rand() % 100));
}
std::cout << "Producer " << id << " finished.\n";
}
// 消费者线程
void consumer(int id) {
int consumed = 0;
int value;
while (consumed < (PRODUCER_COUNT * ITEMS_PER_PRODUCER) / CONSUMER_COUNT) {
if (shared_buffer.pop(value)) {
++consumed;
// std::cout << "Consumer " << id << " got: " << value << '\n';
} else {
// 缓冲区空,等待
std::this_thread::yield();
}
}
std::cout << "Consumer " << id << " consumed " << consumed << " items.\n";
}
int main() {
std::cout << "=== Thread-Safe Ring Buffer Demo ===\n\n";
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
auto start = std::chrono::steady_clock::now();
// 启动消费者
for (int i = 0; i < CONSUMER_COUNT; ++i) {
consumers.emplace_back(consumer, i);
}
// 启动生产者
for (int i = 0; i < PRODUCER_COUNT; ++i) {
producers.emplace_back(producer, i);
}
// 等待所有生产者完成
for (auto& p : producers) {
p.join();
}
// 等待所有消费者完成
for (auto& c : consumers) {
c.join();
}
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "\n=== Test Completed ===\n";
std::cout << "Time taken: " << duration.count() << " ms\n";
std::cout << "Final buffer size: " << shared_buffer.size() << '\n';
std::cout << "Buffer empty: " << (shared_buffer.empty() ? "yes" : "no") << '\n';
std::cout << "\nKey points:\n";
std::cout << "- Uses atomic operations for lock-free synchronization\n";
std::cout << "- Memory ordering ensures correct visibility across threads\n";
std::cout << "- Suitable for single-producer single-consumer scenarios\n";
std::cout << "- For multiple producers/consumers, consider external locking\n";
return 0;
}