鸿蒙进阶:鸿蒙应用集成 MQTT-原理解析与实战开发(基于 @ohos/mqtt)
鸿蒙应用集成 MQTT:原理解析与实战开发(基于 @ohos/mqtt)

MQTT(Message Queuing Telemetry Transport)是物联网领域应用最广泛的通信协议之一,以轻量、低带宽、低功耗的特性适配设备端与服务端的双向通信场景。本文将从 MQTT 核心原理入手,结合鸿蒙(HarmonyOS)系统的@ohos/mqtt API,完整讲解 MQTT 通信机制与鸿蒙应用中实现 MQTT 消息发送的实战过程。
一、MQTT 核心原理深度解析
1.1 MQTT 协议定位与核心设计
MQTT 是基于 TCP/IP 的发布 / 订阅(Publish/Subscribe)模式协议,由 IBM 在 1999 年设计,专为低带宽、高延迟、不可靠网络的物联网场景优化。其核心设计目标:
- 轻量级:协议头最小仅 2 字节,适配嵌入式设备、移动终端等资源受限场景;
- 异步通信:基于发布 / 订阅解耦消息生产者(Publisher)和消费者(Subscriber);
- 可靠性:支持 QoS(Quality of Service)消息质量等级,保障消息交付;
- 断线重连:内置心跳机制(Keep Alive),维护设备与服务器的长连接。
1.2 MQTT 核心架构
MQTT 通信包含三个核心角色:
| 角色 | 作用 |
|---|---|
| 发布者(Publisher) | 消息生产者,向指定主题(Topic)发布消息(如鸿蒙应用、传感器设备) |
| 订阅者(Subscriber) | 消息消费者,订阅感兴趣的主题,接收该主题下的所有消息(如鸿蒙应用、云平台) |
| 代理服务器(Broker) | 核心中转节点,接收发布者消息并推送给所有订阅该主题的订阅者(如 EMQ X、Mosquitto、阿里云 IoT) |
1.3 MQTT 关键概念
(1)主题(Topic)
消息的 “分类标签”,采用层级结构(类似文件路径),支持通配符:
+:匹配单层层级,如sensor/+匹配sensor/temp、sensor/hum,不匹配sensor/temp/room1;#:匹配多层层级(仅能放在末尾),如sensor/#匹配sensor/temp、sensor/temp/room1。
(2)QoS(消息质量等级)
MQTT 定义 3 级 QoS,平衡消息可靠性与传输效率:
- QoS 0:最多一次交付(At most once),消息发送后无需确认,可能丢失(适用于非关键数据,如实时温度采集);
- QoS 1:至少一次交付(At least once),消息需接收方确认,可能重复(适用于关键数据,如设备控制指令);
- QoS 2:恰好一次交付(Exactly once),通过四次握手确保消息仅送达一次(适用于金融、计费等核心场景)。
(3)保留消息(Retained Message)
发布消息时标记为 “保留”,Broker 会存储该主题的最后一条保留消息;新订阅者上线时,会立即收到该保留消息(适用于设备上线后获取最新状态,如设备离线前的最后温度)。
(4)清洁会话(Clean Session)
cleanSession=true:客户端断开连接后,Broker 清除该客户端的订阅关系、未完成的 QoS 消息,重连后需重新订阅;cleanSession=false:Broker 保留客户端的订阅和未完成消息,重连后继续推送(适用于设备断网后恢复通信)。
(5)心跳机制(Keep Alive)
客户端与 Broker 约定心跳间隔(秒),客户端需在间隔内发送心跳包(PINGREQ),Broker 回复 PINGRESP 确认连接;超时未收到心跳,Broker 判定客户端离线,触发断线逻辑。
二、鸿蒙 @ohos/mqtt API 核心能力
OpenHarmony三方库中心仓提供@ohos/mqtt模块,封装 MQTT 协议的核心操作,支持:
- 客户端创建与连接配置;
- 消息发布 / 订阅 / 取消订阅;
- 连接状态监听与断线重连;
- SSL/TLS 加密通信(适配安全场景);
- MQTT 3.1.1/5.0 版本兼容。
核心类 / 接口说明
| 类 / 接口 | 作用 |
|---|---|
MqttClient |
MQTT 客户端核心类,负责创建连接、发送 / 接收消息 |
MqttClientOptions |
客户端初始化配置(URL、ClientId、持久化方式) |
MqttConnectOptions |
连接配置(用户名密码、超时时间、心跳、清洁会话) |
MqttPublishOptions |
发布消息配置(主题、负载、QoS、保留消息) |
MqttSubscribeOptions |
订阅配置(主题、QoS) |
MqttQos |
QoS 枚举(0/1/2) |
MqttMessage |
接收消息封装(主题、负载、QoS 等) |
三、鸿蒙应用 MQTT 实战开发
3.1 开发前置条件
- 鸿蒙 SDK 版本≥API 9(需支持
@ohos/mqtt模块); - 已部署 MQTT Broker(如本地 EMQ X、阿里云 IoT 物联网平台);
- 应用配置网络权限(
module.json5)
{
"module": {
"reqPermissions": [
{
"name": "ohos.permission.INTERNET"
}
]
}
}
3.2 封装 MQTT 工具类(核心代码)
基于单例模式封装 MQTT 工具类,简化连接、发布、订阅操作(完整代码见文末附录),核心逻辑如下:
import {
MqttClient,
MqttClientOptions,
MqttConnectOptions,
MqttPublishOptions,
MqttSubscribeOptions,
MqttResponse,
MqttQos,
MqttMessage
} from '@ohos/mqtt';
// MQTT连接配置接口
export interface MqttConfig {
url: string; // 格式: ip:port
clientId: string;
username?: string;
password?: string;
connectTimeout?: number;
MQTTVersion?: number;
keepAlive?: number;
cleanSession?: boolean;
ssl?: boolean;
sslOptions?: SslOptions;
}
// SSL配置接口
export interface SslOptions {
enableServerCertAuth?: boolean;
trustStore?: string;
}
// MQTT消息回调接口
export interface MqttCallback {
onConnect?: (success: boolean) => void;
onMessage?: (topic: string, message: string) => void;
onDisconnect?: () => void;
onError?: (error: string | MqttResponse) => void;
onSubscribe?: (success: boolean) => void;
onUnsubscribe?: (success: boolean) => void;
onPublish?: (success: boolean) => void;
}
export class MqttUtil {
private static instance: MqttUtil;
private client: MqttClient | null = null;
private config: MqttConfig | null = null;
private callbacks: MqttCallback = {};
private isConnected: boolean = false;
// 获取单例实例
public static getInstance(): MqttUtil {
if (!MqttUtil.instance) {
MqttUtil.instance = new MqttUtil();
}
return MqttUtil.instance;
}
// 初始化MQTT客户端
public init(config: MqttConfig): void {
this.config = config;
// 创建MQTT客户端实例
const clientOptions: MqttClientOptions = {
url: config.url,
clientId: config.clientId,
persistenceType: 1, // 1: 内存持久化, 2: 文件持久化
MQTTVersion: config.MQTTVersion || 5
};
try {
this.client = new MqttClient(clientOptions);
console.info('MQTT客户端创建成功');
} catch (error) {
console.error(`MQTT客户端创建失败: ${JSON.stringify(error)}`);
if (this.callbacks.onError) {
this.callbacks.onError(JSON.stringify(error));
}
}
}
// 设置回调函数
public setCallbacks(callbacks: MqttCallback): void {
this.callbacks = callbacks;
}
// 连接MQTT服务器
public async connect(): Promise<void> {
if (!this.client || !this.config) {
console.error('MQTT客户端未初始化');
if (this.callbacks.onError) {
this.callbacks.onError('MQTT客户端未初始化');
}
return;
}
try {
const connectOptions: MqttConnectOptions = {
userName: this.config.username || '',
password: this.config.password || '',
connectTimeout: this.config.connectTimeout || 10,
MQTTVersion: this.config.MQTTVersion || 5,
keepAliveInterval: this.config.keepAlive || 60,
// cleanSession: this.config.cleanSession || true,
// sslOptions: this.config.sslOptions
};
console.info(`正在连接MQTT服务器: ${this.config.url}`);
// 使用Promise方式连接
await this.client.connect(connectOptions);
this.isConnected = true;
console.info(`MQTT连接成功`);
// 设置消息监听器
this.client.messageArrived((err: Error, data: MqttMessage) => {
if(this.callbacks.onMessage){
this.callbacks.onMessage(data.topic,data.payload);
}
});
if (this.callbacks.onConnect) {
this.callbacks.onConnect(true);
}
} catch (error) {
this.isConnected = false;
console.error(`MQTT连接失败: ${JSON.stringify(error)}`);
if (this.callbacks.onError) {
this.callbacks.onError(error as MqttResponse);
}
if (this.callbacks.onConnect) {
this.callbacks.onConnect(false);
}
}
}
// 断开MQTT连接
public async disconnect(): Promise<void> {
if (!this.client) {
console.error('MQTT客户端未初始化');
return;
}
try {
// 调用disconnect方法
await this.client.disconnect();
this.isConnected = false;
console.info('MQTT连接已断开');
if (this.callbacks.onDisconnect) {
this.callbacks.onDisconnect();
}
} catch (error) {
console.error(`MQTT断开连接失败: ${JSON.stringify(error)}`);
if (this.callbacks.onError) {
this.callbacks.onError(error as MqttResponse);
}
}
}
// 发布消息
public async publish(topic: string, message: string, qos: MqttQos = 0, retained: boolean = false): Promise<void> {
if (!this.client || !this.isConnected) {
console.error('MQTT客户端未连接');
if (this.callbacks.onError) {
this.callbacks.onError('MQTT客户端未连接');
}
return;
}
try {
const publishOptions: MqttPublishOptions = {
topic: topic,
payload: message,
qos: qos,
retained: retained,
properties: {
contentType: "application/json",
userProperties: [
["region", "China"],
["type", "JSON"],
]
},
};
const result = await this.client.publish(publishOptions);
console.info(`MQTT消息发布成功, 主题: ${topic}, 消息: ${message}`);
if (this.callbacks.onPublish) {
this.callbacks.onPublish(true);
}
} catch (error) {
console.error(`MQTT消息发布失败: ${JSON.stringify(error)}`);
if (this.callbacks.onError) {
this.callbacks.onError(error as MqttResponse);
}
if (this.callbacks.onPublish) {
this.callbacks.onPublish(false);
}
}
}
// 订阅主题
public async subscribe(topic: string, qos: MqttQos = 0): Promise<void> {
if (!this.client || !this.isConnected) {
console.error('MQTT客户端未连接');
if (this.callbacks.onError) {
this.callbacks.onError('MQTT客户端未连接');
}
return;
}
try {
const subscribeOptions: MqttSubscribeOptions = {
topic: topic,
qos: qos
};
await this.client.subscribe(subscribeOptions).then((data: MqttResponse) => {
console.log("mqtt subscribe success " + JSON.stringify(data));
console.info(`MQTT订阅主题成功: ${topic}`);
if (this.callbacks.onSubscribe) {
this.callbacks.onSubscribe(true);
}
}).catch((err: MqttResponse) => {
console.log("mqtt subscribe fail " + JSON.stringify(err));
console.error(`MQTT订阅主题失败: ${JSON.stringify(err)}`);
});
} catch (error) {
console.error(`MQTT订阅主题失败: ${JSON.stringify(error)}`);
if (this.callbacks.onError) {
this.callbacks.onError(error as MqttResponse);
}
if (this.callbacks.onSubscribe) {
this.callbacks.onSubscribe(false);
}
}
}
// 取消订阅主题
public async unsubscribe(topic: string): Promise<void> {
if (!this.client || !this.isConnected) {
console.error('MQTT客户端未连接');
if (this.callbacks.onError) {
this.callbacks.onError('MQTT客户端未连接');
}
return;
}
try {
let unOptions: MqttSubscribeOptions = {
topic: topic,
qos: 0
}
await this.client.unsubscribe(unOptions);
console.info(`MQTT取消订阅主题成功: ${topic}`);
if (this.callbacks.onUnsubscribe) {
this.callbacks.onUnsubscribe(true);
}
} catch (error) {
console.error(`MQTT取消订阅主题失败: ${JSON.stringify(error)}`);
if (this.callbacks.onError) {
this.callbacks.onError(error as MqttResponse);
}
if (this.callbacks.onUnsubscribe) {
this.callbacks.onUnsubscribe(false);
}
}
}
// 获取连接状态
public getConnectedState(): boolean {
return this.isConnected;
}
// 获取MQTT客户端实例
public getClient(): MqttClient | null {
return this.client;
}
}
// 导出单例实例
const mqttUtil = MqttUtil.getInstance();
export default mqttUtil;
3.3业务层调用示例
在鸿蒙应用的页面 / 服务中调用 MQTT 工具类,实现连接、发布消息:
import mqttUtil from './MqttUtil';
// 1. 配置MQTT连接参数
const mqttConfig = {
url: 'tcp://192.168.1.100:1883', // EMQ X本地服务器地址
clientId: `harmony_mqtt_${Date.now()}`, // 唯一ClientId
username: 'admin', // Broker认证用户名
password: '123456', // Broker认证密码
connectTimeout: 10,
keepAlive: 60,
cleanSession: true,
MQTTVersion: 5
};
// 2. 设置回调
mqttUtil.setCallbacks({
onConnect: (success) => {
if (success) {
console.info('MQTT连接成功,开始发布消息');
// 3. 连接成功后发布消息
publishTestMessage();
} else {
console.error('MQTT连接失败');
}
},
onMessage: (topic, message) => {
console.info(`收到消息: 主题=${topic}, 内容=${message}`);
},
onError: (error) => {
console.error(`MQTT错误: ${JSON.stringify(error)}`);
}
});
// 3. 初始化并连接
mqttUtil.init(mqttConfig);
mqttUtil.connect();
// 4. 发布测试消息
async function publishTestMessage() {
// 发布JSON格式消息,QoS=1,保留消息=false
const message = JSON.stringify({
deviceId: 'harmony_device_001',
temp: 25.5,
hum: 60,
timestamp: Date.now()
});
await mqttUtil.publish('sensor/environment', message, 1, false);
}
3.4 关键注意事项
- ClientId 唯一性:同一 Broker 下 ClientId 不可重复,否则会导致旧连接被断开(建议拼接时间戳 / 设备 ID);
- 断线重连:可在
onDisconnect回调中实现重连逻辑(需添加重连间隔,避免频繁重试); - SSL 加密:若需安全通信,配置
url: 'ssl://ip:8883',并设置sslOptions验证服务端证书; - 资源释放:应用退后台 / 销毁时,调用
disconnect()断开连接,避免内存泄漏; - QoS 选择:非关键数据用 QoS 0,关键指令用 QoS 1,避免滥用 QoS 2 导致性能损耗。
四、常见问题排查
- 连接失败:检查 Broker 地址 / 端口是否正确、网络权限是否配置、Broker 是否开启认证;
- 消息发布成功但订阅收不到:检查主题是否匹配、QoS 是否兼容、Broker 是否正常转发;
- 心跳超时:调整
keepAlive间隔(建议 30-60 秒),确保网络稳定; - MQTT 5.0 兼容问题:部分 Broker 对 5.0 版本支持不全,可降级为 3.1.1(
MQTTVersion: 3)。
五、总结
MQTT 基于发布 / 订阅模式实现了轻量级、高可靠的物联网通信,而鸿蒙@ohos/mqtt模块为应用层提供了简洁的 API 封装。通过本文的原理解析与实战代码,开发者可快速在鸿蒙应用中实现 MQTT 连接、消息发布 / 订阅等核心能力。实际开发中,需结合业务场景选择合适的 QoS、清洁会话、心跳间隔等参数,确保通信的稳定性与可靠性。
附@ohos/mqtt地址:https://ohpm.openharmony.cn/#/cn/detail/@ohos%2Fmqtt#mqttconnectoptions
更多推荐




所有评论(0)