Flutter+OpenHarmony智能家居开发Day8|MQTT实时通信+设备状态双向同步全流程
实现 MQTT Broker 连接参数的统一管理,避免硬编码,同时解决鸿蒙系统中的连接参数适配问题,为后续 MQTT 工具类封装奠定基础。
Flutter+OpenHarmony 智能家居开发 Day8|MQTT 实时通信 + 设备状态双向同步全流程
欢迎加入开源鸿蒙跨平台社区: https://openharmonycrossplatform.csdn.net
大家好~ 咱们接着前 7 天的开发节奏,继续推进智能家居 APP 的开发!经过 Day7 的实操,我们已经搞定了 ObjectBox 本地持久化和离线设备管理,终于解决了 “断网可用” 的痛点 —— 现在 APP 就算没网,也能正常查看、筛选已保存的设备。但新的问题马上就来了,不知道大家有没有发现:设备状态没法实时同步。
举个很直观的例子:你在客厅用手机 APP 打开了灯光,卧室的平板端 APP 还是显示灯光关闭;如果设备突然断电、离线,APP 也不会及时提醒,只能手动刷新或者等后端定时推送。这显然不符合智能家居的核心体验,所以 Day8 我们就专门解决这个问题,核心就是集成 MQTT 协议,实现设备与 APP、APP 与后端的实时双向通信,打通 “本地 - 设备 - 后端” 的状态同步链路,让设备操作、状态变化能够毫秒级反馈,同时结合 Day7 的本地持久化能力,实现 “实时同步 + 离线兜底” 的双重保障。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级、低带宽、低延迟的发布 / 订阅模式通信协议,非常适合物联网(IoT)场景 —— 智能家居设备大多带宽有限、算力较低,MQTT 的 “轻量特性” 能完美适配,同时其发布 / 订阅模式可以轻松实现 “一对多”“多对多” 的实时通信(如一个设备状态更新,多端 APP 同时接收),这也是它成为智能家居实时通信首选协议的核心原因。
本文全程适配 CSDN 发布规范,所有代码块用 CSDN 兼容格式(``` 标注语言),无需修改直接复制就能显示高亮,延续前 7 天「逐步骤拆解 + 逐行代码解析 + 全场景踩坑解决」的风格,拒绝空谈理论,全程围绕 “实际开发落地” 展开:从 MQTT 协议基础、依赖集成、工具类封装,到 MQTT 连接管理、消息发布 / 订阅、设备状态双向同步,再到鸿蒙多终端适配、异常处理、高可用优化,每一步都包含「为什么做、怎么做、可能出什么问题、怎么解决」,所有代码均可直接复制复用,兼顾新手友好性和实际开发实用性。
本文适合 Flutter+OpenHarmony 跨平台开发学习者、智能家居 APP 开发者,尤其适合需要实现设备实时通信的开发者。无论你是刚跟随系列教程入门,还是有一定的物联网开发经验,都能从本文中获取完整的 MQTT 集成方案、状态同步逻辑,以及贴合鸿蒙生态的适配技巧。话不多说,我们正式开启 Day8 的详细开发之旅!
一、Day8 核心目标(明确方向,不走弯路)
Day8 的开发基于 Day3-Day7 已完成的项目架构,核心是在原有数据层、业务层中集成 MQTT 实时通信能力,衔接 Day7 的本地持久化逻辑,实现设备状态的双向实时同步,所有开发工作围绕以下核心目标展开,确保不偏离 “实时性、稳定性、兼容性” 的核心诉求:
- 理解 MQTT 协议核心概念(发布 / 订阅、主题、客户端、 Broker),掌握智能家居场景下的 MQTT 主题设计规范;
- 集成 MQTT 核心依赖(mqtt_client),完成基础配置,解决鸿蒙系统下的依赖兼容、编译报错问题;
- 封装 MQTT 全局工具类(单例模式),实现连接、断开、重连、发布消息、订阅主题、取消订阅等核心功能;
- 设计智能家居专属 MQTT 主题体系,规范设备控制、状态上报、异常通知等场景的主题格式,确保通信有序;
- 实现 MQTT 消息格式封装与解析,兼容后端、设备的消息协议,关联 Day3 定义的设备模型,实现消息与设备数据的无缝映射;
- 封装业务层 MQTT 逻辑,实现「MQTT 连接状态管理」,支持自动连接、断网重连、后台保活(适配鸿蒙多终端);
- 实现设备状态双向同步:
- 正向同步(APP→设备):APP 发送控制指令(如开灯)→ MQTT 发布消息 → 设备接收指令并执行 → 设备上报执行结果;
- 反向同步(设备→APP):设备主动上报状态(如温度变化、离线)→ MQTT 订阅消息 → APP 接收消息并更新本地数据库 + UI;
- 衔接 Day7 的 ObjectBox 本地持久化,实现「实时消息落地本地」,确保断网后重新联网时,未接收的消息能正常同步、状态不丢失;
- 处理 MQTT 通信全场景异常:连接失败、消息发送失败、消息解析失败、重连失败、设备无响应等,给出友好提示并兜底;
- 完成 OpenHarmony 专属适配:解决 DAYU200 开发板 MQTT 连接稳定性、鸿蒙手机后台保活权限、平板端消息接收延迟等问题;
- 优化 MQTT 通信性能:实现消息防抖、重复消息过滤、心跳包参数优化,确保 100 + 台设备同时通信无卡顿、无消息丢失;
- 实现多终端测试验证:在模拟器、OpenHarmony 手机、DAYU200 开发板上验证 MQTT 连接、消息收发、状态同步全场景,确保实时性和稳定性;
- 规范代码结构,将 MQTT 逻辑与原有网络逻辑、本地数据库逻辑解耦,为后续 Day9 的批量控制、场景联动奠定基础。
二、前置准备(衔接前文,避免脱节)
Day8 的开发紧密依赖 Day3-Day7 已完成的项目架构,核心是集成 MQTT 实时通信能力,衔接 Day7 的本地持久化逻辑,复用 Day3 的设备模型、Day6 的搜索筛选逻辑,因此在开始开发前,需先确认前置环境和已有代码是否就绪,避免开发过程中出现衔接问题。
2.1 已有架构确认(重点衔接)
我们先回顾 Day3-Day7 已完成的核心代码和架构,确保所有依赖和基础组件都能支撑 Day8 的 MQTT 开发:
- 数据层:已完成 BaseDevice 及子类数据模型(空调 / 灯光 / 窗帘)、HttpClient 网络工具、DeviceRepository 仓库(网络请求 + 本地 CRUD)、ObjectBox 本地数据库工具类,可直接复用设备模型和本地持久化能力;
- 业务层:已封装 DeviceProvider 状态管理,管理设备列表、控制状态、搜索筛选状态,支持乐观更新、并发控制,可扩展用于管理 MQTT 连接状态;
- UI 层:已完成设备列表、设备卡片、搜索框、筛选栏等组件,支持多终端布局适配,可扩展用于显示 MQTT 连接状态、实时消息提示;
- 工具类:已封装防抖工具、全局异常处理器、日志工具、设备类型判断工具、依赖注入工具(getIt),可直接复用;
- 鸿蒙适配:已配置网络权限、存储权限、多终端触控适配、鸿蒙原生桥接基础,可在此基础上扩展 MQTT 所需的后台保活权限;
- 核心能力:已实现设备网络请求、本地持久化、离线管理,Day8 只需在此基础上添加 MQTT 实时通信,实现 “实时 + 离线” 双重保障。
2.2 依赖检查与补充(核心:MQTT 适配)
Day8 的核心新增依赖是「MQTT 客户端依赖」,我们选用主流的mqtt_client(支持 Flutter 跨平台,已完成鸿蒙适配),同时需要确认已有依赖是否正确配置,避免出现依赖冲突导致的编译报错。
2.2.1 新增 MQTT 核心依赖
在 pubspec.yaml 中添加 mqtt_client 相关依赖,重点注意 OpenHarmony 适配版本,确保依赖版本与 Flutter、鸿蒙 SDK 版本兼容,直接复制以下配置到你的 pubspec.yaml 中即可:
yaml
dependencies:
flutter:
sdk: flutter
# 原有依赖(Day3-Day7,无需修改)
dio: ^5.4.0 # 网络请求框架
json_annotation: ^4.8.1 # JSON解析注解
provider: ^6.1.1 # 状态管理
get_it: ^7.2.0 # 依赖注入
logger: ^1.1.0 # 日志工具
device_info_plus: ^9.1.0 # 设备信息获取(判断鸿蒙系统)
flutter_windowmanager: ^0.2.0 # 窗口管理(适配多终端)
permission_handler: ^11.0.1 # 动态权限申请(兼容鸿蒙)
shared_preferences: ^2.2.2 # 轻量存储(用于保存同步状态)
objectbox: ^2.0.0 # ObjectBox核心库
objectbox_flutter_libs: ^2.0.0 # ObjectBox Flutter插件(含鸿蒙适配)
connectivity_plus: ^5.0.2 # 网络状态判断(Day7新增,用于MQTT重连)
# 新增:MQTT核心依赖(适配OpenHarmony,直接复制)
mqtt_client: ^10.0.0 # MQTT客户端核心库(支持鸿蒙)
mqtt_shared: ^2.0.0 # MQTT共享订阅(可选,用于多端同步)
crypto: ^3.0.3 # 加密工具(用于MQTT客户端ID加密、消息加密)
dev_dependencies:
flutter_test:
sdk: flutter
# 原有开发依赖(无需修改)
json_serializable: ^6.7.1 # JSON解析代码生成
build_runner: ^2.4.6 # 代码生成工具
flutter_lints: ^2.0.0 # 代码规范检查
objectbox_generator: ^2.0.0 # ObjectBox实体代码生成
2.2.2 执行依赖安装命令
在项目根目录执行以下命令,确保所有依赖(原有 + 新增)都能正常加载,重点注意 mqtt_client 的鸿蒙编译依赖是否下载成功,命令直接复制执行即可:
bash
运行
flutter pub get
安装完成后,可在项目的.pub-cache 文件夹中查看 mqtt_client 的版本,确认是否为 ^10.0.0(鸿蒙适配版本);如果安装失败,先执行flutter clean清除缓存,再重新执行flutter pub get。
2.2.3 可能出现的问题与解决方案(高频踩坑)
这一步很多同学会遇到鸿蒙适配相关的报错,这里提前整理好所有高频问题,对照排查即可,不用浪费时间找解决方案:
表格
| 问题场景 | 根因 | 解决方案 |
|---|---|---|
| mqtt_client 依赖安装失败,提示 “OpenHarmony 平台不支持” | mqtt_client 版本过低,未适配 OpenHarmony 系统 | 确认依赖版本为 ^10.0.0(官方适配鸿蒙的最低版本),执行flutter clean清除缓存后重新执行flutter pub get;若仍失败,从 OpenHarmony 社区下载 mqtt_client 鸿蒙适配源码手动集成 |
| 执行 pub get 后,提示 “crypto 依赖冲突” | 其他依赖(如 dio)也引入了 crypto,版本不兼容 | 在 pubspec.yaml 中指定 crypto 版本为 ^3.0.3,与 mqtt_client 要求的版本一致;若仍冲突,执行flutter pub deps查看依赖树,排除冲突依赖 |
| 鸿蒙开发板编译失败,提示 “找不到 mqtt_client 相关库” | mqtt_client 的鸿蒙编译产物未正确生成 | 1. 确认 Flutter for OpenHarmony 插件已更新至最新版本;2. 重新执行flutter pub get后,执行flutter build ohos生成鸿蒙编译产物;3. 检查 DevEco Studio 的依赖配置,确保引入了 mqtt_client 的鸿蒙库 |
| 与 connectivity_plus 冲突,提示 “网络状态监听异常” | 两个依赖同时监听网络状态,导致端口占用 | 封装统一的网络状态监听工具类,复用 connectivity_plus 的监听逻辑,让 MQTT 工具类通过该工具类获取网络状态,避免重复监听 |
| 执行代码生成命令时,提示 “mqtt_client 相关类未找到” | 依赖未安装成功,或未重新生成代码 | 1. 确认 mqtt_client 依赖安装成功;2. 重新执行flutter pub run build_runner build --delete-conflicting-outputs |
2.3 MQTT 基础概念前置(必懂,避免开发踩坑)
在开始开发前,必须先掌握 MQTT 协议的核心概念,尤其是智能家居场景下的应用方式,否则后续主题设计、消息收发会出现逻辑混乱,这里用最通俗的语言讲解,不用死记硬背,理解即可:
- MQTT Broker(消息代理):核心中转节点,相当于 “消息邮局”——APP 发送消息先给 Broker,设备发送消息也先给 Broker,Broker 再把消息转发给所有需要接收的终端。智能家居场景中,通常由后端部署 Broker(推荐 EMQ X、Mosquitto,开源免费,支持鸿蒙适配),APP、设备均作为客户端连接到 Broker。
- MQTT Client(客户端):发起连接、发布消息、订阅消息的终端,本文中 Flutter APP 就是一个客户端,每个智能家居设备(空调、灯光)也是一个客户端;每个客户端必须有唯一的 Client ID,Broker 通过 Client ID 区分不同终端。
- 主题(Topic):消息的分类标识,类似 “文件夹路径”,比如 “/smart_home/device/123/control”,客户端通过订阅同一主题接收消息,通过发布到指定主题发送消息。比如 APP 要控制设备 123,就往该设备的控制主题发消息,设备 123 订阅自己的控制主题,就能收到指令。
- 发布(Publish):客户端向 Broker 发送消息的操作,需指定主题和消息内容,比如 APP 向设备 123 的控制主题发布 “开灯” 指令,就是发布操作。
- 订阅(Subscribe):客户端向 Broker 申请接收某一主题消息的操作,比如 APP 订阅设备 123 的状态主题,设备 123 上报状态时,APP 就能实时收到。
- QoS(服务质量):消息传输的可靠性等级,智能家居场景推荐这样搭配:
- QoS 1(至少一次送达):用于控制指令(如开灯、调节温度),确保指令不丢失;
- QoS 0(最多一次送达):用于非关键消息(如设备状态上报),节省带宽;
- 客户端 ID(Client ID):每个客户端的唯一标识,必须保证不重复(比如 APP 的 Client ID 可以是 “app_设备唯一标识”,设备的 Client ID 可以是 “device_设备 ID”),否则会导致连接冲突、消息发送错误。
- 心跳包(Keep Alive):连接保活机制,客户端定期向 Broker 发送心跳消息,告诉 Broker “我还在线”;如果 Broker 长时间未收到心跳,会断开连接;反之,客户端长时间未收到 Broker 的响应,也会触发重连。
- 重连机制:客户端与 Broker 断开连接后,自动重新发起连接的逻辑,是保证 MQTT 通信稳定性的核心,必须处理断网、Broker 重启、网络波动等场景。
2.4 主题设计规范(核心,确保通信有序)
主题设计是 MQTT 通信的核心,若主题格式混乱,会导致消息收发错误、多设备干扰等问题,后续修改起来非常麻烦。结合智能家居场景,我们设计统一的主题规范,所有主题均以 “/smart_home” 为根路径,清晰区分不同设备、不同操作的消息,支持多终端、多设备同时通信,直接复用这套规范即可,不用自己设计。
核心设计原则
根路径→模块→设备 ID→操作类型,比如 “/smart_home/device/ 设备 ID/control”,简洁清晰,便于维护和扩展。
2.4.1 设备控制主题(APP→设备)
用于 APP 向设备发送控制指令(如开灯、调节温度),主题格式(直接复用):
text
/smart_home/device/{deviceId}/control
参数说明:
- {deviceId}:设备唯一 ID(与后端、本地数据库中的 deviceId 一致),比如 “dev_123456”;
- 示例:
/smart_home/device/dev_123456/control(向 ID 为 dev_123456 的设备发送控制指令)。
2.4.2 设备状态上报主题(设备→APP / 后端)
用于设备向 APP、后端上报自身状态(如开启 / 关闭、温度变化、在线 / 离线),主题格式(直接复用):
text
/smart_home/device/{deviceId}/state
参数说明:
- {deviceId}:设备唯一 ID;
- 示例:
/smart_home/device/dev_123456/state(ID 为 dev_123456 的设备上报状态)。
2.4.3 设备异常通知主题(设备→APP / 后端)
用于设备向 APP、后端上报异常状态(如故障、断电、网络异常),主题格式(直接复用):
text
/smart_home/device/{deviceId}/alert
参数说明:
- {deviceId}:设备唯一 ID;
- 示例:
/smart_home/device/dev_123456/alert(ID 为 dev_123456 的设备上报异常)。
2.4.4 全局广播主题(后端→所有 APP / 设备)
用于后端向所有 APP、设备发送全局通知(如系统升级、批量控制指令),主题格式(直接复用):
text
/smart_home/global/broadcast
2.4.5 主题设计注意事项(必看)
- 主题区分大小写(如 “/control” 与 “/Control” 是两个不同主题),开发时必须严格遵循规范,不能写错;
- 设备 ID 必须唯一,避免多个设备使用同一主题,导致消息干扰;
- 主题路径不宜过长(建议不超过 5 级),避免影响消息传输效率;
- APP 需订阅所有已绑定设备的「状态上报主题」和「异常通知主题」,以及全局广播主题;
- 设备只需订阅自身的「控制主题」,发布自身的「状态上报主题」和「异常通知主题」;
- 所有主题统一由常量类管理(后续步骤会封装),避免手动拼接主题出现错误。
2.5 消息格式设计(核心,兼容设备与后端)
MQTT 消息内容采用 JSON 格式(轻量、易解析,兼容设备和后端),结合 Day3 定义的设备模型,设计统一的消息格式,分为「控制消息」「状态消息」「异常消息」三类,确保 APP、设备、后端能正常解析,直接复用这套格式,不用修改。
2.5.1 控制消息格式(APP→设备)
用于 APP 向设备发送控制指令,核心包含设备 ID、控制参数、时间戳等信息,格式如下(直接复制复用):
json
{
"deviceId": "dev_123456", // 设备唯一ID(必传)
"controlParams": { // 控制参数(根据设备类型不同,参数不同)
"status": "on", // 设备开关状态(所有设备通用)
"temperature": 26, // 空调专属参数(可选)
"brightness": 80, // 灯光专属参数(可选)
"fanSpeed": "high" // 空调/窗帘专属参数(可选)
},
"timestamp": 1758067200000, // 消息发送时间戳(毫秒,必传)
"clientId": "app_flutter_789", // 发送端客户端ID(APP的Client ID,必传)
"qos": 1 // 消息服务质量(必传,推荐1)
}
说明:
- controlParams 根据设备类型动态变化,比如灯光设备可传 brightness(亮度)、color(颜色),窗帘设备可传 position(开合度);
- timestamp 用于消息去重、排序,避免重复执行控制指令;
- clientId 用于设备识别发送端(区分 APP、其他设备)。
2.5.2 状态消息格式(设备→APP / 后端)
用于设备向 APP、后端上报自身状态,核心包含设备 ID、当前状态、参数、时间戳等信息,格式如下(直接复制复用):
json
{
"deviceId": "dev_123456", // 设备唯一ID(必传)
"state": { // 设备当前状态(所有设备通用参数)
"status": "on", // 开关状态(on/off)
"online": true, // 在线状态(true/false)
"lastActiveTime": "2026-08-18 10:00:00" // 最后活动时间(必传)
},
"params": { // 设备专属参数(根据设备类型不同,参数不同)
"temperature": 26, // 空调专属
"mode": "cool", // 空调专属
"brightness": 80 // 灯光专属
},
"timestamp": 1758067201000, // 消息发送时间戳(毫秒,必传)
"clientId": "device_123456", // 发送端客户端ID(设备的Client ID,必传)
"qos": 0 // 消息服务质量(可选,推荐0)
}
2.5.3 异常消息格式(设备→APP / 后端)
用于设备向 APP、后端上报异常状态,核心包含设备 ID、异常类型、异常描述等信息,格式如下(直接复制复用):
json
{
"deviceId": "dev_123456", // 设备唯一ID(必传)
"alertType": "power_failure", // 异常类型(power_failure:断电,fault:故障,network_error:网络异常)
"alertDesc": "设备断电,请检查电源", // 异常描述(必传)
"timestamp": 1758067202000, // 消息发送时间戳(毫秒,必传)
"clientId": "device_123456", // 发送端客户端ID(设备的Client ID,必传)
"qos": 1 // 消息服务质量(必传,推荐1)
}
2.6 开发环境确认(多终端适配前提)
确认开发环境已就绪,确保后续 MQTT 集成和实时通信功能开发完成后,能快速进行多终端测试,重点确认鸿蒙开发环境的网络权限和后台保活权限配置,避免测试时出现权限报错:
- Flutter 环境:Flutter 3.16.0+,已配置 Flutter for OpenHarmony 插件,mqtt_client 代码生成工具正常;
- 鸿蒙开发环境:DevEco Studio 4.0+,已配置鸿蒙 SDK(API 10+),已添加「网络权限」「后台保活权限」(后续模块会详细讲解配置方法);
- MQTT Broker:已部署 MQTT Broker(推荐 EMQ X,开源免费,支持鸿蒙适配),确认 Broker 地址、端口(默认 1883,WebSocket 端口 8083)可访问;
- 简易部署:如果没有后端部署的 Broker,可下载 EMQ X 本地部署(官网:https://www.emqx.io/),安装后直接启动,默认地址为 tcp://127.0.0.1:1883;
- 测试设备:模拟器(Mate 80 Pro Max)、OpenHarmony 6.0 + 手机、DAYU200 开发板(已刷入鸿蒙系统,已连接网络);
- 调试工具:开启 Flutter DevTools(监控 MQTT 连接状态、消息收发)、开启 logger 日志打印(监控 MQTT 操作)、MQTTX(可选,可视化调试 MQTT 消息收发,官网:https://mqttx.app/);
- MQTTX 使用:安装后创建客户端,输入 Broker 地址、端口、Client ID,即可订阅 / 发布消息,用于调试 APP 与 Broker 的通信;
- 测试设备:至少准备 1 台智能家居设备(或设备模拟器),确保设备能连接到 MQTT Broker,正常接收 / 发送消息。
2.7 代码规范确认(工程化要求)
延续 Day3-Day7 制定的代码规范,Day8 所有开发严格遵循,同时新增「MQTT 代码规范」,确保代码可维护、可复用,避免后续迭代出现问题:
原有规范(无需修改,继续遵循)
- 文件夹命名:小写 + 下划线(如 core/mqtt);
- 类命名:大驼峰(如 MqttManager、MqttControlMessage);
- 方法 / 变量命名:小驼峰(如 connectMqtt、mqttConnectionState);
- 常量命名:全大写 + 下划线(如 MQTT_BROKER_ADDRESS);
- 分层架构:严格区分数据层、业务层、UI 层,MQTT 相关代码统一放在 core/mqtt 文件夹下。
新增规范(Day8 重点遵循)
- MQTT 相关代码统一放在core/mqtt文件夹下,分为工具类、消息模型、配置类,实现模块化管理;
- 文件夹结构:core/mqtt/manager(工具类)、core/mqtt/models(消息模型)、core/mqtt/utils(辅助工具);
- MQTT 工具类采用单例模式,避免多实例同时连接导致的冲突;
- 所有 MQTT 操作(连接、发布、订阅)都添加 try-catch 捕获异常,避免通信失败导致 APP 闪退;
- MQTT 消息解析逻辑单独封装,与工具类解耦,便于后续修改消息格式;
- MQTT 连接状态、消息收发状态统一由业务层管理,UI 层仅通过状态监听更新界面,不直接操作 MQTT 工具类;
- 所有 MQTT 相关常量(如 Broker 地址、端口、主题前缀)统一放在 constants 文件夹下,便于统一修改;
- 代码注释:每个类、每个方法都添加注释,说明功能、参数、返回值,核心代码逐行添加注释,便于后续维护和他人阅读;
- 异常处理:所有 MQTT 异常都需捕获,并给出明确的错误日志和用户提示,避免 “静默失败”。
三、逐步骤详细开发(核心部分,每一步到位)
做好前置准备后,我们开始正式开发,Day8 的开发分为7 个核心模块,按「MQTT 基础集成→MQTT 工具类封装→消息模型封装与解析→业务层逻辑封装→双向同步实现→UI 层适配→OpenHarmony 专属适配」的顺序逐模块开发,每个模块分步骤拆解,每一步都讲清楚细节、问题和解决方案,所有代码可直接复制复用,跟着步骤走就能落地。
模块 1:MQTT 基础集成与全局配置(数据层基础)
本模块是 Day8 的开发基础,核心完成MQTT 的全局初始化、配置类定义、依赖注入,实现 MQTT Broker 连接参数的统一管理,避免硬编码,同时解决鸿蒙系统中的连接参数适配问题,为后续 MQTT 工具类封装奠定基础。
步骤 1:创建 MQTT 配置类(统一管理连接参数)
在 core/constants 文件夹下创建 MQTT 相关常量类,统一管理 Broker 地址、端口、QoS、心跳包等参数,便于后续统一修改,同时区分开发 / 测试 / 生产环境,直接复制代码即可,无需修改(后续根据自己的 Broker 地址调整即可)。
文件路径:lib/core/constants/mqtt_constants.dart
dart
import 'package:mqtt_client/mqtt_client.dart';
// MQTT全局配置常量(区分环境,适配鸿蒙多终端,直接复用)
class MqttConstants {
// -------------------------- 环境配置(根据实际情况修改)--------------------------
// 当前环境(development:开发,test:测试,production:生产)
static const String currentEnv = 'development';
// 开发环境MQTT Broker配置(重点修改这里的Broker地址)
static const String _devBroker = 'tcp://192.168.1.100:1883'; // Broker地址(TCP协议,本地部署填自己的IP)
static const String _devBrokerWs = 'ws://192.168.1.100:8083/mqtt'; // WebSocket协议(鸿蒙开发板适配)
static const int _devPort = 1883; // TCP端口(默认1883)
static const int _devWsPort = 8083; // WebSocket端口(默认8083)
// 测试环境MQTT Broker配置(按需修改)
static const String _testBroker = 'tcp://test.mqtt.example.com:1883';
static const String _testBrokerWs = 'ws://test.mqtt.example.com:8083/mqtt';
static const int _testPort = 1883;
static const int _testWsPort = 8083;
// 生产环境MQTT Broker配置(按需修改)
static const String _prodBroker = 'tcp://mqtt.example.com:1883';
static const String _prodBrokerWs = 'ws://mqtt.example.com:8083/mqtt';
static const int _prodPort = 1883;
static const int _prodWsPort = 8083;
// -------------------------- 通用配置(所有环境共用,无需修改)--------------------------
// 主题前缀(与2.4节主题设计规范一致)
static const String topicPrefix = '/smart_home';
// QoS服务质量(推荐1,至少一次送达)
static const MqttQos defaultQos = MqttQos.atLeastOnce;
// 心跳包时间(秒),默认60秒,鸿蒙设备建议缩短至30秒,避免连接被断开
static const int keepAlivePeriod = 30;
// 重连间隔(秒),连接失败后,每隔5秒重新连接
static const int reconnectInterval = 5;
// 最大重连次数(默认10次,超过后停止重连,给出提示)
static const int maxReconnectCount = 10;
// 消息超时时间(秒),超过后视为消息发送失败
static const int messageTimeout = 10;
// 客户端ID前缀(APP客户端ID,拼接设备唯一标识,确保唯一)
static const String appClientIdPrefix = 'smart_home_app_';
// -------------------------- 动态获取当前环境配置(无需修改)--------------------------
// 获取当前环境的MQTT Broker地址(根据设备类型选择TCP或WebSocket)
static String get brokerAddress {
switch (currentEnv) {
case 'test':
return _testBroker;
case 'production':
return _prodBroker;
default:
return _devBroker;
}
}
// 获取当前环境的MQTT WebSocket地址(鸿蒙开发板适配)
static String get brokerWsAddress {
switch (currentEnv) {
case 'test':
return _testBrokerWs;
case 'production':
return _prodBrokerWs;
default:
return _devBrokerWs;
}
}
// 获取当前环境的MQTT端口
static int get port {
switch (currentEnv) {
case 'test':
return _testPort;
case 'production':
return _prodPort;
default:
return _devPort;
}
}
// 获取当前环境的MQTT WebSocket端口
static int get wsPort {
switch (currentEnv) {
case 'test':
return _testWsPort;
case 'production':
return _prodWsPort;
default:
return _devWsPort;
}
}
// -------------------------- 主题生成工具方法(无需修改,直接调用)--------------------------
// 生成设备控制主题(APP→设备)
static String generateControlTopic(String deviceId) {
return '$topicPrefix/device/$deviceId/control';
}
// 生成设备状态上报主题(设备→APP/后端)
static String generateStateTopic(String deviceId) {
return '$topicPrefix/device/$deviceId/state';
}
// 生成设备异常通知主题(设备→APP/后端)
static String generateAlertTopic(String deviceId) {
return '$topicPrefix/device/$deviceId/alert';
}
// 生成全局广播主题(后端→所有APP/设备)
static String generateGlobalBroadcastTopic() {
return '$topicPrefix/global/broadcast';
}
}
// MQTT连接状态枚举(用于UI层展示和业务层判断,无需修改)
enum MqttConnectionState {
disconnected('已断开连接'), // 未连接/连接断开
connecting('正在连接...'), // 正在连接
connected('已连接'), // 连接成功
reconnecting('正在重连...'), // 正在重连
connectionFailed('连接失败'); // 连接失败
// 状态描述(用于UI展示,比如顶部提示)
final String description;
const MqttConnectionState(this.description);
}
代码详解(重点看这里,理解后便于后续调用):
- 环境区分:通过 currentEnv 控制当前环境,开发、测试、生产环境的 Broker 地址分开配置,后续切换环境只需修改 currentEnv,不用修改所有相关代码;
- 协议适配:同时提供 TCP 和 WebSocket 协议地址,TCP 用于手机、模拟器,WebSocket 用于鸿蒙开发板(部分开发板 TCP 连接不稳定);
- 通用配置:统一管理 QoS、心跳包、重连间隔等参数,后续可根据实际测试结果调整(比如鸿蒙开发板心跳包可缩短至 20 秒);
- 主题生成工具:封装了 4 个主题生成方法,根据设备 ID 自动生成对应主题,避免手动拼接主题出现错误(比如少写 “/”、写错设备 ID);
- 连接状态枚举:定义了 5 种 MQTT 连接状态,后续 UI 层可通过该枚举展示连接状态(如顶部提示 “正在连接...”),业务层可通过该枚举判断逻辑(如连接成功后再订阅主题)。
步骤 2:创建 MQTT 消息模型(映射消息格式)
根据 2.5 节设计的消息格式,创建对应的 MQTT 消息模型,实现 JSON 解析与序列化,关联 Day3 的设备模型,确保消息与设备数据的无缝映射,同时封装消息生成工具方法,简化消息创建流程,所有代码直接复制即可。
2.1 创建控制消息模型(APP→设备)
用于封装 APP 向设备发送的控制指令,关联 Day3 的 BaseDevice 模型,提供消息生成、验证、JSON 解析方法,直接复制代码:
文件路径:lib/core/mqtt/models/mqtt_control_message.dart
dart
import 'package:json_annotation/json_annotation.dart';
import 'package:smart_home_flutter/data/models/device_model.dart';
import 'package:smart_home_flutter/core/constants/mqtt_constants.dart';
import 'package:crypto/crypto.dart';
import 'dart:convert';
// 生成JSON解析代码的注解(必须添加)
part 'mqtt_control_message.g.dart';
// MQTT控制消息模型(APP→设备,直接复用)
@JsonSerializable()
class MqttControlMessage {
// 设备唯一ID(必传)
@JsonKey(name: 'deviceId', required: true)
final String deviceId;
// 控制参数(根据设备类型动态变化,比如空调的温度、灯光的亮度)
@JsonKey(name: 'controlParams', required: true)
final Map<String, dynamic> controlParams;
// 消息发送时间戳(毫秒,用于消息去重)
@JsonKey(name: 'timestamp', required: true)
final int timestamp;
// 发送端客户端ID(APP的Client ID,必传)
@JsonKey(name: 'clientId', required: true)
final String clientId;
// 消息服务质量(必传,推荐1)
@JsonKey(name: 'qos', required: true)
final int qos;
// 构造函数(私有,外部通过fromDeviceAndParams方法创建消息,避免参数遗漏)
MqttControlMessage._({
required this.deviceId,
required this.controlParams,
required this.timestamp,
required this.clientId,
required this.qos,
});
// 从JSON解析消息(用于接收设备响应时解析消息)
factory MqttControlMessage.fromJson(Map<String, dynamic> json) =>
_$MqttControlMessageFromJson(json);
// 将消息转为JSON(用于发送消息时序列化)
Map<String, dynamic> toJson() => _$MqttControlMessageToJson(this);
// 新增:从设备和控制参数生成控制消息(简化消息创建,外部直接调用)
static MqttControlMessage fromDeviceAndParams({
required BaseDevice device,
required Map<String, dynamic> controlParams,
required String clientId,
}) {
return MqttControlMessage._(
deviceId: device.deviceId,
controlParams: controlParams,
timestamp: DateTime.now().millisecondsSinceEpoch, // 当前时间戳
clientId: clientId,
qos: MqttConstants.defaultQos.value, // 复用默认QoS(1)
);
}
// 新增:生成消息唯一标识(用于消息去重,避免重复执行控制指令)
String get messageId {
// 拼接deviceId、timestamp、clientId生成唯一标识,通过MD5加密确保唯一性
final String uniqueStr = '$deviceId-$timestamp-$clientId';
return md5.convert(utf8.encode(uniqueStr)).toString();
}
// 新增:验证消息合法性(检查必填参数,避免发送无效消息)
bool isValid() {
return deviceId.isNotEmpty &&
controlParams.isNotEmpty &&
timestamp > 0 &&
clientId.isNotEmpty &&
(qos == 0 || qos == 1 || qos == 2); // QoS只能是0、1、2
}
}
2.2 创建状态消息模型(设备→APP / 后端)
用于封装设备向 APP、后端上报的状态消息,关联 Day3 的 BaseDevice 模型,提供消息生成、JSON 解析、转为设备模型的方法,直接复制代码:
文件路径:lib/core/mqtt/models/mqtt_state_message.dart
dart
import 'package:json_annotation/json_annotation.dart';
import 'package:smart_home_flutter/data/models/device_model.dart';
import 'package:smart_home_flutter/core/constants/mqtt_constants.dart';
// 生成JSON解析代码的注解(必须添加)
part 'mqtt_state_message.g.dart';
// 设备状态通用参数(对应消息中的state字段,所有设备共用)
@JsonSerializable()
class DeviceStateParams {
// 设备开关状态(on/off,与后端、设备保持一致)
@JsonKey(name: 'status', required: true)
final String status;
// 设备在线状态(true:在线,false:离线)
@JsonKey(name: 'online', required: true)
final bool online;
// 最后活动时间(格式:yyyy-MM-dd HH:mm:ss)
@JsonKey(name: 'lastActiveTime', required: true)
final String lastActiveTime;
// 构造函数
DeviceStateParams({
required this.status,
required this.online,
required this.lastActiveTime,
});
// 从JSON解析
factory DeviceStateParams.fromJson(Map<String, dynamic> json) =>
_$DeviceStateParamsFromJson(json);
// 转为JSON
Map<String, dynamic> toJson() => _$DeviceStateParamsToJson(this);
// 转为DeviceStatusEnum(关联Day3的设备模型,用于更新本地设备状态)
DeviceStatusEnum get toDeviceStatusEnum {
return status == 'on' ? DeviceStatusEnum.on : DeviceStatusEnum.off;
}
}
// MQTT状态消息模型(设备→APP/后端,直接复用)
@JsonSerializable()
class MqttStateMessage {
// 设备唯一ID(必传)
@JsonKey(name: 'deviceId', required: true)
final String deviceId;
// 设备当前状态(通用参数,所有设备共用)
@JsonKey(name: 'state', required: true)
final DeviceStateParams state;
// 设备专属参数(根据设备类型动态变化)
@JsonKey(name: 'params', required: true)
final Map<String, dynamic> params;
// 消息发送时间戳(毫秒)
@JsonKey(name: 'timestamp', required: true)
final int timestamp;
// 发送端客户端ID(设备的Client ID,必传)
@JsonKey(name: 'clientId', required: true)
final String clientId;
// 消息服务质量(可选,推荐0)
@JsonKey(name: 'qos', required: true)
final int qos;
// 构造函数(私有,外部通过fromDevice方法创建)
MqttStateMessage._({
required this.deviceId,
required this.state,
required this.params,
required this.timestamp,
required this.clientId,
required this.qos,
});
// 从JSON解析消息(APP接收设备状态消息时调用)
factory MqttStateMessage.fromJson(Map<String, dynamic> json) =>
_$MqttStateMessageFromJson(json);
// 将消息转为JSON(设备端上报状态时调用,APP端可用于模拟)
Map<String, dynamic> toJson() => _$MqttStateMessageToJson(this);
// 新增:从设备生成状态消息(设备端可用,APP端可用于模拟设备上报)
static MqttStateMessage fromDevice({
required BaseDevice device,
required String clientId,
}) {
// 构建通用状态参数(所有设备共用)
final DeviceStateParams stateParams = DeviceStateParams(
status: device.status.name, // 转为字符串(on/off)
online: device.online,
lastActiveTime: device.lastActiveTime,
);
// 构建设备专属参数(根据设备类型动态获取,关联Day3的子类模型)
Map<String, dynamic> deviceParams = {};
if (device is AirConditionerDevice) {
// 空调专属参数
deviceParams = {
'temperature': device.temperature,
'mode': device.mode,
'fanSpeed': device.fanSpeed,
};
} else if (device is LightDevice) {
// 灯光专属参数
deviceParams = {
'brightness': device.brightness,
'color': device.color,
};
} else if (device is CurtainDevice) {
// 窗帘专属参数
deviceParams = {
'position': device.position,
'speed': device.speed,
};
}
return MqttStateMessage._(
deviceId: device.deviceId,
state: stateParams,
params: deviceParams,
timestamp: DateTime.now().millisecondsSinceEpoch,
clientId: clientId,
qos: 0, // 状态消息默认QoS 0,节省带宽
);
}
// 新增:转为BaseDevice(关联本地设备模型,用于更新本地数据库)
BaseDevice toBaseDevice(BaseDevice existingDevice) {
// 复用已有设备的信息(如名称、房间、图标),仅更新状态和专属参数
if (existingDevice is AirConditionerDevice) {
return AirConditionerDevice(
deviceId: existingDevice.deviceId,
name: existingDevice.name,
type: existingDevice.type,
status: state.toDeviceStatusEnum, // 更新开关状态
online: state.online, // 更新在线状态
room: existingDevice.room,
lastActiveTime: state.lastActiveTime, // 更新最后活动时间
iconUrl: existingDevice.iconUrl,
temperature: params['temperature'] ?? existingDevice.temperature, // 更新温度
mode: params['mode'] ?? existingDevice.mode, // 更新模式
fanSpeed: params['fanSpeed'] ?? existingDevice.fanSpeed, // 更新风速
);
} else if (existingDevice is LightDevice) {
return LightDevice(
deviceId: existingDevice.deviceId,
name: existingDevice.name,
type: existingDevice.type,
status: state.toDeviceStatusEnum,
online: state.online,
room: existingDevice.room,
lastActiveTime: state.lastActiveTime,
iconUrl: existingDevice.iconUrl,
brightness: params['brightness'] ?? existingDevice.brightness, // 更新亮度
color: params['color'] ?? existingDevice.color, // 更新颜色
);
} else if (existingDevice is CurtainDevice) {
return CurtainDevice(
deviceId: existingDevice.deviceId,
name: existingDevice.name,
type: existingDevice.type,
status: state.toDeviceStatusEnum,
online: state.online,
room: existingDevice.room,
lastActiveTime: state.lastActiveTime,
iconUrl: existingDevice.iconUrl,
position: params['position'] ?? existingDevice.position, // 更新开合度
speed: params['speed'] ?? existingDevice.speed, // 更新速度
);
} else {
// 其他设备类型,直接返回原有设备(仅更新状态)
return BaseDevice(
deviceId: existingDevice.deviceId,
name: existingDevice.name,
type: existingDevice.type,
status: state.toDeviceStatusEnum,
online: state.online,
room: existingDevice.room,
lastActiveTime: state.lastActiveTime,
iconUrl: existingDevice.iconUrl,
);
}
}
}
2.3 创建异常消息模型(设备→APP / 后端)
用于封装设备向 APP、后端上报的异常消息,提供消息生成、JSON 解析、异常类型判断方法,直接复制代码:
文件路径:lib/core/mqtt/models/mqtt_alert_message.dart
dart
import 'package:json_annotation/json_annotation.dart';
import 'package:smart_home_flutter/core/constants/mqtt_constants.dart';
// 生成JSON解析代码的注解(必须添加)
part 'mqtt_alert_message.g.dart';
// 设备异常类型枚举(与消息中的alertType对应,统一管理)
enum DeviceAlertType {
powerFailure('power_failure', '设备断电'), // 断电
fault('fault', '设备故障'), // 故障
networkError('network_error', '网络异常'), // 网络异常
lowPower('low_power', '低电量'), // 低电量(电池供电设备)
other('other', '其他异常'); // 其他异常
// 异常类型标识(与消息中的alertType一致)
final String type;
// 异常类型描述(用于UI展示)
final String desc;
const DeviceAlertType(this.type, this.desc);
// 从字符串解析异常类型(消息解析时调用)
static DeviceAlertType fromString(String type) {
return DeviceAlertType.values.firstWhere(
(e) => e.type == type,
orElse: () => DeviceAlertType.other,
);
}
}
// MQTT异常消息模型(设备→APP/后端,直接复用)
@JsonSerializable()
class MqttAlertMessage {
// 设备唯一ID(必传)
@JsonKey(name: 'deviceId', required: true)
final String deviceId;
// 异常类型(与DeviceAlertType对应)
@JsonKey(name: 'alertType', required: true)
final String alertType;
// 异常描述(用于UI展示,如“设备断电,请检查电源”)
@JsonKey(name: 'alertDesc', required: true)
final String alertDesc;
// 消息发送时间戳(毫秒)
@JsonKey(name: 'timestamp', required: true)
final int timestamp;
// 发送端客户端ID(设备的Client ID,必传)
@JsonKey(name: 'clientId', required: true)
final String clientId;
// 消息服务质量(必传,推荐1,确保异常消息不丢失)
@JsonKey(name: 'qos', required: true)
final int qos;
// 构造函数(私有,外部通过fromParams方法创建)
MqttAlertMessage._({
required this.deviceId,
required this.alertType,
required this.alertDesc,
required this.timestamp,
required this.clientId,
required this.qos,
});
// 从JSON解析消息(APP接收异常消息时调用)
factory MqttAlertMessage.fromJson(Map<String, dynamic> json) =>
_$MqttAlertMessageFromJson(json);
// 将消息转为JSON(设备端上报异常时调用)
Map<String, dynamic> toJson() => _$MqttAlertMessageToJson(this);更多推荐



所有评论(0)