Flutter for OpenHarmony:mqtt_client 连接 MQTT 代理,实现物联网(IoT)设备实时状态监控(轻量级发布订阅协议) 深度解析与鸿蒙适配指南
本文介绍了MQTT协议在OpenHarmony平台上的应用,重点讲解了mqtt_client库的使用方法。文章首先介绍了MQTT的基础概念(Broker、Topic、QoS)和原理图解,然后详细解析了核心API的使用,包括连接建立、消息订阅和发布。通过智能家居控制、环境监测和实时聊天三个典型场景,展示了MQTT在物联网中的实际应用。最后提供了完整的示例代码,演示如何在OpenHarmony应用中实
欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

前言
MQTT (Message Queuing Telemetry Transport) 是一种极轻量级的发布/订阅消息传输协议,广泛应用于物联网(IoT)、移动应用和车载设备。在智能家居控制、设备状态上报等场景中,APP 往往需要实时接收设备发来的消息。
mqtt_client 是 Dart 生态中最流行的 MQTT 客户端库,支持 MQTT 3.1 和 3.1.1 协议。它能够在 OpenHarmony 应用中稳定运行,帮助开发者轻松构建物联网控制端。
一、概念介绍/原理解析
1.1 基础概念
- Broker (代理): 消息的转发服务器(如 EMQX, Mosquitto)。
- Topic (主题): 消息的分类标签(如
home/livingroom/temp)。 - QoS (服务质量):
- 0: 最多一次 (Fire and Forget)
- 1: 至少一次 (Acknowledged delivery)
- 2: 只有一次 (Exact delivery)
1.2 进阶概念
在移动端使用 MQTT,通常需要关注断线重连(Keep Alive)和Clean Session(是否接收离线消息)。
二、核心 API/组件详解
2.1 基础用法
创建客户端并连接。
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart'; // 使用 Server Client
Future<void> connect() async {
final client = MqttServerClient('broker.emqx.io', 'flutter_client_id');
client.logging(on: false);
client.keepAlivePeriod = 20;
try {
await client.connect();
print('已连接');
} catch (e) {
print('连接失败: $e');
client.disconnect();
}
}

2.2 高级定制
处理连接状态回调和订阅消息。
// 设置回调
client.onConnected = () => print('Connected');
client.onDisconnected = () => print('Disconnected');
client.onSubscribed = (topic) => print('Subscribed to $topic');
// 订阅主题
client.subscribe('test/topic', MqttQos.atLeastOnce);
// 监听消息
client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c) {
final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
final pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print('收到消息: $pt');
});

三、常见应用场景
3.1 场景 1:智能家居控制
用户点击开关,发送控制指令;同时订阅设备状态更新 UI。
// 发送指令:打开客厅灯
void turnOnLight() {
final builder = MqttClientPayloadBuilder();
builder.addString('{"state": "ON"}');
client.publishMessage('home/livingroom/light/set', MqttQos.atLeastOnce, builder.payload!);
}

3.2 场景 2:环境数据监测
实时接收温度、湿度传感器上报的数据,并绘制成图表。
client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c) {
final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
final pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
// 更新图表数据
chartData.add(parseSensorData(pt));
});

3.3 场景 3:实时聊天/通知
虽然 IM 通常用 WebSocket,但在低带宽环境下,MQTT 也是不错的选择。
// 订阅个人消息主题
client.subscribe('users/$myUserId/inbox', MqttQos.exactlyOnce);
// 收到消息弹窗
client.updates!.listen((event) {
// 显示通知栏消息
});

四、OpenHarmony 平台适配
4.1 网络权限
"requestPermissions": [
{ "name": "ohos.permission.INTERNET" }
]
4.2 后台保活
MQTT 需要保持长连接心跳。OpenHarmony 系统可能会在后台冻结应用网络。如果是关键业务,可能需要申请长时任务(Continuous Task)或使用推送服务辅助唤醒。
五、完整示例代码
本示例展示一个简单的 MQTT 控制台,连接公共 Broker,订阅并发送消息。
import 'dart:io';
import 'package:flutter/material.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
void main() {
runApp(const MaterialApp(home: MqttPage()));
}
class MqttPage extends StatefulWidget {
const MqttPage({super.key});
State<MqttPage> createState() => _MqttPageState();
}
class _MqttPageState extends State<MqttPage> {
// 使用 EMQX 的公共测试服务器
final client = MqttServerClient('broker.emqx.io', 'ohos_client_${DateTime.now().millisecondsSinceEpoch}');
String _status = '未连接';
final List<String> _messages = [];
final TextEditingController _msgController = TextEditingController();
final String _topic = 'flutter/ohos/test';
Future<void> _connect() async {
setState(() => _status = '连接中...');
client.logging(on: true);
client.keepAlivePeriod = 60;
client.onDisconnected = () => setState(() => _status = '已断开');
client.onConnected = () => setState(() => _status = '已连接');
// 设置连接消息
final connMess = MqttConnectMessage()
.withClientIdentifier('ohos_client')
.withWillTopic('willtopic')
.withWillMessage('My Will message')
.startClean()
.withWillQos(MqttQos.atLeastOnce);
client.connectionMessage = connMess;
try {
await client.connect();
} on NoConnectionException catch (e) {
// 客户端异常
print('Client exception: $e');
client.disconnect();
} on SocketException catch (e) {
// Socket 异常
print('Socket exception: $e');
client.disconnect();
}
if (client.connectionStatus!.state == MqttConnectionState.connected) {
_subscribe();
} else {
setState(() => _status = '连接失败');
client.disconnect();
}
}
void _subscribe() {
client.subscribe(_topic, MqttQos.atLeastOnce);
client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>> c) {
final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
final payload = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
setState(() {
_messages.insert(0, '[收到] $payload');
});
});
}
void _publish() {
final builder = MqttClientPayloadBuilder();
builder.addString(_msgController.text);
client.publishMessage(_topic, MqttQos.exactlyOnce, builder.payload!);
setState(() {
_messages.insert(0, '[发送] ${_msgController.text}');
_msgController.clear();
});
}
void dispose() {
client.disconnect();
super.dispose();
}
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: const Text('MQTT Client Demo')),
body: Column(
children: [
Container(
padding: const EdgeInsets.all(16),
color: _status == '已连接' ? Colors.green[100] : Colors.red[100],
child: Row(
mainAxisAlignment: MainAxisAlignment.spaceBetween,
children: [
Text('状态: $_status', style: const TextStyle(fontWeight: FontWeight.bold)),
if (_status == '未连接' || _status == '已断开' || _status == '连接失败')
ElevatedButton(onPressed: _connect, child: const Text('连接'))
else
ElevatedButton(onPressed: client.disconnect, child: const Text('断开')),
],
),
),
Padding(
padding: const EdgeInsets.all(8.0),
child: Row(
children: [
Expanded(
child: TextField(
controller: _msgController,
decoration: InputDecoration(
labelText: '发送消息到 $_topic',
border: const OutlineInputBorder(),
),
),
),
IconButton(icon: const Icon(Icons.send), onPressed: _status == '已连接' ? _publish : null),
],
),
),
Expanded(
child: ListView.builder(
itemCount: _messages.length,
itemBuilder: (context, index) => ListTile(
title: Text(_messages[index]),
leading: _messages[index].startsWith('[发送]')
? const Icon(Icons.arrow_upward, color: Colors.blue)
: const Icon(Icons.arrow_downward, color: Colors.green),
),
),
),
],
),
);
}
}

六、总结
mqtt_client 提供了灵活且强大的 MQTT 能力。
最佳实践:
- QoS 选择:一般场景使用 QoS 1 即可,QoS 2 开销较大。
- 异常捕获:网络环境复杂,务必捕获
SocketException。 - 资源管理:页面销毁时及时
disconnect,避免内存泄漏。
更多推荐

所有评论(0)