Flutter+OpenHarmony 智能家居开发 Day8|MQTT 实时通信 + 设备状态双向同步全流程

欢迎加入开源鸿蒙跨平台社区: https://openharmonycrossplatform.csdn.net

大家好~ 咱们接着前 7 天的开发节奏,继续推进智能家居 APP 的开发!经过 Day7 的实操,我们已经搞定了 ObjectBox 本地持久化和离线设备管理,终于解决了 “断网可用” 的痛点 —— 现在 APP 就算没网,也能正常查看、筛选已保存的设备。但新的问题马上就来了,不知道大家有没有发现:设备状态没法实时同步。

举个很直观的例子:你在客厅用手机 APP 打开了灯光,卧室的平板端 APP 还是显示灯光关闭;如果设备突然断电、离线,APP 也不会及时提醒,只能手动刷新或者等后端定时推送。这显然不符合智能家居的核心体验,所以 Day8 我们就专门解决这个问题,核心就是集成 MQTT 协议,实现设备与 APP、APP 与后端的实时双向通信,打通 “本地 - 设备 - 后端” 的状态同步链路,让设备操作、状态变化能够毫秒级反馈,同时结合 Day7 的本地持久化能力,实现 “实时同步 + 离线兜底” 的双重保障。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级、低带宽、低延迟的发布 / 订阅模式通信协议,非常适合物联网(IoT)场景 —— 智能家居设备大多带宽有限、算力较低,MQTT 的 “轻量特性” 能完美适配,同时其发布 / 订阅模式可以轻松实现 “一对多”“多对多” 的实时通信(如一个设备状态更新,多端 APP 同时接收),这也是它成为智能家居实时通信首选协议的核心原因。

本文全程适配 CSDN 发布规范,所有代码块用 CSDN 兼容格式(``` 标注语言),无需修改直接复制就能显示高亮,延续前 7 天「逐步骤拆解 + 逐行代码解析 + 全场景踩坑解决」的风格,拒绝空谈理论,全程围绕 “实际开发落地” 展开:从 MQTT 协议基础、依赖集成、工具类封装,到 MQTT 连接管理、消息发布 / 订阅、设备状态双向同步,再到鸿蒙多终端适配、异常处理、高可用优化,每一步都包含「为什么做、怎么做、可能出什么问题、怎么解决」,所有代码均可直接复制复用,兼顾新手友好性和实际开发实用性。

本文适合 Flutter+OpenHarmony 跨平台开发学习者、智能家居 APP 开发者,尤其适合需要实现设备实时通信的开发者。无论你是刚跟随系列教程入门,还是有一定的物联网开发经验,都能从本文中获取完整的 MQTT 集成方案、状态同步逻辑,以及贴合鸿蒙生态的适配技巧。话不多说,我们正式开启 Day8 的详细开发之旅!

一、Day8 核心目标(明确方向,不走弯路)

Day8 的开发基于 Day3-Day7 已完成的项目架构,核心是在原有数据层、业务层中集成 MQTT 实时通信能力,衔接 Day7 的本地持久化逻辑,实现设备状态的双向实时同步,所有开发工作围绕以下核心目标展开,确保不偏离 “实时性、稳定性、兼容性” 的核心诉求:

  1. 理解 MQTT 协议核心概念(发布 / 订阅、主题、客户端、 Broker),掌握智能家居场景下的 MQTT 主题设计规范;
  2. 集成 MQTT 核心依赖(mqtt_client),完成基础配置,解决鸿蒙系统下的依赖兼容、编译报错问题;
  3. 封装 MQTT 全局工具类(单例模式),实现连接、断开、重连、发布消息、订阅主题、取消订阅等核心功能;
  4. 设计智能家居专属 MQTT 主题体系,规范设备控制、状态上报、异常通知等场景的主题格式,确保通信有序;
  5. 实现 MQTT 消息格式封装与解析,兼容后端、设备的消息协议,关联 Day3 定义的设备模型,实现消息与设备数据的无缝映射;
  6. 封装业务层 MQTT 逻辑,实现「MQTT 连接状态管理」,支持自动连接、断网重连、后台保活(适配鸿蒙多终端);
  7. 实现设备状态双向同步:
    • 正向同步(APP→设备):APP 发送控制指令(如开灯)→ MQTT 发布消息 → 设备接收指令并执行 → 设备上报执行结果;
    • 反向同步(设备→APP):设备主动上报状态(如温度变化、离线)→ MQTT 订阅消息 → APP 接收消息并更新本地数据库 + UI;
  8. 衔接 Day7 的 ObjectBox 本地持久化,实现「实时消息落地本地」,确保断网后重新联网时,未接收的消息能正常同步、状态不丢失;
  9. 处理 MQTT 通信全场景异常:连接失败、消息发送失败、消息解析失败、重连失败、设备无响应等,给出友好提示并兜底;
  10. 完成 OpenHarmony 专属适配:解决 DAYU200 开发板 MQTT 连接稳定性、鸿蒙手机后台保活权限、平板端消息接收延迟等问题;
  11. 优化 MQTT 通信性能:实现消息防抖、重复消息过滤、心跳包参数优化,确保 100 + 台设备同时通信无卡顿、无消息丢失;
  12. 实现多终端测试验证:在模拟器、OpenHarmony 手机、DAYU200 开发板上验证 MQTT 连接、消息收发、状态同步全场景,确保实时性和稳定性;
  13. 规范代码结构,将 MQTT 逻辑与原有网络逻辑、本地数据库逻辑解耦,为后续 Day9 的批量控制、场景联动奠定基础。

二、前置准备(衔接前文,避免脱节)

Day8 的开发紧密依赖 Day3-Day7 已完成的项目架构,核心是集成 MQTT 实时通信能力,衔接 Day7 的本地持久化逻辑,复用 Day3 的设备模型、Day6 的搜索筛选逻辑,因此在开始开发前,需先确认前置环境和已有代码是否就绪,避免开发过程中出现衔接问题。

2.1 已有架构确认(重点衔接)

我们先回顾 Day3-Day7 已完成的核心代码和架构,确保所有依赖和基础组件都能支撑 Day8 的 MQTT 开发:

  • 数据层:已完成 BaseDevice 及子类数据模型(空调 / 灯光 / 窗帘)、HttpClient 网络工具、DeviceRepository 仓库(网络请求 + 本地 CRUD)、ObjectBox 本地数据库工具类,可直接复用设备模型和本地持久化能力;
  • 业务层:已封装 DeviceProvider 状态管理,管理设备列表、控制状态、搜索筛选状态,支持乐观更新、并发控制,可扩展用于管理 MQTT 连接状态;
  • UI 层:已完成设备列表、设备卡片、搜索框、筛选栏等组件,支持多终端布局适配,可扩展用于显示 MQTT 连接状态、实时消息提示;
  • 工具类:已封装防抖工具、全局异常处理器、日志工具、设备类型判断工具、依赖注入工具(getIt),可直接复用;
  • 鸿蒙适配:已配置网络权限、存储权限、多终端触控适配、鸿蒙原生桥接基础,可在此基础上扩展 MQTT 所需的后台保活权限;
  • 核心能力:已实现设备网络请求、本地持久化、离线管理,Day8 只需在此基础上添加 MQTT 实时通信,实现 “实时 + 离线” 双重保障。

2.2 依赖检查与补充(核心:MQTT 适配)

Day8 的核心新增依赖是「MQTT 客户端依赖」,我们选用主流的mqtt_client(支持 Flutter 跨平台,已完成鸿蒙适配),同时需要确认已有依赖是否正确配置,避免出现依赖冲突导致的编译报错。

2.2.1 新增 MQTT 核心依赖

在 pubspec.yaml 中添加 mqtt_client 相关依赖,重点注意 OpenHarmony 适配版本,确保依赖版本与 Flutter、鸿蒙 SDK 版本兼容,直接复制以下配置到你的 pubspec.yaml 中即可:

yaml

dependencies:
  flutter:
    sdk: flutter
  # 原有依赖(Day3-Day7,无需修改)
  dio: ^5.4.0 # 网络请求框架
  json_annotation: ^4.8.1 # JSON解析注解
  provider: ^6.1.1 # 状态管理
  get_it: ^7.2.0 # 依赖注入
  logger: ^1.1.0 # 日志工具
  device_info_plus: ^9.1.0 # 设备信息获取(判断鸿蒙系统)
  flutter_windowmanager: ^0.2.0 # 窗口管理(适配多终端)
  permission_handler: ^11.0.1 # 动态权限申请(兼容鸿蒙)
  shared_preferences: ^2.2.2 # 轻量存储(用于保存同步状态)
  objectbox: ^2.0.0 # ObjectBox核心库
  objectbox_flutter_libs: ^2.0.0 # ObjectBox Flutter插件(含鸿蒙适配)
  connectivity_plus: ^5.0.2 # 网络状态判断(Day7新增,用于MQTT重连)
  
  # 新增:MQTT核心依赖(适配OpenHarmony,直接复制)
  mqtt_client: ^10.0.0 # MQTT客户端核心库(支持鸿蒙)
  mqtt_shared: ^2.0.0 # MQTT共享订阅(可选,用于多端同步)
  crypto: ^3.0.3 # 加密工具(用于MQTT客户端ID加密、消息加密)

dev_dependencies:
  flutter_test:
    sdk: flutter
  # 原有开发依赖(无需修改)
  json_serializable: ^6.7.1 # JSON解析代码生成
  build_runner: ^2.4.6 # 代码生成工具
  flutter_lints: ^2.0.0 # 代码规范检查
  objectbox_generator: ^2.0.0 # ObjectBox实体代码生成
2.2.2 执行依赖安装命令

在项目根目录执行以下命令,确保所有依赖(原有 + 新增)都能正常加载,重点注意 mqtt_client 的鸿蒙编译依赖是否下载成功,命令直接复制执行即可:

bash

运行

flutter pub get

安装完成后,可在项目的.pub-cache 文件夹中查看 mqtt_client 的版本,确认是否为 ^10.0.0(鸿蒙适配版本);如果安装失败,先执行flutter clean清除缓存,再重新执行flutter pub get

2.2.3 可能出现的问题与解决方案(高频踩坑)

这一步很多同学会遇到鸿蒙适配相关的报错,这里提前整理好所有高频问题,对照排查即可,不用浪费时间找解决方案:

表格

问题场景 根因 解决方案
mqtt_client 依赖安装失败,提示 “OpenHarmony 平台不支持” mqtt_client 版本过低,未适配 OpenHarmony 系统 确认依赖版本为 ^10.0.0(官方适配鸿蒙的最低版本),执行flutter clean清除缓存后重新执行flutter pub get;若仍失败,从 OpenHarmony 社区下载 mqtt_client 鸿蒙适配源码手动集成
执行 pub get 后,提示 “crypto 依赖冲突” 其他依赖(如 dio)也引入了 crypto,版本不兼容 在 pubspec.yaml 中指定 crypto 版本为 ^3.0.3,与 mqtt_client 要求的版本一致;若仍冲突,执行flutter pub deps查看依赖树,排除冲突依赖
鸿蒙开发板编译失败,提示 “找不到 mqtt_client 相关库” mqtt_client 的鸿蒙编译产物未正确生成 1. 确认 Flutter for OpenHarmony 插件已更新至最新版本;2. 重新执行flutter pub get后,执行flutter build ohos生成鸿蒙编译产物;3. 检查 DevEco Studio 的依赖配置,确保引入了 mqtt_client 的鸿蒙库
与 connectivity_plus 冲突,提示 “网络状态监听异常” 两个依赖同时监听网络状态,导致端口占用 封装统一的网络状态监听工具类,复用 connectivity_plus 的监听逻辑,让 MQTT 工具类通过该工具类获取网络状态,避免重复监听
执行代码生成命令时,提示 “mqtt_client 相关类未找到” 依赖未安装成功,或未重新生成代码 1. 确认 mqtt_client 依赖安装成功;2. 重新执行flutter pub run build_runner build --delete-conflicting-outputs

2.3 MQTT 基础概念前置(必懂,避免开发踩坑)

在开始开发前,必须先掌握 MQTT 协议的核心概念,尤其是智能家居场景下的应用方式,否则后续主题设计、消息收发会出现逻辑混乱,这里用最通俗的语言讲解,不用死记硬背,理解即可:

  1. MQTT Broker(消息代理):核心中转节点,相当于 “消息邮局”——APP 发送消息先给 Broker,设备发送消息也先给 Broker,Broker 再把消息转发给所有需要接收的终端。智能家居场景中,通常由后端部署 Broker(推荐 EMQ X、Mosquitto,开源免费,支持鸿蒙适配),APP、设备均作为客户端连接到 Broker。
  2. MQTT Client(客户端):发起连接、发布消息、订阅消息的终端,本文中 Flutter APP 就是一个客户端,每个智能家居设备(空调、灯光)也是一个客户端;每个客户端必须有唯一的 Client ID,Broker 通过 Client ID 区分不同终端。
  3. 主题(Topic):消息的分类标识,类似 “文件夹路径”,比如 “/smart_home/device/123/control”,客户端通过订阅同一主题接收消息,通过发布到指定主题发送消息。比如 APP 要控制设备 123,就往该设备的控制主题发消息,设备 123 订阅自己的控制主题,就能收到指令。
  4. 发布(Publish):客户端向 Broker 发送消息的操作,需指定主题和消息内容,比如 APP 向设备 123 的控制主题发布 “开灯” 指令,就是发布操作。
  5. 订阅(Subscribe):客户端向 Broker 申请接收某一主题消息的操作,比如 APP 订阅设备 123 的状态主题,设备 123 上报状态时,APP 就能实时收到。
  6. QoS(服务质量):消息传输的可靠性等级,智能家居场景推荐这样搭配:
    • QoS 1(至少一次送达):用于控制指令(如开灯、调节温度),确保指令不丢失;
    • QoS 0(最多一次送达):用于非关键消息(如设备状态上报),节省带宽;
  7. 客户端 ID(Client ID):每个客户端的唯一标识,必须保证不重复(比如 APP 的 Client ID 可以是 “app_设备唯一标识”,设备的 Client ID 可以是 “device_设备 ID”),否则会导致连接冲突、消息发送错误。
  8. 心跳包(Keep Alive):连接保活机制,客户端定期向 Broker 发送心跳消息,告诉 Broker “我还在线”;如果 Broker 长时间未收到心跳,会断开连接;反之,客户端长时间未收到 Broker 的响应,也会触发重连。
  9. 重连机制:客户端与 Broker 断开连接后,自动重新发起连接的逻辑,是保证 MQTT 通信稳定性的核心,必须处理断网、Broker 重启、网络波动等场景。

2.4 主题设计规范(核心,确保通信有序)

主题设计是 MQTT 通信的核心,若主题格式混乱,会导致消息收发错误、多设备干扰等问题,后续修改起来非常麻烦。结合智能家居场景,我们设计统一的主题规范,所有主题均以 “/smart_home” 为根路径,清晰区分不同设备、不同操作的消息,支持多终端、多设备同时通信,直接复用这套规范即可,不用自己设计。

核心设计原则

根路径→模块→设备 ID→操作类型,比如 “/smart_home/device/ 设备 ID/control”,简洁清晰,便于维护和扩展。

2.4.1 设备控制主题(APP→设备)

用于 APP 向设备发送控制指令(如开灯、调节温度),主题格式(直接复用):

text

/smart_home/device/{deviceId}/control

参数说明:

  • {deviceId}:设备唯一 ID(与后端、本地数据库中的 deviceId 一致),比如 “dev_123456”;
  • 示例:/smart_home/device/dev_123456/control(向 ID 为 dev_123456 的设备发送控制指令)。
2.4.2 设备状态上报主题(设备→APP / 后端)

用于设备向 APP、后端上报自身状态(如开启 / 关闭、温度变化、在线 / 离线),主题格式(直接复用):

text

/smart_home/device/{deviceId}/state

参数说明:

  • {deviceId}:设备唯一 ID;
  • 示例:/smart_home/device/dev_123456/state(ID 为 dev_123456 的设备上报状态)。
2.4.3 设备异常通知主题(设备→APP / 后端)

用于设备向 APP、后端上报异常状态(如故障、断电、网络异常),主题格式(直接复用):

text

/smart_home/device/{deviceId}/alert

参数说明:

  • {deviceId}:设备唯一 ID;
  • 示例:/smart_home/device/dev_123456/alert(ID 为 dev_123456 的设备上报异常)。
2.4.4 全局广播主题(后端→所有 APP / 设备)

用于后端向所有 APP、设备发送全局通知(如系统升级、批量控制指令),主题格式(直接复用):

text

/smart_home/global/broadcast
2.4.5 主题设计注意事项(必看)
  1. 主题区分大小写(如 “/control” 与 “/Control” 是两个不同主题),开发时必须严格遵循规范,不能写错;
  2. 设备 ID 必须唯一,避免多个设备使用同一主题,导致消息干扰;
  3. 主题路径不宜过长(建议不超过 5 级),避免影响消息传输效率;
  4. APP 需订阅所有已绑定设备的「状态上报主题」和「异常通知主题」,以及全局广播主题;
  5. 设备只需订阅自身的「控制主题」,发布自身的「状态上报主题」和「异常通知主题」;
  6. 所有主题统一由常量类管理(后续步骤会封装),避免手动拼接主题出现错误。

2.5 消息格式设计(核心,兼容设备与后端)

MQTT 消息内容采用 JSON 格式(轻量、易解析,兼容设备和后端),结合 Day3 定义的设备模型,设计统一的消息格式,分为「控制消息」「状态消息」「异常消息」三类,确保 APP、设备、后端能正常解析,直接复用这套格式,不用修改。

2.5.1 控制消息格式(APP→设备)

用于 APP 向设备发送控制指令,核心包含设备 ID、控制参数、时间戳等信息,格式如下(直接复制复用):

json

{
  "deviceId": "dev_123456", // 设备唯一ID(必传)
  "controlParams": { // 控制参数(根据设备类型不同,参数不同)
    "status": "on", // 设备开关状态(所有设备通用)
    "temperature": 26, // 空调专属参数(可选)
    "brightness": 80, // 灯光专属参数(可选)
    "fanSpeed": "high" // 空调/窗帘专属参数(可选)
  },
  "timestamp": 1758067200000, // 消息发送时间戳(毫秒,必传)
  "clientId": "app_flutter_789", // 发送端客户端ID(APP的Client ID,必传)
  "qos": 1 // 消息服务质量(必传,推荐1)
}

说明:

  • controlParams 根据设备类型动态变化,比如灯光设备可传 brightness(亮度)、color(颜色),窗帘设备可传 position(开合度);
  • timestamp 用于消息去重、排序,避免重复执行控制指令;
  • clientId 用于设备识别发送端(区分 APP、其他设备)。
2.5.2 状态消息格式(设备→APP / 后端)

用于设备向 APP、后端上报自身状态,核心包含设备 ID、当前状态、参数、时间戳等信息,格式如下(直接复制复用):

json

{
  "deviceId": "dev_123456", // 设备唯一ID(必传)
  "state": { // 设备当前状态(所有设备通用参数)
    "status": "on", // 开关状态(on/off)
    "online": true, // 在线状态(true/false)
    "lastActiveTime": "2026-08-18 10:00:00" // 最后活动时间(必传)
  },
  "params": { // 设备专属参数(根据设备类型不同,参数不同)
    "temperature": 26, // 空调专属
    "mode": "cool", // 空调专属
    "brightness": 80 // 灯光专属
  },
  "timestamp": 1758067201000, // 消息发送时间戳(毫秒,必传)
  "clientId": "device_123456", // 发送端客户端ID(设备的Client ID,必传)
  "qos": 0 // 消息服务质量(可选,推荐0)
}
2.5.3 异常消息格式(设备→APP / 后端)

用于设备向 APP、后端上报异常状态,核心包含设备 ID、异常类型、异常描述等信息,格式如下(直接复制复用):

json

{
  "deviceId": "dev_123456", // 设备唯一ID(必传)
  "alertType": "power_failure", // 异常类型(power_failure:断电,fault:故障,network_error:网络异常)
  "alertDesc": "设备断电,请检查电源", // 异常描述(必传)
  "timestamp": 1758067202000, // 消息发送时间戳(毫秒,必传)
  "clientId": "device_123456", // 发送端客户端ID(设备的Client ID,必传)
  "qos": 1 // 消息服务质量(必传,推荐1)
}

2.6 开发环境确认(多终端适配前提)

确认开发环境已就绪,确保后续 MQTT 集成和实时通信功能开发完成后,能快速进行多终端测试,重点确认鸿蒙开发环境的网络权限和后台保活权限配置,避免测试时出现权限报错:

  1. Flutter 环境:Flutter 3.16.0+,已配置 Flutter for OpenHarmony 插件,mqtt_client 代码生成工具正常;
  2. 鸿蒙开发环境:DevEco Studio 4.0+,已配置鸿蒙 SDK(API 10+),已添加「网络权限」「后台保活权限」(后续模块会详细讲解配置方法);
  3. MQTT Broker:已部署 MQTT Broker(推荐 EMQ X,开源免费,支持鸿蒙适配),确认 Broker 地址、端口(默认 1883,WebSocket 端口 8083)可访问;
    • 简易部署:如果没有后端部署的 Broker,可下载 EMQ X 本地部署(官网:https://www.emqx.io/),安装后直接启动,默认地址为 tcp://127.0.0.1:1883;
  4. 测试设备:模拟器(Mate 80 Pro Max)、OpenHarmony 6.0 + 手机、DAYU200 开发板(已刷入鸿蒙系统,已连接网络);
  5. 调试工具:开启 Flutter DevTools(监控 MQTT 连接状态、消息收发)、开启 logger 日志打印(监控 MQTT 操作)、MQTTX(可选,可视化调试 MQTT 消息收发,官网:https://mqttx.app/);
    • MQTTX 使用:安装后创建客户端,输入 Broker 地址、端口、Client ID,即可订阅 / 发布消息,用于调试 APP 与 Broker 的通信;
  6. 测试设备:至少准备 1 台智能家居设备(或设备模拟器),确保设备能连接到 MQTT Broker,正常接收 / 发送消息。

2.7 代码规范确认(工程化要求)

延续 Day3-Day7 制定的代码规范,Day8 所有开发严格遵循,同时新增「MQTT 代码规范」,确保代码可维护、可复用,避免后续迭代出现问题:

原有规范(无需修改,继续遵循)
  • 文件夹命名:小写 + 下划线(如 core/mqtt);
  • 类命名:大驼峰(如 MqttManager、MqttControlMessage);
  • 方法 / 变量命名:小驼峰(如 connectMqtt、mqttConnectionState);
  • 常量命名:全大写 + 下划线(如 MQTT_BROKER_ADDRESS);
  • 分层架构:严格区分数据层、业务层、UI 层,MQTT 相关代码统一放在 core/mqtt 文件夹下。
新增规范(Day8 重点遵循)
  1. MQTT 相关代码统一放在core/mqtt文件夹下,分为工具类、消息模型、配置类,实现模块化管理;
    • 文件夹结构:core/mqtt/manager(工具类)、core/mqtt/models(消息模型)、core/mqtt/utils(辅助工具);
  2. MQTT 工具类采用单例模式,避免多实例同时连接导致的冲突;
  3. 所有 MQTT 操作(连接、发布、订阅)都添加 try-catch 捕获异常,避免通信失败导致 APP 闪退;
  4. MQTT 消息解析逻辑单独封装,与工具类解耦,便于后续修改消息格式;
  5. MQTT 连接状态、消息收发状态统一由业务层管理,UI 层仅通过状态监听更新界面,不直接操作 MQTT 工具类;
  6. 所有 MQTT 相关常量(如 Broker 地址、端口、主题前缀)统一放在 constants 文件夹下,便于统一修改;
  7. 代码注释:每个类、每个方法都添加注释,说明功能、参数、返回值,核心代码逐行添加注释,便于后续维护和他人阅读;
  8. 异常处理:所有 MQTT 异常都需捕获,并给出明确的错误日志和用户提示,避免 “静默失败”。

三、逐步骤详细开发(核心部分,每一步到位)

做好前置准备后,我们开始正式开发,Day8 的开发分为7 个核心模块,按「MQTT 基础集成→MQTT 工具类封装→消息模型封装与解析→业务层逻辑封装→双向同步实现→UI 层适配→OpenHarmony 专属适配」的顺序逐模块开发,每个模块分步骤拆解,每一步都讲清楚细节、问题和解决方案,所有代码可直接复制复用,跟着步骤走就能落地。

模块 1:MQTT 基础集成与全局配置(数据层基础)

本模块是 Day8 的开发基础,核心完成MQTT 的全局初始化、配置类定义、依赖注入,实现 MQTT Broker 连接参数的统一管理,避免硬编码,同时解决鸿蒙系统中的连接参数适配问题,为后续 MQTT 工具类封装奠定基础。

步骤 1:创建 MQTT 配置类(统一管理连接参数)

在 core/constants 文件夹下创建 MQTT 相关常量类,统一管理 Broker 地址、端口、QoS、心跳包等参数,便于后续统一修改,同时区分开发 / 测试 / 生产环境,直接复制代码即可,无需修改(后续根据自己的 Broker 地址调整即可)。

文件路径:lib/core/constants/mqtt_constants.dart

dart

import 'package:mqtt_client/mqtt_client.dart';

// MQTT全局配置常量(区分环境,适配鸿蒙多终端,直接复用)
class MqttConstants {
  // -------------------------- 环境配置(根据实际情况修改)--------------------------
  // 当前环境(development:开发,test:测试,production:生产)
  static const String currentEnv = 'development';

  // 开发环境MQTT Broker配置(重点修改这里的Broker地址)
  static const String _devBroker = 'tcp://192.168.1.100:1883'; // Broker地址(TCP协议,本地部署填自己的IP)
  static const String _devBrokerWs = 'ws://192.168.1.100:8083/mqtt'; // WebSocket协议(鸿蒙开发板适配)
  static const int _devPort = 1883; // TCP端口(默认1883)
  static const int _devWsPort = 8083; // WebSocket端口(默认8083)

  // 测试环境MQTT Broker配置(按需修改)
  static const String _testBroker = 'tcp://test.mqtt.example.com:1883';
  static const String _testBrokerWs = 'ws://test.mqtt.example.com:8083/mqtt';
  static const int _testPort = 1883;
  static const int _testWsPort = 8083;

  // 生产环境MQTT Broker配置(按需修改)
  static const String _prodBroker = 'tcp://mqtt.example.com:1883';
  static const String _prodBrokerWs = 'ws://mqtt.example.com:8083/mqtt';
  static const int _prodPort = 1883;
  static const int _prodWsPort = 8083;

  // -------------------------- 通用配置(所有环境共用,无需修改)--------------------------
  // 主题前缀(与2.4节主题设计规范一致)
  static const String topicPrefix = '/smart_home';

  // QoS服务质量(推荐1,至少一次送达)
  static const MqttQos defaultQos = MqttQos.atLeastOnce;

  // 心跳包时间(秒),默认60秒,鸿蒙设备建议缩短至30秒,避免连接被断开
  static const int keepAlivePeriod = 30;

  // 重连间隔(秒),连接失败后,每隔5秒重新连接
  static const int reconnectInterval = 5;

  // 最大重连次数(默认10次,超过后停止重连,给出提示)
  static const int maxReconnectCount = 10;

  // 消息超时时间(秒),超过后视为消息发送失败
  static const int messageTimeout = 10;

  // 客户端ID前缀(APP客户端ID,拼接设备唯一标识,确保唯一)
  static const String appClientIdPrefix = 'smart_home_app_';

  // -------------------------- 动态获取当前环境配置(无需修改)--------------------------
  // 获取当前环境的MQTT Broker地址(根据设备类型选择TCP或WebSocket)
  static String get brokerAddress {
    switch (currentEnv) {
      case 'test':
        return _testBroker;
      case 'production':
        return _prodBroker;
      default:
        return _devBroker;
    }
  }

  // 获取当前环境的MQTT WebSocket地址(鸿蒙开发板适配)
  static String get brokerWsAddress {
    switch (currentEnv) {
      case 'test':
        return _testBrokerWs;
      case 'production':
        return _prodBrokerWs;
      default:
        return _devBrokerWs;
    }
  }

  // 获取当前环境的MQTT端口
  static int get port {
    switch (currentEnv) {
      case 'test':
        return _testPort;
      case 'production':
        return _prodPort;
      default:
        return _devPort;
    }
  }

  // 获取当前环境的MQTT WebSocket端口
  static int get wsPort {
    switch (currentEnv) {
      case 'test':
        return _testWsPort;
      case 'production':
        return _prodWsPort;
      default:
        return _devWsPort;
    }
  }

  // -------------------------- 主题生成工具方法(无需修改,直接调用)--------------------------
  // 生成设备控制主题(APP→设备)
  static String generateControlTopic(String deviceId) {
    return '$topicPrefix/device/$deviceId/control';
  }

  // 生成设备状态上报主题(设备→APP/后端)
  static String generateStateTopic(String deviceId) {
    return '$topicPrefix/device/$deviceId/state';
  }

  // 生成设备异常通知主题(设备→APP/后端)
  static String generateAlertTopic(String deviceId) {
    return '$topicPrefix/device/$deviceId/alert';
  }

  // 生成全局广播主题(后端→所有APP/设备)
  static String generateGlobalBroadcastTopic() {
    return '$topicPrefix/global/broadcast';
  }
}

// MQTT连接状态枚举(用于UI层展示和业务层判断,无需修改)
enum MqttConnectionState {
  disconnected('已断开连接'), // 未连接/连接断开
  connecting('正在连接...'), // 正在连接
  connected('已连接'), // 连接成功
  reconnecting('正在重连...'), // 正在重连
  connectionFailed('连接失败'); // 连接失败

  // 状态描述(用于UI展示,比如顶部提示)
  final String description;
  const MqttConnectionState(this.description);
}

代码详解(重点看这里,理解后便于后续调用)

  1. 环境区分:通过 currentEnv 控制当前环境,开发、测试、生产环境的 Broker 地址分开配置,后续切换环境只需修改 currentEnv,不用修改所有相关代码;
  2. 协议适配:同时提供 TCP 和 WebSocket 协议地址,TCP 用于手机、模拟器,WebSocket 用于鸿蒙开发板(部分开发板 TCP 连接不稳定);
  3. 通用配置:统一管理 QoS、心跳包、重连间隔等参数,后续可根据实际测试结果调整(比如鸿蒙开发板心跳包可缩短至 20 秒);
  4. 主题生成工具:封装了 4 个主题生成方法,根据设备 ID 自动生成对应主题,避免手动拼接主题出现错误(比如少写 “/”、写错设备 ID);
  5. 连接状态枚举:定义了 5 种 MQTT 连接状态,后续 UI 层可通过该枚举展示连接状态(如顶部提示 “正在连接...”),业务层可通过该枚举判断逻辑(如连接成功后再订阅主题)。
步骤 2:创建 MQTT 消息模型(映射消息格式)

根据 2.5 节设计的消息格式,创建对应的 MQTT 消息模型,实现 JSON 解析与序列化,关联 Day3 的设备模型,确保消息与设备数据的无缝映射,同时封装消息生成工具方法,简化消息创建流程,所有代码直接复制即可。

2.1 创建控制消息模型(APP→设备)

用于封装 APP 向设备发送的控制指令,关联 Day3 的 BaseDevice 模型,提供消息生成、验证、JSON 解析方法,直接复制代码:

文件路径:lib/core/mqtt/models/mqtt_control_message.dart

dart

import 'package:json_annotation/json_annotation.dart';
import 'package:smart_home_flutter/data/models/device_model.dart';
import 'package:smart_home_flutter/core/constants/mqtt_constants.dart';
import 'package:crypto/crypto.dart';
import 'dart:convert';

// 生成JSON解析代码的注解(必须添加)
part 'mqtt_control_message.g.dart';

// MQTT控制消息模型(APP→设备,直接复用)
@JsonSerializable()
class MqttControlMessage {
  // 设备唯一ID(必传)
  @JsonKey(name: 'deviceId', required: true)
  final String deviceId;

  // 控制参数(根据设备类型动态变化,比如空调的温度、灯光的亮度)
  @JsonKey(name: 'controlParams', required: true)
  final Map<String, dynamic> controlParams;

  // 消息发送时间戳(毫秒,用于消息去重)
  @JsonKey(name: 'timestamp', required: true)
  final int timestamp;

  // 发送端客户端ID(APP的Client ID,必传)
  @JsonKey(name: 'clientId', required: true)
  final String clientId;

  // 消息服务质量(必传,推荐1)
  @JsonKey(name: 'qos', required: true)
  final int qos;

  // 构造函数(私有,外部通过fromDeviceAndParams方法创建消息,避免参数遗漏)
  MqttControlMessage._({
    required this.deviceId,
    required this.controlParams,
    required this.timestamp,
    required this.clientId,
    required this.qos,
  });

  // 从JSON解析消息(用于接收设备响应时解析消息)
  factory MqttControlMessage.fromJson(Map<String, dynamic> json) =>
      _$MqttControlMessageFromJson(json);

  // 将消息转为JSON(用于发送消息时序列化)
  Map<String, dynamic> toJson() => _$MqttControlMessageToJson(this);

  // 新增:从设备和控制参数生成控制消息(简化消息创建,外部直接调用)
  static MqttControlMessage fromDeviceAndParams({
    required BaseDevice device,
    required Map<String, dynamic> controlParams,
    required String clientId,
  }) {
    return MqttControlMessage._(
      deviceId: device.deviceId,
      controlParams: controlParams,
      timestamp: DateTime.now().millisecondsSinceEpoch, // 当前时间戳
      clientId: clientId,
      qos: MqttConstants.defaultQos.value, // 复用默认QoS(1)
    );
  }

  // 新增:生成消息唯一标识(用于消息去重,避免重复执行控制指令)
  String get messageId {
    // 拼接deviceId、timestamp、clientId生成唯一标识,通过MD5加密确保唯一性
    final String uniqueStr = '$deviceId-$timestamp-$clientId';
    return md5.convert(utf8.encode(uniqueStr)).toString();
  }

  // 新增:验证消息合法性(检查必填参数,避免发送无效消息)
  bool isValid() {
    return deviceId.isNotEmpty &&
        controlParams.isNotEmpty &&
        timestamp > 0 &&
        clientId.isNotEmpty &&
        (qos == 0 || qos == 1 || qos == 2); // QoS只能是0、1、2
  }
}
2.2 创建状态消息模型(设备→APP / 后端)

用于封装设备向 APP、后端上报的状态消息,关联 Day3 的 BaseDevice 模型,提供消息生成、JSON 解析、转为设备模型的方法,直接复制代码:

文件路径:lib/core/mqtt/models/mqtt_state_message.dart

dart

import 'package:json_annotation/json_annotation.dart';
import 'package:smart_home_flutter/data/models/device_model.dart';
import 'package:smart_home_flutter/core/constants/mqtt_constants.dart';

// 生成JSON解析代码的注解(必须添加)
part 'mqtt_state_message.g.dart';

// 设备状态通用参数(对应消息中的state字段,所有设备共用)
@JsonSerializable()
class DeviceStateParams {
  // 设备开关状态(on/off,与后端、设备保持一致)
  @JsonKey(name: 'status', required: true)
  final String status;

  // 设备在线状态(true:在线,false:离线)
  @JsonKey(name: 'online', required: true)
  final bool online;

  // 最后活动时间(格式:yyyy-MM-dd HH:mm:ss)
  @JsonKey(name: 'lastActiveTime', required: true)
  final String lastActiveTime;

  // 构造函数
  DeviceStateParams({
    required this.status,
    required this.online,
    required this.lastActiveTime,
  });

  // 从JSON解析
  factory DeviceStateParams.fromJson(Map<String, dynamic> json) =>
      _$DeviceStateParamsFromJson(json);

  // 转为JSON
  Map<String, dynamic> toJson() => _$DeviceStateParamsToJson(this);

  // 转为DeviceStatusEnum(关联Day3的设备模型,用于更新本地设备状态)
  DeviceStatusEnum get toDeviceStatusEnum {
    return status == 'on' ? DeviceStatusEnum.on : DeviceStatusEnum.off;
  }
}

// MQTT状态消息模型(设备→APP/后端,直接复用)
@JsonSerializable()
class MqttStateMessage {
  // 设备唯一ID(必传)
  @JsonKey(name: 'deviceId', required: true)
  final String deviceId;

  // 设备当前状态(通用参数,所有设备共用)
  @JsonKey(name: 'state', required: true)
  final DeviceStateParams state;

  // 设备专属参数(根据设备类型动态变化)
  @JsonKey(name: 'params', required: true)
  final Map<String, dynamic> params;

  // 消息发送时间戳(毫秒)
  @JsonKey(name: 'timestamp', required: true)
  final int timestamp;

  // 发送端客户端ID(设备的Client ID,必传)
  @JsonKey(name: 'clientId', required: true)
  final String clientId;

  // 消息服务质量(可选,推荐0)
  @JsonKey(name: 'qos', required: true)
  final int qos;

  // 构造函数(私有,外部通过fromDevice方法创建)
  MqttStateMessage._({
    required this.deviceId,
    required this.state,
    required this.params,
    required this.timestamp,
    required this.clientId,
    required this.qos,
  });

  // 从JSON解析消息(APP接收设备状态消息时调用)
  factory MqttStateMessage.fromJson(Map<String, dynamic> json) =>
      _$MqttStateMessageFromJson(json);

  // 将消息转为JSON(设备端上报状态时调用,APP端可用于模拟)
  Map<String, dynamic> toJson() => _$MqttStateMessageToJson(this);

  // 新增:从设备生成状态消息(设备端可用,APP端可用于模拟设备上报)
  static MqttStateMessage fromDevice({
    required BaseDevice device,
    required String clientId,
  }) {
    // 构建通用状态参数(所有设备共用)
    final DeviceStateParams stateParams = DeviceStateParams(
      status: device.status.name, // 转为字符串(on/off)
      online: device.online,
      lastActiveTime: device.lastActiveTime,
    );

    // 构建设备专属参数(根据设备类型动态获取,关联Day3的子类模型)
    Map<String, dynamic> deviceParams = {};
    if (device is AirConditionerDevice) {
      // 空调专属参数
      deviceParams = {
        'temperature': device.temperature,
        'mode': device.mode,
        'fanSpeed': device.fanSpeed,
      };
    } else if (device is LightDevice) {
      // 灯光专属参数
      deviceParams = {
        'brightness': device.brightness,
        'color': device.color,
      };
    } else if (device is CurtainDevice) {
      // 窗帘专属参数
      deviceParams = {
        'position': device.position,
        'speed': device.speed,
      };
    }

    return MqttStateMessage._(
      deviceId: device.deviceId,
      state: stateParams,
      params: deviceParams,
      timestamp: DateTime.now().millisecondsSinceEpoch,
      clientId: clientId,
      qos: 0, // 状态消息默认QoS 0,节省带宽
    );
  }

  // 新增:转为BaseDevice(关联本地设备模型,用于更新本地数据库)
  BaseDevice toBaseDevice(BaseDevice existingDevice) {
    // 复用已有设备的信息(如名称、房间、图标),仅更新状态和专属参数
    if (existingDevice is AirConditionerDevice) {
      return AirConditionerDevice(
        deviceId: existingDevice.deviceId,
        name: existingDevice.name,
        type: existingDevice.type,
        status: state.toDeviceStatusEnum, // 更新开关状态
        online: state.online, // 更新在线状态
        room: existingDevice.room,
        lastActiveTime: state.lastActiveTime, // 更新最后活动时间
        iconUrl: existingDevice.iconUrl,
        temperature: params['temperature'] ?? existingDevice.temperature, // 更新温度
        mode: params['mode'] ?? existingDevice.mode, // 更新模式
        fanSpeed: params['fanSpeed'] ?? existingDevice.fanSpeed, // 更新风速
      );
    } else if (existingDevice is LightDevice) {
      return LightDevice(
        deviceId: existingDevice.deviceId,
        name: existingDevice.name,
        type: existingDevice.type,
        status: state.toDeviceStatusEnum,
        online: state.online,
        room: existingDevice.room,
        lastActiveTime: state.lastActiveTime,
        iconUrl: existingDevice.iconUrl,
        brightness: params['brightness'] ?? existingDevice.brightness, // 更新亮度
        color: params['color'] ?? existingDevice.color, // 更新颜色
      );
    } else if (existingDevice is CurtainDevice) {
      return CurtainDevice(
        deviceId: existingDevice.deviceId,
        name: existingDevice.name,
        type: existingDevice.type,
        status: state.toDeviceStatusEnum,
        online: state.online,
        room: existingDevice.room,
        lastActiveTime: state.lastActiveTime,
        iconUrl: existingDevice.iconUrl,
        position: params['position'] ?? existingDevice.position, // 更新开合度
        speed: params['speed'] ?? existingDevice.speed, // 更新速度
      );
    } else {
      // 其他设备类型,直接返回原有设备(仅更新状态)
      return BaseDevice(
        deviceId: existingDevice.deviceId,
        name: existingDevice.name,
        type: existingDevice.type,
        status: state.toDeviceStatusEnum,
        online: state.online,
        room: existingDevice.room,
        lastActiveTime: state.lastActiveTime,
        iconUrl: existingDevice.iconUrl,
      );
    }
  }
}
2.3 创建异常消息模型(设备→APP / 后端)

用于封装设备向 APP、后端上报的异常消息,提供消息生成、JSON 解析、异常类型判断方法,直接复制代码:

文件路径:lib/core/mqtt/models/mqtt_alert_message.dart

dart

import 'package:json_annotation/json_annotation.dart';
import 'package:smart_home_flutter/core/constants/mqtt_constants.dart';

// 生成JSON解析代码的注解(必须添加)
part 'mqtt_alert_message.g.dart';

// 设备异常类型枚举(与消息中的alertType对应,统一管理)
enum DeviceAlertType {
  powerFailure('power_failure', '设备断电'), // 断电
  fault('fault', '设备故障'), // 故障
  networkError('network_error', '网络异常'), // 网络异常
  lowPower('low_power', '低电量'), // 低电量(电池供电设备)
  other('other', '其他异常'); // 其他异常

  // 异常类型标识(与消息中的alertType一致)
  final String type;
  // 异常类型描述(用于UI展示)
  final String desc;

  const DeviceAlertType(this.type, this.desc);

  // 从字符串解析异常类型(消息解析时调用)
  static DeviceAlertType fromString(String type) {
    return DeviceAlertType.values.firstWhere(
      (e) => e.type == type,
      orElse: () => DeviceAlertType.other,
    );
  }
}

// MQTT异常消息模型(设备→APP/后端,直接复用)
@JsonSerializable()
class MqttAlertMessage {
  // 设备唯一ID(必传)
  @JsonKey(name: 'deviceId', required: true)
  final String deviceId;

  // 异常类型(与DeviceAlertType对应)
  @JsonKey(name: 'alertType', required: true)
  final String alertType;

  // 异常描述(用于UI展示,如“设备断电,请检查电源”)
  @JsonKey(name: 'alertDesc', required: true)
  final String alertDesc;

  // 消息发送时间戳(毫秒)
  @JsonKey(name: 'timestamp', required: true)
  final int timestamp;

  // 发送端客户端ID(设备的Client ID,必传)
  @JsonKey(name: 'clientId', required: true)
  final String clientId;

  // 消息服务质量(必传,推荐1,确保异常消息不丢失)
  @JsonKey(name: 'qos', required: true)
  final int qos;

  // 构造函数(私有,外部通过fromParams方法创建)
  MqttAlertMessage._({
    required this.deviceId,
    required this.alertType,
    required this.alertDesc,
    required this.timestamp,
    required this.clientId,
    required this.qos,
  });

  // 从JSON解析消息(APP接收异常消息时调用)
  factory MqttAlertMessage.fromJson(Map<String, dynamic> json) =>
      _$MqttAlertMessageFromJson(json);

  // 将消息转为JSON(设备端上报异常时调用)
  Map<String, dynamic> toJson() => _$MqttAlertMessageToJson(this);
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐