返回首页

Zephyr 多线程编程学习笔记

概述

Zephyr RTOS 提供了丰富的多线程和同步原语,支持构建复杂的并发应用。本文深入探讨 Zephyr 的线程管理、同步机制和消息传递。

线程管理

1. 线程定义

静态定义

#include <zephyr/kernel.h>

// 定义线程栈
K_THREAD_STACK_DEFINE(my_thread_stack, 1024);

// 线程数据结构
static struct k_thread my_thread_data;

// 线程函数
void my_thread_entry(void *p1, void *p2, void *p3)
{
    while (1) {
        // 线程任务
        k_sleep(K_MSEC(100));
    }
}

// 启动线程
k_thread_create(&my_thread_data, my_thread_stack,
                K_THREAD_STACK_SIZEOF(my_thread_stack),
                my_thread_entry, NULL, NULL, NULL,
                7, 0, K_NO_WAIT);

动态定义(K_THREAD_DEFINE 宏)

// 参数:名称、栈大小、入口函数、参数1/2/3、优先级、选项、延迟
K_THREAD_DEFINE(my_thread, 1024, my_thread_entry, 
                NULL, NULL, NULL, 
                7, 0, 0);  // 优先级 7,无选项,立即启动

2. 线程优先级

Zephyr 使用数值优先级,数值越小优先级越高

优先级范围描述
0最高优先级(协作线程)
0 - 15协作线程(不可抢占)
16+抢占线程
// 协作线程(优先级 < CONFIG_NUM_COOP_PRIORITIES)
K_THREAD_DEFINE(coop_thread, 512, coop_func, NULL, NULL, NULL, 
                -5, 0, 0);  // 负数表示协作线程

// 抢占线程
K_THREAD_DEFINE(preempt_thread, 1024, preempt_func, NULL, NULL, NULL,
                7, 0, 0);   // 正数表示抢占线程

3. 线程选项

// 常用线程选项
#define K_ESSENTIAL      (1 << 0)  // 必要线程,崩溃会导致系统重启
#define K_FP_REGS        (1 << 1)  // 使用浮点寄存器
#define K_SSE_REGS       (1 << 2)  // 使用 SSE 寄存器
#define K_USER           (1 << 3)  // 用户模式线程
#define K_INHERIT_PERMS  (1 << 4)  // 继承父线程权限

K_THREAD_DEFINE(fp_thread, 1024, fp_func, NULL, NULL, NULL,
                7, K_FP_REGS, 0);  // 使用浮点

4. 线程控制

// 挂起线程
k_thread_suspend(k_thread_get(thread_id));

// 恢复线程
k_thread_resume(k_thread_get(thread_id));

// 终止线程(自终止)
k_thread_abort(k_current_get());

// 设置优先级
k_thread_priority_set(k_current_get(), 10);

// 让出 CPU
k_yield();

// 让出并等待唤醒
k_sleep(K_FOREVER);  // 休眠直到被唤醒

5. 线程信息获取

// 获取当前线程
k_tid_t tid = k_current_get();

// 获取线程名称
const char *name = k_thread_name_get(tid);

// 获取线程优先级
int prio = k_thread_priority_get(tid);

// 获取线程状态
// Zephyr 不直接提供状态 API,可通过调试工具查看

同步原语

1. 信号量(Semaphore)

基本用法

#include <zephyr/kernel.h>

// 定义信号量
K_SEM_DEFINE(my_sem, 0, 1);  // 初始值 0,最大值 1

// 或动态创建
struct k_sem my_sem;
k_sem_init(&my_sem, 0, 1);

// 等待信号量
k_sem_take(&my_sem, K_FOREVER);  // 永久等待
k_sem_take(&my_sem, K_MSEC(100));  // 超时等待

// 释放信号量
k_sem_give(&my_sem);

// 重置信号量
k_sem_reset(&my_sem);

// 获取当前计数
unsigned int count = k_sem_count_get(&my_sem);

生产者-消费者示例

K_SEM_DEFINE(data_ready, 0, 1);
K_SEM_DEFINE(buffer_free, 1, 1);

static uint8_t buffer[256];

void producer_thread(void)
{
    while (1) {
        // 等待缓冲区空闲
        k_sem_take(&buffer_free, K_FOREVER);
        
        // 产生数据
        fill_buffer(buffer);
        
        // 通知消费者
        k_sem_give(&data_ready);
    }
}

void consumer_thread(void)
{
    while (1) {
        // 等待数据就绪
        k_sem_take(&data_ready, K_FOREVER);
        
        // 处理数据
        process_buffer(buffer);
        
        // 通知生产者缓冲区已空闲
        k_sem_give(&buffer_free);
    }
}

2. 互斥锁(Mutex)

基本用法

#include <zephyr/kernel.h>

// 定义互斥锁
K_MUTEX_DEFINE(my_mutex);

// 或动态创建
struct k_mutex my_mutex;
k_mutex_init(&my_mutex);

// 加锁
k_mutex_lock(&my_mutex, K_FOREVER);

// 临界区代码
shared_resource++;

// 解锁
k_mutex_unlock(&my_mutex);

优先级继承

// Zephyr 互斥锁支持优先级继承,防止优先级反转
// 当高优先级线程等待低优先级线程持有的锁时
// 低优先级线程临时提升到高优先级

K_MUTEX_DEFINE(resource_mutex);

void high_priority_thread(void)
{
    k_mutex_lock(&resource_mutex, K_FOREVER);
    // 使用资源
    k_mutex_unlock(&resource_mutex);
}

void low_priority_thread(void)
{
    k_mutex_lock(&resource_mutex, K_FOREVER);
    // 此时高优先级线程等待此锁
    // 此线程优先级会被临时提升
    // 使用资源
    k_mutex_unlock(&resource_mutex);
    // 解锁后恢复原优先级
}

3. 条件变量(Condition Variable)

#include <zephyr/kernel.h>

K_MUTEX_DEFINE(cond_mutex);
K_CONDVAR_DEFINE(data_condvar);

static bool data_ready = false;

void wait_for_data(void)
{
    k_mutex_lock(&cond_mutex, K_FOREVER);
    
    while (!data_ready) {
        // 等待条件变量信号
        k_condvar_wait(&data_condvar, &cond_mutex, K_FOREVER);
    }
    
    // 处理数据
    process_data();
    data_ready = false;
    
    k_mutex_unlock(&cond_mutex);
}

void signal_data(void)
{
    k_mutex_lock(&cond_mutex, K_FOREVER);
    
    data_ready = true;
    k_condvar_signal(&data_condvar);  // 唤醒一个等待线程
    // 或 k_condvar_broadcast(&data_condvar);  // 唤醒所有等待线程
    
    k_mutex_unlock(&cond_mutex);
}

4. 事件(Event)

#include <zephyr/kernel.h>

// 定义事件
K_EVENT_DEFINE(my_event);

// 等待事件
#define EVENT_BIT_0 (1 << 0)
#define EVENT_BIT_1 (1 << 1)

// 等待任意一个事件
uint32_t events = k_event_wait(&my_event, 
                               EVENT_BIT_0 | EVENT_BIT_1,
                               false,  // 等待任意一个
                               K_FOREVER);

// 等待所有事件
events = k_event_wait_all(&my_event,
                          EVENT_BIT_0 | EVENT_BIT_1,
                          false,  // 不清除
                          K_FOREVER);

// 发送事件
k_event_post(&my_event, EVENT_BIT_0);

// 清除事件
k_event_clear(&my_event, EVENT_BIT_0);

// 设置事件(清除所有后设置)
k_event_set(&my_event, EVENT_BIT_1);

5. 完成回调(Completion)

#include <zephyr/kernel.h>

struct k_completion my_comp;

void async_operation(void)
{
    // 执行异步操作
    do_work();
    
    // 标记完成
    k_completion_complete(&my_comp);
}

void wait_for_completion(void)
{
    // 初始化
    k_completion_init(&my_comp);
    
    // 启动异步操作(可能在另一个线程)
    start_async_op();
    
    // 等待完成
    k_completion_wait(&my_comp, K_FOREVER);
}

消息传递

1. 消息队列(Message Queue)

#include <zephyr/kernel.h>

// 定义消息类型
struct sensor_msg {
    int16_t temperature;
    int16_t humidity;
    uint32_t timestamp;
};

// 定义消息队列
K_MSGQ_DEFINE(sensor_msgq, sizeof(struct sensor_msg), 10, 4);

// 发送消息
void send_sensor_data(struct sensor_msg *msg)
{
    // 普通发送(队列满时等待)
    k_msgq_put(&sensor_msgq, msg, K_FOREVER);
    
    // 立即发送(队列满时丢弃最旧消息)
    // k_msgq_put(&sensor_msgq, msg, K_NO_WAIT);
}

// 接收消息
void receive_sensor_data(void)
{
    struct sensor_msg msg;
    
    while (1) {
        // 等待消息
        if (k_msgq_get(&sensor_msgq, &msg, K_FOREVER) == 0) {
            process_message(&msg);
        }
    }
}

// 清空队列
k_msgq_purge(&sensor_msgq);

// 获取队列信息
int pending = k_msgq_num_used_get(&sensor_msgq);
int free_slots = k_msgq_num_free_get(&sensor_msgq);

2. 消息邮箱(Mailbox)

邮箱可以发送更大的数据块:

#include <zephyr/kernel.h>

K_MBOX_DEFINE(my_mailbox);

struct k_mbox_msg send_msg = {
    .size = sizeof(my_data),
    .tx_data = &my_data,
    .rx_source_thread = K_ANY,
};

// 发送邮件
k_mbox_put(&my_mailbox, &send_msg, K_FOREVER);

// 接收邮件
struct k_mbox_msg recv_msg = {
    .size = sizeof(my_data),
    .rx_data = &my_data,
    .rx_target_thread = K_ANY,
};

k_mbox_get(&my_mailbox, &recv_msg, &my_data, K_FOREVER);

3. 管道(Pipe)

用于字节流传输:

#include <zephyr/kernel.h>

K_PIPE_DEFINE(my_pipe, 1024, 4);  // 1024 字节缓冲,4 字节对齐

// 写入管道
size_t bytes_written;
k_pipe_put(&my_pipe, data, sizeof(data), 
           &bytes_written, sizeof(data), K_FOREVER);

// 读取管道
size_t bytes_read;
k_pipe_get(&my_pipe, buffer, sizeof(buffer),
           &bytes_read, sizeof(buffer), K_FOREVER);

4. 环形缓冲区(Ring Buffer)

#include <zephyr/kernel.h>

// 定义环形缓冲区
RING_BUF_DECLARE(my_ringbuf, 256);  // 256 字节

// 写入数据
uint8_t data[] = {1, 2, 3, 4, 5};
int written = ring_buf_put(&my_ringbuf, data, sizeof(data));

// 读取数据
uint8_t buffer[256];
int read = ring_buf_get(&my_ringbuf, buffer, sizeof(buffer));

// 查看数据(不移除)
int available = ring_buf_size_get(&my_ringbuf);

// 重置
ring_buf_reset(&my_ringbuf);

5. FIFO 队列

#include <zephyr/kernel.h>

// 定义数据项结构
struct data_item {
    void *fifo_reserved;  // FIFO 保留字段(必须放在第一位)
    int value;
};

// 定义 FIFO
K_FIFO_DEFINE(my_fifo);

// 入队
struct data_item *item = k_malloc(sizeof(struct data_item));
item->value = 42;
k_fifo_put(&my_fifo, item);

// 出队
struct data_item *rx_item = k_fifo_get(&my_fifo, K_FOREVER);
if (rx_item != NULL) {
    process_item(rx_item);
    k_free(rx_item);
}

// 批量入队
k_fifo_put_list(&my_fifo, item_list, last_item);

定时器和延迟工作

1. 定时器

#include <zephyr/kernel.h>

// 定时器回调
void timer_handler(struct k_timer *timer)
{
    // 定时器到期时的回调
    // 注意:在中断上下文执行,不要调用阻塞函数
    k_sem_give(&data_sem);
}

// 定义定时器
K_TIMER_DEFINE(my_timer, timer_handler, NULL);

// 启动单次定时器
k_timer_start(&my_timer, K_SECONDS(5), K_NO_WAIT);

// 启动周期定时器
k_timer_start(&my_timer, K_SECONDS(1), K_SECONDS(1));

// 停止定时器
k_timer_stop(&my_timer);

// 获取剩余时间
k_ticks_t remaining = k_timer_remaining_ticks(&my_timer);

2. 延迟工作队列

#include <zephyr/kernel.h>

// 工作处理函数
void work_handler(struct k_work *work)
{
    // 执行延迟任务(在线程上下文)
    process_data();
}

// 定义延迟工作
K_WORK_DELAYABLE_DEFINE(delayed_work, work_handler);

// 调度延迟工作(5秒后执行)
k_work_schedule(&delayed_work, K_SECONDS(5));

// 取消延迟工作
k_work_cancel_delayable(&delayed_work);

// 立即执行
k_work_submit(&delayed_work.work);

3. 工作队列

#include <zephyr/kernel.h>

// 定义工作队列
K_THREAD_STACK_DEFINE(workq_stack, 1024);
static struct k_work_q my_workq;

// 工作项
struct my_work {
    struct k_work work;
    int data;
};

static struct my_work my_work_item;

void work_handler(struct k_work *work)
{
    struct my_work *item = CONTAINER_OF(work, struct my_work, work);
    // 处理工作
    process(item->data);
}

void init_work_queue(void)
{
    // 启动工作队列
    k_work_queue_start(&my_workq, workq_stack,
                       K_THREAD_STACK_SIZEOF(workq_stack),
                       7, NULL);
    
    // 初始化工作项
    k_work_init(&my_work_item.work, work_handler);
    my_work_item.data = 42;
    
    // 提交工作到队列
    k_work_submit_to_queue(&my_workq, &my_work_item.work);
}

实际应用示例

多线程传感器系统

#include <zephyr/kernel.h>
#include <zephyr/drivers/sensor.h>

/* 定义 */
K_THREAD_STACK_DEFINE(sensor_stack, 1024);
K_THREAD_STACK_DEFINE(processor_stack, 1024);
K_THREAD_STACK_DEFINE(communicator_stack, 2048);

static struct k_thread sensor_thread_data;
static struct k_thread processor_thread_data;
static struct k_thread communicator_thread_data;

/* 消息队列 */
struct sensor_data {
    float temperature;
    float humidity;
    int64_t timestamp;
};

K_MSGQ_DEFINE(sensor_msgq, sizeof(struct sensor_data), 16, 4);
K_MSGQ_DEFINE(tx_msgq, sizeof(struct sensor_data), 8, 4);

/* 同步 */
K_SEM_DEFINE(sensor_sem, 0, 1);
K_MUTEX_DEFINE(config_mutex);

/* 配置 */
static volatile int sample_interval_ms = 1000;

/* 传感器线程 */
void sensor_thread_fn(void *p1, void *p2, void *p3)
{
    const struct device *sensor = DEVICE_DT_GET(DT_NODELABEL(bme280));
    struct sensor_data data;
    struct sensor_value temp, hum;
    
    while (1) {
        // 读取传感器
        sensor_sample_fetch(sensor);
        sensor_channel_get(sensor, SENSOR_CHAN_AMBIENT_TEMP, &temp);
        sensor_channel_get(sensor, SENSOR_CHAN_HUMIDITY, &hum);
        
        // 填充数据
        data.temperature = sensor_value_to_float(&temp);
        data.humidity = sensor_value_to_float(&hum);
        data.timestamp = k_uptime_get();
        
        // 发送到处理队列
        k_msgq_put(&sensor_msgq, &data, K_NO_WAIT);
        
        // 等待下一次采样
        k_mutex_lock(&config_mutex, K_FOREVER);
        int interval = sample_interval_ms;
        k_mutex_unlock(&config_mutex);
        
        k_sleep(K_MSEC(interval));
    }
}

/* 处理线程 */
void processor_thread_fn(void *p1, void *p2, void *p3)
{
    struct sensor_data data;
    
    while (1) {
        // 等待数据
        if (k_msgq_get(&sensor_msgq, &data, K_FOREVER) == 0) {
            // 数据处理(如滤波、校准)
            data.temperature = calibrate(data.temperature);
            
            // 发送到通信队列
            k_msgq_put(&tx_msgq, &data, K_NO_WAIT);
        }
    }
}

/* 通信线程 */
void communicator_thread_fn(void *p1, void *p2, void *p3)
{
    struct sensor_data data;
    
    while (1) {
        // 等待数据
        if (k_msgq_get(&tx_msgq, &data, K_SECONDS(30)) == 0) {
            // 通过 BLE/LTE 发送
            send_via_ble(&data);
        }
    }
}

/* 主函数 */
int main(void)
{
    // 创建线程
    k_thread_create(&sensor_thread_data, sensor_stack,
                    K_THREAD_STACK_SIZEOF(sensor_stack),
                    sensor_thread_fn, NULL, NULL, NULL,
                    7, 0, K_NO_WAIT);
    k_thread_name_set(&sensor_thread_data, "sensor");
    
    k_thread_create(&processor_thread_data, processor_stack,
                    K_THREAD_STACK_SIZEOF(processor_stack),
                    processor_thread_fn, NULL, NULL, NULL,
                    7, 0, K_NO_WAIT);
    k_thread_name_set(&processor_thread_data, "processor");
    
    k_thread_create(&communicator_thread_data, communicator_stack,
                    K_THREAD_STACK_SIZEOF(communicator_stack),
                    communicator_thread_fn, NULL, NULL, NULL,
                    7, 0, K_NO_WAIT);
    k_thread_name_set(&communicator_thread_data, "communicator");
    
    return 0;
}

调试和诊断

1. 线程堆栈分析

# 启用堆栈分析
CONFIG_THREAD_STACK_INFO=y
CONFIG_THREAD_NAME=y
CONFIG_THREAD_MONITOR=y
// 检查堆栈使用
size_t unused = k_thread_stack_space_get(k_current_get());
printk("Stack unused: %zu bytes\n", unused);

2. 线程运行时统计

CONFIG_THREAD_RUNTIME_STATS=y
CONFIG_THREAD_RUNTIME_STATS_USE_TIMING_FUNCTIONS=y
#include <zephyr/debug/thread_stats.h>

struct k_thread_runtime_stats stats;
k_thread_runtime_stats_get(k_current_get(), &stats);

printk("CPU cycles: %llu\n", stats.execution_cycles);
printk("CPU usage: %llu%%\n", stats.usage);

3. 线程分析

CONFIG_THREAD_ANALYZER=y
CONFIG_THREAD_ANALYZER_USE_PRINTK=y
CONFIG_THREAD_ANALYZER_AUTO=y
CONFIG_THREAD_ANALYZER_AUTO_INTERVAL=5

总结

选择合适的同步机制

机制用途
Semaphore计数资源、事件通知
Mutex互斥访问共享资源
Condvar复杂条件等待
Event多事件位等待
Message Queue结构化消息传递
FIFO高性能数据队列
Work Queue延迟和后台任务

最佳实践

  1. 避免死锁:按固定顺序获取多个锁
  2. 最小化临界区:只在必要时持锁
  3. 使用超时:避免无限等待
  4. 合理设置优先级:避免优先级反转
  5. 监控堆栈:确保足够堆栈空间

*学习日期: 2026-03-21* *小白 🤖*