鸿蒙南向开发教程 Day 9:消息队列(Message Queue)
·
目标:掌握 OpenHarmony 轻量系统的消息队列 API,实现多线程间的安全数据传递
前置条件:已完成 Day 8 的信号量教程
一、工程结构
app/
├── BUILD.gn
└── 08_message_queue/ # 模块目录
├── BUILD.gn
└── demo.c # 消息队列测试代码
1.1 app/BUILD.gn
import("//build/lite/config/component/lite_component.gni")
lite_component("app") {
features = [
"08_message_queue:msgq_demo", # 引用 08_message_queue 模块
]
}
1.2 08_message_queue/BUILD.gn
static_library("msgq_demo") {
sources = [
"demo.c"
]
include_dirs = [
"//utils/native/lite/include",
"//kernel/liteos_m/components/cmsis/2.0",
]
}
二、完整代码详解
2.1 头文件与宏定义
#include <stdio.h>
#include <unistd.h>
#include "ohos_init.h"
#include "cmsis_os2.h"
#define STACK_SIZE (1024) // 线程栈大小
#define DELAY_TICKS_3 (3) // 接收线程延时
#define DELAY_TICKS_5 (5) // 发送线程延时
#define DELAY_TICKS_20 (20) // 主线程等待时间
#define DELAY_TICKS_80 (80) // 主线程终止前等待
#define QUEUE_SIZE (3) // 消息队列容量(最多缓存3条消息)
2.2 消息结构体
// 定义消息格式
typedef struct {
osThreadId_t tid; // 发送者线程 ID
int count; // 消息序号
} message_entry;
osMessageQueueId_t qid; // 全局消息队列 ID
关键:消息队列传递的是数据副本,不是指针。发送方把 message_entry 复制到队列,接收方从队列复制出来。
2.3 发送线程(多生产者)
void sender_thread(void)
{
static int count = 0; // 静态变量,记录发送次数
message_entry sentry; // 消息结构体
while (1) {
// 1. 填充消息内容
sentry.tid = osThreadGetId(); // 记录发送者 ID
sentry.count = count; // 记录序号
printf("[Message Test] %s send %d to message queue.\r\n",
osThreadGetName(osThreadGetId()), count);
// 2. 发送消息到队列
// 参数:队列ID, 数据指针, 优先级(0), 超时时间
osMessageQueuePut(qid, (const void *)&sentry, 0, osWaitForever);
count++; // 序号递增
osDelay(DELAY_TICKS_5); // 延时 5 tick
}
}
发送流程:
填充消息 → osMessageQueuePut → 队列满则阻塞等待 → 发送成功继续
2.4 接收线程(多消费者)
void receiver_thread(void)
{
message_entry rentry; // 接收缓冲区
while (1) {
// 1. 从队列获取消息
// 参数:队列ID, 接收缓冲区, 优先级指针(NULL), 超时时间
osMessageQueueGet(qid, (void *)&rentry, NULL, osWaitForever);
// 2. 处理消息
printf("[Message Test] %s get %d from %s by message queue.\r\n",
osThreadGetName(osThreadGetId()), // 接收者名字
rentry.count, // 消息序号
osThreadGetName(rentry.tid)); // 发送者名字
osDelay(DELAY_TICKS_3); // 处理延时 3 tick
}
}
接收流程:
osMessageQueueGet → 队列空则阻塞等待 → 获取消息 → 处理消息
2.5 线程创建辅助函数
osThreadId_t newThread(char *name, osThreadFunc_t func, char *arg)
{
osThreadAttr_t attr = {
name, // 线程名称
0, // 属性位
NULL, // 控制块内存
0, // 控制块大小
NULL, // 栈内存
STACK_SIZE * 2, // 栈大小 2048 字节
osPriorityNormal, // 普通优先级
0, // 保留
0 // 保留
};
osThreadId_t tid = osThreadNew(func, (void *)arg, &attr);
if (tid == NULL) {
printf("[Message Test] osThreadNew(%s) failed.\r\n", name);
} else {
printf("[Message Test] osThreadNew(%s) success, thread id: %d.\r\n", name, tid);
}
return tid;
}
2.6 主控制线程
void rtosv2_msgq_main(void)
{
// 1. 创建消息队列
// 参数:队列容量, 单个消息大小, 属性(NULL=默认)
qid = osMessageQueueNew(QUEUE_SIZE, sizeof(message_entry), NULL);
// 2. 创建 2 个接收者 + 3 个发送者(多生产者-多消费者模型)
osThreadId_t ctid1 = newThread("recevier1", receiver_thread, NULL);
osThreadId_t ctid2 = newThread("recevier2", receiver_thread, NULL);
osThreadId_t ptid1 = newThread("sender1", sender_thread, NULL);
osThreadId_t ptid2 = newThread("sender2", sender_thread, NULL);
osThreadId_t ptid3 = newThread("sender3", sender_thread, NULL);
// 3. 等待队列运行一段时间
osDelay(DELAY_TICKS_20);
// 4. 查询队列状态
uint32_t cap = osMessageQueueGetCapacity(qid);
printf("[Message Test] osMessageQueueGetCapacity, capacity: %u.\r\n", cap);
uint32_t msg_size = osMessageQueueGetMsgSize(qid);
printf("[Message Test] osMessageQueueGetMsgSize, size: %u.\r\n", msg_size);
uint32_t count = osMessageQueueGetCount(qid);
printf("[Message Test] osMessageQueueGetCount, count: %u.\r\n", count);
uint32_t space = osMessageQueueGetSpace(qid);
printf("[Message Test] osMessageQueueGetSpace, space: %u.\r\n", space);
// 5. 继续运行一段时间
osDelay(DELAY_TICKS_80);
// 6. 清理资源
osThreadTerminate(ctid1);
osThreadTerminate(ctid2);
osThreadTerminate(ptid1);
osThreadTerminate(ptid2);
osThreadTerminate(ptid3);
osMessageQueueDelete(qid);
}
2.7 系统入口
static void MessageTestTask(void)
{
osThreadAttr_t attr = {
.name = "rtosv2_msgq_main",
.attr_bits = 0U,
.cb_mem = NULL,
.cb_size = 0U,
.stack_mem = NULL,
.stack_size = STACK_SIZE,
.priority = osPriorityNormal,
};
if (osThreadNew((osThreadFunc_t)rtosv2_msgq_main, NULL, &attr) == NULL) {
printf("[MessageTestTask] Failed to create rtosv2_msgq_main!\n");
}
}
APP_FEATURE_INIT(MessageTestTask);
三、消息队列 API 详解
3.1 osMessageQueueNew — 创建队列
osMessageQueueId_t osMessageQueueNew(uint32_t msg_count, uint32_t msg_size, const osMessageQueueAttr_t *attr);
| 参数 | 说明 |
|---|---|
msg_count |
队列容量(最多缓存多少条消息) |
msg_size |
单条消息的字节大小 |
attr |
属性,NULL 为默认 |
本例:osMessageQueueNew(3, sizeof(message_entry), NULL)
| 参数 | 值 | 含义 |
|---|---|---|
msg_count |
3 | 最多缓存 3 条消息,第 4 条发送会阻塞 |
msg_size |
sizeof(message_entry) |
每条消息大小为结构体大小 |
3.2 osMessageQueuePut — 发送消息
osStatus_t osMessageQueuePut(osMessageQueueId_t mq_id, const void *msg_ptr, uint8_t msg_prio, uint32_t timeout);
| 参数 | 说明 |
|---|---|
mq_id |
队列 ID |
msg_ptr |
消息数据指针(内容会被复制到队列) |
msg_prio |
消息优先级(0 = 不区分优先级) |
timeout |
超时时间,osWaitForever = 永久等待 |
阻塞行为:
- 队列未满:立即复制数据,返回
osOK - 队列已满:阻塞等待,直到有空间或超时
3.3 osMessageQueueGet — 接收消息
osStatus_t osMessageQueueGet(osMessageQueueId_t mq_id, void *msg_ptr, uint8_t *msg_prio, uint32_t timeout);
| 参数 | 说明 |
|---|---|
mq_ptr |
接收缓冲区指针(数据从队列复制到这里) |
msg_prio |
输出参数,获取消息的优先级(NULL = 不关心) |
timeout |
超时时间 |
阻塞行为:
- 队列非空:立即取出数据,返回
osOK - 队列已空:阻塞等待,直到有消息或超时
3.4 状态查询 API
| API | 功能 |
|---|---|
osMessageQueueGetCapacity |
获取队列总容量 |
osMessageQueueGetMsgSize |
获取单条消息大小 |
osMessageQueueGetCount |
获取当前队列中的消息数 |
osMessageQueueGetSpace |
获取剩余可用空间 |
3.5 osMessageQueueDelete — 删除队列
osStatus_t osMessageQueueDelete(osMessageQueueId_t mq_id);
四、多生产者-多消费者模型
┌─────────────────────────────────────────┐
│ 消息队列(Queue Size = 3) │
│ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │msg0 │ │msg1 │ │msg2 │ │
│ └─────┘ └─────┘ └─────┘ │
│ 写指针→ ←读指针 │
│ [满: Put阻塞] [空: Get阻塞] │
└─────────────────────────────────────────┘
↑ ↓
┌────┴────┐ ┌────┴────┐
│ 发送者 │ │ 接收者 │
│ sender1 │ │ recevier1│
│ sender2 │ │ recevier2│
│ sender3 │ └─────────┘
└─────────┘
特点:
- 3 个发送者竞争写入队列
- 2 个接收者竞争读取队列
- 队列满时发送者阻塞
- 队列空时接收者阻塞
- 天然线程安全,无需额外互斥锁
五、消息队列 vs 全局变量+互斥锁
| 方式 | 代码复杂度 | 耦合度 | 实时性 | 适用场景 |
|---|---|---|---|---|
| 全局变量+Mutex | 低 | 高(直接访问) | 立即 | 简单数据共享 |
| 消息队列 | 中 | 低(间接传递) | 有延迟(入队/出队) | 复杂数据、解耦、异步 |
消息队列的优势:
- 发送方和接收方完全解耦,不需要知道对方存在
- 天然线程安全,无需手动加锁
- 支持异步通信,发送方不用等接收方处理完
- 支持多对多通信模型
六、编译与验证
6.1 编译烧录
VSCode 点击 Build → Upload,串口波特率 115200。
6.2 预期输出
[Message Test] osThreadNew(recevier1) success, thread id: 3.
[Message Test] osThreadNew(recevier2) success, thread id: 4.
[Message Test] osThreadNew(sender1) success, thread id: 5.
[Message Test] osThreadNew(sender2) success, thread id: 6.
[Message Test] osThreadNew(sender3) success, thread id: 7.
[Message Test] sender1 send 0 to message queue.
[Message Test] sender2 send 0 to message queue.
[Message Test] sender3 send 0 to message queue.
[Message Test] recevier1 get 0 from sender1 by message queue.
[Message Test] sender1 send 1 to message queue.
[Message Test] recevier2 get 0 from sender2 by message queue.
...
[Message Test] osMessageQueueGetCapacity, capacity: 3.
[Message Test] osMessageQueueGetMsgSize, size: 8.
[Message Test] osMessageQueueGetCount, count: 1.
[Message Test] osMessageQueueGetSpace, space: 2.
七、总结
| 要点 | 内容 |
|---|---|
| 消息队列本质 | 线程安全的 FIFO 缓冲区,传递数据副本 |
| 创建参数 | 容量 + 单条消息大小 |
| 发送 | osMessageQueuePut,满则阻塞 |
| 接收 | osMessageQueueGet,空则阻塞 |
| 多生产者-多消费者 | 天然支持,无需额外同步 |
| 解耦优势 | 发送方和接收方互不依赖 |
| 状态查询 | Capacity / MsgSize / Count / Space |
八、下一步
Day 10 预告:GPIO 控制 LED 闪烁 —— 从软件到硬件控制。
更多推荐


所有评论(0)