欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

在这里插入图片描述

前言

MQTT (Message Queuing Telemetry Transport) 是一种极轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)、移动应用和车载设备。在智能家居控制、设备状态上报等场景中,APP 往往需要实时接收设备发来的消息。

mqtt_client 是 Dart 生态中最流行的 MQTT 客户端库,支持 MQTT 3.1 和 3.1.1 协议。它能够在 OpenHarmony 应用中稳定运行,帮助开发者轻松构建物联网控制端。

一、概念介绍/原理解析

1.1 基础概念

  • Broker (代理): 消息的转发服务器(如 EMQX, Mosquitto)。
  • Topic (主题): 消息的分类标签(如 home/livingroom/temp)。
  • QoS (服务质量):
    • 0: 最多一次 (Fire and Forget)
    • 1: 至少一次 (Acknowledged delivery)
    • 2: 只有一次 (Exact delivery)

发布: 25°C

转发

订阅主题

呈现

传感器设备

环境主题

MQTT 代理服务器

OpenHarmony 应用

界面显示: 25°C

1.2 进阶概念

在移动端使用 MQTT,通常需要关注断线重连(Keep Alive)和Clean Session(是否接收离线消息)。

二、核心 API/组件详解

2.1 基础用法

创建客户端并连接。

import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart'; // 使用 Server Client

Future<void> connect() async {
  final client = MqttServerClient('broker.emqx.io', 'flutter_client_id');
  client.logging(on: false);
  client.keepAlivePeriod = 20;

  try {
    await client.connect();
    print('已连接');
  } catch (e) {
    print('连接失败: $e');
    client.disconnect();
  }
}

在这里插入图片描述

2.2 高级定制

处理连接状态回调和订阅消息。

// 设置回调
client.onConnected = () => print('Connected');
client.onDisconnected = () => print('Disconnected');
client.onSubscribed = (topic) => print('Subscribed to $topic');

// 订阅主题
client.subscribe('test/topic', MqttQos.atLeastOnce);

// 监听消息
client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c) {
  final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
  final pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
  print('收到消息: $pt');
});

在这里插入图片描述

三、常见应用场景

3.1 场景 1:智能家居控制

用户点击开关,发送控制指令;同时订阅设备状态更新 UI。

// 发送指令:打开客厅灯
void turnOnLight() {
  final builder = MqttClientPayloadBuilder();
  builder.addString('{"state": "ON"}');
  client.publishMessage('home/livingroom/light/set', MqttQos.atLeastOnce, builder.payload!);
}

在这里插入图片描述

3.2 场景 2:环境数据监测

实时接收温度、湿度传感器上报的数据,并绘制成图表。

client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c) {
  final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
  final pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
  
  // 更新图表数据
  chartData.add(parseSensorData(pt));
});

在这里插入图片描述

3.3 场景 3:实时聊天/通知

虽然 IM 通常用 WebSocket,但在低带宽环境下,MQTT 也是不错的选择。

// 订阅个人消息主题
client.subscribe('users/$myUserId/inbox', MqttQos.exactlyOnce);

// 收到消息弹窗
client.updates!.listen((event) {
  // 显示通知栏消息
});

在这里插入图片描述

四、OpenHarmony 平台适配

4.1 网络权限

"requestPermissions": [
  { "name": "ohos.permission.INTERNET" }
]

4.2 后台保活

MQTT 需要保持长连接心跳。OpenHarmony 系统可能会在后台冻结应用网络。如果是关键业务,可能需要申请长时任务(Continuous Task)或使用推送服务辅助唤醒。

五、完整示例代码

本示例展示一个简单的 MQTT 控制台,连接公共 Broker,订阅并发送消息。

import 'dart:io';
import 'package:flutter/material.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

void main() {
  runApp(const MaterialApp(home: MqttPage()));
}

class MqttPage extends StatefulWidget {
  const MqttPage({super.key});

  
  State<MqttPage> createState() => _MqttPageState();
}

class _MqttPageState extends State<MqttPage> {
  // 使用 EMQX 的公共测试服务器
  final client = MqttServerClient('broker.emqx.io', 'ohos_client_${DateTime.now().millisecondsSinceEpoch}');
  String _status = '未连接';
  final List<String> _messages = [];
  final TextEditingController _msgController = TextEditingController();
  final String _topic = 'flutter/ohos/test';

  Future<void> _connect() async {
    setState(() => _status = '连接中...');
    
    client.logging(on: true);
    client.keepAlivePeriod = 60;
    client.onDisconnected = () => setState(() => _status = '已断开');
    client.onConnected = () => setState(() => _status = '已连接');
    
    // 设置连接消息
    final connMess = MqttConnectMessage()
        .withClientIdentifier('ohos_client')
        .withWillTopic('willtopic')
        .withWillMessage('My Will message')
        .startClean()
        .withWillQos(MqttQos.atLeastOnce);
    client.connectionMessage = connMess;

    try {
      await client.connect();
    } on NoConnectionException catch (e) {
      // 客户端异常
      print('Client exception: $e');
      client.disconnect();
    } on SocketException catch (e) {
      // Socket 异常
      print('Socket exception: $e');
      client.disconnect();
    }

    if (client.connectionStatus!.state == MqttConnectionState.connected) {
      _subscribe();
    } else {
      setState(() => _status = '连接失败');
      client.disconnect();
    }
  }

  void _subscribe() {
    client.subscribe(_topic, MqttQos.atLeastOnce);
    client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c) {
      final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
      final payload = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
      
      setState(() {
        _messages.insert(0, '[收到] $payload');
      });
    });
  }

  void _publish() {
    final builder = MqttClientPayloadBuilder();
    builder.addString(_msgController.text);
    client.publishMessage(_topic, MqttQos.exactlyOnce, builder.payload!);
    setState(() {
      _messages.insert(0, '[发送] ${_msgController.text}');
      _msgController.clear();
    });
  }

  
  void dispose() {
    client.disconnect();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: const Text('MQTT Client Demo')),
      body: Column(
        children: [
          Container(
            padding: const EdgeInsets.all(16),
            color: _status == '已连接' ? Colors.green[100] : Colors.red[100],
            child: Row(
              mainAxisAlignment: MainAxisAlignment.spaceBetween,
              children: [
                Text('状态: $_status', style: const TextStyle(fontWeight: FontWeight.bold)),
                if (_status == '未连接' || _status == '已断开' || _status == '连接失败')
                  ElevatedButton(onPressed: _connect, child: const Text('连接'))
                else
                  ElevatedButton(onPressed: client.disconnect, child: const Text('断开')),
              ],
            ),
          ),
          Padding(
            padding: const EdgeInsets.all(8.0),
            child: Row(
              children: [
                Expanded(
                  child: TextField(
                    controller: _msgController,
                    decoration: InputDecoration(
                      labelText: '发送消息到 $_topic',
                      border: const OutlineInputBorder(),
                    ),
                  ),
                ),
                IconButton(icon: const Icon(Icons.send), onPressed: _status == '已连接' ? _publish : null),
              ],
            ),
          ),
          Expanded(
            child: ListView.builder(
              itemCount: _messages.length,
              itemBuilder: (context, index) => ListTile(
                title: Text(_messages[index]),
                leading: _messages[index].startsWith('[发送]') 
                    ? const Icon(Icons.arrow_upward, color: Colors.blue)
                    : const Icon(Icons.arrow_downward, color: Colors.green),
              ),
            ),
          ),
        ],
      ),
    );
  }
}

在这里插入图片描述

六、总结

mqtt_client 提供了灵活且强大的 MQTT 能力。

最佳实践

  1. QoS 选择:一般场景使用 QoS 1 即可,QoS 2 开销较大。
  2. 异常捕获:网络环境复杂,务必捕获 SocketException
  3. 资源管理:页面销毁时及时 disconnect,避免内存泄漏。
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐