Flutter for OpenHarmony:async 异步编程的强力补丁,流处理与集合操作的扩展库(Dart 官方出品) 深度解析与鸿蒙适配指南
Dart 语言天生支持异步编程(FutureStream),这使得它非常适合 UI 开发。然而,标准库dart:async提供的是最基础的原语。“我需要合并三个 Stream,无论谁来了数据都处理。“我要把一个 Stream 切分成块,但不想手动写 transformer。“我想缓存 Future 的结果,防止重复网络请求。这时候,asyncpackage 就登场了。
欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

前言
Dart 语言天生支持异步编程(Future, Stream, async/await),这使得它非常适合 UI 开发。然而,标准库 dart:async 提供的是最基础的原语。当你面对复杂的异步场景时,比如:
- “我需要合并三个 Stream,无论谁来了数据都处理。”
- “我要把一个 Stream 切分成块,但不想手动写 transformer。”
- “我想缓存 Future 的结果,防止重复网络请求。”
这时候,async package 就登场了。它是由 Dart 团队维护的官方扩展库,提供了大量实用的工具类、集合操作符和 Stream 辅助函数,填补了标准库在复杂业务场景下的空白。
对于 OpenHarmony 开发,由于鸿蒙应用的界面更新高度依赖异步事件驱动(如系统回调、硬件传感器数据),熟练使用 async 库能让你的代码逻辑更加清晰、健壮。
一、核心功能概览
async 库的功能非常零碎但实用,主要可以分为以下几类:
- Future Extensions:
CancelableOperation,AsyncCache,FutureGroup。 - Stream Extensions:
StreamGroup,StreamQueue,SubscriptionStream。 - Utility Classes:
Result(类似 Rust 的 Result),RestartableTimer。
二、集成与用法详解
2.1 添加依赖
dependencies:
async: ^2.13.0
2.2 彻底解决 Future 取消难题:CancelableOperation
Dart 的原生 Future 是不可取消的。一旦你 await future,就必须等待它完成或报错。即使 UI 已经销毁了,网络请求回来后 setState 依然会报错。
CancelableOperation 包装了一个 Future,允许你中途取消回调。
import 'package:async/async.dart';
void main() async {
var completer = CancelableCompleter<String>(onCancel: () {
print('操作被取消了,清理资源...');
});
// 模拟耗时任务
Future.delayed(Duration(seconds: 3), () {
if (!completer.isCanceled) {
completer.complete('任务完成');
}
});
var operation = completer.operation;
operation.value.then((val) => print('结果: $val'));
// 1秒后取消
await Future.delayed(Duration(seconds: 1));
print('正在取消...');
await operation.cancel();
// 输出:
// 正在取消...
// 操作被取消了,清理资源...
// ("结果: 任务完成" 永远不会输出)
}
在 Flutter 页面 dispose 时,取消所有正在进行的 CancelableOperation 是最佳实践。

2.3 优雅的缓存:AsyncCache
不想引入复杂的数据库,只想在内存里缓存一下网络请求?AsyncCache 是最轻量的选择。
final _usersCache = AsyncCache<List<String>>(const Duration(minutes: 5));
Future<List<String>> getUsers() async {
// 如果缓存有效,直接返回缓存
// 否则执行 fetchUsers(),并缓存结果 5 分钟
return _usersCache.fetch(() => fetchFromApi());
}
Future<List<String>> fetchFromApi() async {
print('调用真实 API');
return ['张三', '李四'];
}
这对于鸿蒙手表或车机等网络环境不稳定的设备特别有用,能显著减少不必要的请求。

2.4 Stream 的瑞士军刀
1. StreamGroup (合并流)
你想同时监听 蓝牙状态变化、网络状态变化、和用户点击事件?
var group = StreamGroup<String>();
group.add(bluetoothStream);
group.add(networkStream);
group.close(); // 当添加完毕后关闭
// 这里会收到所有子流发来的数据
group.stream.listen((event) => print('收到事件: $event'));

2. StreamQueue (拉取式消费)
通常 Stream 是“推”模型(Push)。但有时我们需要“拉”模型(Pull),比如解析协议头时:先读 4 个字节,判断类型,再读 n 个字节。
var events = StreamQueue<int>(sourceStream);
// 像操作迭代器一样操作流
var first = await events.next;
var header = await events.take(4); // 等待并获取接下来的4个
var rest = await events.rest.toList(); // 获取剩余所有
三、OpenHarmony 适配实战:Result 类型处理
在 OpenHarmony 原生开发(ArkTS)中,很多 API 可能返回错误码。在 Dart 层,传统的 try-catch 写起来比较臃肿。package:async 提供了 Result 类型,将“成功值”和“异常”统一封装为一个对象,便于传递。
3.1 场景:封装鸿蒙系统能力
假设我们调用一个不稳定的鸿蒙原生方法。
import 'package:async/async.dart';
import 'package:flutter/services.dart';
class OhosSystemApi {
static const platform = MethodChannel('ohos.system');
// 将 try-catch 封装在底层,上层拿到的是 Result 对象
static Future<Result<String>> getDeviceInfo() async {
try {
final info = await platform.invokeMethod('getDeviceInfo');
return Result.value(info);
} catch (e, stack) {
return Result.error(e, stack);
}
}
}
// 业务层调用
void showInfo() async {
var result = await OhosSystemApi.getDeviceInfo();
if (result.isValue) {
print('设备信息: ${result.asValue!.value}');
} else {
print('获取失败: ${result.asError!.error}');
// 还可以选择是否重新抛出
// result.asError!.complete(completer);
}
}
这种模式让错误处理变成了显式的逻辑分支,而不是跳跃的异常流,对于构建高稳定性的鸿蒙工业 APP 很有帮助。

四、高级进阶:StreamSplitter
有时候我们有一个单订阅的 Stream(比如来自 HTTP Response 的 bytes 流),但我们需要多处监听(一处用于写文件,一处用于计算 MD5)。直接 listen 两次会报错。
虽然可以用 asBroadcastStream,但 StreamSplitter 更强大,它支持创建任意数量的副本,并在所有副本关闭后才关闭源流。
var splitter = StreamSplitter(sourceStream);
var stream1 = splitter.split();
var stream2 = splitter.split();
// 两个流互不干扰,数据相同
stream1.listen((data) => writeToFile(data));
stream2.listen((data) => calculateMd5(data));
splitter.close(); // 允许流开始流动
注意:在处理大文件流时要小心,StreamSplitter 可能会在内存中缓冲数据以等待慢速的订阅者,可能导致内存占用增加。
五、总结
package:async 是那种“你可能没听过,但一旦用了就离不开”的库。它补充了 Dart 标准库在异步控制流上的不足。
对于 OpenHarmony 开发者:
- 用
CancelableOperation解决页面销毁后的setState异常。 - 用
AsyncCache优化弱网环境下的数据体验。 - 用
Result封装跨端调用的不确定性。 - 用
StreamGroup聚合来自不同鸿蒙子系统(位置、传感器、网络)的事件流。
它不需要任何原生适配,是纯 Dart 逻辑,因此在鸿蒙、Android、iOS 上的表现完全一致,值得加入你的标准依赖库列表。
六、完整实战示例
import 'dart:async';
import 'package:async/async.dart';
// 模拟一个不稳定的网络请求
Future<String> fetchUser(int id) async {
await Future.delayed(Duration(milliseconds: 500));
if (id < 0) throw Exception('无效 ID');
return '用户_$id';
}
void main() async {
// 1. AsyncCache: 避免短时间内重复请求
// 比如鸿蒙应用中获取设备信息的接口,没必要每次点按钮都调底层
final cache = AsyncCache<String>(Duration(seconds: 5));
print('第一次调用...');
print(await cache.fetch(() => fetchUser(1))); // 执行并缓存
print('第二次调用 (走缓存)...');
print(await cache.fetch(() => fetchUser(1))); // 直接返回缓存,不等待
// 2. StreamGroup: 合并多个事件源
// 比如同时监听触摸屏点击和实体按键事件
final touchStream = Stream.periodic(Duration(seconds: 1), (i) => '触摸_$i').take(3);
final keyStream = Stream.periodic(Duration(seconds: 2), (i) => '按键_$i').take(2);
final inputMerged = StreamGroup.merge([touchStream, keyStream]);
await for (var event in inputMerged) {
print('输入事件: $event');
}
// 3. Result: 安全处理错误,不让异常中断 UI 渲染流程
print('开始错误处理演示...');
final result = await Result.capture(fetchUser(-1));
if (result.isError) {
print('安全捕获错误: ${result.asError!.error}');
} else {
print('成功: ${result.asValue!.value}');
}
}

更多推荐




所有评论(0)