目标:掌握 OpenHarmony 轻量系统的消息队列 API,实现多线程间的安全数据传递
前置条件:已完成 Day 8 的信号量教程


一、工程结构

app/
├── BUILD.gn
└── 08_message_queue/             # 模块目录
    ├── BUILD.gn
    └── demo.c                    # 消息队列测试代码

1.1 app/BUILD.gn

import("//build/lite/config/component/lite_component.gni")

lite_component("app") {
  features = [
    "08_message_queue:msgq_demo",  # 引用 08_message_queue 模块
  ]
}

1.2 08_message_queue/BUILD.gn

static_library("msgq_demo") {
    sources = [
        "demo.c"
    ]

    include_dirs = [
        "//utils/native/lite/include",
        "//kernel/liteos_m/components/cmsis/2.0",
    ]
}

二、完整代码详解

2.1 头文件与宏定义

#include <stdio.h>
#include <unistd.h>
#include "ohos_init.h"
#include "cmsis_os2.h"

#define STACK_SIZE      (1024)    // 线程栈大小
#define DELAY_TICKS_3   (3)       // 接收线程延时
#define DELAY_TICKS_5   (5)       // 发送线程延时
#define DELAY_TICKS_20  (20)      // 主线程等待时间
#define DELAY_TICKS_80  (80)      // 主线程终止前等待

#define QUEUE_SIZE      (3)       // 消息队列容量(最多缓存3条消息)

2.2 消息结构体

// 定义消息格式
typedef struct {
    osThreadId_t tid;    // 发送者线程 ID
    int count;           // 消息序号
} message_entry;

osMessageQueueId_t qid;  // 全局消息队列 ID

关键:消息队列传递的是数据副本,不是指针。发送方把 message_entry 复制到队列,接收方从队列复制出来。

2.3 发送线程(多生产者)

void sender_thread(void)
{
    static int count = 0;           // 静态变量,记录发送次数
    message_entry sentry;           // 消息结构体

    while (1) {
        // 1. 填充消息内容
        sentry.tid = osThreadGetId();                          // 记录发送者 ID
        sentry.count = count;                                   // 记录序号

        printf("[Message Test] %s send %d to message queue.\r\n",
               osThreadGetName(osThreadGetId()), count);

        // 2. 发送消息到队列
        // 参数:队列ID, 数据指针, 优先级(0), 超时时间
        osMessageQueuePut(qid, (const void *)&sentry, 0, osWaitForever);

        count++;                                                // 序号递增
        osDelay(DELAY_TICKS_5);                                 // 延时 5 tick
    }
}

发送流程

填充消息 → osMessageQueuePut → 队列满则阻塞等待 → 发送成功继续

2.4 接收线程(多消费者)

void receiver_thread(void)
{
    message_entry rentry;           // 接收缓冲区

    while (1) {
        // 1. 从队列获取消息
        // 参数:队列ID, 接收缓冲区, 优先级指针(NULL), 超时时间
        osMessageQueueGet(qid, (void *)&rentry, NULL, osWaitForever);

        // 2. 处理消息
        printf("[Message Test] %s get %d from %s by message queue.\r\n",
               osThreadGetName(osThreadGetId()),           // 接收者名字
               rentry.count,                                 // 消息序号
               osThreadGetName(rentry.tid));                // 发送者名字

        osDelay(DELAY_TICKS_3);                             // 处理延时 3 tick
    }
}

接收流程

osMessageQueueGet → 队列空则阻塞等待 → 获取消息 → 处理消息

2.5 线程创建辅助函数

osThreadId_t newThread(char *name, osThreadFunc_t func, char *arg)
{
    osThreadAttr_t attr = {
        name,                   // 线程名称
        0,                      // 属性位
        NULL,                   // 控制块内存
        0,                      // 控制块大小
        NULL,                   // 栈内存
        STACK_SIZE * 2,         // 栈大小 2048 字节
        osPriorityNormal,       // 普通优先级
        0,                      // 保留
        0                       // 保留
    };

    osThreadId_t tid = osThreadNew(func, (void *)arg, &attr);
    if (tid == NULL) {
        printf("[Message Test] osThreadNew(%s) failed.\r\n", name);
    } else {
        printf("[Message Test] osThreadNew(%s) success, thread id: %d.\r\n", name, tid);
    }
    return tid;
}

2.6 主控制线程

void rtosv2_msgq_main(void)
{
    // 1. 创建消息队列
    // 参数:队列容量, 单个消息大小, 属性(NULL=默认)
    qid = osMessageQueueNew(QUEUE_SIZE, sizeof(message_entry), NULL);

    // 2. 创建 2 个接收者 + 3 个发送者(多生产者-多消费者模型)
    osThreadId_t ctid1 = newThread("recevier1", receiver_thread, NULL);
    osThreadId_t ctid2 = newThread("recevier2", receiver_thread, NULL);
    osThreadId_t ptid1 = newThread("sender1", sender_thread, NULL);
    osThreadId_t ptid2 = newThread("sender2", sender_thread, NULL);
    osThreadId_t ptid3 = newThread("sender3", sender_thread, NULL);

    // 3. 等待队列运行一段时间
    osDelay(DELAY_TICKS_20);

    // 4. 查询队列状态
    uint32_t cap = osMessageQueueGetCapacity(qid);
    printf("[Message Test] osMessageQueueGetCapacity, capacity: %u.\r\n", cap);

    uint32_t msg_size = osMessageQueueGetMsgSize(qid);
    printf("[Message Test] osMessageQueueGetMsgSize, size: %u.\r\n", msg_size);

    uint32_t count = osMessageQueueGetCount(qid);
    printf("[Message Test] osMessageQueueGetCount, count: %u.\r\n", count);

    uint32_t space = osMessageQueueGetSpace(qid);
    printf("[Message Test] osMessageQueueGetSpace, space: %u.\r\n", space);

    // 5. 继续运行一段时间
    osDelay(DELAY_TICKS_80);

    // 6. 清理资源
    osThreadTerminate(ctid1);
    osThreadTerminate(ctid2);
    osThreadTerminate(ptid1);
    osThreadTerminate(ptid2);
    osThreadTerminate(ptid3);
    osMessageQueueDelete(qid);
}

2.7 系统入口

static void MessageTestTask(void)
{
    osThreadAttr_t attr = {
        .name = "rtosv2_msgq_main",
        .attr_bits = 0U,
        .cb_mem = NULL,
        .cb_size = 0U,
        .stack_mem = NULL,
        .stack_size = STACK_SIZE,
        .priority = osPriorityNormal,
    };

    if (osThreadNew((osThreadFunc_t)rtosv2_msgq_main, NULL, &attr) == NULL) {
        printf("[MessageTestTask] Failed to create rtosv2_msgq_main!\n");
    }
}

APP_FEATURE_INIT(MessageTestTask);

三、消息队列 API 详解

3.1 osMessageQueueNew — 创建队列

osMessageQueueId_t osMessageQueueNew(uint32_t msg_count, uint32_t msg_size, const osMessageQueueAttr_t *attr);
参数 说明
msg_count 队列容量(最多缓存多少条消息)
msg_size 单条消息的字节大小
attr 属性,NULL 为默认

本例osMessageQueueNew(3, sizeof(message_entry), NULL)

参数 含义
msg_count 3 最多缓存 3 条消息,第 4 条发送会阻塞
msg_size sizeof(message_entry) 每条消息大小为结构体大小

3.2 osMessageQueuePut — 发送消息

osStatus_t osMessageQueuePut(osMessageQueueId_t mq_id, const void *msg_ptr, uint8_t msg_prio, uint32_t timeout);
参数 说明
mq_id 队列 ID
msg_ptr 消息数据指针(内容会被复制到队列)
msg_prio 消息优先级(0 = 不区分优先级)
timeout 超时时间,osWaitForever = 永久等待

阻塞行为

  • 队列未满:立即复制数据,返回 osOK
  • 队列已满:阻塞等待,直到有空间或超时

3.3 osMessageQueueGet — 接收消息

osStatus_t osMessageQueueGet(osMessageQueueId_t mq_id, void *msg_ptr, uint8_t *msg_prio, uint32_t timeout);
参数 说明
mq_ptr 接收缓冲区指针(数据从队列复制到这里)
msg_prio 输出参数,获取消息的优先级(NULL = 不关心)
timeout 超时时间

阻塞行为

  • 队列非空:立即取出数据,返回 osOK
  • 队列已空:阻塞等待,直到有消息或超时

3.4 状态查询 API

API 功能
osMessageQueueGetCapacity 获取队列总容量
osMessageQueueGetMsgSize 获取单条消息大小
osMessageQueueGetCount 获取当前队列中的消息数
osMessageQueueGetSpace 获取剩余可用空间

3.5 osMessageQueueDelete — 删除队列

osStatus_t osMessageQueueDelete(osMessageQueueId_t mq_id);

四、多生产者-多消费者模型

┌─────────────────────────────────────────┐
│         消息队列(Queue Size = 3)        │
│  ┌─────┐ ┌─────┐ ┌─────┐              │
│  │msg0 │ │msg1 │ │msg2 │              │
│  └─────┘ └─────┘ └─────┘              │
│   写指针→              ←读指针           │
│  [满: Put阻塞]  [空: Get阻塞]           │
└─────────────────────────────────────────┘
        ↑                    ↓
   ┌────┴────┐          ┌────┴────┐
   │ 发送者  │          │ 接收者  │
   │ sender1 │          │ recevier1│
   │ sender2 │          │ recevier2│
   │ sender3 │          └─────────┘
   └─────────┘

特点:
- 3 个发送者竞争写入队列
- 2 个接收者竞争读取队列
- 队列满时发送者阻塞
- 队列空时接收者阻塞
- 天然线程安全,无需额外互斥锁

五、消息队列 vs 全局变量+互斥锁

方式 代码复杂度 耦合度 实时性 适用场景
全局变量+Mutex 高(直接访问) 立即 简单数据共享
消息队列 低(间接传递) 有延迟(入队/出队) 复杂数据、解耦、异步

消息队列的优势

  • 发送方和接收方完全解耦,不需要知道对方存在
  • 天然线程安全,无需手动加锁
  • 支持异步通信,发送方不用等接收方处理完
  • 支持多对多通信模型

六、编译与验证

6.1 编译烧录

VSCode 点击 BuildUpload,串口波特率 115200

6.2 预期输出

[Message Test] osThreadNew(recevier1) success, thread id: 3.
[Message Test] osThreadNew(recevier2) success, thread id: 4.
[Message Test] osThreadNew(sender1) success, thread id: 5.
[Message Test] osThreadNew(sender2) success, thread id: 6.
[Message Test] osThreadNew(sender3) success, thread id: 7.
[Message Test] sender1 send 0 to message queue.
[Message Test] sender2 send 0 to message queue.
[Message Test] sender3 send 0 to message queue.
[Message Test] recevier1 get 0 from sender1 by message queue.
[Message Test] sender1 send 1 to message queue.
[Message Test] recevier2 get 0 from sender2 by message queue.
...
[Message Test] osMessageQueueGetCapacity, capacity: 3.
[Message Test] osMessageQueueGetMsgSize, size: 8.
[Message Test] osMessageQueueGetCount, count: 1.
[Message Test] osMessageQueueGetSpace, space: 2.

七、总结

要点 内容
消息队列本质 线程安全的 FIFO 缓冲区,传递数据副本
创建参数 容量 + 单条消息大小
发送 osMessageQueuePut,满则阻塞
接收 osMessageQueueGet,空则阻塞
多生产者-多消费者 天然支持,无需额外同步
解耦优势 发送方和接收方互不依赖
状态查询 Capacity / MsgSize / Count / Space

八、下一步

Day 10 预告:GPIO 控制 LED 闪烁 —— 从软件到硬件控制。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐