返回首页

MQTT 协议学习笔记

学习日期: 2026-03-23 笔记编号: #64 相关主题: 物联网通信、消息队列、Zephyr MQTT


一、MQTT 概述

1.1 什么是 MQTT

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,专为物联网和低带宽、高延迟网络设计。

1.2 主要特点

特性描述
轻量级最小消息头仅 2 字节
发布/订阅解耦消息生产者和消费者
QoS 支持三种消息质量等级
持久会话支持离线消息
遗嘱消息连接断开时自动发布
主题过滤灵活的消息路由

1.3 MQTT vs CoAP 对比

特性MQTTCoAP
传输层TCPUDP
模式发布/订阅请求/响应
头部开销2-5 字节4 字节
QoS3 级2 级(CON/NON)
适用场景持续连接、实时推送资源受限、间歇连接
观察模式订阅模式Observe 选项

二、MQTT 协议架构

2.1 角色定义

┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│  Publisher  │────┐    │   Broker    │    ┌────│ Subscriber  │
└─────────────┘    │    │   (Server)  │    │    └─────────────┘
                   │    └─────────────┘    │
┌─────────────┐    │           │          │    ┌─────────────┐
│  Publisher  │────┘           │          └────│ Subscriber  │
└─────────────┘                │               └─────────────┘
                         Topic-based
                           Routing

角色说明

2.2 主题层级

MQTT 使用类似文件路径的主题层级:

home/
├── livingroom/
│   ├── temperature
│   ├── humidity
│   └── light/status
├── bedroom/
│   ├── temperature
│   └── light/control
└── kitchen/
    └── smoke/alarm

通配符


三、MQTT 消息类型

3.1 控制报文类型

类型方向描述
CONNECT1Client→Broker请求连接
CONNACK2Broker→Client连接确认
PUBLISH3双向发布消息
PUBACK4双向QoS 1 确认
PUBREC5双向QoS 2 接收确认
PUBREL6双向QoS 2 释放
PUBCOMP7双向QoS 2 完成
SUBSCRIBE8Client→Broker订阅请求
SUBACK9Broker→Client订阅确认
UNSUBSCRIBE10Client→Broker取消订阅
UNSUBACK11Broker→Client取消确认
PINGREQ12Client→Broker心跳请求
PINGRESP13Broker→Client心跳响应
DISCONNECT14Client→Broker断开连接

3.2 固定头部格式

┌─────────────────────────────────┐
│  Bit 7-4   │    Bit 3-0         │
│ 报文类型   │   标志位            │
├─────────────────────────────────┤
│          剩余长度                │
│       (1-4 字节变长)            │
└─────────────────────────────────┘

四、服务质量(QoS)

4.1 QoS 等级

QoS名称消息传递保证消息流
0At Most Once最多一次,可能丢失PUBLISH
1At Least Once至少一次,可能重复PUBLISH → PUBACK
2Exactly Once恰好一次,保证到达PUBLISH → PUBREC → PUBREL → PUBCOMP

4.2 QoS 0 - 最多一次

Publisher              Broker              Subscriber
    │                    │                    │
    │──── PUBLISH ──────>│                    │
    │                    │──── PUBLISH ──────>│
    │                    │                    │

4.3 QoS 1 - 至少一次

Publisher              Broker              Subscriber
    │                    │                    │
    │──── PUBLISH ──────>│                    │
    │                    │──── PUBLISH ──────>│
    │                    │                    │
    │<─── PUBACK ────────│<─── PUBACK ────────│
    │                    │                    │

4.4 QoS 2 - 恰好一次

Publisher              Broker              Subscriber
    │                    │                    │
    │──── PUBLISH ──────>│                    │
    │<─── PUBREC ────────│                    │
    │──── PUBREL ───────>│                    │
    │                    │──── PUBLISH ──────>│
    │                    │<─── PUBREC ────────│
    │                    │──── PUBREL ───────>│
    │                    │<─── PUBCOMP ───────│
    │<─── PUBCOMP ───────│                    │
    │                    │                    │

五、持久会话与遗嘱消息

5.1 持久会话

连接参数

// Zephyr MQTT 配置
struct mqtt_client client;
struct mqtt_utf8 username = MQTT_UTF8_LITERAL("user");
struct mqtt_utf8 password = MQTT_UTF8_LITERAL("pass");

struct mqtt_utf8 client_id = {
    .utf8 = "nrf54l15_client",
    .size = sizeof("nrf54l15_client") - 1
};

client.client_id = &client_id;
client.user_name = &username;
client.password = &password;
client.clean_session = 0;  // 持久会话

持久会话特点

5.2 遗嘱消息(Last Will)

struct mqtt_will will = {
    .topic = {
        .utf8 = "device/status",
        .size = sizeof("device/status") - 1
    },
    .message = {
        .utf8 = "offline",
        .size = sizeof("offline") - 1
    },
    .qos = MQTT_QOS_1_AT_LEAST_ONCE,
    .retain = 1  // 保留消息
};

client.will = &will;

用途


六、Zephyr MQTT 实现

6.1 头文件

#include <zephyr/net/mqtt.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/tls_credentials.h>

6.2 初始化 MQTT 客户端

#define MQTT_CLIENTID "nrf54l15_mqtt_client"

static uint8_t rx_buffer[256];
static uint8_t tx_buffer[256];

static struct mqtt_client client_ctx;
static struct sockaddr_storage broker;

static void mqtt_client_init(void)
{
    struct mqtt_client *client = &client_ctx;
    
    mqtt_client_init(client);
    
    // 设置缓冲区
    client->rx_buf = rx_buffer;
    client->rx_buf_size = sizeof(rx_buffer);
    client->tx_buf = tx_buffer;
    client->tx_buf_size = sizeof(tx_buffer);
    
    // 设置客户端 ID
    client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
    client->client_id.size = strlen(MQTT_CLIENTID);
    
    // 设置用户名密码(可选)
    client->user_name = NULL;
    client->password = NULL;
    
    // 清洁会话
    client->clean_session = 1;
    
    // 协议版本
    client->protocol_version = MQTT_VERSION_3_1_1;
}

6.3 连接 Broker

static int mqtt_connect_to_broker(void)
{
    int err;
    struct zsock_addrinfo hints = {
        .ai_family = AF_INET,
        .ai_socktype = SOCK_STREAM
    };
    struct zsock_addrinfo *result;
    
    // 解析 Broker 地址
    err = zsock_getaddrinfo("test.mosquitto.org", "1883",
                           &hints, &result);
    if (err != 0) {
        return err;
    }
    
    // 设置 Broker 地址
    memcpy(&broker, result->ai_addr, result->ai_addrlen);
    zsock_freeaddrinfo(result);
    
    // 连接
    err = mqtt_connect(&client_ctx);
    if (err != 0) {
        return err;
    }
    
    return 0;
}

6.4 订阅主题

#define SUBSCRIBE_TOPIC "sensors/nrf54l15/commands"

static int mqtt_subscribe(void)
{
    struct mqtt_topic topic = {
        .topic = {
            .utf8 = SUBSCRIBE_TOPIC,
            .size = strlen(SUBSCRIBE_TOPIC)
        },
        .qos = MQTT_QOS_1_AT_LEAST_ONCE
    };
    
    return mqtt_subscribe(&client_ctx, &topic, 1, 1);
}

6.5 发布消息

#define PUBLISH_TOPIC "sensors/nrf54l15/data"

static int mqtt_publish_message(const char *data, size_t len)
{
    struct mqtt_publish_param param = {
        .message = {
            .topic = {
                .utf8 = PUBLISH_TOPIC,
                .size = strlen(PUBLISH_TOPIC)
            },
            .payload = {
                .data = (uint8_t *)data,
                .len = len
            },
            .qos = MQTT_QOS_1_AT_LEAST_ONCE
        },
        .message_id = 1
    };
    
    return mqtt_publish(&client_ctx, &param);
}

6.6 处理 MQTT 事件

static void mqtt_evt_handler(struct mqtt_client *client,
                            const struct mqtt_evt *evt)
{
    switch (evt->type) {
    case MQTT_EVT_CONNACK:
        if (evt->result != 0) {
            printk("MQTT connect failed: %d\n", evt->result);
        } else {
            printk("MQTT connected\n");
            mqtt_subscribe();
        }
        break;
        
    case MQTT_EVT_DISCONNECT:
        printk("MQTT disconnected: %d\n", evt->result);
        break;
        
    case MQTT_EVT_PUBLISH:
        {
            const struct mqtt_publish_param *p = &evt->param.publish;
            printk("Received: topic=%.*s, payload=%.*s\n",
                   p->message.topic.size,
                   (char *)p->message.topic.utf8,
                   p->message.payload.len,
                   (char *)p->message.payload.data);
            
            // 发送 PUBACK(QoS 1)
            if (p->message.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
                mqtt_publish_qos1_ack(client, &p->message_id);
            }
        }
        break;
        
    case MQTT_EVT_PUBACK:
        printk("PUBACK received: id=%d\n", 
               evt->param.puback.message_id);
        break;
        
    case MQTT_EVT_SUBACK:
        printk("SUBACK received: id=%d\n",
               evt->param.suback.message_id);
        break;
        
    default:
        break;
    }
}

6.7 主循环处理

#include <zephyr/posix/fdtable.h>

void mqtt_thread_fn(void)
{
    int err;
    struct zsock_pollfd fds[1];
    
    mqtt_client_init();
    client_ctx.evt_cb = mqtt_evt_handler;
    
    err = mqtt_connect_to_broker();
    if (err != 0) {
        return;
    }
    
    fds[0].fd = client_ctx.transport.tcp.sock;
    fds[0].events = ZSOCK_POLLIN;
    
    while (1) {
        // 等待数据
        err = zsock_poll(fds, 1, 100);
        if (err < 0) {
            break;
        }
        
        // 处理 MQTT 输入
        if (fds[0].revents & ZSOCK_POLLIN) {
            err = mqtt_input(&client_ctx);
            if (err != 0) {
                break;
            }
        }
        
        // 处理 MQTT 心跳
        err = mqtt_live(&client_ctx);
        if (err != 0) {
            break;
        }
    }
    
    mqtt_disconnect(&client_ctx);
}

七、MQTT over TLS(MQTTS)

7.1 安全配置

#include <zephyr/net/tls_credentials.h>

// CA 证书
static const char ca_certificate[] = {
    #include "ca_cert.inc"
};

// TLS 凭据标签
#define TLS_TAG 1

static int mqtt_setup_tls(void)
{
    int err;
    
    // 注册 CA 证书
    err = tls_credential_add(TLS_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
                            ca_certificate, sizeof(ca_certificate));
    if (err < 0) {
        return err;
    }
    
    // 配置 TLS
    client_ctx.transport.type = MQTT_TRANSPORT_SECURE;
    client_ctx.transport.tls.config = NULL;
    client_ctx.transport.tls.sock = -1;
    client_ctx.transport.tls.tag = TLS_TAG;
    client_ctx.transport.tls.peer_verify = TLS_PEER_VERIFY_REQUIRED;
    client_ctx.transport.tls.hostname = "test.mosquitto.org";
    
    return 0;
}

7.2 连接 MQTTS Broker

// MQTTS 端口 8883
err = zsock_getaddrinfo("test.mosquitto.org", "8883",
                       &hints, &result);

八、MQTT 5.0 新特性

8.1 主要改进

特性描述
增强错误报告详细的原因码
共享订阅负载均衡支持
消息过期TTL 支持
主题别名减少主题名开销
用户属性自定义键值对
请求/响应内置响应模式

8.2 Zephyr MQTT 5.0 支持

// 设置 MQTT 5.0 协议
client->protocol_version = MQTT_VERSION_5_0;

// MQTT 5.0 发布属性
struct mqtt_publish_param param = {
    .message = {
        .topic = { ... },
        .payload = { ... }
    },
    .message_id = 1,
    // MQTT 5.0 属性
    .properties = {
        .message_expiry_interval = 3600,  // 1小时过期
        .topic_alias = 1,
        .user_property_count = 0
    }
};

九、与 nRF Cloud MQTT 集成

9.1 nRF Cloud MQTT 特点

9.2 nRF Cloud 主题格式

// 设备到云端
prod/{account_id}/{device_id}/d2c

// 云端到设备
prod/{account_id}/{device_id}/c2d

// 设备影子
$aws/things/{device_id}/shadow/update
$aws/things/{device_id}/shadow/get

9.3 连接示例

#define NRF_CLOUD_MQTT_HOST "mqtt.nrfcloud.com"
#define NRF_CLOUD_MQTT_PORT 8883

// JWT Token 作为密码
static int connect_to_nrf_cloud(void)
{
    // 生成 JWT Token
    char jwt_token[256];
    generate_jwt_token(jwt_token, sizeof(jwt_token));
    
    client->user_name = &(struct mqtt_utf8){
        .utf8 = device_id,
        .size = strlen(device_id)
    };
    client->password = &(struct mqtt_utf8){
        .utf8 = jwt_token,
        .size = strlen(jwt_token)
    };
    
    // TLS 连接
    client->transport.type = MQTT_TRANSPORT_SECURE;
    
    return mqtt_connect(client);
}

十、最佳实践

10.1 主题设计

// 推荐格式
{project}/{location}/{device_id}/{data_type}

// 示例
smart-home/livingroom/temperature
smart-home/livingroom/humidity
smart-home/kitchen/motion

10.2 QoS 选择

数据类型推荐 QoS理由
实时传感器QoS 0可容忍丢失
告警事件QoS 1需要确认
配置更新QoS 2确保唯一
历史数据QoS 0/1批量重传

10.3 功耗优化

// 低功耗 MQTT 配置
client->keepalive = 300;  // 5 分钟心跳

// 休眠前发送离线状态
mqtt_publish_message("offline", 7);

// 使用保留消息存储最后状态
struct mqtt_will will = {
    .topic = { ... },
    .message = { .utf8 = "offline", ... },
    .retain = 1
};

10.4 断线重连

static void mqtt_reconnect(void)
{
    int retry_count = 0;
    const int max_retries = 5;
    const int retry_delay_ms[] = {1000, 2000, 4000, 8000, 16000};
    
    while (retry_count < max_retries) {
        int err = mqtt_connect(&client_ctx);
        if (err == 0) {
            return;
        }
        
        k_msleep(retry_delay_ms[retry_count]);
        retry_count++;
    }
    
    // 重启网络
    network_restart();
}

十一、常见 MQTT Broker

Broker特点端口
Mosquitto开源、轻量1883/8883
EMQX高性能、企业级1883/8883/8083(WS)
HiveMQ商业、可扩展1883/8883
nRF CloudNordic 云服务8883
AWS IoT CoreAWS 集成8883

十二、调试技巧

12.1 使用 MQTT 客户端工具

# 安装 mosquitto 客户端
sudo apt install mosquitto-clients

# 订阅
mosquitto_sub -h test.mosquitto.org -t "sensors/#" -v

# 发布
mosquitto_pub -h test.mosquitto.org -t "sensors/test" -m "hello"

12.2 Zephyr 调试配置

# prj.conf
CONFIG_MQTT_LOG_LEVEL_DBG=y
CONFIG_NET_LOG=y
CONFIG_NET_SOCKETS_LOG_LEVEL_DBG=y

12.3 抓包分析

# 使用 Wireshark 过滤 MQTT
tcp.port == 1883 || tcp.port == 8883

十三、总结

13.1 MQTT 适用场景

13.2 与 CoAP 对比选择

选择 MQTT选择 CoAP
需要实时推送资源极度受限
持续在线设备间歇连接设备
多订阅者场景单点通信
需要 QoS 2NAT 穿透需求

学习笔记: #64 主题: MQTT 协议、物联网通信、Zephyr MQTT 相关笔记: CoAP 协议 (#63)、BLE Services (#46)、nRF Cloud (#50)