鸿蒙应用集成 MQTT:原理解析与实战开发(基于 @ohos/mqtt)

MQTT(Message Queuing Telemetry Transport)是物联网领域应用最广泛的通信协议之一,以轻量、低带宽、低功耗的特性适配设备端与服务端的双向通信场景。本文将从 MQTT 核心原理入手,结合鸿蒙(HarmonyOS)系统的@ohos/mqtt API,完整讲解 MQTT 通信机制与鸿蒙应用中实现 MQTT 消息发送的实战过程。

一、MQTT 核心原理深度解析

1.1 MQTT 协议定位与核心设计

MQTT 是基于 TCP/IP 的发布 / 订阅(Publish/Subscribe)模式协议,由 IBM 在 1999 年设计,专为低带宽、高延迟、不可靠网络的物联网场景优化。其核心设计目标:

  • 轻量级:协议头最小仅 2 字节,适配嵌入式设备、移动终端等资源受限场景;
  • 异步通信:基于发布 / 订阅解耦消息生产者(Publisher)和消费者(Subscriber);
  • 可靠性:支持 QoS(Quality of Service)消息质量等级,保障消息交付;
  • 断线重连:内置心跳机制(Keep Alive),维护设备与服务器的长连接。

1.2 MQTT 核心架构

MQTT 通信包含三个核心角色:

角色 作用
发布者(Publisher) 消息生产者,向指定主题(Topic)发布消息(如鸿蒙应用、传感器设备)
订阅者(Subscriber) 消息消费者,订阅感兴趣的主题,接收该主题下的所有消息(如鸿蒙应用、云平台)
代理服务器(Broker) 核心中转节点,接收发布者消息并推送给所有订阅该主题的订阅者(如 EMQ X、Mosquitto、阿里云 IoT)

1.3 MQTT 关键概念

(1)主题(Topic)

消息的 “分类标签”,采用层级结构(类似文件路径),支持通配符:

  • +:匹配单层层级,如sensor/+匹配sensor/tempsensor/hum,不匹配sensor/temp/room1
  • #:匹配多层层级(仅能放在末尾),如sensor/#匹配sensor/tempsensor/temp/room1
(2)QoS(消息质量等级)

MQTT 定义 3 级 QoS,平衡消息可靠性与传输效率:

  • QoS 0:最多一次交付(At most once),消息发送后无需确认,可能丢失(适用于非关键数据,如实时温度采集);
  • QoS 1:至少一次交付(At least once),消息需接收方确认,可能重复(适用于关键数据,如设备控制指令);
  • QoS 2:恰好一次交付(Exactly once),通过四次握手确保消息仅送达一次(适用于金融、计费等核心场景)。
(3)保留消息(Retained Message)

发布消息时标记为 “保留”,Broker 会存储该主题的最后一条保留消息;新订阅者上线时,会立即收到该保留消息(适用于设备上线后获取最新状态,如设备离线前的最后温度)。

(4)清洁会话(Clean Session)
  • cleanSession=true:客户端断开连接后,Broker 清除该客户端的订阅关系、未完成的 QoS 消息,重连后需重新订阅;
  • cleanSession=false:Broker 保留客户端的订阅和未完成消息,重连后继续推送(适用于设备断网后恢复通信)。
(5)心跳机制(Keep Alive)

客户端与 Broker 约定心跳间隔(秒),客户端需在间隔内发送心跳包(PINGREQ),Broker 回复 PINGRESP 确认连接;超时未收到心跳,Broker 判定客户端离线,触发断线逻辑。

二、鸿蒙 @ohos/mqtt API 核心能力

OpenHarmony三方库中心仓提供@ohos/mqtt模块,封装 MQTT 协议的核心操作,支持:

  • 客户端创建与连接配置;
  • 消息发布 / 订阅 / 取消订阅;
  • 连接状态监听与断线重连;
  • SSL/TLS 加密通信(适配安全场景);
  • MQTT 3.1.1/5.0 版本兼容。

核心类 / 接口说明

类 / 接口 作用
MqttClient MQTT 客户端核心类,负责创建连接、发送 / 接收消息
MqttClientOptions 客户端初始化配置(URL、ClientId、持久化方式)
MqttConnectOptions 连接配置(用户名密码、超时时间、心跳、清洁会话)
MqttPublishOptions 发布消息配置(主题、负载、QoS、保留消息)
MqttSubscribeOptions 订阅配置(主题、QoS)
MqttQos QoS 枚举(0/1/2)
MqttMessage 接收消息封装(主题、负载、QoS 等)

三、鸿蒙应用 MQTT 实战开发

3.1 开发前置条件

  1. 鸿蒙 SDK 版本≥API 9(需支持@ohos/mqtt模块);
  2. 已部署 MQTT Broker(如本地 EMQ X、阿里云 IoT 物联网平台);
  3. 应用配置网络权限(module.json5
{
  "module": {
    "reqPermissions": [
      {
        "name": "ohos.permission.INTERNET"
      }
    ]
  }
}

3.2 封装 MQTT 工具类(核心代码)

基于单例模式封装 MQTT 工具类,简化连接、发布、订阅操作(完整代码见文末附录),核心逻辑如下:

import {
  MqttClient,
  MqttClientOptions,
  MqttConnectOptions,
  MqttPublishOptions,
  MqttSubscribeOptions,
  MqttResponse,
  MqttQos,
  MqttMessage
} from '@ohos/mqtt';

// MQTT连接配置接口
export interface MqttConfig {
  url: string; // 格式: ip:port
  clientId: string;
  username?: string;
  password?: string;
  connectTimeout?: number;
  MQTTVersion?: number;
  keepAlive?: number;
  cleanSession?: boolean;
  ssl?: boolean;
  sslOptions?: SslOptions;
}

// SSL配置接口
export interface SslOptions {
  enableServerCertAuth?: boolean;
  trustStore?: string;
}

// MQTT消息回调接口
export interface MqttCallback {
  onConnect?: (success: boolean) => void;
  onMessage?: (topic: string, message: string) => void;
  onDisconnect?: () => void;
  onError?: (error: string | MqttResponse) => void;
  onSubscribe?: (success: boolean) => void;
  onUnsubscribe?: (success: boolean) => void;
  onPublish?: (success: boolean) => void;
}

export class MqttUtil {
  private static instance: MqttUtil;
  private client: MqttClient | null = null;
  private config: MqttConfig | null = null;
  private callbacks: MqttCallback = {};
  private isConnected: boolean = false;

  // 获取单例实例
  public static getInstance(): MqttUtil {
    if (!MqttUtil.instance) {
      MqttUtil.instance = new MqttUtil();
    }
    return MqttUtil.instance;
  }

  // 初始化MQTT客户端
  public init(config: MqttConfig): void {
    this.config = config;

    // 创建MQTT客户端实例
    const clientOptions: MqttClientOptions = {
      url: config.url,
      clientId: config.clientId,
      persistenceType: 1, // 1: 内存持久化, 2: 文件持久化
      MQTTVersion: config.MQTTVersion || 5
    };

    try {
      this.client = new MqttClient(clientOptions);
      console.info('MQTT客户端创建成功');
    } catch (error) {
      console.error(`MQTT客户端创建失败: ${JSON.stringify(error)}`);
      if (this.callbacks.onError) {
        this.callbacks.onError(JSON.stringify(error));
      }
    }
  }

  // 设置回调函数
  public setCallbacks(callbacks: MqttCallback): void {
    this.callbacks = callbacks;
  }

  // 连接MQTT服务器
  public async connect(): Promise<void> {
    if (!this.client || !this.config) {
      console.error('MQTT客户端未初始化');
      if (this.callbacks.onError) {
        this.callbacks.onError('MQTT客户端未初始化');
      }
      return;
    }

    try {
      const connectOptions: MqttConnectOptions = {
        userName: this.config.username || '',
        password: this.config.password || '',
        connectTimeout: this.config.connectTimeout || 10,
        MQTTVersion: this.config.MQTTVersion || 5,
        keepAliveInterval: this.config.keepAlive || 60,
        // cleanSession: this.config.cleanSession || true,
        // sslOptions: this.config.sslOptions
      };

      console.info(`正在连接MQTT服务器: ${this.config.url}`);

      // 使用Promise方式连接
      await this.client.connect(connectOptions);
      this.isConnected = true;
      console.info(`MQTT连接成功`);

      // 设置消息监听器
      this.client.messageArrived((err: Error, data: MqttMessage) => {
        if(this.callbacks.onMessage){
          this.callbacks.onMessage(data.topic,data.payload);
        }
      });

      if (this.callbacks.onConnect) {
        this.callbacks.onConnect(true);
      }
    } catch (error) {
      this.isConnected = false;
      console.error(`MQTT连接失败: ${JSON.stringify(error)}`);
      if (this.callbacks.onError) {
        this.callbacks.onError(error as MqttResponse);
      }
      if (this.callbacks.onConnect) {
        this.callbacks.onConnect(false);
      }
    }
  }

  // 断开MQTT连接
  public async disconnect(): Promise<void> {
    if (!this.client) {
      console.error('MQTT客户端未初始化');
      return;
    }

    try {
      // 调用disconnect方法
      await this.client.disconnect();
      this.isConnected = false;
      console.info('MQTT连接已断开');

      if (this.callbacks.onDisconnect) {
        this.callbacks.onDisconnect();
      }
    } catch (error) {
      console.error(`MQTT断开连接失败: ${JSON.stringify(error)}`);
      if (this.callbacks.onError) {
        this.callbacks.onError(error as MqttResponse);
      }
    }
  }

  // 发布消息
  public async publish(topic: string, message: string, qos: MqttQos = 0, retained: boolean = false): Promise<void> {
    if (!this.client || !this.isConnected) {
      console.error('MQTT客户端未连接');
      if (this.callbacks.onError) {
        this.callbacks.onError('MQTT客户端未连接');
      }
      return;
    }

    try {
      const publishOptions: MqttPublishOptions = {
        topic: topic,
        payload: message,
        qos: qos,
        retained: retained,
        properties: {
          contentType: "application/json",
          userProperties: [
            ["region", "China"],
            ["type", "JSON"],
          ]
        },
      };

      const result = await this.client.publish(publishOptions);
      console.info(`MQTT消息发布成功, 主题: ${topic}, 消息: ${message}`);

      if (this.callbacks.onPublish) {
        this.callbacks.onPublish(true);
      }
    } catch (error) {
      console.error(`MQTT消息发布失败: ${JSON.stringify(error)}`);
      if (this.callbacks.onError) {
        this.callbacks.onError(error as MqttResponse);
      }
      if (this.callbacks.onPublish) {
        this.callbacks.onPublish(false);
      }
    }
  }

  // 订阅主题
  public async subscribe(topic: string, qos: MqttQos = 0): Promise<void> {
    if (!this.client || !this.isConnected) {
      console.error('MQTT客户端未连接');
      if (this.callbacks.onError) {
        this.callbacks.onError('MQTT客户端未连接');
      }
      return;
    }

    try {
      const subscribeOptions: MqttSubscribeOptions = {
        topic: topic,
        qos: qos
      };

      await this.client.subscribe(subscribeOptions).then((data: MqttResponse) => {
        console.log("mqtt subscribe success " + JSON.stringify(data));
        console.info(`MQTT订阅主题成功: ${topic}`);
        if (this.callbacks.onSubscribe) {
          this.callbacks.onSubscribe(true);
        }
      }).catch((err: MqttResponse) => {
        console.log("mqtt subscribe fail " + JSON.stringify(err));
        console.error(`MQTT订阅主题失败: ${JSON.stringify(err)}`);

      });


    } catch (error) {
      console.error(`MQTT订阅主题失败: ${JSON.stringify(error)}`);
      if (this.callbacks.onError) {
        this.callbacks.onError(error as MqttResponse);
      }
      if (this.callbacks.onSubscribe) {
        this.callbacks.onSubscribe(false);
      }
    }
  }

  // 取消订阅主题
  public async unsubscribe(topic: string): Promise<void> {
    if (!this.client || !this.isConnected) {
      console.error('MQTT客户端未连接');
      if (this.callbacks.onError) {
        this.callbacks.onError('MQTT客户端未连接');
      }
      return;
    }


    try {

      let unOptions: MqttSubscribeOptions = {
        topic: topic,
        qos: 0
      }
      await this.client.unsubscribe(unOptions);
      console.info(`MQTT取消订阅主题成功: ${topic}`);

      if (this.callbacks.onUnsubscribe) {
        this.callbacks.onUnsubscribe(true);
      }
    } catch (error) {
      console.error(`MQTT取消订阅主题失败: ${JSON.stringify(error)}`);
      if (this.callbacks.onError) {
        this.callbacks.onError(error as MqttResponse);
      }
      if (this.callbacks.onUnsubscribe) {
        this.callbacks.onUnsubscribe(false);
      }
    }
  }

  // 获取连接状态
  public getConnectedState(): boolean {
    return this.isConnected;
  }

  // 获取MQTT客户端实例
  public getClient(): MqttClient | null {
    return this.client;
  }
}

// 导出单例实例
const mqttUtil = MqttUtil.getInstance();

export default mqttUtil;

3.3业务层调用示例

在鸿蒙应用的页面 / 服务中调用 MQTT 工具类,实现连接、发布消息:

import mqttUtil from './MqttUtil';

// 1. 配置MQTT连接参数
const mqttConfig = {
  url: 'tcp://192.168.1.100:1883', // EMQ X本地服务器地址
  clientId: `harmony_mqtt_${Date.now()}`, // 唯一ClientId
  username: 'admin', // Broker认证用户名
  password: '123456', // Broker认证密码
  connectTimeout: 10,
  keepAlive: 60,
  cleanSession: true,
  MQTTVersion: 5
};

// 2. 设置回调
mqttUtil.setCallbacks({
  onConnect: (success) => {
    if (success) {
      console.info('MQTT连接成功,开始发布消息');
      // 3. 连接成功后发布消息
      publishTestMessage();
    } else {
      console.error('MQTT连接失败');
    }
  },
  onMessage: (topic, message) => {
    console.info(`收到消息: 主题=${topic}, 内容=${message}`);
  },
  onError: (error) => {
    console.error(`MQTT错误: ${JSON.stringify(error)}`);
  }
});

// 3. 初始化并连接
mqttUtil.init(mqttConfig);
mqttUtil.connect();

// 4. 发布测试消息
async function publishTestMessage() {
  // 发布JSON格式消息,QoS=1,保留消息=false
  const message = JSON.stringify({
    deviceId: 'harmony_device_001',
    temp: 25.5,
    hum: 60,
    timestamp: Date.now()
  });
  await mqttUtil.publish('sensor/environment', message, 1, false);
}

3.4 关键注意事项

  1. ClientId 唯一性:同一 Broker 下 ClientId 不可重复,否则会导致旧连接被断开(建议拼接时间戳 / 设备 ID);
  2. 断线重连:可在onDisconnect回调中实现重连逻辑(需添加重连间隔,避免频繁重试);
  3. SSL 加密:若需安全通信,配置url: 'ssl://ip:8883',并设置sslOptions验证服务端证书;
  4. 资源释放:应用退后台 / 销毁时,调用disconnect()断开连接,避免内存泄漏;
  5. QoS 选择:非关键数据用 QoS 0,关键指令用 QoS 1,避免滥用 QoS 2 导致性能损耗。

四、常见问题排查

  1. 连接失败:检查 Broker 地址 / 端口是否正确、网络权限是否配置、Broker 是否开启认证;
  2. 消息发布成功但订阅收不到:检查主题是否匹配、QoS 是否兼容、Broker 是否正常转发;
  3. 心跳超时:调整keepAlive间隔(建议 30-60 秒),确保网络稳定;
  4. MQTT 5.0 兼容问题:部分 Broker 对 5.0 版本支持不全,可降级为 3.1.1(MQTTVersion: 3)。

五、总结

MQTT 基于发布 / 订阅模式实现了轻量级、高可靠的物联网通信,而鸿蒙@ohos/mqtt模块为应用层提供了简洁的 API 封装。通过本文的原理解析与实战代码,开发者可快速在鸿蒙应用中实现 MQTT 连接、消息发布 / 订阅等核心能力。实际开发中,需结合业务场景选择合适的 QoS、清洁会话、心跳间隔等参数,确保通信的稳定性与可靠性。

附@ohos/mqtt地址:https://ohpm.openharmony.cn/#/cn/detail/@ohos%2Fmqtt#mqttconnectoptions

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐