学习日期: 2026-03-23 笔记编号: #64 相关主题: 物联网通信、消息队列、Zephyr MQTT
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,专为物联网和低带宽、高延迟网络设计。
| 特性 | 描述 |
|---|---|
| 轻量级 | 最小消息头仅 2 字节 |
| 发布/订阅 | 解耦消息生产者和消费者 |
| QoS 支持 | 三种消息质量等级 |
| 持久会话 | 支持离线消息 |
| 遗嘱消息 | 连接断开时自动发布 |
| 主题过滤 | 灵活的消息路由 |
| 特性 | MQTT | CoAP |
|---|---|---|
| 传输层 | TCP | UDP |
| 模式 | 发布/订阅 | 请求/响应 |
| 头部开销 | 2-5 字节 | 4 字节 |
| QoS | 3 级 | 2 级(CON/NON) |
| 适用场景 | 持续连接、实时推送 | 资源受限、间歇连接 |
| 观察模式 | 订阅模式 | Observe 选项 |
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Publisher │────┐ │ Broker │ ┌────│ Subscriber │
└─────────────┘ │ │ (Server) │ │ └─────────────┘
│ └─────────────┘ │
┌─────────────┐ │ │ │ ┌─────────────┐
│ Publisher │────┘ │ └────│ Subscriber │
└─────────────┘ │ └─────────────┘
Topic-based
Routing
角色说明:
MQTT 使用类似文件路径的主题层级:
home/
├── livingroom/
│ ├── temperature
│ ├── humidity
│ └── light/status
├── bedroom/
│ ├── temperature
│ └── light/control
└── kitchen/
└── smoke/alarm
通配符:
+: 单级通配符,如 home/+/temperature#: 多级通配符,如 home/#(匹配所有子主题)| 类型 | 值 | 方向 | 描述 |
|---|---|---|---|
| CONNECT | 1 | Client→Broker | 请求连接 |
| CONNACK | 2 | Broker→Client | 连接确认 |
| PUBLISH | 3 | 双向 | 发布消息 |
| PUBACK | 4 | 双向 | QoS 1 确认 |
| PUBREC | 5 | 双向 | QoS 2 接收确认 |
| PUBREL | 6 | 双向 | QoS 2 释放 |
| PUBCOMP | 7 | 双向 | QoS 2 完成 |
| SUBSCRIBE | 8 | Client→Broker | 订阅请求 |
| SUBACK | 9 | Broker→Client | 订阅确认 |
| UNSUBSCRIBE | 10 | Client→Broker | 取消订阅 |
| UNSUBACK | 11 | Broker→Client | 取消确认 |
| PINGREQ | 12 | Client→Broker | 心跳请求 |
| PINGRESP | 13 | Broker→Client | 心跳响应 |
| DISCONNECT | 14 | Client→Broker | 断开连接 |
┌─────────────────────────────────┐
│ Bit 7-4 │ Bit 3-0 │
│ 报文类型 │ 标志位 │
├─────────────────────────────────┤
│ 剩余长度 │
│ (1-4 字节变长) │
└─────────────────────────────────┘
| QoS | 名称 | 消息传递保证 | 消息流 |
|---|---|---|---|
| 0 | At Most Once | 最多一次,可能丢失 | PUBLISH |
| 1 | At Least Once | 至少一次,可能重复 | PUBLISH → PUBACK |
| 2 | Exactly Once | 恰好一次,保证到达 | PUBLISH → PUBREC → PUBREL → PUBCOMP |
Publisher Broker Subscriber
│ │ │
│──── PUBLISH ──────>│ │
│ │──── PUBLISH ──────>│
│ │ │
Publisher Broker Subscriber
│ │ │
│──── PUBLISH ──────>│ │
│ │──── PUBLISH ──────>│
│ │ │
│<─── PUBACK ────────│<─── PUBACK ────────│
│ │ │
Publisher Broker Subscriber
│ │ │
│──── PUBLISH ──────>│ │
│<─── PUBREC ────────│ │
│──── PUBREL ───────>│ │
│ │──── PUBLISH ──────>│
│ │<─── PUBREC ────────│
│ │──── PUBREL ───────>│
│ │<─── PUBCOMP ───────│
│<─── PUBCOMP ───────│ │
│ │ │
连接参数:
// Zephyr MQTT 配置
struct mqtt_client client;
struct mqtt_utf8 username = MQTT_UTF8_LITERAL("user");
struct mqtt_utf8 password = MQTT_UTF8_LITERAL("pass");
struct mqtt_utf8 client_id = {
.utf8 = "nrf54l15_client",
.size = sizeof("nrf54l15_client") - 1
};
client.client_id = &client_id;
client.user_name = &username;
client.password = &password;
client.clean_session = 0; // 持久会话
持久会话特点:
struct mqtt_will will = {
.topic = {
.utf8 = "device/status",
.size = sizeof("device/status") - 1
},
.message = {
.utf8 = "offline",
.size = sizeof("offline") - 1
},
.qos = MQTT_QOS_1_AT_LEAST_ONCE,
.retain = 1 // 保留消息
};
client.will = &will;
用途:
#include <zephyr/net/mqtt.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/tls_credentials.h>
#define MQTT_CLIENTID "nrf54l15_mqtt_client"
static uint8_t rx_buffer[256];
static uint8_t tx_buffer[256];
static struct mqtt_client client_ctx;
static struct sockaddr_storage broker;
static void mqtt_client_init(void)
{
struct mqtt_client *client = &client_ctx;
mqtt_client_init(client);
// 设置缓冲区
client->rx_buf = rx_buffer;
client->rx_buf_size = sizeof(rx_buffer);
client->tx_buf = tx_buffer;
client->tx_buf_size = sizeof(tx_buffer);
// 设置客户端 ID
client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
client->client_id.size = strlen(MQTT_CLIENTID);
// 设置用户名密码(可选)
client->user_name = NULL;
client->password = NULL;
// 清洁会话
client->clean_session = 1;
// 协议版本
client->protocol_version = MQTT_VERSION_3_1_1;
}
static int mqtt_connect_to_broker(void)
{
int err;
struct zsock_addrinfo hints = {
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM
};
struct zsock_addrinfo *result;
// 解析 Broker 地址
err = zsock_getaddrinfo("test.mosquitto.org", "1883",
&hints, &result);
if (err != 0) {
return err;
}
// 设置 Broker 地址
memcpy(&broker, result->ai_addr, result->ai_addrlen);
zsock_freeaddrinfo(result);
// 连接
err = mqtt_connect(&client_ctx);
if (err != 0) {
return err;
}
return 0;
}
#define SUBSCRIBE_TOPIC "sensors/nrf54l15/commands"
static int mqtt_subscribe(void)
{
struct mqtt_topic topic = {
.topic = {
.utf8 = SUBSCRIBE_TOPIC,
.size = strlen(SUBSCRIBE_TOPIC)
},
.qos = MQTT_QOS_1_AT_LEAST_ONCE
};
return mqtt_subscribe(&client_ctx, &topic, 1, 1);
}
#define PUBLISH_TOPIC "sensors/nrf54l15/data"
static int mqtt_publish_message(const char *data, size_t len)
{
struct mqtt_publish_param param = {
.message = {
.topic = {
.utf8 = PUBLISH_TOPIC,
.size = strlen(PUBLISH_TOPIC)
},
.payload = {
.data = (uint8_t *)data,
.len = len
},
.qos = MQTT_QOS_1_AT_LEAST_ONCE
},
.message_id = 1
};
return mqtt_publish(&client_ctx, ¶m);
}
static void mqtt_evt_handler(struct mqtt_client *client,
const struct mqtt_evt *evt)
{
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
printk("MQTT connect failed: %d\n", evt->result);
} else {
printk("MQTT connected\n");
mqtt_subscribe();
}
break;
case MQTT_EVT_DISCONNECT:
printk("MQTT disconnected: %d\n", evt->result);
break;
case MQTT_EVT_PUBLISH:
{
const struct mqtt_publish_param *p = &evt->param.publish;
printk("Received: topic=%.*s, payload=%.*s\n",
p->message.topic.size,
(char *)p->message.topic.utf8,
p->message.payload.len,
(char *)p->message.payload.data);
// 发送 PUBACK(QoS 1)
if (p->message.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
mqtt_publish_qos1_ack(client, &p->message_id);
}
}
break;
case MQTT_EVT_PUBACK:
printk("PUBACK received: id=%d\n",
evt->param.puback.message_id);
break;
case MQTT_EVT_SUBACK:
printk("SUBACK received: id=%d\n",
evt->param.suback.message_id);
break;
default:
break;
}
}
#include <zephyr/posix/fdtable.h>
void mqtt_thread_fn(void)
{
int err;
struct zsock_pollfd fds[1];
mqtt_client_init();
client_ctx.evt_cb = mqtt_evt_handler;
err = mqtt_connect_to_broker();
if (err != 0) {
return;
}
fds[0].fd = client_ctx.transport.tcp.sock;
fds[0].events = ZSOCK_POLLIN;
while (1) {
// 等待数据
err = zsock_poll(fds, 1, 100);
if (err < 0) {
break;
}
// 处理 MQTT 输入
if (fds[0].revents & ZSOCK_POLLIN) {
err = mqtt_input(&client_ctx);
if (err != 0) {
break;
}
}
// 处理 MQTT 心跳
err = mqtt_live(&client_ctx);
if (err != 0) {
break;
}
}
mqtt_disconnect(&client_ctx);
}
#include <zephyr/net/tls_credentials.h>
// CA 证书
static const char ca_certificate[] = {
#include "ca_cert.inc"
};
// TLS 凭据标签
#define TLS_TAG 1
static int mqtt_setup_tls(void)
{
int err;
// 注册 CA 证书
err = tls_credential_add(TLS_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
ca_certificate, sizeof(ca_certificate));
if (err < 0) {
return err;
}
// 配置 TLS
client_ctx.transport.type = MQTT_TRANSPORT_SECURE;
client_ctx.transport.tls.config = NULL;
client_ctx.transport.tls.sock = -1;
client_ctx.transport.tls.tag = TLS_TAG;
client_ctx.transport.tls.peer_verify = TLS_PEER_VERIFY_REQUIRED;
client_ctx.transport.tls.hostname = "test.mosquitto.org";
return 0;
}
// MQTTS 端口 8883
err = zsock_getaddrinfo("test.mosquitto.org", "8883",
&hints, &result);
| 特性 | 描述 |
|---|---|
| 增强错误报告 | 详细的原因码 |
| 共享订阅 | 负载均衡支持 |
| 消息过期 | TTL 支持 |
| 主题别名 | 减少主题名开销 |
| 用户属性 | 自定义键值对 |
| 请求/响应 | 内置响应模式 |
// 设置 MQTT 5.0 协议
client->protocol_version = MQTT_VERSION_5_0;
// MQTT 5.0 发布属性
struct mqtt_publish_param param = {
.message = {
.topic = { ... },
.payload = { ... }
},
.message_id = 1,
// MQTT 5.0 属性
.properties = {
.message_expiry_interval = 3600, // 1小时过期
.topic_alias = 1,
.user_property_count = 0
}
};
// 设备到云端
prod/{account_id}/{device_id}/d2c
// 云端到设备
prod/{account_id}/{device_id}/c2d
// 设备影子
$aws/things/{device_id}/shadow/update
$aws/things/{device_id}/shadow/get
#define NRF_CLOUD_MQTT_HOST "mqtt.nrfcloud.com"
#define NRF_CLOUD_MQTT_PORT 8883
// JWT Token 作为密码
static int connect_to_nrf_cloud(void)
{
// 生成 JWT Token
char jwt_token[256];
generate_jwt_token(jwt_token, sizeof(jwt_token));
client->user_name = &(struct mqtt_utf8){
.utf8 = device_id,
.size = strlen(device_id)
};
client->password = &(struct mqtt_utf8){
.utf8 = jwt_token,
.size = strlen(jwt_token)
};
// TLS 连接
client->transport.type = MQTT_TRANSPORT_SECURE;
return mqtt_connect(client);
}
// 推荐格式
{project}/{location}/{device_id}/{data_type}
// 示例
smart-home/livingroom/temperature
smart-home/livingroom/humidity
smart-home/kitchen/motion
| 数据类型 | 推荐 QoS | 理由 |
|---|---|---|
| 实时传感器 | QoS 0 | 可容忍丢失 |
| 告警事件 | QoS 1 | 需要确认 |
| 配置更新 | QoS 2 | 确保唯一 |
| 历史数据 | QoS 0/1 | 批量重传 |
// 低功耗 MQTT 配置
client->keepalive = 300; // 5 分钟心跳
// 休眠前发送离线状态
mqtt_publish_message("offline", 7);
// 使用保留消息存储最后状态
struct mqtt_will will = {
.topic = { ... },
.message = { .utf8 = "offline", ... },
.retain = 1
};
static void mqtt_reconnect(void)
{
int retry_count = 0;
const int max_retries = 5;
const int retry_delay_ms[] = {1000, 2000, 4000, 8000, 16000};
while (retry_count < max_retries) {
int err = mqtt_connect(&client_ctx);
if (err == 0) {
return;
}
k_msleep(retry_delay_ms[retry_count]);
retry_count++;
}
// 重启网络
network_restart();
}
| Broker | 特点 | 端口 |
|---|---|---|
| Mosquitto | 开源、轻量 | 1883/8883 |
| EMQX | 高性能、企业级 | 1883/8883/8083(WS) |
| HiveMQ | 商业、可扩展 | 1883/8883 |
| nRF Cloud | Nordic 云服务 | 8883 |
| AWS IoT Core | AWS 集成 | 8883 |
# 安装 mosquitto 客户端
sudo apt install mosquitto-clients
# 订阅
mosquitto_sub -h test.mosquitto.org -t "sensors/#" -v
# 发布
mosquitto_pub -h test.mosquitto.org -t "sensors/test" -m "hello"
# prj.conf
CONFIG_MQTT_LOG_LEVEL_DBG=y
CONFIG_NET_LOG=y
CONFIG_NET_SOCKETS_LOG_LEVEL_DBG=y
# 使用 Wireshark 过滤 MQTT
tcp.port == 1883 || tcp.port == 8883
| 选择 MQTT | 选择 CoAP |
|---|---|
| 需要实时推送 | 资源极度受限 |
| 持续在线设备 | 间歇连接设备 |
| 多订阅者场景 | 单点通信 |
| 需要 QoS 2 | NAT 穿透需求 |
学习笔记: #64 主题: MQTT 协议、物联网通信、Zephyr MQTT 相关笔记: CoAP 协议 (#63)、BLE Services (#46)、nRF Cloud (#50)