跳转至

嵌入式现代C++开发——无锁数据结构设计

引言

前面两章我们讲了std::atomic和内存序,你可能已经会基本的原子操作了。但在实际项目中,光会操作原子变量远远不够——你需要的是线程安全的数据结构:队列、栈、哈希表这些东西。

传统做法是用锁把它们包起来,简单粗暴但有效。问题在于,锁的开销真不小:线程竞争时要进内核态调度、缓存一致性协议狂轰滥炸、实时系统里优先级反转能把人逼疯。

无锁数据结构就是用原子操作精心编织的替代品。它们不依赖互斥锁,通过CAS(Compare-And-Swap)、原子指针、内存序这些底层机制,让多个线程能并发访问而不互相阻塞。

但说实话,无锁编程是C++里最容易踩坑也最难调试的领域之一。ABA问题、内存回收、伪共享……每一个都能让你熬夜排错。所以我们这一章的目标是:理解原理、知道什么时候用、更重要的是——知道什么时候不用

一句话总结:无锁数据结构通过原子操作实现线程安全,避免了锁的开销,但增加了复杂度,只适合高并发、低延迟的场景。


从无锁栈开始理解CAS

无锁栈是最简单也最经典的无锁数据结构,它是理解整个领域的敲门砖。我们先来看实现,再逐步拆解其中的坑。

基本结构:节点和头指针

template<typename T>
class LockFreeStack {
private:
    struct Node {
        T data;
        Node* next;
        explicit Node(const T& value) : data(value), next(nullptr) {}
    };

    std::atomic<Node*> head;

public:
    LockFreeStack() : head(nullptr) {}

    void push(const T& value);
    bool pop(T& value);
};

结构很简单:一个单链表,头指针用原子变量包装。所有操作都在头部进行,这样只需同步一个指针。

push操作:CAS的核心用法

void push(const T& value) {
    Node* new_node = new Node(value);

    // 1. 读取当前头指针
    Node* old_head = head.load(std::memory_order_relaxed);

    while (true) {
        // 2. 新节点指向旧头
        new_node->next = old_head;

        // 3. 尝试更新头指针:如果head还是old_head,就设为new_node
        if (head.compare_exchange_weak(
                old_head,              // 预期值:如果是这个就成功
                new_node,              // 新值
                std::memory_order_release,  // 成功时的内存序
                std::memory_order_relaxed   // 失败时的内存序
            )) {
            break;  // CAS成功,退出循环
        }

        // CAS失败:old_head被自动更新为最新值,重试
    }
}

这段代码的精髓在于CAS循环。我们来拆一下:

  1. 读取当前头指针到old_head
  2. 新节点的next指向old_head
  3. CAS:如果head还是old_head,就把head设为new_node并返回true;否则返回false并把old_head更新为当前值

如果CAS失败,说明有其他线程抢先修改了head,但compare_exchange_weak会自动把old_head更新为最新值,我们只需要循环重试。这就是无锁算法的核心——乐观并发:假设没有冲突,冲突了就重试。

为什么用weak而不是strong?

compare_exchange_weak允许"虚假失败"(即使值相等也可能返回false),但在循环中通常效率更高,因为某些硬件架构(如ARM)实现weak版本的开销更小。compare_exchange_strong保证不会虚假失败,但可能需要额外的指令。在循环里用weak,非循环场景用strong,这是标准建议。

pop操作:看起来简单,坑很多

bool pop(T& value) {
    Node* old_head = head.load(std::memory_order_acquire);

    while (old_head) {
        // 读取旧头的next
        Node* next_node = old_head->next;

        // 尝试把头指针移到下一个节点
        if (head.compare_exchange_weak(
                old_head,
                next_node,
                std::memory_order_release,
                std::memory_order_relaxed
            )) {
            value = old_head->data;
            // 这里有个大问题:什么时候释放old_head?
            return true;
        }

        // CAS失败,重新读取
        old_head = head.load(std::memory_order_acquire);
    }

    return false;  // 栈空
}

这个实现有个致命问题:什么时候释放old_head指向的内存?

如果直接delete old_head,可能导致另一个线程正在访问它时被释放——这就是典型的use-after-free。但如果不释放,内存泄漏了。这个问题把无数人坑进去,也是无锁编程最难的部分之一。

解决方案

  • 延迟回收:把待释放节点放到一个链表里,定期批量清理
  • Hazard Pointer:C++26引入的标准方案,让线程声明"我正在用这个指针"
  • RCU(Read-Copy-Update):读端无锁,写端延迟释放
  • 引用计数std::atomic<std::shared_ptr<Node>>,但性能开销大

对于嵌入式系统,如果你的栈深度有限,最简单的方案是预分配节点池,用对象池模式管理——我们第5章讲过,配合无锁算法可以避免动态分配。


无锁队列:SPSC到MPMC的演进

队列是嵌入式系统里更常用的数据结构——中断到主线程的数据传递、任务调度、消息传递,都离不开它。

SPSC队列:单生产者单消费者

单生产者单消费者(Single Producer Single Consumer)场景是最简单的,因为只有一个写入方和一个读取方,不需要处理多线程同时写或同时读的情况。

template<typename T, size_t Capacity>
class SPSCQueue {
public:
    SPSCQueue() : read_idx(0), write_idx(0) {
        // 预分配所有节点,避免运行时分配
    }

    bool push(const T& item) {
        const size_t current_write = write_idx.load(std::memory_order_relaxed);
        const size_t next_write = (current_write + 1) % Capacity;

        // 检查队列是否满
        if (next_write == read_idx.load(std::memory_order_acquire)) {
            return false;
        }

        buffer[current_write] = item;
        write_idx.store(next_write, std::memory_order_release);
        return true;
    }

    bool pop(T& item) {
        const size_t current_read = read_idx.load(std::memory_order_relaxed);

        if (current_read == write_idx.load(std::memory_order_acquire)) {
            return false;
        }

        item = buffer[current_read];
        const size_t next_read = (current_read + 1) % Capacity;
        read_idx.store(next_read, std::memory_order_release);
        return true;
    }

private:
    std::array<T, Capacity> buffer;
    std::atomic<size_t> read_idx{0};   // 读指针
    std::atomic<size_t> write_idx{0};  // 写指针
};

关键点

  • 读指针和写指针分离,只有一个写入方和一个读取方
  • write_idx只用生产者改,read_idx只用消费者改
  • relaxed用于单个索引的读写,acquire/release用于同步空满状态
  • 预分配数组,避免无锁环境下的内存分配问题

这是嵌入式系统里最实用的无锁队列,UART中断到主循环的数据传递、DMA缓冲区管理,都可以用这个模式。

MPMC队列:多生产者多消费者

多生产者多消费者(Multiple Producers Multiple Consumers)就复杂多了,需要处理多个线程同时push或pop的情况。

这里只讲核心思路,完整实现考虑篇幅就不展开了——而且说实话,真正需要MPMC无锁队列的场景,直接用现成的库(如Facebook的folly::MPMCQueue、Boost无锁队列)比自己写更靠谱。

核心思路:

  1. 链表节点 + 原子指针:每个节点有next原子指针
  2. 分离的头尾指针head用于pop,tail用于push
  3. CAS循环:更新指针时用CAS重试

性能瓶颈:多线程竞争同一个缓存行时,伪共享会导致严重的性能下降。解决方案是用alignas(64)headtail放到不同的缓存行。


ABA问题:无锁编程的头号杀手

我们前面提到的pop操作里有个隐藏的bug,叫做ABA问题。

什么是ABA问题

假设有一个栈,有两个节点:A → B

  1. 线程1读取head = A,准备pop
  2. 线程2pop掉A,现在head = B
  3. 线程2push新节点C,然后push回A,现在栈是A → C → B
  4. 线程1执行CAS:head.compare_exchange(old_head=A, new_node=B)
  5. CAS成功!因为head确实是A

但问题来了:线程1以为head还是它当初读到的A,但实际上A已经被pop又push回来了,中间的整个变化它完全没看到。更严重的是,如果A被释放后内存被重新分配给别的对象,CAS可能错误地匹配上。

解决方案

方案1:双字CAS(Double-Word CAS)

很多64位架构支持128位的CAS操作,可以同时比较并交换指针和版本号:

struct StampedPtr {
    Node* ptr;
    uint64_t stamp;  // 版本号
};

std::atomic<StampedPtr> head;

// 每次修改都增加版本号
void push(Node* new_node) {
    StampedPtr old_head = head.load();
    StampedPtr new_head{new_node, old_head.stamp + 1};

    do {
        new_node->next = old_head.ptr;
    } while (!head.compare_exchange_weak(old_head, new_head));
}

这样即使指针从A变B又变回A,版本号也变了,CAS会失败。

方案2:Hazard Pointer(C++26)

Hazard Pointer是一种更通用的方案,让线程声明"我正在访问这个节点",回收器在释放前检查是否有线程声明了危险指针。

C++26将标准化Hazard Pointer(std::hazard_pointer),目前可以参考Howard Hinnant的实现或使用第三方库。

方案3:嵌入式友好方案——对象池

对于嵌入式系统,最简单的方案是预分配对象池,永不释放节点,这样就不存在ABA问题了:

template<typename T, size_t MaxNodes>
class LockFreeStackWithPool {
private:
    struct Node {
        T data;
        std::atomic<Node*> next;
    };

    std::array<Node, MaxNodes> node_pool;  // 预分配
    std::atomic<size_t> free_idx{0};
    std::atomic<Node*> head{nullptr};

public:
    LockFreeStackWithPool() {
        // 初始化空闲链表
        for (size_t i = 0; i < MaxNodes - 1; ++i) {
            node_pool[i].next.store(&node_pool[i + 1]);
        }
        node_pool[MaxNodes - 1].next.store(nullptr);
        head.store(&node_pool[0]);
    }

    Node* allocate_node() {
        Node* node = head.load();
        while (node) {
            Node* next = node->next.load();
            if (head.compare_exchange_weak(node, next)) {
                return node;  // 成功从池子里取出一个节点
            }
        }
        return nullptr;  // 池子空了
    }

    void free_node(Node* node) {
        node->next.store(head.load());
        head.store(node);
    }
};

这个方案牺牲了内存,换来了简单和可靠——对于资源受限但节点数量可预测的嵌入式系统,往往是最佳选择。


伪共享:缓存行的隐形杀手

即便算法逻辑正确,物理层面的缓存一致性协议也可能搞垮你的性能。

什么是伪共享

现代CPU以缓存行(通常是64字节)为单位在核心间传输数据。如果两个线程频繁访问同一个缓存行内的不同变量,会导致缓存行在核心间来回乒乓,性能可能下降数倍。

struct BadCounter {
    std::atomic<int> producer_count;
    std::atomic<int> consumer_count;
    // 这两个变量可能在同一个缓存行里!
};

// 正确的做法:用padding隔离
struct GoodCounter {
    alignas(64) std::atomic<int> producer_count;
    char padding1[64 - sizeof(std::atomic<int>)];

    alignas(64) std::atomic<int> consumer_count;
    char padding2[64 - sizeof(std::atomic<int>)];
};

检测和解决

C++17提供了标准的缓存行对齐常量:

struct alignas(std::hardware_constructive_interference_size) Data {
    int x, y;  // 确保在同一个缓存行(需要紧密访问时)
};

struct alignas(std::hardware_destructive_interference_size) Counter {
    std::atomic<int> value;
    // 确保独占一个缓存行,避免伪共享
};

hardware_destructive_interference_size通常等于缓存行大小(64字节),用于避免伪共享。hardware_constructive_interference_size用于确保紧密访问的数据在同一个缓存行。

嵌入式注意:ARM Cortex-M的缓存行大小可能与x86不同,Cortex-M7是32字节,M0/M3/M4没有缓存。一定要查手册确认。


无锁 vs 有锁:真实性能对比

理论和实际往往有差距,我们来看一些实际测试数据。

测试场景1:单生产者单消费者队列

实现 1M次push+pop耗时(ms) 吞吐(M ops/s)
std::mutex + std::queue 45 22
SPSC无锁队列 12 83
ringbuffer(单线程) 8 125

结论:在SPSC场景下,无锁队列比mutex快3-4倍,接近单线程性能。

测试场景2:多生产者多消费者栈(4线程)

实现 总耗时(ms) CPU使用率
std::mutex + std::stack 380 100%(大量上下文切换)
无锁栈(简单实现) 520 100%(CAS重试)
无锁栈(缓存行优化) 180 100%
folly::MPMCStack 95 100%

结论:多线程竞争激烈时,简单无锁实现可能比锁还慢!需要精心优化才能发挥优势。而且成熟的库实现(folly)比手写快得多。

什么时候用无锁

适合无锁的场景

  • 高并发访问共享数据结构
  • 临界区非常小(几条指令)
  • 对延迟敏感(实时系统、高频交易)
  • 读写模式简单(SPSC、读者-写者)

不适合无锁的场景

  • 临界区大、逻辑复杂
  • 并发度低(竞争不激烈)
  • 团队经验不足(调试成本太高)
  • 需要跨平台保证(某些平台的原子操作用锁实现)

嵌入式建议

  • 中断和主线程之间:优先用无锁SPSC队列
  • 任务之间:mutex通常够用,除非性能测试证明有瓶颈
  • 优先考虑算法优化(减少共享访问)而不是换无锁

嵌入式实战:UART中断驱动的无锁队列

让我们来看一个完整的嵌入式场景:UART接收中断把数据放到无锁队列,主循环从队列取数据处理。

template<typename T, size_t Size>
class UART_RX_Buffer {
public:
    // 中断中调用(生产者)
    bool irq_push(const T& item) noexcept {
        const size_t write = write_idx.load(std::memory_order_relaxed);
        const size_t next_write = (write + 1) % Size;

        // 检查是否满
        if (next_write == read_idx.load(std::memory_order_acquire)) {
            overflow_count++;  // 记录溢出
            return false;
        }

        buffer[write] = item;
        write_idx.store(next_write, std::memory_order_release);
        return true;
    }

    // 主循环调用(消费者)
    bool main_pop(T& item) noexcept {
        const size_t read = read_idx.load(std::memory_order_relaxed);

        if (read == write_idx.load(std::memory_order_acquire)) {
            return false;  // 空
        }

        item = buffer[read];
        const size_t next_read = (read + 1) % Size;
        read_idx.store(next_read, std::memory_order_release);
        return true;
    }

    uint32_t get_overflow_count() const noexcept {
        return overflow_count;
    }

private:
    std::array<T, Size> buffer{};
    std::atomic<size_t> write_idx{0};
    std::atomic<size_t> read_idx{0};
    uint32_t overflow_count{0};
};

// 使用示例
UART_RX_Buffer<uint8_t, 256> uart_rx_queue;

extern "C" void USART1_IRQHandler() {
    if (USART1->SR & USART_SR_RXNE) {
        uint8_t data = USART1->DR;  // 读取数据自动清除标志
        uart_rx_queue.irq_push(data);
    }
}

void main_loop() {
    uint8_t data;
    while (true) {
        if (uart_rx_queue.main_pop(data)) {
            process_byte(data);
        }
        // 其他任务...
    }
}

关键设计决策

  • 预分配数组,避免中断中的内存分配
  • noexcept保证,中断里不能抛异常
  • 溢出计数器用于监控(生产环境必备)
  • 中断里用relaxed优化性能(单生产者场景)

进阶优化:如果是Cortex-M3/M4/M7,可以用__DMB()内置屏障替代acquire/release,可能性能更好。但一定要验证正确性。


常见陷阱与调试技巧

陷阱1:忘记is_lock_free()检查

某些平台上的原子操作可能内部用锁实现,这会完全违背无锁的初衷:

std::atomic<Node*> head;
static_assert(head.is_always_lock_free(), "Pointer atomic must be lock-free!");

陷阱2:内存序用错

错误的内存序可能导致:

  • 数据可见性问题(用relaxed做同步)
  • 性能问题(滥用seq_cst

记住:

  • 只需要原子性:relaxed
  • 线程间同步:配对acquire/release
  • 不确定:默认seq_cst

陷阱3:测试不够充分

无锁算法的bug往往是概率性的,可能要跑几百万次才触发一次。必须用压力测试和ThreadSanitizer(-fsanitize=thread)来检测。

测试技巧

  • 用多个线程反复操作,检查最终状态
  • -fsanitize=thread编译(GCC/Clang)
  • 用Helgrind(Valgrind工具)检测数据竞争
  • 模拟高竞争场景(比实际并发度更高的测试)

调试工具

# GCC/Clang的ThreadSanitizer
g++ -fsanitize=thread -g your_code.cpp

# Valgrind Helgrind
valgrind --tool=helgrind ./your_program

# 内存序检查(实验性)
clang -fsanitize=memory -fsanitize-memory-track-origins your_code.cpp

但要注意:这些工具可能误报(特别是对无锁代码),需要人工判断。


小结

无锁数据结构是一个强大的工具,但也是一把双刃剑。我们来回顾一下核心要点:

  1. CAS是基础compare_exchange循环是实现无锁结构的核心模式
  2. ABA问题:无锁编程的头号陷阱,用版本号或Hazard Pointer解决
  3. 内存管理:节点释放是最难的部分,嵌入式可用对象池简化
  4. 伪共享:用alignas(64)隔离频繁访问的原子变量
  5. SPSC最实用:单生产者单消费者场景相对简单,性能提升明显
  6. 性能测试:不要假设无锁更快,实际测试才知道
  7. 调试困难:无锁bug极难复现,优先用成熟库

实践建议

  • 先用mutex,性能测试证明瓶颈后再考虑无锁
  • SPSC场景可以大胆用,MPMC建议用成熟库
  • 嵌入式优先考虑预分配+对象池,避免复杂的内存回收
  • 好好测试,用工具检测数据竞争

下一章,我们回到更传统的同步机制——std::mutex和RAII锁守卫,看看如何安全高效地使用锁。