我是兰瓶Coding,一枚刚踏入鸿蒙领域的转型小白,原是移动开发中级,如下是我学习笔记《零基础学鸿蒙》,若对你所有帮助,还请不吝啬的给个大大的赞~

前言

我承认,第一眼看到 ArkTS 的时候,我把“异步”三个字往脑门上一贴,条件反射就是 async/await 一顿乱敲。直到一个周五晚上,我的 Demo 在列表滚动时卡成 PPT,后台统计一跑 CPU 飙到 120%,手机风扇(哦对,手机没风扇)仿佛在冲我叹气,我才老实下来:鸿蒙里“异步”不只是语法糖,它涉及任务模型、线程池(TaskPool)、事件循环(EventHandler)以及你如何调度并发
  本文我不讲空话,直接沿着你给的大纲来——异步 API → Promise 与 async/await → 并发任务调度——但每一段都上手可跑的例子。还会把 TaskPoolAsyncFunctionEventHandler 这三个“关键字”拆开揉碎,让它们在实际工程里各司其职。
  先立个 flag:整文拒绝 AI 腔,只有我(一个长期被 deadline 教育过的鸿蒙全栈)夜半踩坑后写下的实话实说。有调侃、有复盘、有代码。你照着抄也能跑,抄了还能快。

一、前言|为什么“会用 async/await”≠“会写异步”

说说我那次栽跟头。一个看似“朴素”的统计页面:

  • 列表展示 5 千条待办(离线缓存);
  • 点击“统计”按钮,做去重、关键词聚类、权重计算;
  • 顺手把结果写回本地数据库,再上传云端。

我天真地在 onClick 里来一句:

await crunchBigData(this.todos); // 超大数组,CPU 重型运算

结果 UI 主线程被我锁死了。async/await 不是“后台线程”的代名词,它只是把异步逻辑写得像同步。要是 await 的那个 Promise 里恰好做了重活(纯 JS 计算),仍然在当前线程跑
  这时就轮到鸿蒙的任务池(TaskPool)大显身手,把重活丢到后台执行;细粒度调度、时序编排交给EventHandler;而 async/await 负责拼出通顺的流程与明确的错误边界。三者合体,才是一套“可维护、不卡顿、能扩展”的异步并发解。

二、异步 API 一图观(ArkTS 视角)

我习惯先画一张“脑内地图”,把名词放到位:

  • 回调(Callback):最原始的异步接口风格,常见于某些系统能力;优点轻量,缺点“回调地狱”“错误传递难”。
  • Promise:状态有限(pending/fulfilled/rejected),可链式调用;配合 then/catch/finally
  • async/await:Promise 的语法糖,写法像同步;异常用 try/catch。
  • TaskPool(@ohos.taskpool)把重 CPU 的纯函数任务放进系统线程池跑;和 Web Worker 思路类似,但用起来更轻。
  • EventHandler(@ohos.eventhandler)创建一个事件循环(Runner)与任务队列;可定时、可串行化、可在“专属线程”跑轻中型任务。
  • 定时/延迟handler.postTask(fn, delay) 比随手 setTimeout 更工程化(且在 ArkTS 场景更贴边)。
  • AsyncFunction动态生成异步函数的黑科技(JS 原生能力),在“可配置脚本”“插件”场景下特别好用;在 TaskPool 里执行,既灵活又安全(相对而言)。

把它们放在“正确的位置”:UI 线程只做轻渲染 + IO 启动;CPU 重活进 TaskPool;轻中型串行/定时任务用 EventHandler;流程 glue 用 async/await
  这不是教条,是我踩过坑后总结出来的经验曲线

三、Promise 与 async/await:甜不甜取决于你“收口”的姿势

3.1 用 Promise 包一层:把回调式 API 拉平

鸿蒙里有些系统能力仍提供回调式接口。我的习惯是第一时间做 Promise 化封装。下面以 HTTP 请求(@ohos.net.http)为例,给出一个非常“可抄可改”的写法:

// http/client.ts
import http from '@ohos.net.http';

export interface ApiResp<T> { code: number; msg: string; data: T }

export class HttpClient {
  constructor(private base: string) {}

  get<T>(path: string, headers: Record<string, string> = {}): Promise<T> {
    const req = http.createHttp();
    return new Promise<T>((resolve, reject) => {
      req.request(this.base + path, { method: http.RequestMethod.GET, header: headers })
        .then(res => {
          if (res.responseCode !== 200) return reject(new Error(`HTTP ${res.responseCode}`));
          let body: ApiResp<T>;
          try {
            body = JSON.parse(String(res.result));
          } catch (e) {
            return reject(new Error('JSON parse error'));
          }
          if (body.code !== 0) return reject(new Error(body.msg || 'Biz error'));
          resolve(body.data);
        })
        .catch(reject)
        .finally(() => req.destroy());
    });
  }
}

以后在页面里,你就可以优雅地:

const api = new HttpClient('https://api.example.com');
try {
  const profile = await api.get<{ name: string; vip: boolean }>('/me');
  this.userName = profile.name;
} catch (e) {
  this.toast('网络开小差了:' + (e as Error).message);
}

要点

  • 捕获解析错误,别把“网络成功但 JSON 失败”当成“网络失败”。
  • 业务态与协议态分离:HTTP 200 不代表业务成功;code !== 0 也要走 reject
  • 资源回收finallyreq.destroy(),形成肌肉记忆。

3.2 async/await 的“收口设计”:三段式

凡是交互触发的异步流程,我都有“三段式”套路:准备 → 核心 → 善后

async doSyncAll() {
  this.loading = true; // 准备
  try {
    const local = await this.repo.list();
    const payload = await this.buildSyncPayload(local); // 可能是 CPU 重
    const result = await this.api.post('/sync', payload);
    await this.repo.merge(result.updated);
    this.toast('同步完成 ✅');
  } catch (e) {
    this.toast('同步失败:' + (e as Error).message);
  } finally {
    this.loading = false; // 善后必执行
  }
}

这段和平时写没差,但关键在“可能很重”的那步buildSyncPayload 就是适合丢 TaskPool 的候选。我们先按普通写法跑通,再做“重活下沉”,这是我在中大型项目里保持“增量可控”的策略。

3.3 并行与串行:真的不要“一股脑儿 await All”

当你要并行请求多个接口,或同时读写多份数据,不要条件反射地:

// 慢且串行
const a = await getA();
const b = await getB();
const c = await getC();

更高效的写法是:

const [a, b, c] = await Promise.all([getA(), getB(), getC()]);

但!如果有依赖关系,就别硬上 all。把依赖明确地串起来,它更“可读可测”。

3.4 超时控制与取消语义

ArkTS 没有内建的 Promise 超时,你可以做一个小工具:

export async function withTimeout<T>(p: Promise<T>, ms: number, tag = 'timeout'): Promise<T> {
  let timer: number;
  const t = new Promise<never>((_, reject) => {
    // @ts-ignore 延迟的计时器
    timer = setTimeout(() => reject(new Error(tag)), ms) as unknown as number;
  });
  try {
    return await Promise.race([p, t]);
  } finally {
    clearTimeout(timer);
  }
}

取消呢?HTTP 有 req.destroy();自定义任务你可以引入 AbortController 思路(传入 token,任务里主动检查并早停)。这点等我们写 TaskPool 例子时再演示。

四、TaskPool:把“重活”丢给后台,UI 线程只做漂亮的自己

4.1 为什么是 TaskPool

结论先说:在鸿蒙 NEXT 的 ArkTS 里,要把 CPU 密集型任务、长循环、复杂聚合运算放到 TaskPool。这跟 Web Worker 的哲学一致——只不过 TaskPool 的使用门槛更低。在多数情况下,你就把纯函数丢过去,带上参数,它跑完给你结果。

4.2 基本用法:taskpool.Task + taskpool.execute

// worker/heavy.ts
import taskpool from '@ohos.taskpool';

// 一个“纯函数”风格:输入->输出,便于序列化
export function topNWords(text: string, n: number): Array<[string, number]> {
  const freq = new Map<string, number>();
  const words = text.toLowerCase().split(/[^a-zA-Z0-9_\u4e00-\u9fa5]+/).filter(Boolean);
  for (let w of words) freq.set(w, (freq.get(w) || 0) + 1);
  return Array.from(freq.entries()).sort((a, b) => b[1] - a[1]).slice(0, n);
}

// 页面里调用
import { topNWords } from '../worker/heavy';
import taskpool from '@ohos.taskpool';

async function analyzeInPool(text: string) {
  const task = new taskpool.Task(topNWords, text, 20);
  const res = await taskpool.execute(task);
  return res as Array<[string, number]>;
}

要点

  • 纯函数更稳:无外部闭包依赖(序列化困难),更易被 TaskPool 执行器“搬到另一边”。
  • 参数与返回值建议使用可序列化类型(字符串、数值、对象、数组、ArrayBuffer),避免“带引用”的复杂实例。
  • taskpool.execute 返回 Promise,异常会 reject,照常 try/catch。

4.3 一个真·CPU 重活示例:找素数(顺便演示取消)

// worker/primes.ts
export interface PrimeRequest {
  limit: number;
  abortFlag?: { value: boolean }; // 手工的“取消标志”
}

export function sievePrimes(req: PrimeRequest): number[] {
  const { limit, abortFlag } = req;
  const isPrime = new Array<boolean>(limit + 1).fill(true);
  isPrime[0] = isPrime[1] = false;

  for (let p = 2; p * p <= limit; p++) {
    if (abortFlag?.value) { // 轮询取消
      // 抛出一个特殊错误,外层可识别为“被取消”
      throw new Error('ABORTED');
    }
    if (!isPrime[p]) continue;
    for (let m = p * p; m <= limit; m += p) {
      isPrime[m] = false;
    }
  }
  const res: number[] = [];
  for (let i = 2; i <= limit; i++) if (isPrime[i]) res.push(i);
  return res;
}
// 页面调用 primes
import taskpool from '@ohos.taskpool';
import { sievePrimes, PrimeRequest } from '../worker/primes';

class PrimePageVM {
  abortFlag = { value: false };
  running = false;

  async run(limit: number) {
    this.abortFlag.value = false;
    this.running = true;
    try {
      const task = new taskpool.Task(sievePrimes, { limit, abortFlag: this.abortFlag } as PrimeRequest);
      const primes = await taskpool.execute(task);
      this.show(`找到 ${primes.length} 个素数`);
    } catch (e) {
      if ((e as Error).message === 'ABORTED') {
        this.show('用户取消了任务');
      } else {
        this.show('计算失败:' + (e as Error).message);
      }
    } finally {
      this.running = false;
    }
  }

  cancel() {
    this.abortFlag.value = true; // 轮询式取消
  }
}

经验小记

  • 真想做“硬取消”,你需要任务里遵守取消协议(轮询或检查 token);不遵守就没法停。
  • 细颗粒度轮询:把内层循环分段,每 N 次检查一次 abortFlag,既不太耗性能,也不至于“很久才响应取消”。
  • 避免把巨大的对象来回搬运:结果集尽量“轻”,必要时只返回统计指标(例如 top-10),后续再按需分页请求。

4.4 把 TaskPool 封装成“项目级工具”

我常用下面这个“小外衣”,让 TaskPool 用起来像“只换了个 await”:

// worker/pool.ts
import taskpool from '@ohos.taskpool';

export async function runInPool<T extends (...args: any[]) => any>(
  fn: T,
  ...args: Parameters<T>
): Promise<ReturnType<T>> {
  const task = new taskpool.Task(fn, ...args);
  return taskpool.execute(task) as Promise<ReturnType<T>>;
}

以后就可以:

import { runInPool } from '../worker/pool';
import { topNWords } from '../worker/heavy';

const top = await runInPool(topNWords, bigText, 50);

好处

  • 屏蔽 Task 创建/执行细节;
  • 便于统一加日志、超时、埋点;
  • 将来如果要在某些平台切换到 Worker,也只改封装不改业务。

五、EventHandler:你需要一个“轻调度器”,而不是到处 setTimeout

5.1 EventRunner + EventHandler 是什么

@ohos.eventhandler 提供了 EventRunner(事件循环)和 EventHandler(向该循环投递任务)的组合。一个 Runner 通常绑定一个线程,你可以在其上串行地执行任务、安排延迟任务、实现“消息队列式”的小调度。

5.2 基础用法:创建 Runner 与投递任务

// sched/scheduler.ts
import eventHandler from '@ohos.eventhandler';

export class Scheduler {
  private runner = eventHandler.createEventRunner('bg-runner');
  private handler = new eventHandler.EventHandler(this.runner);

  post(task: () => void, delay = 0): void {
    this.handler.postTask(task, delay);
  }

  // 周期性任务(简单实现)
  every(task: () => void, interval: number): () => void {
    let stopped = false;
    const tick = () => {
      if (stopped) return;
      task();
      this.handler.postTask(tick, interval);
    };
    this.handler.postTask(tick, interval);
    return () => { stopped = true; };
  }
}

你可以把这个 Scheduler 作为 服务/Store 的内部成员,比如日志上报、缓存刷盘、轻量计算等都丢到它上面跑。优点是串行:天然线程安全(相对而言),避免到处 setTimeout 带来的散乱与难测。

5.3 把 EventHandler 与 Promise 打通:postAsync

// sched/asyncPost.ts
import eventHandler from '@ohos.eventhandler';

export function createAsyncScheduler() {
  const runner = eventHandler.createEventRunner('async-runner');
  const handler = new eventHandler.EventHandler(runner);

  function postAsync<T>(fn: () => T | Promise<T>, delay = 0): Promise<T> {
    return new Promise<T>((resolve, reject) => {
      handler.postTask(async () => {
        try {
          resolve(await fn());
        } catch (e) {
          reject(e);
        }
      }, delay);
    });
  }

  return { postAsync };
}

用法:

const { postAsync } = createAsyncScheduler();
const value = await postAsync(() => computeLightWeight()); // 串行执行

适用场景

  • 需要顺序一致性(同一 Runner 上,先发先到,先到先执行);
  • 任务比较“轻”(IO glue / 小型计算),不值得为它专门开 TaskPool
  • 需要与主流程隔离(避免 UI 线程被卡),但又希望有“顺序保障”。

六、AsyncFunction:动态拼装异步逻辑,交给 TaskPool 安全落地

有些业务需要动态逻辑:后端下发一个“计算脚本”或“策略表达式”,端侧执行。ArkTS 是 TS 超集,本质上运行在 JS 引擎上,所以你可以用原生的 AsyncFunction 构造器动态创建异步函数。

小心使用、注意安全!生产上应有“可信脚本白名单”“沙箱参数”“只读上下文”等限制。以下示例偏工程演示。

// dynamic/loader.ts
export const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor as
  new (...args: string[]) => (...args: any[]) => Promise<any>;

export interface ScriptSpec {
  args: string[];   // ['text', 'n']
  body: string;     // 'const arr = text.split(/\\s+/); return arr.slice(0, n);'
}

export function buildAsync(spec: ScriptSpec) {
  const fn = new AsyncFunction(...spec.args, spec.body);
  return fn; // 异步函数
}

配合 TaskPool:

// dynamic/runner.ts
import taskpool from '@ohos.taskpool';
import { buildAsync, ScriptSpec } from './loader';

export async function runScriptInPool(spec: ScriptSpec, ...params: any[]) {
  const fn = buildAsync(spec); // 异步函数
  const task = new taskpool.Task(fn, ...params);
  return taskpool.execute(task); // 由 TaskPool 调度
}

这样策略就可配置了。比如你要做“文本评分策略 AB 测试”,后端下发 body,端上直接执行,同时又不阻塞 UI
  提醒:不要把敏感对象暴露给脚本,只传入 data 与受控 helper;必要时做执行时间上限结果结构校验

七、并发任务调度:别把“多线程”当灵丹妙药

7.1 限流(并发度控制):写个轻量 Semaphore

最常见的坑:一口气启动 100 个并发下载,然后你就看着内存飙升、失败重试排队、用户骂娘。解决它,用 信号量

// sched/semaphore.ts
export class Semaphore {
  private q: Array<() => void> = [];
  private used = 0;
  constructor(private limit: number) {}

  async acquire(): Promise<void> {
    if (this.used < this.limit) {
      this.used++;
      return;
    }
    return new Promise<void>(resolve => this.q.push(() => {
      this.used++;
      resolve();
    }));
  }

  release(): void {
    this.used--;
    const next = this.q.shift();
    if (next) next();
  }

  async use<T>(fn: () => Promise<T>): Promise<T> {
    await this.acquire();
    try {
      return await fn();
    } finally {
      this.release();
    }
  }
}

用法:

const sem = new Semaphore(4); // 最多 4 个并发
const results = await Promise.all(files.map(f => sem.use(() => uploadFile(f))));

要点

  • 系统有 TaskPool,不代表你“想开几个就开几个”;合理的并发度往往更快、更稳。
  • UI 线程的资源(比如可见列表项)也有限,限流能避免突刺。

7.2 任务编排:队列 + 去重(防抖)

有些任务天然会被频繁触发(例如搜索实时联想、批量同步的增量触发)。写个“队列 + 去重”的小调度器很值当。

// sched/dedupQueue.ts
type Key = string;
export class DedupQueue<T> {
  private pend = new Map<Key, Promise<T>>();
  constructor(private runner: (key: Key) => Promise<T>) {}

  run(key: Key): Promise<T> {
    if (!this.pend.has(key)) {
      const p = this.runner(key).finally(() => this.pend.delete(key));
      this.pend.set(key, p);
    }
    return this.pend.get(key)!;
  }
}

场景:短时间内对同一个资源发起多次“刷新”,只保留一个在飞的任务,节约带宽与算力。

7.3 组合技:TaskPool + EventHandler + 限流

来个“综合菜”:分批把大数组丢给 TaskPool 计算EventHandler 负责节奏Semaphore 把并发度卡住

import { runInPool } from '../worker/pool';
import { topNWords } from '../worker/heavy';
import { Semaphore } from './semaphore';
import eventHandler from '@ohos.eventhandler';

export async function batchAnalyze(documents: string[]) {
  const sem = new Semaphore(3);
  const runner = eventHandler.createEventRunner('batch');
  const handler = new eventHandler.EventHandler(runner);

  const results: Array<Array<[string, number]>> = [];
  let i = 0;

  return await new Promise<typeof results>((resolve, reject) => {
    const tick = () => {
      if (i >= documents.length) return resolve(results);
      // 每tick 启动至多 3 个任务
      const chunk = documents.slice(i, i + 3);
      i += 3;

      Promise.all(chunk.map(doc => sem.use(() => runInPool(topNWords, doc, 10))))
        .then(arr => { results.push(...arr); handler.postTask(tick, 0); })
        .catch(reject);
    };
    handler.postTask(tick, 0);
  });
}

说明

  • 这里 EventHandler 让批处理分帧启动,避免一次性把所有 Promise 建起来。
  • Semaphore 确保每帧最多 3 个在飞。
  • TaskPool 真正承担 CPU 负载。
  • UI 仍然流畅,因为所有重活都移走了。

八、把它落到 ArkUI 页面:一个能跑的“文本分析器”小样

接下来上 ArkUI 组件,把上面的能力拼在一个页面里(结构化省略若干边角逻辑,重点在异步与并发)。

// pages/AnalyzerPage.ets
import taskpool from '@ohos.taskpool';
import { runInPool } from '../worker/pool';
import { topNWords } from '../worker/heavy';
import { withTimeout } from '../utils/timeout';
import { Semaphore } from '../sched/semaphore';

@Entry
@Component
struct AnalyzerPage {
  @State text: string = ''
  @State top: Array<[string, number]> = []
  @State loading: boolean = false
  @State logs: string[] = []

  private log(s: string) { this.logs = [s, ...this.logs].slice(0, 20) }

  build() {
    Column({ space: 12 }) {
      Text('ArkTS 文本分析器(TaskPool 并发示例)')
        .fontSize(20).fontWeight(FontWeight.Bold).margin({ top: 12 })

      TextArea({ placeholder: '粘贴一大段文本...' })
        .height(160)
        .onChange(v => this.text = v)

      Row({ space: 12 }) {
        Button(this.loading ? '分析中…' : '分析 Top50')
          .enabled(!this.loading)
          .onClick(() => this.analyzeNow(50))
        Button('随机噪声 + 并发分批')
          .onClick(() => this.analyzeBatches())
      }

      if (this.top.length > 0) {
        List() {
          ForEach(this.top, item => {
            ListItem() {
              Row({ space: 12 }) {
                Text(item[0]).width('50%')
                Text(String(item[1])).width('50%').textAlign(TextAlign.End)
              }.width('100%').padding(8)
            }
          }, item => item[0])
        }.height(240)
      }

      Text('Logs').fontWeight(FontWeight.Bold).margin({ top: 8 })
      List() {
        ForEach(this.logs, (s, i) => ListItem() { Text(s).fontSize(12) }, (s, i) => i.toString())
      }.height(120)
    }
    .padding(16)
  }

  private async analyzeNow(n: number) {
    if (!this.text) { this.log('请输入文本'); return; }
    this.loading = true;
    try {
      const res = await withTimeout(runInPool(topNWords, this.text, n), 8000, '分析超时');
      this.top = res;
      this.log(`完成:Top${n} 统计`)
    } catch (e) {
      this.log('失败:' + (e as Error).message)
    } finally {
      this.loading = false;
    }
  }

  private async analyzeBatches() {
    const docs = new Array(12).fill(0).map((_, i) => this.text + ' ' + this.makeNoise(i));
    const sem = new Semaphore(3);
    this.log('开始分批并发分析(并发=3)');

    const all: Array<Array<[string, number]>> = [];
    for (let i = 0; i < docs.length; i += 3) {
      const batch = docs.slice(i, i + 3);
      const res = await Promise.all(batch.map(d => sem.use(() => runInPool(topNWords, d, 20))));
      all.push(...res);
      this.log(`完成批次 #${(i/3)+1}`);
    }
    // 随便合并一下:取每批第一项
    this.top = all.flat().slice(0, 50);
    this.log('全部完成 ✅');
  }

  private makeNoise(seed: number): string {
    const base = ['ArkTS', 'TaskPool', 'EventHandler', 'AsyncFunction', 'Promise', 'await'];
    return new Array(2000).fill(0).map((_, i) => base[(i + seed) % base.length]).join(' ');
  }
}

你会看到

  • 点击“分析 Top50”,数据会迅速给出,UI 不会卡。
  • 点击“并发分批”,日志会按批次滚动打印,整个过程平滑。
  • 即使输入超大文本,TaskPool 也能兜住 CPU 压力。

九、工程化套路:从“能跑”到“好维护”

9.1 模块划分:把“异步的角色”写清楚

  • api/:Promise 化的系统能力封装(http、文件、相册、蓝牙等)。
  • worker/:TaskPool 纯函数库,每个文件尽量无外部闭包依赖。
  • sched/:调度器(Semaphore、队列、EventHandler 包装)。
  • utils/:withTimeout、重试器(指数退避)、节流/防抖。
  • pages/:ArkUI 组件,只关心“业务流程 + UI 状态”,不关心底层如何调度。

9.2 错误边界与兜底

  • 所有 await 外围套 try/catch,catch 里只做“复述 + 埋点”,别吞掉错误。
  • Promise.all 用时要小心:一个任务挂全挂。对“尽可能完成”的场景,考虑 Promise.allSettled
  • TaskPool 的异常会被正常抛回,别忘了日志(携带任务名、参数摘要、耗时)。

9.3 重试策略(指数退避)

export async function retry<T>(fn: () => Promise<T>, times = 3, base = 300): Promise<T> {
  let lastErr: any;
  for (let i = 0; i < times; i++) {
    try { return await fn(); }
    catch (e) {
      lastErr = e;
      await new Promise(r => setTimeout(r, base * 2 ** i));
    }
  }
  throw lastErr;
}

withTimeout 混搭:

await retry(() => withTimeout(api.get('/heavy'), 5000), 2, 400);

9.4 观测与压测

  • 打点:TaskPool 任务开始/结束、耗时、内存峰值(可借助系统日志 + 业务统计);
  • 压测:用批量生成数据在模拟器/真机上跑,记录冷启动/首屏/交互卡顿;
  • 回归测试:对“调度相关 bug”写回归用例(例如“并发=3 时永远不会超过 3 个在飞”)。

十、把“异步/并发”讲成人话:几个常见误区

  1. 误区await 就是“后台线程”。
    更正:不是。它只是 Promise 的语法糖。CPU 重活仍会卡住当前线程,需要 TaskPool

  2. 误区:并发越多越快。
    更正资源竞争 + 上下文切换的代价很实在。合理限流往往更快。

  3. 误区:EventHandler 只是“高级 setTimeout”。
    更正:它提供绑定线程的事件循环,天然串行与顺序保障,适合“轻中型任务编排”。

  4. 误区:动态脚本必不安全。
    更正:风险确实大,但通过 AsyncFunction + 受控上下文 + TaskPool + 超时,你能把风险装进小盒子里。

  5. 误区:一次性把 10 万条数据都交给 TaskPool。
    更正:TaskPool 强,并不等于你能“无脑塞”。分批 + 限流 + 合并才专业。

十一、把“异步 API”到“并发调度”串起来:一个端到端小闭环

来个更贴近实战的“同步流程”——“离线优先的多段上传”(比如把本地离线的 5000 条操作日志合并后上报)。

  1. 准备:从 RDB 拉出 5000 条记录(IO),然后TaskPool 做“合并 + 去重 + 分桶”(CPU)。
  2. 分批上传:用 Semaphore(4) 控制并发度 4,分批 Promise.all 上传;每批完成后 EventHandler postTask 把“进度”投递回 UI。
  3. 出错/重试:每个上传用 retry(withTimeout(...)),最多重试 2 次。
  4. 收尾:全部成功后标记本地“已上报”;失败则只记录失败桶,等待下一次增量重试。

关键代码骨架:

async function syncAll() {
  this.loading = true;
  try {
    const logs = await this.repo.listPendingLogs();           // IO
    const buckets = await runInPool(mergeAndBucket, logs);     // CPU: TaskPool
    const sem = new Semaphore(4);

    let done = 0;
    for (const bucket of buckets) {
      await Promise.all(bucket.map(item => sem.use(() =>
        retry(() => withTimeout(this.api.post('/upload', item), 4000), 2, 300)
      )));
      done += bucket.length;
      this.uiRunner.postTask(() => this.progress = done / total, 0); // EventHandler 投递回 UI
    }

    await this.repo.markUploaded(buckets.flat());
    this.toast('上传完成 ✅');
  } catch (e) {
    this.toast('上传失败:' + (e as Error).message);
  } finally {
    this.loading = false;
  }
}

这段逻辑是“课代表范本”:IO、CPU、调度、UI 更新各就各位,谁也不抢谁饭碗。

十二、更多“细节的火候”:序列化、数据结构、拷贝成本

  • 序列化成本:TaskPool 参数/返回值要过边界,结构越扁平越好;能用 Array<number> 就别传“复杂 class”。
  • 二进制数据:大块 ArrayBuffer 建议分片;必要时仅传“指纹/摘要”做判等。
  • 对象拷贝:跨线程边界一般是“复制语义”,避免把大对象在多个任务间来回搬运
  • 临界区:在 EventHandler 的“串行保证”下可以少用锁;多线程共享态尽量改为“消息传递 + 不可变数据”。

十三、常见问题清单(踩坑复盘)

  1. TaskPool 里引用了外侧闭包变量 → 某些情况下不可序列化,任务直接失败。
    改法纯函数 + 参数传入
  2. EventHandler 里跑了重活 → 它也会卡自己所属线程。
    改法:EventHandler 跑“轻中型、需要顺序”的活,重活进 TaskPool。
  3. Promise.all 晚上线上炸了 → 单个失败导致整批失败。
    改法allSettled + 失败重试,或批次更细。
  4. 取消语义形同虚设 → 任务内部不检查标志。
    改法:建立“取消协议”,循环里轮询;IO 用 API 的取消能力。
  5. 日志缺失 → 出事找不到定位点。
    改法:统一封装 runInPool/withTimeout/retry,在封装层打点。

十四、把“知识点”翻成“面经答法”

Q:ArkTS 里 async/await 与 TaskPool 的关系?
A:async/await语法层,不改变线程;TaskPool 是运行时层,把 CPU 密集任务丢到后台线程池跑。二者互补,一个负责“流程可读”,一个负责“负载隔离”。

Q:EventHandler 与 TaskPool 的边界?
A:EventHandler 负责轻中型任务的串行调度/定时;TaskPool 负责计算重活。你可以用 EventHandler 分帧分批投递 TaskPool 任务,实现“稳中求快”。

Q:AsyncFunction 有啥用?
A:动态脚本/策略化场景的解法。配合 TaskPool、超时、只读上下文,可以安全地做“端上动态计算”。

十五、一个“可抽走复用”的工具集锦(即拿即用)

// utils/timeout.ts
export async function withTimeout<T>(p: Promise<T>, ms: number, tag = 'timeout'): Promise<T> { /* 同上 */ }

// utils/retry.ts
export async function retry<T>(fn: () => Promise<T>, times = 3, base = 300): Promise<T> { /* 同上 */ }

// worker/pool.ts
import taskpool from '@ohos.taskpool';
export async function runInPool<T extends (...args: any[]) => any>(fn: T, ...args: Parameters<T>): Promise<ReturnType<T>> {
  const task = new taskpool.Task(fn, ...args);
  return taskpool.execute(task) as Promise<ReturnType<T>>;
}

// sched/semaphore.ts
export class Semaphore { /* 同上 */ }

// sched/scheduler.ts
import eventHandler from '@ohos.eventhandler';
export class Scheduler { /* 同上 */ }

// dynamic/loader.ts
export const AsyncFunction = Object.getPrototypeOf(async function(){}).constructor as
  new (...args: string[]) => (...args: any[]) => Promise<any>;
export function buildAsync(spec: { args: string[], body: string }) { return new AsyncFunction(...spec.args, spec.body); }

十六、收个尾:把它当“日常工程”,而不是“黑魔法”

回到文章最开始那个卡成 PPT 的页面。如今它的实现,不过是这些规则的自然结果:

  • 主流程 async/await,每段都有错误边界;
  • 重计算 runInPool(TaskPool),参数扁平、结果轻巧;
  • 批处理 EventHandler 分帧调度 + Semaphore 限流;
  • 失败走 retry(withTimeout),进度用 handler 往 UI 线程 post;
  • 全程打点、日志清晰,出了事第二天我还能记得昨晚怎么改的(关键)。

异步与并发,不是某个“独门绝技”,而是你每天写代码时的一种“摆放秩序”。当你把“谁做什么、什么时候做、做不完怎么办、做快了怎么限制”都想清楚,ArkTS 的这套工具就会在你手里变得顺手、优雅,甚至……有点好玩。

下次有人问你:“ArkTS 的异步/并发怎么写才不崩?”——不妨把这篇给 TA,然后反问一句:
“都 2025 了,你还在主线程里跑素数筛吗?”

(未完待续)

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐