Flutter for OpenHarmony:queue 异步任务队列管理(并发控制与任务调度) 深度解析与鸿蒙适配指南
摘要 本文介绍了如何在Dart中使用queue包管理异步任务并发,特别适用于OpenHarmony应用开发。该包通过Queue类实现任务队列控制,支持串行(parallel:1)和并行(parallel:N)两种模式,有效解决以下问题: 服务器过载:限制API请求并发数,避免429错误 资源保护:防止大量并发任务导致内存溢出(OOM)或UI卡顿 数据一致性:确保数据库写入等操作的顺序执行 文章详细
欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

前言
在 Dart 中,异步操作(Future)通常是并发执行的。如果你在一个 for 循环里发起了 100 个网络请求:
for (var url in urls) {
fetch(url); // 瞬间发出100个请求
}
这会导致什么?
- 服务器爆炸:可能触发 API速率限制(429 Too Many Requests)。
- 客户端OOM:瞬间创建过多的 Socket 连接和 Buffer。
- UI 卡顿:大量的 Event Loop 任务阻塞。
我们需要一种机制来限制并发数,或者让任务串行执行。
queue package 就是这样一个轻量级的任务队列管理库。它允许你控制同时运行的 Future 数量。
对于 OpenHarmony 应用,特别是在处理文件批量上传、数据库批量写入或图片批量处理等场景,queue 是保护系统资源的防波堤。
一、核心功能
queue 的核心类是 Queue。它维护一个待执行的任务列表,并根据设定的 parallel(并发数)来调度执行。
- Serial Queue (parallel: 1): 严格串行,一个接一个。
- Concurrent Queue (parallel: N): 最多同时跑 N 个。
- Task Management: 添加任务,取消剩余任务(部分支持)。
二、集成与用法详解
2.1 添加依赖
dependencies:
queue: ^3.4.0
2.2 基础用法
import 'package:queue/queue.dart';
void main() async {
// 1. 创建队列,限制并行数为 2
final queue = Queue(parallel: 2);
// 2. 添加任务
for (int i = 0; i < 5; i++) {
queue.add(() async {
print('开始任务 $i');
await Future.delayed(Duration(seconds: 1));
print('结束任务 $i');
});
}
// 3. 等待所有任务完成
await queue.onComplete;
print('全部完成!');
}
输出会是:同时有两个 Start,1秒后两个 End,然后接下两个…

2.3 获取返回值
queue.add 会返回一个 Future,你可以等待单个任务的结果。
final resultFuture = queue.add(() async {
return await http.get(...);
});
print(await resultFuture);

三、OpenHarmony 适配与实战:批量文件上传
在鸿蒙 App 中,用户选择了 50 张照片要上传到服务器。如果我们并发 50 个上传任务,可能会因为内存占用过高被系统杀掉,或者导致网络拥塞传输极慢。
3.1 实现并发控制上传器
import 'package:queue/queue.dart';
class UploadManager {
// 限制同时上传 3 张图片
final _uploadQueue = Queue(parallel: 3);
Future<void> uploadImages(List<String> filePaths) async {
final futures = <Future>[];
for (final path in filePaths) {
// 将任务加入队列
final future = _uploadQueue.add(() => _doUpload(path));
futures.add(future);
}
// 等待全部完成
await Future.wait(futures);
}
Future<String> _doUpload(String path) async {
print('正在上传 $path...');
// 模拟耗时
await Future.delayed(Duration(seconds: 2));
return 'url_for_$path';
}
void cancelAll() {
_uploadQueue.cancel(); // 取消尚未开始的任务
}
}
使用 queue 后,无论用户一次选了多少图,系统最多只维护 3 个上传连接,既快又稳。

3.2 数据库写入保护
SQLite(尤其是 sqflite)通常只支持单线程写入。如果多个并发 Future 试图同时写库,可能会遇到锁竞争。
虽然数据库驱动通常有内部锁,但在业务层使用 Queue(parallel: 1) 强制串行化写入任务,可以从逻辑上避免竞争条件,确保写入顺序。
final dbQueue = Queue(parallel: 1);
Future<void> logAction(String action) {
return dbQueue.add(() async {
await db.insert('logs', {'action': action});
});
}

四、进阶:与 Stream 结合
如果你有一个源源不断的任务流(比如实时处理每一帧与服务器通信),可以将 Queue 包装在 Stream 的 listen 回调中。
stream.listen((data) {
queue.add(() => process(data));
});
但要注意,如果生产者的速度远快于消费者的速度(Queue 处理不过来),queue 内部的待处理列表会无限堆积,最终导致 OOM。这种情况下你需要通过 Backpressure(背压)机制来丢弃任务或暂停生产者。
五、总结
queue 是 Dart 异步编程中被低估的工具。它用不到 100 行代码解决了复杂的并发调度问题。
对于 OpenHarmony 开发者:
- 网络优化:用 queue 限制 API 并发,避免拥塞。
- 资源保护:控制文件 IO 和内存密集型任务的并发度,防止应用崩溃。
它非常适合作为应用基础架构的一部分(BaseRepository, NetworkManager)。
最佳实践:
- 按场景创建 Queue:不要全局共用一个 Queue,应该为网络、DB、文件各自创建 Queue。
- 设置合理的 parallel:网络请求通常 4-6 个,IO 操作通常 1-2 个。
- 处理异常:
queue.add返回的 Future 可能会抛出异常,记得在 await 时 catch 它,否则可能中断整个流程。
六、完整实战示例
import 'package:queue/queue.dart';
class DownloadManager {
// 限制同时只能有 2 个下载任务,避免带宽占满
final _queue = Queue(parallel: 2);
Future<void> downloadFiles(List<String> urls) async {
final futures = <Future>[];
for (var url in urls) {
// queue.add 返回的是任务闭包的返回值 (Future)
// 这里的 catchError 保证单个失败不影响整体等待
final task = _queue.add(() => _startDownload(url)).catchError((e) {
print('❌ 下载失败 $url: $e');
});
futures.add(task);
}
// 等待所有下载结束
await Future.wait(futures);
print('所有文件处理完毕');
}
Future<String> _startDownload(String url) async {
final activeCount = _queue.running;
final pendingCount = _queue.pending;
print('⬇️ 开始下载: $url (当前进行中: $activeCount, 等待中: $pendingCount)');
// 模拟不同大小文件的下载时间
await Future.delayed(Duration(milliseconds: 500));
if (url.contains('error')) throw Exception('404 Not Found');
print('✅ 下载完成: $url');
return '/storage/$url';
}
}
void main() async {
final manager = DownloadManager();
// 模拟一批文件,其中包含一个坏链接
final urls = ['file1.zip', 'file2.zip', 'file_error.zip', 'file3.zip', 'file4.zip'];
await manager.downloadFiles(urls);
}

更多推荐


所有评论(0)