欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

在这里插入图片描述

前言

在 Dart 中,异步操作(Future)通常是并发执行的。如果你在一个 for 循环里发起了 100 个网络请求:

for (var url in urls) {
  fetch(url); // 瞬间发出100个请求
}

这会导致什么?

  1. 服务器爆炸:可能触发 API速率限制(429 Too Many Requests)。
  2. 客户端OOM:瞬间创建过多的 Socket 连接和 Buffer。
  3. UI 卡顿:大量的 Event Loop 任务阻塞。

我们需要一种机制来限制并发数,或者让任务串行执行

queue package 就是这样一个轻量级的任务队列管理库。它允许你控制同时运行的 Future 数量。

对于 OpenHarmony 应用,特别是在处理文件批量上传、数据库批量写入或图片批量处理等场景,queue 是保护系统资源的防波堤。

一、核心功能

queue 的核心类是 Queue。它维护一个待执行的任务列表,并根据设定的 parallel(并发数)来调度执行。

  1. Serial Queue (parallel: 1): 严格串行,一个接一个。
  2. Concurrent Queue (parallel: N): 最多同时跑 N 个。
  3. Task Management: 添加任务,取消剩余任务(部分支持)。

槽位 1

槽位 2

等待

完成

调度下一个

多个任务 1..100

Queue

运行中

运行中

挂起任务

二、集成与用法详解

2.1 添加依赖

dependencies:
  queue: ^3.4.0

2.2 基础用法

import 'package:queue/queue.dart';

void main() async {
  // 1. 创建队列,限制并行数为 2
  final queue = Queue(parallel: 2);

  // 2. 添加任务
  for (int i = 0; i < 5; i++) {
    queue.add(() async {
      print('开始任务 $i');
      await Future.delayed(Duration(seconds: 1));
      print('结束任务 $i');
    });
  }

  // 3. 等待所有任务完成
  await queue.onComplete;
  print('全部完成!');
}

输出会是:同时有两个 Start,1秒后两个 End,然后接下两个…

在这里插入图片描述

2.3 获取返回值

queue.add 会返回一个 Future,你可以等待单个任务的结果。

final resultFuture = queue.add(() async {
  return await http.get(...);
});
print(await resultFuture);

在这里插入图片描述

三、OpenHarmony 适配与实战:批量文件上传

在鸿蒙 App 中,用户选择了 50 张照片要上传到服务器。如果我们并发 50 个上传任务,可能会因为内存占用过高被系统杀掉,或者导致网络拥塞传输极慢。

3.1 实现并发控制上传器

import 'package:queue/queue.dart';

class UploadManager {
  // 限制同时上传 3 张图片
  final _uploadQueue = Queue(parallel: 3);
  
  Future<void> uploadImages(List<String> filePaths) async {
    final futures = <Future>[];

    for (final path in filePaths) {
      // 将任务加入队列
      final future = _uploadQueue.add(() => _doUpload(path));
      futures.add(future);
    }

    // 等待全部完成
    await Future.wait(futures);
  }

  Future<String> _doUpload(String path) async {
    print('正在上传 $path...');
    // 模拟耗时
    await Future.delayed(Duration(seconds: 2));
    return 'url_for_$path';
  }
  
  void cancelAll() {
    _uploadQueue.cancel(); // 取消尚未开始的任务
  }
}

使用 queue 后,无论用户一次选了多少图,系统最多只维护 3 个上传连接,既快又稳。

在这里插入图片描述

3.2 数据库写入保护

SQLite(尤其是 sqflite)通常只支持单线程写入。如果多个并发 Future 试图同时写库,可能会遇到锁竞争。
虽然数据库驱动通常有内部锁,但在业务层使用 Queue(parallel: 1) 强制串行化写入任务,可以从逻辑上避免竞争条件,确保写入顺序。

final dbQueue = Queue(parallel: 1);

Future<void> logAction(String action) {
  return dbQueue.add(() async {
    await db.insert('logs', {'action': action});
  });
}

在这里插入图片描述

四、进阶:与 Stream 结合

如果你有一个源源不断的任务流(比如实时处理每一帧与服务器通信),可以将 Queue 包装在 Stream 的 listen 回调中。

stream.listen((data) {
  queue.add(() => process(data));
});

但要注意,如果生产者的速度远快于消费者的速度(Queue 处理不过来),queue 内部的待处理列表会无限堆积,最终导致 OOM。这种情况下你需要通过 Backpressure(背压)机制来丢弃任务或暂停生产者。

五、总结

queue 是 Dart 异步编程中被低估的工具。它用不到 100 行代码解决了复杂的并发调度问题。

对于 OpenHarmony 开发者:

  • 网络优化:用 queue 限制 API 并发,避免拥塞。
  • 资源保护:控制文件 IO 和内存密集型任务的并发度,防止应用崩溃。

它非常适合作为应用基础架构的一部分(BaseRepository, NetworkManager)。

最佳实践

  1. 按场景创建 Queue:不要全局共用一个 Queue,应该为网络、DB、文件各自创建 Queue。
  2. 设置合理的 parallel:网络请求通常 4-6 个,IO 操作通常 1-2 个。
  3. 处理异常queue.add 返回的 Future 可能会抛出异常,记得在 await 时 catch 它,否则可能中断整个流程。

六、完整实战示例

import 'package:queue/queue.dart';

class DownloadManager {
  // 限制同时只能有 2 个下载任务,避免带宽占满
  final _queue = Queue(parallel: 2);
  
  Future<void> downloadFiles(List<String> urls) async {
    final futures = <Future>[];
    
    for (var url in urls) {
      // queue.add 返回的是任务闭包的返回值 (Future)
      // 这里的 catchError 保证单个失败不影响整体等待
      final task = _queue.add(() => _startDownload(url)).catchError((e) {
        print('❌ 下载失败 $url: $e');
      });
      futures.add(task);
    }

    // 等待所有下载结束
    await Future.wait(futures);
    print('所有文件处理完毕');
  }

  Future<String> _startDownload(String url) async {
    final activeCount = _queue.running;
    final pendingCount = _queue.pending;
    print('⬇️ 开始下载: $url (当前进行中: $activeCount, 等待中: $pendingCount)');
    
    // 模拟不同大小文件的下载时间
    await Future.delayed(Duration(milliseconds: 500)); 
    
    if (url.contains('error')) throw Exception('404 Not Found');

    print('✅ 下载完成: $url');
    return '/storage/$url';
  }
}

void main() async {
  final manager = DownloadManager();
  // 模拟一批文件,其中包含一个坏链接
  final urls = ['file1.zip', 'file2.zip', 'file_error.zip', 'file3.zip', 'file4.zip'];
  
  await manager.downloadFiles(urls);
}

在这里插入图片描述

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐