引用计数的实现与性能¶
写引用计数的文章就像在讲"谁送快递给谁付钱"——每多一个人拿着快递,你的账本上就多记一笔;最后那个人把快递丢在门口,账本清零,快递就可以销毁。区别是:在嵌入式世界里,这个"账本"得很小心地放在口袋里,不能丢、不能被并发多人改错账,还要尽量别让 CPU 为了记一笔账掉进长时间的排队。
什么是引用计数¶
引用计数(reference counting)就是给对象装个小计数器,每当有人"拿走"一份引用就 ++,放回去就 --;当计数变成 0,表明没人需要了,就销毁对象。简单、直观、无需全局垃圾回收。但要注意:两个对象互相持有引用——那就是经典的"情侣互相缠着不放(cycle)",永远不会变成 0,需要我们手动拆或用弱引用(weak reference)。
两种常见实现思路(不列长清单,用段落说明)¶
非侵入式(non-intrusive):像 std::shared_ptr,计数器放在对象外面的控制块(control block)。优点是对象类不需要修改,适配性高;缺点是需要额外分配(可能两次分配:对象 + control block),这在内存受限或不允许堆分配的嵌入式系统里可能是问题。
侵入式(intrusive):把计数器放到对象内部(通常是基类成员)。优点是零额外分配(节省内存与碎片),性能更好(少一次内存访问);缺点是对象类必须侵入式继承或提供接口,侵入性高但在自研系统里通常可接受。
对于嵌入式,我的倾向是:能侵入就侵入;不能改类结构的才用非侵入。
单线程、最小化实现(嵌入式友好、无锁、无异常)¶
如果你是在裸机、单核、没有 preemptive threading 的环境,引用计数可以做到非常轻量:用 uint16_t/uint32_t,普通 +±- 就好。示例:
// IntrusiveRef.h - 单线程版本,适合裸机或只在主任务中使用
class IntrusiveRefBase {
protected:
uint16_t ref_count_{1}; // 默认持有者为创建者
virtual ~IntrusiveRefBase() = default;
public:
void retain() noexcept { ++ref_count_; }
void release() noexcept {
if (--ref_count_ == 0) delete this;
}
};
template <typename T>
class IntrusivePtr {
T* p_;
public:
IntrusivePtr(T* p = nullptr) : p_(p) {}
IntrusivePtr(const IntrusivePtr& o) : p_(o.p_) { if (p_) p_->retain(); }
IntrusivePtr& operator=(const IntrusivePtr& o) {
if (o.p_) o.p_->retain();
if (p_) p_->release();
p_ = o.p_;
return *this;
}
~IntrusivePtr() { if (p_) p_->release(); }
T* get() const noexcept { return p_; }
T& operator*() const noexcept { return *p_; }
T* operator->() const noexcept { return p_; }
};
说明:没有 std::atomic,极简开销,适合没有并发的嵌入式场景。
查看完整可编译示例
// 侵入式引用计数示例 - 单线程版本
// 演示适合嵌入式环境的轻量级引用计数实现
#include <cstdint>
#include <cstdio>
// ========== 单线程侵入式引用计数基类 ==========
class IntrusiveRefBase {
protected:
uint16_t ref_count_{1}; // 默认持有者为创建者
virtual ~IntrusiveRefBase() = default;
public:
void retain() noexcept {
++ref_count_;
}
void release() noexcept {
if (--ref_count_ == 0) {
delete this;
}
}
uint16_t ref_count() const noexcept {
return ref_count_;
}
};
// ========== 单线程智能指针 ==========
template <typename T>
class IntrusivePtr {
T* p_;
public:
IntrusivePtr(T* p = nullptr) : p_(p) {}
IntrusivePtr(const IntrusivePtr& o) : p_(o.p_) {
if (p_) p_->retain();
}
IntrusivePtr(IntrusivePtr&& o) noexcept : p_(o.p_) {
o.p_ = nullptr;
}
~IntrusivePtr() {
if (p_) p_->release();
}
IntrusivePtr& operator=(const IntrusivePtr& o) {
if (o.p_) o.p_->retain();
if (p_) p_->release();
p_ = o.p_;
return *this;
}
IntrusivePtr& operator=(IntrusivePtr&& o) noexcept {
if (this != &o) {
if (p_) p_->release();
p_ = o.p_;
o.p_ = nullptr;
}
return *this;
}
T* get() const noexcept { return p_; }
T& operator*() const noexcept { return *p_; }
T* operator->() const noexcept { return p_; }
explicit operator bool() const noexcept { return p_ != nullptr; }
void reset() {
if (p_) {
p_->release();
p_ = nullptr;
}
}
uint16_t use_count() const noexcept {
return p_ ? p_->ref_count() : 0;
}
};
// ========== 使用示例 ==========
class SharedResource : public IntrusiveRefBase {
public:
int id;
int data[64]; // 模拟一些数据
explicit SharedResource(int i) : id(i) {
printf("SharedResource %d created\n", id);
}
~SharedResource() override {
printf("SharedResource %d destroyed (final refcount: %u)\n",
id, ref_count());
}
void process() {
printf("Processing resource %d\n", id);
}
};
void basic_usage() {
printf("=== Basic Intrusive Reference Counting ===\n");
IntrusivePtr<SharedResource> res1(new SharedResource(1));
printf("After creation: use_count = %u\n", res1.use_count());
{
IntrusivePtr<SharedResource> res2 = res1;
printf("After copy: use_count = %u\n", res1.use_count());
IntrusivePtr<SharedResource> res3 = res2;
printf("After second copy: use_count = %u\n", res1.use_count());
}
printf("After inner scope: use_count = %u\n", res1.use_count());
}
void move_semantics() {
printf("\n=== Move Semantics ===\n");
IntrusivePtr<SharedResource> res1(new SharedResource(2));
printf("Initial: res1.use_count = %u\n", res1.use_count());
IntrusivePtr<SharedResource> res2 = std::move(res1);
printf("After move:\n");
printf(" res1 is null: %s\n", !res1 ? "yes" : "no");
printf(" res2.use_count = %u\n", res2.use_count());
}
void container_usage() {
printf("\n=== Container Usage ===\n");
// 简单的固定容量容器
struct ResourceHolder {
IntrusivePtr<SharedResource> resources[4];
size_t count = 0;
void add(const IntrusivePtr<SharedResource>& res) {
if (count < 4) {
resources[count++] = res;
}
}
void print_all() {
for (size_t i = 0; i < count; ++i) {
printf(" resources[%zu]: id=%d, refcount=%u\n",
i, resources[i]->id, resources[i].use_count());
}
}
};
ResourceHolder holder;
IntrusivePtr<SharedResource> res(new SharedResource(3));
printf("Original refcount: %u\n", res.use_count());
holder.add(res);
printf("After adding to holder: %u\n", res.use_count());
holder.add(res);
printf("After adding again: %u\n", res.use_count());
holder.print_all();
}
// 多态示例
class Message : public IntrusiveRefBase {
public:
virtual ~Message() = default;
virtual void execute() = 0;
};
class PrintMessage : public Message {
public:
const char* text;
explicit PrintMessage(const char* t) : text(t) {
printf("PrintMessage created: %s\n", text);
}
~PrintMessage() override {
printf("PrintMessage destroyed\n");
}
void execute() override {
printf("Executing: %s\n", text);
}
};
class BeepMessage : public Message {
public:
int frequency;
explicit BeepMessage(int f) : frequency(f) {
printf("BeepMessage created: %dHz\n", f);
}
~BeepMessage() override {
printf("BeepMessage destroyed\n");
}
void execute() override {
printf("Beeping at %dHz\n", frequency);
}
};
void polymorphism_example() {
printf("\n=== Polymorphism Example ===\n");
IntrusivePtr<Message> msg1(new PrintMessage("Hello, World!"));
IntrusivePtr<Message> msg2(new BeepMessage(440));
msg1->execute();
msg2->execute();
}
// 性能对比(概念性)
void performance_notes() {
printf("\n=== Performance Notes ===\n");
printf("Intrusive reference counting advantages:\n");
printf(" - No separate control block allocation\n");
printf(" - No atomic operations (in this single-threaded version)\n");
printf(" - Better cache locality (counter is in the object)\n");
printf(" - Predictable memory layout\n");
printf("\nSize comparison:\n");
printf(" sizeof(IntrusivePtr<SharedResource>): %zu\n",
sizeof(IntrusivePtr<SharedResource>));
printf(" sizeof(SharedResource*): %zu\n", sizeof(SharedResource*));
}
int main() {
basic_usage();
move_semantics();
container_usage();
polymorphism_example();
performance_notes();
printf("\n=== All Examples Complete ===\n");
return 0;
}
多线程 / 中断上下文下的线程安全引用计数¶
当引用可以在不同任务/中断里操作时,你需要原子性。标准做法是在计数器上使用 std::atomic<uint32_t>。对性能和正确性的一个典型要求是:增加引用(retain)可以使用比较宽松(relaxed)内存序,但在释放到 0 并执行删除时必须使用 acquire/release 来保证可见性(对象构造后的写,对删除线程可见)。
常见安全模式(并被广泛使用):
// Control block style (non-intrusive) 线程安全示例(简化)
struct ControlBlock {
std::atomic<uint32_t> ref{1};
// 可扩展:弱引用计数、自定义删除器等
};
template <typename T>
class SharedPtr {
T* ptr_;
ControlBlock* cb_;
public:
SharedPtr(T* p = nullptr): ptr_(p), cb_(p ? new ControlBlock{} : nullptr) {}
SharedPtr(const SharedPtr& o) : ptr_(o.ptr_), cb_(o.cb_) {
if (cb_) cb_->ref.fetch_add(1, std::memory_order_relaxed);
}
SharedPtr& operator=(const SharedPtr& o) {
if (o.cb_) o.cb_->ref.fetch_add(1, std::memory_order_relaxed);
release();
ptr_ = o.ptr_; cb_ = o.cb_;
return *this;
}
~SharedPtr() { release(); }
private:
void release() {
if (!cb_) return;
if (cb_->ref.fetch_sub(1, std::memory_order_acq_rel) == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
delete ptr_;
delete cb_;
}
}
};
关键点解释:fetch_add(1, relaxed) 用于提高并发下的吞吐(因为仅递增不用保证内存同步);fetch_sub(1, acq_rel) 用来在最后一个引用离开时,以 acq_rel 保证前面的写入对删除线程可见;紧接着的 atomic_thread_fence(acquire) 确保在删除对象前已同步好所有内存状态(这是 std::shared_ptr 实现中的常见模式)。
查看完整可编译示例
// 线程安全引用计数示例
// 演示多线程/中断环境下的引用计数实现
#include <cstdint>
#include <cstdio>
#include <atomic>
#include <thread>
#include <vector>
// ========== 线程安全侵入式引用计数基类 ==========
class AtomicRefBase {
protected:
std::atomic<uint32_t> ref_count_{1};
virtual ~AtomicRefBase() = default;
public:
void retain() noexcept {
// 使用 relaxed 内存序,因为只是增加计数
ref_count_.fetch_add(1, std::memory_order_relaxed);
}
void release() noexcept {
// 使用 acq_rel 内存序确保可见性
if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
// 最后一个引用,确保所有之前的修改都可见
std::atomic_thread_fence(std::memory_order_acquire);
delete this;
}
}
uint32_t ref_count() const noexcept {
return ref_count_.load(std::memory_order_relaxed);
}
bool is_lock_free() const noexcept {
return ref_count_.is_always_lock_free;
}
};
// ========== 线程安全智能指针 ==========
template <typename T>
class AtomicPtr {
T* p_;
public:
AtomicPtr(T* p = nullptr) : p_(p) {}
AtomicPtr(const AtomicPtr& o) : p_(o.p_) {
if (p_) p_->retain();
}
AtomicPtr(AtomicPtr&& o) noexcept : p_(o.p_) {
o.p_ = nullptr;
}
~AtomicPtr() {
if (p_) p_->release();
}
AtomicPtr& operator=(const AtomicPtr& o) {
if (o.p_) o.p_->retain();
if (p_) p_->release();
p_ = o.p_;
return *this;
}
AtomicPtr& operator=(AtomicPtr&& o) noexcept {
if (this != &o) {
if (p_) p_->release();
p_ = o.p_;
o.p_ = nullptr;
}
return *this;
}
T* get() const noexcept { return p_; }
T& operator*() const noexcept { return *p_; }
T* operator->() const noexcept { return p_; }
explicit operator bool() const noexcept { return p_ != nullptr; }
uint32_t use_count() const noexcept {
return p_ ? p_->ref_count() : 0;
}
};
// ========== 使用示例 ==========
class ThreadSafeResource : public AtomicRefBase {
public:
int id;
int value;
ThreadSafeResource(int i, int v) : id(i), value(v) {
printf("Resource %d created (value=%d)\n", id, value);
}
~ThreadSafeResource() override {
printf("Resource %d destroyed (final refcount: %u)\n",
id, ref_count());
}
void increment() {
++value;
}
};
// ========== 多线程测试 ==========
void multi_thread_test() {
printf("=== Multi-Thread Reference Counting Test ===\n");
AtomicPtr<ThreadSafeResource> shared(new ThreadSafeResource(1, 0));
printf("Initial refcount: %u\n", shared.use_count());
printf("Atomic is lock-free: %s\n",
shared->is_lock_free() ? "yes" : "no");
const int NUM_THREADS = 4;
const int OPERATIONS = 10000;
std::vector<std::thread> threads;
for (int i = 0; i < NUM_THREADS; ++i) {
threads.emplace_back([&shared, OPERATIONS]() {
// 每个线程持有自己的拷贝
AtomicPtr<ThreadSafeResource> local = shared;
for (int j = 0; j < OPERATIONS; ++j) {
local->increment();
// 偶尔创建额外的拷贝增加引用计数
if (j % 100 == 0) {
AtomicPtr<ThreadSafeResource> temp = local;
(void)temp; // 避免未使用警告
}
}
});
}
for (auto& t : threads) {
t.join();
}
printf("\nAfter all threads complete:\n");
printf(" Final refcount: %u\n", shared.use_count());
printf(" Final value: %d (expected: %d)\n",
shared->value, NUM_THREADS * OPERATIONS);
}
// ========== 性能对比测试 ==========
void performance_comparison() {
printf("\n=== Performance Comparison ===\n");
const int N = 100000;
// 测试原子引用计数的开销
auto start = std::chrono::steady_clock::now();
{
AtomicPtr<ThreadSafeResource> ptr(new ThreadSafeResource(2, 0));
for (int i = 0; i < N; ++i) {
AtomicPtr<ThreadSafeResource> temp = ptr;
// temp 离开作用域
}
}
auto end = std::chrono::steady_clock::now();
auto atomic_time = std::chrono::duration_cast<std::chrono::microseconds>(
end - start).count();
printf("Atomic refcount (%d cycles): %ld us\n", N, atomic_time);
printf("Average per copy/destroy: %.3f us\n",
atomic_time / (2.0 * N));
}
// ========== 内存序说明 ==========
void memory_order_explanation() {
printf("\n=== Memory Order Explanation ===\n");
printf("\nWhy relaxed for increment?\n");
printf(" - We're just tracking how many references exist\n");
printf(" - No other memory needs to be synchronized yet\n");
printf("\nWhy acq_rel for decrement to zero?\n");
printf(" - When reaching zero, we delete the object\n");
printf(" - Must ensure all prior writes to the object are visible\n");
printf("\nWhy acquire fence before delete?\n");
printf(" - Extra safety to guarantee the deleting thread sees everything\n");
}
int main() {
multi_thread_test();
performance_comparison();
memory_order_explanation();
printf("\n=== All Examples Complete ===\n");
return 0;
}
性能陷阱(真正决定工程成败的地方)¶
在嵌入式中,引用计数的成本不仅是"一次 ++ 或 --",而是下面这些常被忽视的问题:
- 原子操作的成本:在多核系统上,原子写会导致缓存行在核心间跳动(cache line bouncing),这在高频短寿命对象上极昂贵。单核系统上,atomic 也可能被实现为普通指令,但在有中断的环境下仍需小心。
- 伪共享 / 对齐:把计数器与其它经常被访问的成员放在同一缓存行,会导致无谓的同步开销。把计数器独立对齐到缓存行边界可以显著提升并发性能(代价是更多内存)。
- 控制块的额外跳转:非侵入式需要额外一次堆读取(control block),每次拷贝/赋值都可能触发内存访问,影响速度和能耗。
- 短寿命对象竞争:如果对象在多个线程中频繁 acquire/release,原子竞争会成为瓶颈。此类场景下,考虑对象池(reuse),或尽量限制共享的粒度。
- 中断与锁:在 ISR 中修改计数器时,若不能使用 atomics(或 atomics 不可用),可能需要在临界区禁中断,这会影响中断延迟。
简而言之:如果你的对象经常在多核、多任务间短时间内大量拷贝引用,引用计数会吃掉 CPU 时间;如果对象相对长寿(比如资源句柄),引用计数的开销通常是可以接受的。
嵌入式实践建议¶
如果你在做嵌入式 C++,可以按下面的优先级来选择实现策略
先判断是否有并发与中断场景。如果没有并发,使用侵入式非原子计数最省。若有并发但系统是单核且可在关键段禁中断,把计数器的修改放在关中断的临界区里(成本比全原子低,适合对延迟敏感但只有少量竞争的场景)。对真正的多核并发,使用 std::atomic 的实现,尽量使用侵入式以减少内存访问;如果不能侵入并且堆分配昂贵,考虑对象池或预分配 control-block 的策略。
此外,要避免循环引用:在父子关系里,父持有子强引用,子持父弱引用;或者在文档中明确约定谁负责生命周期(simplicity beats cleverness)。
简单性能微基准建议¶
不需要复杂工具,写个基准程序测两件事:单线程下每次拷贝/销毁耗时(测 new+sharedptr vs intrusive),以及 N 线程同时频繁拷贝/释放的吞吐。测量使用 std::chrono::steady_clock,而且要注意热身和大量重复(百万级)以消除计时误差。注意记录 CPU 占用和功耗(若设备可测)。
往深里走(补充注意点)¶
如果你要把 weak_ptr 做到位,需要维护第二个弱引用计数,且在最后一个强引用消失时不要删除 control block 直到弱引用也为 0。这会增加复杂度,但解决循环引用是必要的。还有一种替代方案是观察者(observer)+手动释放,以及明确的生命周期层级(owner/borrower)策略:在嵌入式中,明确的所有权往往比自动垃圾更可靠。
结语:在嵌入式里,引用计数既是救星也是麻烦¶
引用计数给我们提供了容易理解的资源管理模型,但在资源受限、高并发、实时性敏感的嵌入式世界里它也需要被谨慎使用。我的经验是:在短任务、低并发场景下用侵入式的简单实现;在多核复杂场景下用原子并结合对象池或上下文限定(减少全局共享);永远别忘了防环(weak/refactor)和测量(measure, measure, measure)。写好单元测试和压力测试,让你的引用计数在真实负载下过关。