回调机制的零开销实现¶
引言¶
上一章我们讨论了std::function和函数指针的权衡。std::function功能强大但有运行时开销,函数指针零开销但无法携带上下文。在嵌入式开发中,我们经常需要既能捕获上下文,又没有动态分配和间接调用开销的回调机制。
好消息是:C++的模板系统给了我们实现这种"两全其美"的工具。通过类型擦除和小对象优化,我们可以自己实现一个零开销的回调容器。
一句话总结:通过模板元编程和精心设计,可以实现既能存储任意可调用对象,又无堆分配、调用开销可完全内联的回调机制。
问题分析:std::function的开销来源¶
在实现零开销版本之前,先理解std::function的开销到底来自哪里。
三个主要开销¶
- 类型擦除的存储成本
std::function<int(int)> f = [](int x) { return x * 2; };
// f内部存储:
// - 函数指针/调用器指针
// - 可能的管理对象指针
// - 小对象优化缓冲区(通常16-32字节)
- 动态分配的可能性
// 大型Lambda:捕获太多变量,超出SOO
std::function<void()> f = [big = std::array<int, 100>()](){ /*...*/ };
// 编译器需要在堆上分配存储
- 间接调用
零开销的目标¶
- 编译期多态:不同类型实例化不同模板,无虚函数
- 无堆分配:固定大小,使用栈/静态存储
- 可内联:编译器能看到完整调用链
方案一:模板化回调(编译期多态)¶
最简单直接的方式:用模板代替类型擦除。
基本实现¶
template<typename Signature>
class Callback;
template<typename R, typename... Args>
class Callback<R(Args...)> {
// 抽象接口
struct Concept {
virtual ~Concept() = default;
virtual R invoke(Args...) = 0;
};
// 具体模型:存储任意可调用对象
template<typename T>
struct Model : Concept {
T callable;
Model(T&& c) : callable(std::forward<T>(c)) {}
R invoke(Args... args) override {
return callable(args...);
}
};
std::unique_ptr<Concept> object; // 问题:堆分配!
public:
template<typename T>
Callback(T&& t) : object(std::make_unique<Model<T>>(std::forward<T>(t)) {}
R operator()(Args... args) {
return object->invoke(args...);
}
};
问题:堆分配!虽然在嵌入式可以用自定义分配器,但仍不是零开销。
改进:固定大小存储¶
template<typename Signature, size_t StorageSize = 32>
class SmallCallback;
template<size_t StorageSize, typename R, typename... Args>
class SmallCallback<R(Args...), StorageSize> {
struct Concept {
virtual ~Concept() = default;
virtual R invoke(Args...) = 0;
virtual void move_to(void* dest) = 0;
};
template<typename T>
struct Model : Concept {
T callable;
Model(T&& c) : callable(std::forward<T>(c)) {}
R invoke(Args... args) override {
return callable(std...);
}
void move_to(void* dest) override {
new(dest) Model(std::move(callable));
}
};
// 固定大小的栈上存储
alignas(std::max_align_t) std::byte storage[StorageSize];
Concept* object = nullptr;
public:
SmallCallback() = default;
template<typename T>
SmallCallback(T&& t) {
static_assert(sizeof(T) <= StorageSize, "Callable too large");
object = new(storage) Model<T>(std::forward<T>(t));
}
~SmallCallback() {
if (object) object->~Concept();
}
SmallCallback(const SmallCallback&) = delete;
SmallCallback(SmallCallback&& other) noexcept {
if (other.object) {
other.object->move_to(storage);
object = reinterpret_cast<Concept*>(storage);
other.object = nullptr;
}
}
R operator()(Args... args) {
return object->invoke(args...);
}
};
改进:无堆分配,但仍用虚函数(间接调用)。
方案二:完全零开销的内联回调¶
关键洞察:大多数回调在注册时就确定类型,不需要运行时多态。
核心思想:直接存储可调用对象¶
template<typename Signature>
class InlineCallback;
template<typename R, typename... Args>
class InlineCallback<R(Args...)> {
// 不用虚函数,直接存储
struct CallableBase {
virtual ~CallableBase() = default;
virtual R invoke(Args...) = 0;
};
template<typename T>
struct CallableImpl : CallableBase {
T callable;
CallableImpl(T&& c) : callable(std::forward<T>(c)) {}
R invoke(Args... args) override { return callable(args...); }
};
std::unique_ptr<CallableBase> impl; // 还是堆分配
};
这样没解决问题。让我们换个思路。
真正的零开销:模板化容器¶
// 回调容器:直接存储特定类型
template<typename T>
class CallbackBox {
T callable;
public:
CallbackBox(T&& c) : callable(std::forward<T>(c)) {}
template<typename... Args>
auto operator()(Args&&... args) -> decltype(callable(args...)) {
return callable(std::forward<Args>(args)...);
}
};
// 使用:类型明确,零开销
CallbackBox lambda([](int x) { return x * 2; });
int result = lambda(21); // 完全内联
问题:每个CallbackBox是不同类型,不能放进同一容器。
方案三:自定义类型擦除(推荐)¶
结合前两种方案的优势:手动实现类型擦除,避免虚函数和堆分配。
实现原理¶
不用虚函数表,改用函数指针表:
template<typename Signature, size_t Size = 32>
class ZeroCallback;
template<size_t Size, typename R, typename... Args>
class ZeroCallback<R(Args...), Size> {
// 操作表:函数指针而非虚函数
struct VTable {
void (*move)(void* dest, void* src);
void (*destroy)(void* obj);
R (*invoke)(void* obj, Args...);
};
// 存储空间
alignas(std::max_align_t) std::byte storage[Size];
const VTable* vtable = nullptr;
// 为每个可调用类型生成VTable
template<typename T>
struct VTableFor {
static void do_move(void* dest, void* src) {
new(dest) T(std::move(*reinterpret_cast<T*>(src)));
}
static void do_destroy(void* obj) {
reinterpret_cast<T*>(obj)->~T();
}
static R do_invoke(void* obj, Args... args) {
return (*reinterpret_cast<T*>(obj))(args...);
}
static constexpr VTable value = {do_move, do_destroy, do_invoke};
};
public:
ZeroCallback() = default;
// 构造:任意可调用对象
template<typename T>
ZeroCallback(T&& callable) {
using TDecay = std::decay_t<T>;
static_assert(sizeof(TDecay) <= Size, "Callable too large");
new(storage) TDecay(std::forward<T>(callable));
vtable = &VTableFor<TDecay>::value;
}
// 移动构造
ZeroCallback(ZeroCallback&& other) noexcept : vtable(other.vtable) {
if (vtable) {
vtable->move(storage, other.storage);
other.vtable = nullptr;
}
}
// 析构
~ZeroCallback() {
if (vtable) {
vtable->destroy(storage);
}
}
// 禁止拷贝
ZeroCallback(const ZeroCallback&) = delete;
ZeroCallback& operator=(const ZeroCallback&) = delete;
// 调用
R operator()(Args... args) const {
return vtable->invoke(const_cast<std::byte*>(storage), args...);
}
bool empty() const { return vtable == nullptr; }
};
查看完整可编译示例
// Zero Overhead Callback Implementation
// Demonstrates a type-erased callback with manual vtable
#include <iostream>
#include <cstdint>
#include <cstring>
template<typename Signature, size_t Size = 32>
class ZeroCallback;
template<size_t Size, typename R, typename... Args>
class ZeroCallback<R(Args...), Size> {
// VTable with function pointers instead of virtual functions
struct VTable {
void (*move)(void* dest, void* src);
void (*destroy)(void* obj);
R (*invoke)(void* obj, Args...);
};
// Storage for the callable
alignas(std::max_align_t) std::byte storage[Size];
const VTable* vtable = nullptr;
// Generate VTable for each callable type
template<typename T>
struct VTableFor {
static void do_move(void* dest, void* src) {
new(dest) T(std::move(*reinterpret_cast<T*>(src)));
}
static void do_destroy(void* obj) {
reinterpret_cast<T*>(obj)->~T();
}
static R do_invoke(void* obj, Args... args) {
return (*reinterpret_cast<T*>(obj))(args...);
}
static constexpr VTable value = {do_move, do_destroy, do_invoke};
};
public:
ZeroCallback() = default;
// Accept any callable object
template<typename T>
ZeroCallback(T&& callable) {
using TDecay = std::decay_t<T>;
static_assert(sizeof(TDecay) <= Size, "Callable too large for ZeroCallback");
new(storage) TDecay(std::forward<T>(callable));
vtable = &VTableFor<TDecay>::value;
}
// Move constructor
ZeroCallback(ZeroCallback&& other) noexcept : vtable(other.vtable) {
if (vtable) {
vtable->move(storage, other.storage);
other.vtable = nullptr;
}
}
// Destructor
~ZeroCallback() {
if (vtable) {
vtable->destroy(storage);
}
}
// No copy
ZeroCallback(const ZeroCallback&) = delete;
ZeroCallback& operator=(const ZeroCallback&) = delete;
// Invoke the callback
R operator()(Args... args) const {
return vtable->invoke(const_cast<std::byte*>(storage), args...);
}
bool empty() const { return vtable == nullptr; }
};
// Demo usage
int main() {
std::cout << "=== Zero Overhead Callback Demo ===" << std::endl;
// Store a simple lambda
ZeroCallback<int(int)> cb = [](int x) { return x * 2; };
std::cout << "cb(21) = " << cb(21) << std::endl; // 42
// Store a lambda with capture
int multiplier = 10;
ZeroCallback<int(int)> captured = [multiplier](int x) {
return x * multiplier;
};
std::cout << "captured(5) = " << captured(5) << std::endl; // 50
// Store a function pointer
auto add = [](int a, int b) { return a + b; };
ZeroCallback<int(int, int), 16> fp_cb = add;
std::cout << "fp_cb(3, 4) = " << fp_cb(3, 4) << std::endl; // 7
// Size comparison
std::cout << "\n=== Size Comparison ===" << std::endl;
std::cout << "sizeof(ZeroCallback<int(int), 32>) = "
<< sizeof(ZeroCallback<int(int), 32>) << " bytes" << std::endl;
std::cout << "sizeof(std::function<int(int)>) = "
<< sizeof(std::function<int(int)>) << " bytes" << std::endl;
std::cout << "sizeof(int(*)(int)) = " << sizeof(int(*)(int)) << " bytes" << std::endl;
// Performance characteristics
std::cout << "\n=== Performance Characteristics ===" << std::endl;
std::cout << "- No heap allocation (for small callables)" << std::endl;
std::cout << "- One level of indirection (function pointer)" << std::endl;
std::cout << "- Can be inlined by compiler in some cases" << std::endl;
std::cout << "- Fixed size, stack-allocated storage" << std::endl;
return 0;
}
使用示例¶
// 存储Lambda
ZeroCallback<int(int)> cb = [](int x) { return x * 2; };
int result = cb(21); // result = 42
// 存储捕获上下文的Lambda
int counter = 0;
ZeroCallback<void()> counter_cb = [counter]() mutable {
counter++;
};
// 存储函数指针
int add(int a, int b) { return a + b; }
ZeroCallback<int(int, int)> fp_cb = add;
性能特性¶
| 特性 | 说明 |
|---|---|
| 大小 | Size参数控制(默认32字节) |
| 堆分配 | 无,全部栈上 |
| 间接调用 | 一层(通过函数指针) |
| 内联机会 | 中等(编译器可能内联函数指针指向的函数) |
方案四:编译期固定回调(极致性能)¶
如果回调类型在编译期完全确定,可以用更激进的模板。
延迟实例化模板¶
// 回调注册器:在回调类型确定时才实例化
template<typename Signature>
class CallbackRegistry;
template<typename R, typename... Args>
class CallbackRegistry<R(Args...)> {
// 不存储回调,而是接受模板参数的函数
public:
template<typename Callback>
static void register_callback(Callback&& cb) {
// 直接使用cb,编译期类型已知
current_callback = std::forward<Callback>(cb);
}
static R invoke(Args... args) {
return current_callback(args...);
}
private:
// 问题:current_callback类型不确定
};
这不能直接工作。我们需要另一种方法。
静态多态:CRTP模式¶
// 基类模板
template<typename Derived>
class CallbackProvider {
public:
template<typename... Args>
auto callback(Args&&... args) {
return static_cast<Derived*>(this)->invoke(std::forward<Args>(args)...);
}
};
// 用户派生
class MyCallbacks : public CallbackProvider<MyCallbacks> {
public:
int invoke(int x) {
return x * 2;
}
};
这在某些场景有效,但限制了灵活性。
最终方案:混合策略¶
// 小回调:栈上存储,无堆分配
template<typename Signature, size_t Size = 32>
using StackCallback = ZeroCallback<Signature, Size>;
// 快速回调:模板化,编译期类型
template<typename Signature>
struct FastCallback;
template<typename R, typename... Args>
struct FastCallback<R(Args...)> {
// 直接存储函数指针(无上下文)
using FnPtr = R(*)(Args...);
FnPtr ptr = nullptr;
FastCallback() = default;
FastCallback(FnPtr p) : ptr(p) {}
R operator()(Args... args) const {
return ptr(args...);
}
bool empty() const { return ptr == nullptr; }
};
// 使用:根据需求选择
void example() {
// 需要上下文:用StackCallback
int ctx = 42;
StackCallback<int(int)> cb1 = [ctx](int x) { return x + ctx; };
// 无需上下文:用FastCallback(纯函数指针)
FastCallback<int(int, int)> cb2 = [](int a, int b) { return a + b; };
}
嵌入式实战:零开销事件系统¶
结合以上技术,构建一个完整的事件处理系统。
设计¶
#include <cstdint>
#include <array>
// 零开销回调类型
template<typename Signature, size_t Size = 24>
using Callback = ZeroCallback<Signature, Size>;
// 事件类型
enum class EventType : uint8_t {
GPIO_CHANGED,
ADC_COMPLETE,
TIMER_EXPIRED,
UART_RX,
MAX_EVENTS
};
// 事件系统
class EventSystem {
public:
using Handler = Callback<void(uint32_t), 24>;
struct Slot {
Handler callback;
uint32_t user_data;
Slot() = default;
Slot(Handler cb, uint32_t data = 0)
: callback(std::move(cb)), user_data(data) {}
};
// 注册事件处理器
bool register_handler(EventType evt, Handler cb, uint32_t data = 0) {
auto idx = static_cast<size_t>(evt);
if (handlers[idx].callback.empty()) {
handlers[idx] = {std::move(cb), data};
return true;
}
return false; // 槽已占用
}
// 触发事件(从中断调用)
void __attribute__((always_inline)) trigger(EventType evt, uint32_t param = 0) {
auto idx = static_cast<size_t>(evt);
if (!handlers[idx].callback.empty()) {
handlers[idx].callback(param | handlers[idx].user_data);
}
}
private:
std::array<Slot, static_cast<size_t>(EventType::MAX_EVENTS)> handlers;
};
查看完整可编译示例
// Zero Overhead Event System
// A complete event handling system for embedded systems
#include <iostream>
#include <array>
#include <cstdint>
#include <functional>
// ZeroCallback implementation
template<typename Signature, size_t Size = 24>
class ZeroCallback;
template<size_t Size, typename R, typename... Args>
class ZeroCallback<R(Args...), Size> {
struct VTable {
void (*move)(void* dest, void* src);
void (*destroy)(void* obj);
R (*invoke)(void* obj, Args...);
};
alignas(std::max_align_t) std::byte storage[Size];
const VTable* vtable = nullptr;
template<typename T>
struct VTableFor {
static void do_move(void* dest, void* src) {
new(dest) T(std::move(*reinterpret_cast<T*>(src)));
}
static void do_destroy(void* obj) {
reinterpret_cast<T*>(obj)->~T();
}
static R do_invoke(void* obj, Args... args) {
return (*reinterpret_cast<T*>(obj))(args...);
}
static constexpr VTable value = {do_move, do_destroy, do_invoke};
};
public:
ZeroCallback() = default;
template<typename T>
ZeroCallback(T&& callable) {
using TDecay = std::decay_t<T>;
static_assert(sizeof(TDecay) <= Size, "Callable too large");
new(storage) TDecay(std::forward<T>(callable));
vtable = &VTableFor<TDecay>::value;
}
ZeroCallback(ZeroCallback&& other) noexcept : vtable(other.vtable) {
if (vtable) {
vtable->move(storage, other.storage);
other.vtable = nullptr;
}
}
~ZeroCallback() {
if (vtable) {
vtable->destroy(storage);
}
}
ZeroCallback(const ZeroCallback&) = delete;
ZeroCallback& operator=(const ZeroCallback&) = delete;
R operator()(Args... args) const {
return vtable->invoke(const_cast<std::byte*>(storage), args...);
}
bool empty() const { return vtable == nullptr; }
};
// Event types
enum class EventType : uint8_t {
GPIO_CHANGED,
ADC_COMPLETE,
TIMER_EXPIRED,
UART_RX,
MAX_EVENTS
};
// Event system
class EventSystem {
public:
using Handler = ZeroCallback<void(uint32_t), 24>;
struct Slot {
Handler callback;
uint32_t user_data;
Slot() = default;
Slot(Handler cb, uint32_t data = 0)
: callback(std::move(cb)), user_data(data) {}
};
bool register_handler(EventType evt, Handler cb, uint32_t data = 0) {
auto idx = static_cast<size_t>(evt);
if (!handlers[idx].callback.empty()) {
return false; // Slot already occupied
}
handlers[idx] = {std::move(cb), data};
return true;
}
void trigger(EventType evt, uint32_t param = 0) {
auto idx = static_cast<size_t>(evt);
if (!handlers[idx].callback.empty()) {
handlers[idx].callback(param | handlers[idx].user_data);
}
}
private:
std::array<Slot, static_cast<size_t>(EventType::MAX_EVENTS)> handlers{};
};
// Button driver
class ButtonDriver {
int pin;
int press_count = 0;
public:
ButtonDriver(int p) : pin(p) {}
void init(EventSystem& events) {
events.register_handler(EventType::GPIO_CHANGED,
[this](uint32_t pins) {
if (pins & (1 << pin)) {
press_count++;
on_press();
}
});
}
void on_press() {
std::cout << "Button on pin " << pin << " pressed (count: "
<< press_count << ")" << std::endl;
}
};
// LED controller
class LEDController {
int pin;
bool state = false;
public:
LEDController(int p) : pin(p) {}
void init(EventSystem& events) {
events.register_handler(EventType::TIMER_EXPIRED,
[this](uint32_t) {
state = !state;
std::cout << "LED pin " << pin << " -> "
<< (state ? "ON" : "OFF") << std::endl;
});
}
};
// ADC handler
void setup_adc(EventSystem& events) {
events.register_handler(EventType::ADC_COMPLETE,
[](uint32_t channel) {
std::cout << "ADC conversion complete, channel: "
<< channel << ", value: " << (2048 + channel * 100) << std::endl;
});
}
int main() {
std::cout << "=== Zero Overhead Event System Demo ===" << std::endl;
EventSystem g_events;
// Initialize devices
ButtonDriver btn(5);
btn.init(g_events);
LEDController led(10);
led.init(g_events);
setup_adc(g_events);
// Simulate interrupts
std::cout << "\n--- Simulating Events ---" << std::endl;
g_events.trigger(EventType::GPIO_CHANGED, 0x20); // Button press
g_events.trigger(EventType::TIMER_EXPIRED, 0); // LED toggle
g_events.trigger(EventType::ADC_COMPLETE, 0); // ADC done
g_events.trigger(EventType::GPIO_CHANGED, 0x20); // Another button press
g_events.trigger(EventType::TIMER_EXPIRED, 0); // LED toggle
std::cout << "\n=== Performance Characteristics ===" << std::endl;
std::cout << "- No heap allocation (stack storage only)" << std::endl;
std::cout << "- Single indirection through function pointer" << std::endl;
std::cout << "- Compiler can inline in many cases" << std::endl;
std::cout << "- Suitable for interrupt handlers" << std::endl;
std::cout << "- Fixed overhead per callback (~24 bytes)" << std::endl;
return 0;
}
使用示例¶
// 全局事件系统
EventSystem g_events;
// 按键处理
class ButtonDriver {
int pin;
int press_count = 0;
public:
ButtonDriver(int p) : pin(p) {}
void init() {
// 注册回调,捕获this指针
g_events.register_handler(EventType::GPIO_CHANGED,
[this](uint32_t pins) {
if (pins & (1 << pin)) {
press_count++;
on_press();
}
});
}
void on_press() {
// 处理按键逻辑
}
};
// ADC处理(无需上下文,可用纯Lambda)
void setup_adc() {
g_events.register_handler(EventType::ADC_COMPLETE,
[](uint32_t channel) {
int16_t value = *ADC_DR;
process_adc_value(channel, value);
});
}
// 中断处理程序
extern "C" void EXTI_IRQHandler() {
uint32_t pending = *EXTI_PR;
*EXTI_PR = pending; // 清除标志
g_events.trigger(EventType::GPIO_CHANGED, pending);
}
性能分析¶
; 触发事件的汇编(-O2优化)
; 假设EventType::GPIO_CHANGED = 0
EXTI_IRQHandler:
ldr r0, [pc, #EXTI_PR] ; 加载EXTI_PR地址
ldr r1, [r0] ; 读取pending
str r1, [r0] ; 清除标志
; 调用trigger
ldr r0, =g_events ; 加载EventSystem地址
ldr r2, [r0, #0] ; 加载handlers[0]地址
cmp r2, #0 ; 检查是否为空
beq 1f
; 调用回调(通过函数指针,但可能被内联)
mov r0, r1 ; 参数:pending
blx r2 ; 间接调用
1: bx lr ; 返回
```
每个事件处理约10-15条指令,非常适合中断处理。
------
## 进阶:可扩展的多播系统
有时一个事件需要多个监听器。
### 实现
```cpp
template<typename Signature, size_t Size = 24, size_t MaxListeners = 4>
class MulticastCallback {
using Handler = Callback<Signature, Size>;
std::array<Handler, MaxListeners> handlers;
size_t count = 0;
public:
bool subscribe(Handler h) {
if (count >= MaxListeners) return false;
handlers[count++] = std::move(h);
return true;
}
template<typename... Args>
void publish(Args&&... args) {
for (size_t i = 0; i < count; ++i) {
handlers[i](args...);
}
}
};
// 使用
void example_multicast() {
MulticastCallback<void(int), 24, 4> adc_event;
// 订阅者1:记录日志
adc_event.subscribe([](int value) {
log_adc(value);
});
// 订阅者2:更新显示
adc_event.subscribe([](int value) {
display_update(value);
});
// 订阅者3:存储数据
adc_event.subscribe([](int value) {
buffer_push(value);
});
// 发布事件
adc_event.publish(2048);
}
```
------
## 对比总结
| 方案 | 堆分配 | 间接调用 | 通用性 | 复杂度 |
|------|--------|----------|--------|--------|
| 函数指针 | 无 | 1层 | 低(无上下文) | 简单 |
| std::function | 可能有 | 2层 | 高 | 中等 |
| ZeroCallback | 无 | 1层 | 高 | 中等 |
| 模板化容器 | 无 | 0层(可内联) | 中等 | 简单 |
### 选择建议
```cpp
// 1. 无上下文的热路径:函数指针
void fast_path(int (*cb)(int)) {
// 零开销调用
}
// 2. 有上下文的冷路径:std::function
void slow_path(std::function<void()> cb) {
// 通用但稍慢
}
// 3. 有上下文的热路径:ZeroCallback
void optimized_path(ZeroCallback<void(int), 24> cb) {
// 无堆分配,单层间接调用
}
// 4. 编译期确定的类型:模板
template<typename CB>
void compile_time_path(CB&& cb) {
// 完全零开销,可内联
}
小结¶
零开销回调机制的核心要点:
- 理解开销来源:堆分配、间接调用、类型擦除
- 选择合适方案:根据场景权衡通用性和性能
- 手动类型擦除:用函数指针表代替虚函数,减少开销
- 固定大小存储:避免堆分配,使用栈上缓冲区
- 编译期多态:模板化设计在编译期确定类型
在嵌入式开发中:
- 中断处理:用函数指针或ZeroCallback
- 事件系统:用固定大小的Callback容器
- 应用层:可用std::function牺牲性能换取便利
记住:零开销不是不花钱,而是"你没用到的特性不花钱"。通过精心设计,我们可以同时获得现代C++的便利和接近C的性能。