欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

在这里插入图片描述

前言

Dart 语言天生支持异步编程(Future, Stream, async/await),这使得它非常适合 UI 开发。然而,标准库 dart:async 提供的是最基础的原语。当你面对复杂的异步场景时,比如:

  • “我需要合并三个 Stream,无论谁来了数据都处理。”
  • “我要把一个 Stream 切分成块,但不想手动写 transformer。”
  • “我想缓存 Future 的结果,防止重复网络请求。”

这时候,async package 就登场了。它是由 Dart 团队维护的官方扩展库,提供了大量实用的工具类、集合操作符和 Stream 辅助函数,填补了标准库在复杂业务场景下的空白。

对于 OpenHarmony 开发,由于鸿蒙应用的界面更新高度依赖异步事件驱动(如系统回调、硬件传感器数据),熟练使用 async 库能让你的代码逻辑更加清晰、健壮。

一、核心功能概览

async 库的功能非常零碎但实用,主要可以分为以下几类:

  1. Future Extensions: CancelableOperation, AsyncCache, FutureGroup
  2. Stream Extensions: StreamGroup, StreamQueue, SubscriptionStream
  3. Utility Classes: Result (类似 Rust 的 Result), RestartableTimer

基础原语

扩展

扩展

扩展

应用

dart:async 标准库

package:async

Future 增强 (Result, Cache)

Stream 复杂操作 (Group, Queue)

流程控制 (Cancelable, Splitter)

鸿蒙应用逻辑

二、集成与用法详解

2.1 添加依赖

dependencies:
  async: ^2.13.0

2.2 彻底解决 Future 取消难题:CancelableOperation

Dart 的原生 Future 是不可取消的。一旦你 await future,就必须等待它完成或报错。即使 UI 已经销毁了,网络请求回来后 setState 依然会报错。

CancelableOperation 包装了一个 Future,允许你中途取消回调。

import 'package:async/async.dart';

void main() async {
  var completer = CancelableCompleter<String>(onCancel: () {
    print('操作被取消了,清理资源...');
  });

  // 模拟耗时任务
  Future.delayed(Duration(seconds: 3), () {
    if (!completer.isCanceled) {
      completer.complete('任务完成');
    }
  });

  var operation = completer.operation;

  operation.value.then((val) => print('结果: $val'));

  // 1秒后取消
  await Future.delayed(Duration(seconds: 1));
  print('正在取消...');
  await operation.cancel(); 
  // 输出: 
  // 正在取消...
  // 操作被取消了,清理资源...
  // ("结果: 任务完成" 永远不会输出)
}

在 Flutter 页面 dispose 时,取消所有正在进行的 CancelableOperation 是最佳实践。

在这里插入图片描述

2.3 优雅的缓存:AsyncCache

不想引入复杂的数据库,只想在内存里缓存一下网络请求?AsyncCache 是最轻量的选择。

final _usersCache = AsyncCache<List<String>>(const Duration(minutes: 5));

Future<List<String>> getUsers() async {
  // 如果缓存有效,直接返回缓存
  // 否则执行 fetchUsers(),并缓存结果 5 分钟
  return _usersCache.fetch(() => fetchFromApi());
}

Future<List<String>> fetchFromApi() async {
  print('调用真实 API');
  return ['张三', '李四'];
}

这对于鸿蒙手表或车机等网络环境不稳定的设备特别有用,能显著减少不必要的请求。

在这里插入图片描述

2.4 Stream 的瑞士军刀

1. StreamGroup (合并流)

你想同时监听 蓝牙状态变化、网络状态变化、和用户点击事件?

var group = StreamGroup<String>();
group.add(bluetoothStream);
group.add(networkStream);
group.close(); // 当添加完毕后关闭

// 这里会收到所有子流发来的数据
group.stream.listen((event) => print('收到事件: $event'));

在这里插入图片描述

2. StreamQueue (拉取式消费)

通常 Stream 是“推”模型(Push)。但有时我们需要“拉”模型(Pull),比如解析协议头时:先读 4 个字节,判断类型,再读 n 个字节。

var events = StreamQueue<int>(sourceStream);

// 像操作迭代器一样操作流
var first = await events.next; 
var header = await events.take(4); // 等待并获取接下来的4个
var rest = await events.rest.toList(); // 获取剩余所有

三、OpenHarmony 适配实战:Result 类型处理

在 OpenHarmony 原生开发(ArkTS)中,很多 API 可能返回错误码。在 Dart 层,传统的 try-catch 写起来比较臃肿。package:async 提供了 Result 类型,将“成功值”和“异常”统一封装为一个对象,便于传递。

3.1 场景:封装鸿蒙系统能力

假设我们调用一个不稳定的鸿蒙原生方法。

import 'package:async/async.dart';
import 'package:flutter/services.dart';

class OhosSystemApi {
  static const platform = MethodChannel('ohos.system');

  // 将 try-catch 封装在底层,上层拿到的是 Result 对象
  static Future<Result<String>> getDeviceInfo() async {
    try {
      final info = await platform.invokeMethod('getDeviceInfo');
      return Result.value(info);
    } catch (e, stack) {
      return Result.error(e, stack);
    }
  }
}

// 业务层调用
void showInfo() async {
  var result = await OhosSystemApi.getDeviceInfo();
  
  if (result.isValue) {
    print('设备信息: ${result.asValue!.value}');
  } else {
    print('获取失败: ${result.asError!.error}');
    // 还可以选择是否重新抛出
    // result.asError!.complete(completer); 
  }
}

这种模式让错误处理变成了显式的逻辑分支,而不是跳跃的异常流,对于构建高稳定性的鸿蒙工业 APP 很有帮助。

在这里插入图片描述

四、高级进阶:StreamSplitter

有时候我们有一个单订阅的 Stream(比如来自 HTTP Response 的 bytes 流),但我们需要多处监听(一处用于写文件,一处用于计算 MD5)。直接 listen 两次会报错。

虽然可以用 asBroadcastStream,但 StreamSplitter 更强大,它支持创建任意数量的副本,并在所有副本关闭后才关闭源流。

var splitter = StreamSplitter(sourceStream);

var stream1 = splitter.split();
var stream2 = splitter.split();

// 两个流互不干扰,数据相同
stream1.listen((data) => writeToFile(data));
stream2.listen((data) => calculateMd5(data));

splitter.close(); // 允许流开始流动

注意:在处理大文件流时要小心,StreamSplitter 可能会在内存中缓冲数据以等待慢速的订阅者,可能导致内存占用增加。

五、总结

package:async 是那种“你可能没听过,但一旦用了就离不开”的库。它补充了 Dart 标准库在异步控制流上的不足。

对于 OpenHarmony 开发者:

  • CancelableOperation 解决页面销毁后的 setState 异常。
  • AsyncCache 优化弱网环境下的数据体验。
  • Result 封装跨端调用的不确定性。
  • StreamGroup 聚合来自不同鸿蒙子系统(位置、传感器、网络)的事件流。

它不需要任何原生适配,是纯 Dart 逻辑,因此在鸿蒙、Android、iOS 上的表现完全一致,值得加入你的标准依赖库列表。

六、完整实战示例

import 'dart:async';
import 'package:async/async.dart';

// 模拟一个不稳定的网络请求
Future<String> fetchUser(int id) async {
  await Future.delayed(Duration(milliseconds: 500));
  if (id < 0) throw Exception('无效 ID');
  return '用户_$id';
}

void main() async {
  // 1. AsyncCache: 避免短时间内重复请求
  // 比如鸿蒙应用中获取设备信息的接口,没必要每次点按钮都调底层
  final cache = AsyncCache<String>(Duration(seconds: 5));
  
  print('第一次调用...');
  print(await cache.fetch(() => fetchUser(1))); // 执行并缓存

  print('第二次调用 (走缓存)...');
  print(await cache.fetch(() => fetchUser(1))); // 直接返回缓存,不等待

  // 2. StreamGroup: 合并多个事件源
  // 比如同时监听触摸屏点击和实体按键事件
  final touchStream = Stream.periodic(Duration(seconds: 1), (i) => '触摸_$i').take(3);
  final keyStream = Stream.periodic(Duration(seconds: 2), (i) => '按键_$i').take(2);
  
  final inputMerged = StreamGroup.merge([touchStream, keyStream]);
  await for (var event in inputMerged) {
    print('输入事件: $event');
  }

  // 3. Result: 安全处理错误,不让异常中断 UI 渲染流程
  print('开始错误处理演示...');
  final result = await Result.capture(fetchUser(-1));
  
  if (result.isError) {
    print('安全捕获错误: ${result.asError!.error}');
  } else {
    print('成功: ${result.asValue!.value}');
  }
}

在这里插入图片描述

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐