跳转至

嵌入式C++教程——循环缓冲区

在嵌入式世界里,有一类问题反复出现:数据源不停地产生数据,消费者慢慢地处理数据,中间还不想 malloc。于是,一个古老但永不过时的数据结构登场了——循环缓冲区(Circular Buffer / Ring Buffer)

你可以把它理解为一个仓库,只有固定大小,装满了就从头再来。没有扩容、没有碎片、没有"new 失败",非常适合 MCU、驱动、中断、DMA、串口、音频流等场景。


为什么嵌入式这么爱循环缓冲区?

在 PC 世界,我们可以随便 new、随便 std::vector::push_back。但在嵌入式里,这些操作听起来就很危险:

  • 堆内存小,还容易碎片化
  • 中断上下文里不能 malloc
  • 实时系统里不希望出现不可控延迟

而循环缓冲区的特性,几乎是为嵌入式量身定做的:

  • 固定大小,编译期或初始化时确定
  • O(1) 入队 / 出队
  • 内存连续,Cache 友好
  • 不需要动态分配
  • 实现简单,容易做成 lock-free / 中断安全

一句话总结:

它不聪明,但它很可靠。


循环缓冲区的核心思想(其实非常朴素)

循环缓冲区本质上就是:

  • 一块固定大小的数组
  • 两个索引:
  • head:写入位置
  • tail:读取位置

当索引走到数组末尾,就绕回开头,像一个圆。

[ 0 ][ 1 ][ 2 ][ 3 ][ 4 ][ 5 ]
                 
      tail      head

写入数据:移动 head 读取数据:移动 tail

重点只有一个问题需要想清楚: 👉 如何区分"满"和"空"?


如何区分"空"和"满"?(经典难题)

有三种常见方案:

  1. 浪费一个元素(最常见)
  2. 额外维护一个 count
  3. 用一个额外的 full 标志位

在嵌入式里,方案 1 最受欢迎:简单、无歧义、逻辑清晰。规则是:

  • 缓冲区大小为 N
  • 实际最多只能存 N - 1 个元素
  • 条件判断:
  • 空:head == tail
  • 满:(head + 1) % N == tail

是的,我们牺牲了一个格子,换来一生的安宁。


一个干净的 C++ 循环缓冲区实现

下面是一个无动态内存、模板化、适合嵌入式的实现。

基本接口设计

#pragma once
#include <cstddef>
#include <array>

template<typename T, std::size_t Capacity>
class RingBuffer {
public:
    bool push(const T& value);
    bool pop(T& out);

    bool empty() const;
    bool full() const;

    std::size_t size() const;
    std::size_t capacity() const { return Capacity - 1; }

private:
    std::array<T, Capacity> buffer_{};
    std::size_t head_ = 0;
    std::size_t tail_ = 0;
};

注意一个细节: 👉 Capacity 实际数组大小 = 用户可用容量 + 1


入队(push):向前走一步

template<typename T, std::size_t Capacity>
bool RingBuffer<T, Capacity>::push(const T& value)
{
    if (full()) {
        return false;  // 缓冲区满了
    }

    buffer_[head_] = value;
    head_ = (head_ + 1) % Capacity;
    return true;
}

这里没有任何黑魔法:

  • 先判断是否满
  • 写数据
  • 移动 head
  • 如果走到末尾,绕回开头

O(1),永远不会慢。


出队(pop):消费者登场

template<typename T, std::size_t Capacity>
bool RingBuffer<T, Capacity>::pop(T& out)
{
    if (empty()) {
        return false;  // 没数据
    }

    out = buffer_[tail_];
    tail_ = (tail_ + 1) % Capacity;
    return true;
}

同样简单:

  • 空就失败
  • 读数据
  • 移动 tail

状态判断函数

template<typename T, std::size_t Capacity>
bool RingBuffer<T, Capacity>::empty() const
{
    return head_ == tail_;
}

template<typename T, std::size_t Capacity>
bool RingBuffer<T, Capacity>::full() const
{
    return (head_ + 1) % Capacity == tail_;
}

template<typename T, std::size_t Capacity>
std::size_t RingBuffer<T, Capacity>::size() const
{
    if (head_ >= tail_) {
        return head_ - tail_;
    }
    return Capacity - (tail_ - head_);
}

size() 这个写法在嵌入式里很常见, 避免分支复杂化,也没有使用额外计数器。


一个真实的嵌入式使用场景

串口接收(ISR + 主循环)

RingBuffer<uint8_t, 128> rx_buffer;

void USART_IRQHandler()
{
    uint8_t data = UART_Read();
    rx_buffer.push(data);  // 中断里只做这件事
}

int main()
{
    while (1) {
        uint8_t ch;
        if (rx_buffer.pop(ch)) {
            process_char(ch);
        }
    }
}

这种写法有几个非常嵌入式的优点:

  • ISR 里逻辑极短
  • 不 malloc
  • 主循环慢慢处理
  • 即使处理慢一点,也不会阻塞中断

关于线程安全 / 中断安全的一点现实提醒

上面的实现是:

  • 单生产者 + 单消费者
  • 一个在中断,一个在主循环

在很多 MCU 上,这是天然安全的(只要索引读写是原子的)。

但如果你遇到下面情况之一:

  • 多线程
  • 多个生产者
  • SMP
  • RTOS 任务间通信

那你需要:

  • 关中断
  • 原子变量
  • 或者 mutex / spinlock

和 std::queue / std::vector 比一比

方案 是否动态分配 是否确定性 嵌入式友好
std::vector
std::queue 取决于底层
循环缓冲区
查看完整可编译示例
// ring_buffer.h - 简单的循环缓冲区实现
#pragma once
#include <cstddef>
#include <array>

template<typename T, std::size_t Capacity>
class RingBuffer {
public:
    bool push(const T& value) {
        if (full()) {
            return false;  // 缓冲区满了
        }

        buffer_[head_] = value;
        head_ = (head_ + 1) % Capacity;
        return true;
    }

    bool pop(T& out) {
        if (empty()) {
            return false;  // 没数据
        }

        out = buffer_[tail_];
        tail_ = (tail_ + 1) % Capacity;
        return true;
    }

    bool empty() const {
        return head_ == tail_;
    }

    bool full() const {
        return (head_ + 1) % Capacity == tail_;
    }

    std::size_t size() const {
        if (head_ >= tail_) {
            return head_ - tail_;
        }
        return Capacity - (tail_ - head_);
    }

    std::size_t capacity() const {
        return Capacity - 1;  // 实际可用容量
    }

    void clear() {
        head_ = tail_ = 0;
    }

private:
    std::array<T, Capacity> buffer_{};
    std::size_t head_ = 0;
    std::size_t tail_ = 0;
};
// basic_example.cpp - 循环缓冲区基本用法演示
#include "ring_buffer.h"
#include <iostream>

int main() {
    // 创建一个容量为 16 的循环缓冲区(实际可用 15)
    RingBuffer<int, 16> buffer;

    std::cout << "Ring Buffer Demo\n";
    std::cout << "Capacity: " << buffer.capacity() << "\n\n";

    // 测试:填充和读取
    std::cout << "=== Pushing 10 elements ===\n";
    for (int i = 0; i < 10; ++i) {
        if (buffer.push(i)) {
            std::cout << "Pushed: " << i << ", size: " << buffer.size() << '\n';
        }
    }

    std::cout << "\n=== Popping 5 elements ===\n";
    for (int i = 0; i < 5; ++i) {
        int value;
        if (buffer.pop(value)) {
            std::cout << "Popped: " << value << ", size: " << buffer.size() << '\n';
        }
    }

    std::cout << "\n=== Pushing 8 more elements ===\n";
    for (int i = 10; i < 18; ++i) {
        if (buffer.push(i)) {
            std::cout << "Pushed: " << i << ", size: " << buffer.size() << '\n';
        } else {
            std::cout << "Failed to push: " << i << " (buffer full)\n";
        }
    }

    std::cout << "\n=== Remaining contents ===\n";
    while (!buffer.empty()) {
        int value;
        buffer.pop(value);
        std::cout << value << ' ';
    }
    std::cout << "\n";

    std::cout << "\nFinal size: " << buffer.size() << '\n';
    std::cout << "Empty: " << (buffer.empty() ? "yes" : "no") << '\n';
    std::cout << "Full: " << (buffer.full() ? "yes" : "no") << '\n';

    return 0;
}
查看更多示例:UART 模拟、线程安全版本
// uart_example.cpp - 模拟 UART 串口接收场景
#include "ring_buffer.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <csignal>
#include <cstdlib>

// 模拟硬件寄存器
std::atomic<bool> uart_data_ready{false};
std::atomic<uint8_t> uart_rx_reg{0};

// 接收缓冲区
RingBuffer<uint8_t, 128> rx_buffer;

// 模拟 UART 硬件接收中断
void USART_IRQHandler() {
    // 在真实硬件中,这是由硬件触发的中断
    uint8_t data = uart_rx_reg.load();
    uart_data_ready.store(false);

    // 中断里只做这件事:把数据放入缓冲区
    if (!rx_buffer.push(data)) {
        std::cout << "[ISR] Buffer overrun! Data lost.\n";
    }
}

// 模拟 UART 接收线程(模拟硬件产生数据)
void uart_rx_thread() {
    const char* test_message = "Hello, Embedded World!";
    size_t index = 0;

    while (test_message[index] != '\0') {
        // 模拟硬件接收一个字节
        uart_rx_reg.store(static_cast<uint8_t>(test_message[index]));
        uart_data_ready.store(true);

        // 触发中断(模拟)
        USART_IRQHandler();

        ++index;
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

// 处理单个字符
void process_char(uint8_t ch) {
    if (ch >= 32 && ch < 127) {
        std::cout << "Processing: '" << static_cast<char>(ch) << "'\n";
    } else {
        std::cout << "Processing: 0x" << std::hex << static_cast<int>(ch) << std::dec << '\n';
    }
}

int main() {
    std::cout << "=== UART Circular Buffer Example ===\n\n";

    // 启动模拟 UART 接收线程
    std::thread uart_thread(uart_rx_thread);

    // 主循环:处理接收到的数据
    int idle_count = 0;
    while (idle_count < 50) {  // 运行一段时间后退出
        uint8_t ch;
        if (rx_buffer.pop(ch)) {
            process_char(ch);
            idle_count = 0;
        } else {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            ++idle_count;
        }
    }

    uart_thread.join();

    std::cout << "\n=== Summary ===\n";
    std::cout << "Buffer size: " << rx_buffer.size() << '\n';
    std::cout << "This pattern demonstrates:\n";
    std::cout << "- ISR pushes data quickly\n";
    std::cout << "- Main loop processes at its own pace\n";
    std::cout << "- No malloc, deterministic behavior\n";

    return 0;
}
// atomic_ring_buffer.h - 线程安全的循环缓冲区实现
#pragma once
#include <cstddef>
#include <array>
#include <atomic>

template<typename T, std::size_t Capacity>
class AtomicRingBuffer {
public:
    AtomicRingBuffer() : head_(0), tail_(0) {}

    bool push(const T& value) {
        std::size_t head = head_.load(std::memory_order_relaxed);
        std::size_t next_head = (head + 1) % Capacity;
        std::size_t tail = tail_.load(std::memory_order_acquire);

        if (next_head == tail) {
            return false;  // 缓冲区满了
        }

        buffer_[head] = value;
        head_.store(next_head, std::memory_order_release);
        return true;
    }

    bool pop(T& out) {
        std::size_t tail = tail_.load(std::memory_order_relaxed);
        std::size_t head = head_.load(std::memory_order_acquire);

        if (tail == head) {
            return false;  // 没数据
        }

        out = buffer_[tail];
        std::size_t next_tail = (tail + 1) % Capacity;
        tail_.store(next_tail, std::memory_order_release);
        return true;
    }

    bool empty() const {
        std::size_t head = head_.load(std::memory_order_acquire);
        std::size_t tail = tail_.load(std::memory_order_acquire);
        return head == tail;
    }

    bool full() const {
        std::size_t head = head_.load(std::memory_order_acquire);
        std::size_t tail = tail_.load(std::memory_order_acquire);
        return (head + 1) % Capacity == tail;
    }

    std::size_t size() const {
        std::size_t head = head_.load(std::memory_order_acquire);
        std::size_t tail = tail_.load(std::memory_order_acquire);
        if (head >= tail) {
            return head - tail;
        }
        return Capacity - (tail - head);
    }

    std::size_t capacity() const {
        return Capacity - 1;  // 实际可用容量
    }

private:
    std::array<T, Capacity> buffer_;
    std::atomic<std::size_t> head_;
    std::atomic<std::size_t> tail_;
};
// thread_safe_example.cpp - 多线程安全的循环缓冲区演示
#include "atomic_ring_buffer.h"
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>

// 多生产者-多消费者测试
const int PRODUCER_COUNT = 2;
const int CONSUMER_COUNT = 2;
const int ITEMS_PER_PRODUCER = 100;

AtomicRingBuffer<int, 64> shared_buffer;

// 生产者线程
void producer(int id) {
    for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) {
        int value = id * 1000 + i;
        while (!shared_buffer.push(value)) {
            // 缓冲区满,等待
            std::this_thread::yield();
        }
        // 模拟生产时间
        std::this_thread::sleep_for(std::chrono::microseconds(rand() % 100));
    }
    std::cout << "Producer " << id << " finished.\n";
}

// 消费者线程
void consumer(int id) {
    int consumed = 0;
    int value;
    while (consumed < (PRODUCER_COUNT * ITEMS_PER_PRODUCER) / CONSUMER_COUNT) {
        if (shared_buffer.pop(value)) {
            ++consumed;
            // std::cout << "Consumer " << id << " got: " << value << '\n';
        } else {
            // 缓冲区空,等待
            std::this_thread::yield();
        }
    }
    std::cout << "Consumer " << id << " consumed " << consumed << " items.\n";
}

int main() {
    std::cout << "=== Thread-Safe Ring Buffer Demo ===\n\n";

    std::vector<std::thread> producers;
    std::vector<std::thread> consumers;

    auto start = std::chrono::steady_clock::now();

    // 启动消费者
    for (int i = 0; i < CONSUMER_COUNT; ++i) {
        consumers.emplace_back(consumer, i);
    }

    // 启动生产者
    for (int i = 0; i < PRODUCER_COUNT; ++i) {
        producers.emplace_back(producer, i);
    }

    // 等待所有生产者完成
    for (auto& p : producers) {
        p.join();
    }

    // 等待所有消费者完成
    for (auto& c : consumers) {
        c.join();
    }

    auto end = std::chrono::steady_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

    std::cout << "\n=== Test Completed ===\n";
    std::cout << "Time taken: " << duration.count() << " ms\n";
    std::cout << "Final buffer size: " << shared_buffer.size() << '\n';
    std::cout << "Buffer empty: " << (shared_buffer.empty() ? "yes" : "no") << '\n';

    std::cout << "\nKey points:\n";
    std::cout << "- Uses atomic operations for lock-free synchronization\n";
    std::cout << "- Memory ordering ensures correct visibility across threads\n";
    std::cout << "- Suitable for single-producer single-consumer scenarios\n";
    std::cout << "- For multiple producers/consumers, consider external locking\n";

    return 0;
}