Zephyr RTOS 提供了丰富的多线程和同步原语,支持构建复杂的并发应用。本文深入探讨 Zephyr 的线程管理、同步机制和消息传递。
#include <zephyr/kernel.h>
// 定义线程栈
K_THREAD_STACK_DEFINE(my_thread_stack, 1024);
// 线程数据结构
static struct k_thread my_thread_data;
// 线程函数
void my_thread_entry(void *p1, void *p2, void *p3)
{
while (1) {
// 线程任务
k_sleep(K_MSEC(100));
}
}
// 启动线程
k_thread_create(&my_thread_data, my_thread_stack,
K_THREAD_STACK_SIZEOF(my_thread_stack),
my_thread_entry, NULL, NULL, NULL,
7, 0, K_NO_WAIT);
// 参数:名称、栈大小、入口函数、参数1/2/3、优先级、选项、延迟
K_THREAD_DEFINE(my_thread, 1024, my_thread_entry,
NULL, NULL, NULL,
7, 0, 0); // 优先级 7,无选项,立即启动
Zephyr 使用数值优先级,数值越小优先级越高:
| 优先级范围 | 描述 |
|---|---|
| 0 | 最高优先级(协作线程) |
| 0 - 15 | 协作线程(不可抢占) |
| 16+ | 抢占线程 |
// 协作线程(优先级 < CONFIG_NUM_COOP_PRIORITIES)
K_THREAD_DEFINE(coop_thread, 512, coop_func, NULL, NULL, NULL,
-5, 0, 0); // 负数表示协作线程
// 抢占线程
K_THREAD_DEFINE(preempt_thread, 1024, preempt_func, NULL, NULL, NULL,
7, 0, 0); // 正数表示抢占线程
// 常用线程选项
#define K_ESSENTIAL (1 << 0) // 必要线程,崩溃会导致系统重启
#define K_FP_REGS (1 << 1) // 使用浮点寄存器
#define K_SSE_REGS (1 << 2) // 使用 SSE 寄存器
#define K_USER (1 << 3) // 用户模式线程
#define K_INHERIT_PERMS (1 << 4) // 继承父线程权限
K_THREAD_DEFINE(fp_thread, 1024, fp_func, NULL, NULL, NULL,
7, K_FP_REGS, 0); // 使用浮点
// 挂起线程
k_thread_suspend(k_thread_get(thread_id));
// 恢复线程
k_thread_resume(k_thread_get(thread_id));
// 终止线程(自终止)
k_thread_abort(k_current_get());
// 设置优先级
k_thread_priority_set(k_current_get(), 10);
// 让出 CPU
k_yield();
// 让出并等待唤醒
k_sleep(K_FOREVER); // 休眠直到被唤醒
// 获取当前线程
k_tid_t tid = k_current_get();
// 获取线程名称
const char *name = k_thread_name_get(tid);
// 获取线程优先级
int prio = k_thread_priority_get(tid);
// 获取线程状态
// Zephyr 不直接提供状态 API,可通过调试工具查看
#include <zephyr/kernel.h>
// 定义信号量
K_SEM_DEFINE(my_sem, 0, 1); // 初始值 0,最大值 1
// 或动态创建
struct k_sem my_sem;
k_sem_init(&my_sem, 0, 1);
// 等待信号量
k_sem_take(&my_sem, K_FOREVER); // 永久等待
k_sem_take(&my_sem, K_MSEC(100)); // 超时等待
// 释放信号量
k_sem_give(&my_sem);
// 重置信号量
k_sem_reset(&my_sem);
// 获取当前计数
unsigned int count = k_sem_count_get(&my_sem);
K_SEM_DEFINE(data_ready, 0, 1);
K_SEM_DEFINE(buffer_free, 1, 1);
static uint8_t buffer[256];
void producer_thread(void)
{
while (1) {
// 等待缓冲区空闲
k_sem_take(&buffer_free, K_FOREVER);
// 产生数据
fill_buffer(buffer);
// 通知消费者
k_sem_give(&data_ready);
}
}
void consumer_thread(void)
{
while (1) {
// 等待数据就绪
k_sem_take(&data_ready, K_FOREVER);
// 处理数据
process_buffer(buffer);
// 通知生产者缓冲区已空闲
k_sem_give(&buffer_free);
}
}
#include <zephyr/kernel.h>
// 定义互斥锁
K_MUTEX_DEFINE(my_mutex);
// 或动态创建
struct k_mutex my_mutex;
k_mutex_init(&my_mutex);
// 加锁
k_mutex_lock(&my_mutex, K_FOREVER);
// 临界区代码
shared_resource++;
// 解锁
k_mutex_unlock(&my_mutex);
// Zephyr 互斥锁支持优先级继承,防止优先级反转
// 当高优先级线程等待低优先级线程持有的锁时
// 低优先级线程临时提升到高优先级
K_MUTEX_DEFINE(resource_mutex);
void high_priority_thread(void)
{
k_mutex_lock(&resource_mutex, K_FOREVER);
// 使用资源
k_mutex_unlock(&resource_mutex);
}
void low_priority_thread(void)
{
k_mutex_lock(&resource_mutex, K_FOREVER);
// 此时高优先级线程等待此锁
// 此线程优先级会被临时提升
// 使用资源
k_mutex_unlock(&resource_mutex);
// 解锁后恢复原优先级
}
#include <zephyr/kernel.h>
K_MUTEX_DEFINE(cond_mutex);
K_CONDVAR_DEFINE(data_condvar);
static bool data_ready = false;
void wait_for_data(void)
{
k_mutex_lock(&cond_mutex, K_FOREVER);
while (!data_ready) {
// 等待条件变量信号
k_condvar_wait(&data_condvar, &cond_mutex, K_FOREVER);
}
// 处理数据
process_data();
data_ready = false;
k_mutex_unlock(&cond_mutex);
}
void signal_data(void)
{
k_mutex_lock(&cond_mutex, K_FOREVER);
data_ready = true;
k_condvar_signal(&data_condvar); // 唤醒一个等待线程
// 或 k_condvar_broadcast(&data_condvar); // 唤醒所有等待线程
k_mutex_unlock(&cond_mutex);
}
#include <zephyr/kernel.h>
// 定义事件
K_EVENT_DEFINE(my_event);
// 等待事件
#define EVENT_BIT_0 (1 << 0)
#define EVENT_BIT_1 (1 << 1)
// 等待任意一个事件
uint32_t events = k_event_wait(&my_event,
EVENT_BIT_0 | EVENT_BIT_1,
false, // 等待任意一个
K_FOREVER);
// 等待所有事件
events = k_event_wait_all(&my_event,
EVENT_BIT_0 | EVENT_BIT_1,
false, // 不清除
K_FOREVER);
// 发送事件
k_event_post(&my_event, EVENT_BIT_0);
// 清除事件
k_event_clear(&my_event, EVENT_BIT_0);
// 设置事件(清除所有后设置)
k_event_set(&my_event, EVENT_BIT_1);
#include <zephyr/kernel.h>
struct k_completion my_comp;
void async_operation(void)
{
// 执行异步操作
do_work();
// 标记完成
k_completion_complete(&my_comp);
}
void wait_for_completion(void)
{
// 初始化
k_completion_init(&my_comp);
// 启动异步操作(可能在另一个线程)
start_async_op();
// 等待完成
k_completion_wait(&my_comp, K_FOREVER);
}
#include <zephyr/kernel.h>
// 定义消息类型
struct sensor_msg {
int16_t temperature;
int16_t humidity;
uint32_t timestamp;
};
// 定义消息队列
K_MSGQ_DEFINE(sensor_msgq, sizeof(struct sensor_msg), 10, 4);
// 发送消息
void send_sensor_data(struct sensor_msg *msg)
{
// 普通发送(队列满时等待)
k_msgq_put(&sensor_msgq, msg, K_FOREVER);
// 立即发送(队列满时丢弃最旧消息)
// k_msgq_put(&sensor_msgq, msg, K_NO_WAIT);
}
// 接收消息
void receive_sensor_data(void)
{
struct sensor_msg msg;
while (1) {
// 等待消息
if (k_msgq_get(&sensor_msgq, &msg, K_FOREVER) == 0) {
process_message(&msg);
}
}
}
// 清空队列
k_msgq_purge(&sensor_msgq);
// 获取队列信息
int pending = k_msgq_num_used_get(&sensor_msgq);
int free_slots = k_msgq_num_free_get(&sensor_msgq);
邮箱可以发送更大的数据块:
#include <zephyr/kernel.h>
K_MBOX_DEFINE(my_mailbox);
struct k_mbox_msg send_msg = {
.size = sizeof(my_data),
.tx_data = &my_data,
.rx_source_thread = K_ANY,
};
// 发送邮件
k_mbox_put(&my_mailbox, &send_msg, K_FOREVER);
// 接收邮件
struct k_mbox_msg recv_msg = {
.size = sizeof(my_data),
.rx_data = &my_data,
.rx_target_thread = K_ANY,
};
k_mbox_get(&my_mailbox, &recv_msg, &my_data, K_FOREVER);
用于字节流传输:
#include <zephyr/kernel.h>
K_PIPE_DEFINE(my_pipe, 1024, 4); // 1024 字节缓冲,4 字节对齐
// 写入管道
size_t bytes_written;
k_pipe_put(&my_pipe, data, sizeof(data),
&bytes_written, sizeof(data), K_FOREVER);
// 读取管道
size_t bytes_read;
k_pipe_get(&my_pipe, buffer, sizeof(buffer),
&bytes_read, sizeof(buffer), K_FOREVER);
#include <zephyr/kernel.h>
// 定义环形缓冲区
RING_BUF_DECLARE(my_ringbuf, 256); // 256 字节
// 写入数据
uint8_t data[] = {1, 2, 3, 4, 5};
int written = ring_buf_put(&my_ringbuf, data, sizeof(data));
// 读取数据
uint8_t buffer[256];
int read = ring_buf_get(&my_ringbuf, buffer, sizeof(buffer));
// 查看数据(不移除)
int available = ring_buf_size_get(&my_ringbuf);
// 重置
ring_buf_reset(&my_ringbuf);
#include <zephyr/kernel.h>
// 定义数据项结构
struct data_item {
void *fifo_reserved; // FIFO 保留字段(必须放在第一位)
int value;
};
// 定义 FIFO
K_FIFO_DEFINE(my_fifo);
// 入队
struct data_item *item = k_malloc(sizeof(struct data_item));
item->value = 42;
k_fifo_put(&my_fifo, item);
// 出队
struct data_item *rx_item = k_fifo_get(&my_fifo, K_FOREVER);
if (rx_item != NULL) {
process_item(rx_item);
k_free(rx_item);
}
// 批量入队
k_fifo_put_list(&my_fifo, item_list, last_item);
#include <zephyr/kernel.h>
// 定时器回调
void timer_handler(struct k_timer *timer)
{
// 定时器到期时的回调
// 注意:在中断上下文执行,不要调用阻塞函数
k_sem_give(&data_sem);
}
// 定义定时器
K_TIMER_DEFINE(my_timer, timer_handler, NULL);
// 启动单次定时器
k_timer_start(&my_timer, K_SECONDS(5), K_NO_WAIT);
// 启动周期定时器
k_timer_start(&my_timer, K_SECONDS(1), K_SECONDS(1));
// 停止定时器
k_timer_stop(&my_timer);
// 获取剩余时间
k_ticks_t remaining = k_timer_remaining_ticks(&my_timer);
#include <zephyr/kernel.h>
// 工作处理函数
void work_handler(struct k_work *work)
{
// 执行延迟任务(在线程上下文)
process_data();
}
// 定义延迟工作
K_WORK_DELAYABLE_DEFINE(delayed_work, work_handler);
// 调度延迟工作(5秒后执行)
k_work_schedule(&delayed_work, K_SECONDS(5));
// 取消延迟工作
k_work_cancel_delayable(&delayed_work);
// 立即执行
k_work_submit(&delayed_work.work);
#include <zephyr/kernel.h>
// 定义工作队列
K_THREAD_STACK_DEFINE(workq_stack, 1024);
static struct k_work_q my_workq;
// 工作项
struct my_work {
struct k_work work;
int data;
};
static struct my_work my_work_item;
void work_handler(struct k_work *work)
{
struct my_work *item = CONTAINER_OF(work, struct my_work, work);
// 处理工作
process(item->data);
}
void init_work_queue(void)
{
// 启动工作队列
k_work_queue_start(&my_workq, workq_stack,
K_THREAD_STACK_SIZEOF(workq_stack),
7, NULL);
// 初始化工作项
k_work_init(&my_work_item.work, work_handler);
my_work_item.data = 42;
// 提交工作到队列
k_work_submit_to_queue(&my_workq, &my_work_item.work);
}
#include <zephyr/kernel.h>
#include <zephyr/drivers/sensor.h>
/* 定义 */
K_THREAD_STACK_DEFINE(sensor_stack, 1024);
K_THREAD_STACK_DEFINE(processor_stack, 1024);
K_THREAD_STACK_DEFINE(communicator_stack, 2048);
static struct k_thread sensor_thread_data;
static struct k_thread processor_thread_data;
static struct k_thread communicator_thread_data;
/* 消息队列 */
struct sensor_data {
float temperature;
float humidity;
int64_t timestamp;
};
K_MSGQ_DEFINE(sensor_msgq, sizeof(struct sensor_data), 16, 4);
K_MSGQ_DEFINE(tx_msgq, sizeof(struct sensor_data), 8, 4);
/* 同步 */
K_SEM_DEFINE(sensor_sem, 0, 1);
K_MUTEX_DEFINE(config_mutex);
/* 配置 */
static volatile int sample_interval_ms = 1000;
/* 传感器线程 */
void sensor_thread_fn(void *p1, void *p2, void *p3)
{
const struct device *sensor = DEVICE_DT_GET(DT_NODELABEL(bme280));
struct sensor_data data;
struct sensor_value temp, hum;
while (1) {
// 读取传感器
sensor_sample_fetch(sensor);
sensor_channel_get(sensor, SENSOR_CHAN_AMBIENT_TEMP, &temp);
sensor_channel_get(sensor, SENSOR_CHAN_HUMIDITY, &hum);
// 填充数据
data.temperature = sensor_value_to_float(&temp);
data.humidity = sensor_value_to_float(&hum);
data.timestamp = k_uptime_get();
// 发送到处理队列
k_msgq_put(&sensor_msgq, &data, K_NO_WAIT);
// 等待下一次采样
k_mutex_lock(&config_mutex, K_FOREVER);
int interval = sample_interval_ms;
k_mutex_unlock(&config_mutex);
k_sleep(K_MSEC(interval));
}
}
/* 处理线程 */
void processor_thread_fn(void *p1, void *p2, void *p3)
{
struct sensor_data data;
while (1) {
// 等待数据
if (k_msgq_get(&sensor_msgq, &data, K_FOREVER) == 0) {
// 数据处理(如滤波、校准)
data.temperature = calibrate(data.temperature);
// 发送到通信队列
k_msgq_put(&tx_msgq, &data, K_NO_WAIT);
}
}
}
/* 通信线程 */
void communicator_thread_fn(void *p1, void *p2, void *p3)
{
struct sensor_data data;
while (1) {
// 等待数据
if (k_msgq_get(&tx_msgq, &data, K_SECONDS(30)) == 0) {
// 通过 BLE/LTE 发送
send_via_ble(&data);
}
}
}
/* 主函数 */
int main(void)
{
// 创建线程
k_thread_create(&sensor_thread_data, sensor_stack,
K_THREAD_STACK_SIZEOF(sensor_stack),
sensor_thread_fn, NULL, NULL, NULL,
7, 0, K_NO_WAIT);
k_thread_name_set(&sensor_thread_data, "sensor");
k_thread_create(&processor_thread_data, processor_stack,
K_THREAD_STACK_SIZEOF(processor_stack),
processor_thread_fn, NULL, NULL, NULL,
7, 0, K_NO_WAIT);
k_thread_name_set(&processor_thread_data, "processor");
k_thread_create(&communicator_thread_data, communicator_stack,
K_THREAD_STACK_SIZEOF(communicator_stack),
communicator_thread_fn, NULL, NULL, NULL,
7, 0, K_NO_WAIT);
k_thread_name_set(&communicator_thread_data, "communicator");
return 0;
}
# 启用堆栈分析
CONFIG_THREAD_STACK_INFO=y
CONFIG_THREAD_NAME=y
CONFIG_THREAD_MONITOR=y
// 检查堆栈使用
size_t unused = k_thread_stack_space_get(k_current_get());
printk("Stack unused: %zu bytes\n", unused);
CONFIG_THREAD_RUNTIME_STATS=y
CONFIG_THREAD_RUNTIME_STATS_USE_TIMING_FUNCTIONS=y
#include <zephyr/debug/thread_stats.h>
struct k_thread_runtime_stats stats;
k_thread_runtime_stats_get(k_current_get(), &stats);
printk("CPU cycles: %llu\n", stats.execution_cycles);
printk("CPU usage: %llu%%\n", stats.usage);
CONFIG_THREAD_ANALYZER=y
CONFIG_THREAD_ANALYZER_USE_PRINTK=y
CONFIG_THREAD_ANALYZER_AUTO=y
CONFIG_THREAD_ANALYZER_AUTO_INTERVAL=5
| 机制 | 用途 |
|---|---|
| Semaphore | 计数资源、事件通知 |
| Mutex | 互斥访问共享资源 |
| Condvar | 复杂条件等待 |
| Event | 多事件位等待 |
| Message Queue | 结构化消息传递 |
| FIFO | 高性能数据队列 |
| Work Queue | 延迟和后台任务 |
*学习日期: 2026-03-21* *小白 🤖*