都 2025 了,还在 callback 地狱里自我感动?ArkTS 异步与并发真要这样写吗?
《零基础学鸿蒙》摘要:本文由移动开发者转型鸿蒙的亲历者撰写,聚焦ArkTS异步编程实战。从踩坑经历出发,指出async/await不等于异步优化,需结合TaskPool线程池和EventHandler事件循环。文章详解Promise封装回调API、三段式async/await最佳实践、并行任务调度技巧,并重点介绍如何用TaskPool分流CPU密集型任务。提供可运行代码示例,涵盖HTTP请求封装、
我是兰瓶Coding,一枚刚踏入鸿蒙领域的转型小白,原是移动开发中级,如下是我学习笔记《零基础学鸿蒙》,若对你所有帮助,还请不吝啬的给个大大的赞~
前言
我承认,第一眼看到 ArkTS 的时候,我把“异步”三个字往脑门上一贴,条件反射就是 async/await 一顿乱敲。直到一个周五晚上,我的 Demo 在列表滚动时卡成 PPT,后台统计一跑 CPU 飙到 120%,手机风扇(哦对,手机没风扇)仿佛在冲我叹气,我才老实下来:鸿蒙里“异步”不只是语法糖,它涉及任务模型、线程池(TaskPool)、事件循环(EventHandler)以及你如何调度并发。
本文我不讲空话,直接沿着你给的大纲来——异步 API → Promise 与 async/await → 并发任务调度——但每一段都上手可跑的例子。还会把 TaskPool、AsyncFunction、EventHandler 这三个“关键字”拆开揉碎,让它们在实际工程里各司其职。
先立个 flag:整文拒绝 AI 腔,只有我(一个长期被 deadline 教育过的鸿蒙全栈)夜半踩坑后写下的实话实说。有调侃、有复盘、有代码。你照着抄也能跑,抄了还能快。
一、前言|为什么“会用 async/await”≠“会写异步”
说说我那次栽跟头。一个看似“朴素”的统计页面:
- 列表展示 5 千条待办(离线缓存);
- 点击“统计”按钮,做去重、关键词聚类、权重计算;
- 顺手把结果写回本地数据库,再上传云端。
我天真地在 onClick 里来一句:
await crunchBigData(this.todos); // 超大数组,CPU 重型运算
结果 UI 主线程被我锁死了。async/await 不是“后台线程”的代名词,它只是把异步逻辑写得像同步。要是 await 的那个 Promise 里恰好做了重活(纯 JS 计算),仍然在当前线程跑。
这时就轮到鸿蒙的任务池(TaskPool)大显身手,把重活丢到后台执行;细粒度调度、时序编排交给EventHandler;而 async/await 负责拼出通顺的流程与明确的错误边界。三者合体,才是一套“可维护、不卡顿、能扩展”的异步并发解。
二、异步 API 一图观(ArkTS 视角)
我习惯先画一张“脑内地图”,把名词放到位:
- 回调(Callback):最原始的异步接口风格,常见于某些系统能力;优点轻量,缺点“回调地狱”“错误传递难”。
- Promise:状态有限(pending/fulfilled/rejected),可链式调用;配合
then/catch/finally。 - async/await:Promise 的语法糖,写法像同步;异常用 try/catch。
- TaskPool(@ohos.taskpool):把重 CPU 的纯函数任务放进系统线程池跑;和 Web Worker 思路类似,但用起来更轻。
- EventHandler(@ohos.eventhandler):创建一个事件循环(Runner)与任务队列;可定时、可串行化、可在“专属线程”跑轻中型任务。
- 定时/延迟:
handler.postTask(fn, delay)比随手setTimeout更工程化(且在 ArkTS 场景更贴边)。 - AsyncFunction:动态生成异步函数的黑科技(JS 原生能力),在“可配置脚本”“插件”场景下特别好用;在 TaskPool 里执行,既灵活又安全(相对而言)。
把它们放在“正确的位置”:UI 线程只做轻渲染 + IO 启动;CPU 重活进 TaskPool;轻中型串行/定时任务用 EventHandler;流程 glue 用 async/await。
这不是教条,是我踩过坑后总结出来的经验曲线。
三、Promise 与 async/await:甜不甜取决于你“收口”的姿势
3.1 用 Promise 包一层:把回调式 API 拉平
鸿蒙里有些系统能力仍提供回调式接口。我的习惯是第一时间做 Promise 化封装。下面以 HTTP 请求(@ohos.net.http)为例,给出一个非常“可抄可改”的写法:
// http/client.ts
import http from '@ohos.net.http';
export interface ApiResp<T> { code: number; msg: string; data: T }
export class HttpClient {
constructor(private base: string) {}
get<T>(path: string, headers: Record<string, string> = {}): Promise<T> {
const req = http.createHttp();
return new Promise<T>((resolve, reject) => {
req.request(this.base + path, { method: http.RequestMethod.GET, header: headers })
.then(res => {
if (res.responseCode !== 200) return reject(new Error(`HTTP ${res.responseCode}`));
let body: ApiResp<T>;
try {
body = JSON.parse(String(res.result));
} catch (e) {
return reject(new Error('JSON parse error'));
}
if (body.code !== 0) return reject(new Error(body.msg || 'Biz error'));
resolve(body.data);
})
.catch(reject)
.finally(() => req.destroy());
});
}
}
以后在页面里,你就可以优雅地:
const api = new HttpClient('https://api.example.com');
try {
const profile = await api.get<{ name: string; vip: boolean }>('/me');
this.userName = profile.name;
} catch (e) {
this.toast('网络开小差了:' + (e as Error).message);
}
要点:
- 捕获解析错误,别把“网络成功但 JSON 失败”当成“网络失败”。
- 业务态与协议态分离:HTTP 200 不代表业务成功;
code !== 0也要走reject。 - 资源回收:
finally里req.destroy(),形成肌肉记忆。
3.2 async/await 的“收口设计”:三段式
凡是交互触发的异步流程,我都有“三段式”套路:准备 → 核心 → 善后。
async doSyncAll() {
this.loading = true; // 准备
try {
const local = await this.repo.list();
const payload = await this.buildSyncPayload(local); // 可能是 CPU 重
const result = await this.api.post('/sync', payload);
await this.repo.merge(result.updated);
this.toast('同步完成 ✅');
} catch (e) {
this.toast('同步失败:' + (e as Error).message);
} finally {
this.loading = false; // 善后必执行
}
}
这段和平时写没差,但关键在“可能很重”的那步:buildSyncPayload 就是适合丢 TaskPool 的候选。我们先按普通写法跑通,再做“重活下沉”,这是我在中大型项目里保持“增量可控”的策略。
3.3 并行与串行:真的不要“一股脑儿 await All”
当你要并行请求多个接口,或同时读写多份数据,不要条件反射地:
// 慢且串行
const a = await getA();
const b = await getB();
const c = await getC();
更高效的写法是:
const [a, b, c] = await Promise.all([getA(), getB(), getC()]);
但!如果有依赖关系,就别硬上 all。把依赖明确地串起来,它更“可读可测”。
3.4 超时控制与取消语义
ArkTS 没有内建的 Promise 超时,你可以做一个小工具:
export async function withTimeout<T>(p: Promise<T>, ms: number, tag = 'timeout'): Promise<T> {
let timer: number;
const t = new Promise<never>((_, reject) => {
// @ts-ignore 延迟的计时器
timer = setTimeout(() => reject(new Error(tag)), ms) as unknown as number;
});
try {
return await Promise.race([p, t]);
} finally {
clearTimeout(timer);
}
}
取消呢?HTTP 有 req.destroy();自定义任务你可以引入 AbortController 思路(传入 token,任务里主动检查并早停)。这点等我们写 TaskPool 例子时再演示。
四、TaskPool:把“重活”丢给后台,UI 线程只做漂亮的自己
4.1 为什么是 TaskPool
结论先说:在鸿蒙 NEXT 的 ArkTS 里,要把 CPU 密集型任务、长循环、复杂聚合运算放到 TaskPool。这跟 Web Worker 的哲学一致——只不过 TaskPool 的使用门槛更低。在多数情况下,你就把纯函数丢过去,带上参数,它跑完给你结果。
4.2 基本用法:taskpool.Task + taskpool.execute
// worker/heavy.ts
import taskpool from '@ohos.taskpool';
// 一个“纯函数”风格:输入->输出,便于序列化
export function topNWords(text: string, n: number): Array<[string, number]> {
const freq = new Map<string, number>();
const words = text.toLowerCase().split(/[^a-zA-Z0-9_\u4e00-\u9fa5]+/).filter(Boolean);
for (let w of words) freq.set(w, (freq.get(w) || 0) + 1);
return Array.from(freq.entries()).sort((a, b) => b[1] - a[1]).slice(0, n);
}
// 页面里调用
import { topNWords } from '../worker/heavy';
import taskpool from '@ohos.taskpool';
async function analyzeInPool(text: string) {
const task = new taskpool.Task(topNWords, text, 20);
const res = await taskpool.execute(task);
return res as Array<[string, number]>;
}
要点:
- 纯函数更稳:无外部闭包依赖(序列化困难),更易被 TaskPool 执行器“搬到另一边”。
- 参数与返回值建议使用可序列化类型(字符串、数值、对象、数组、ArrayBuffer),避免“带引用”的复杂实例。
taskpool.execute返回 Promise,异常会 reject,照常 try/catch。
4.3 一个真·CPU 重活示例:找素数(顺便演示取消)
// worker/primes.ts
export interface PrimeRequest {
limit: number;
abortFlag?: { value: boolean }; // 手工的“取消标志”
}
export function sievePrimes(req: PrimeRequest): number[] {
const { limit, abortFlag } = req;
const isPrime = new Array<boolean>(limit + 1).fill(true);
isPrime[0] = isPrime[1] = false;
for (let p = 2; p * p <= limit; p++) {
if (abortFlag?.value) { // 轮询取消
// 抛出一个特殊错误,外层可识别为“被取消”
throw new Error('ABORTED');
}
if (!isPrime[p]) continue;
for (let m = p * p; m <= limit; m += p) {
isPrime[m] = false;
}
}
const res: number[] = [];
for (let i = 2; i <= limit; i++) if (isPrime[i]) res.push(i);
return res;
}
// 页面调用 primes
import taskpool from '@ohos.taskpool';
import { sievePrimes, PrimeRequest } from '../worker/primes';
class PrimePageVM {
abortFlag = { value: false };
running = false;
async run(limit: number) {
this.abortFlag.value = false;
this.running = true;
try {
const task = new taskpool.Task(sievePrimes, { limit, abortFlag: this.abortFlag } as PrimeRequest);
const primes = await taskpool.execute(task);
this.show(`找到 ${primes.length} 个素数`);
} catch (e) {
if ((e as Error).message === 'ABORTED') {
this.show('用户取消了任务');
} else {
this.show('计算失败:' + (e as Error).message);
}
} finally {
this.running = false;
}
}
cancel() {
this.abortFlag.value = true; // 轮询式取消
}
}
经验小记:
- 真想做“硬取消”,你需要任务里遵守取消协议(轮询或检查 token);不遵守就没法停。
- 细颗粒度轮询:把内层循环分段,每 N 次检查一次
abortFlag,既不太耗性能,也不至于“很久才响应取消”。 - 避免把巨大的对象来回搬运:结果集尽量“轻”,必要时只返回统计指标(例如 top-10),后续再按需分页请求。
4.4 把 TaskPool 封装成“项目级工具”
我常用下面这个“小外衣”,让 TaskPool 用起来像“只换了个 await”:
// worker/pool.ts
import taskpool from '@ohos.taskpool';
export async function runInPool<T extends (...args: any[]) => any>(
fn: T,
...args: Parameters<T>
): Promise<ReturnType<T>> {
const task = new taskpool.Task(fn, ...args);
return taskpool.execute(task) as Promise<ReturnType<T>>;
}
以后就可以:
import { runInPool } from '../worker/pool';
import { topNWords } from '../worker/heavy';
const top = await runInPool(topNWords, bigText, 50);
好处:
- 屏蔽 Task 创建/执行细节;
- 便于统一加日志、超时、埋点;
- 将来如果要在某些平台切换到 Worker,也只改封装不改业务。
五、EventHandler:你需要一个“轻调度器”,而不是到处 setTimeout
5.1 EventRunner + EventHandler 是什么
@ohos.eventhandler 提供了 EventRunner(事件循环)和 EventHandler(向该循环投递任务)的组合。一个 Runner 通常绑定一个线程,你可以在其上串行地执行任务、安排延迟任务、实现“消息队列式”的小调度。
5.2 基础用法:创建 Runner 与投递任务
// sched/scheduler.ts
import eventHandler from '@ohos.eventhandler';
export class Scheduler {
private runner = eventHandler.createEventRunner('bg-runner');
private handler = new eventHandler.EventHandler(this.runner);
post(task: () => void, delay = 0): void {
this.handler.postTask(task, delay);
}
// 周期性任务(简单实现)
every(task: () => void, interval: number): () => void {
let stopped = false;
const tick = () => {
if (stopped) return;
task();
this.handler.postTask(tick, interval);
};
this.handler.postTask(tick, interval);
return () => { stopped = true; };
}
}
你可以把这个 Scheduler 作为 服务/Store 的内部成员,比如日志上报、缓存刷盘、轻量计算等都丢到它上面跑。优点是串行:天然线程安全(相对而言),避免到处 setTimeout 带来的散乱与难测。
5.3 把 EventHandler 与 Promise 打通:postAsync
// sched/asyncPost.ts
import eventHandler from '@ohos.eventhandler';
export function createAsyncScheduler() {
const runner = eventHandler.createEventRunner('async-runner');
const handler = new eventHandler.EventHandler(runner);
function postAsync<T>(fn: () => T | Promise<T>, delay = 0): Promise<T> {
return new Promise<T>((resolve, reject) => {
handler.postTask(async () => {
try {
resolve(await fn());
} catch (e) {
reject(e);
}
}, delay);
});
}
return { postAsync };
}
用法:
const { postAsync } = createAsyncScheduler();
const value = await postAsync(() => computeLightWeight()); // 串行执行
适用场景:
- 需要顺序一致性(同一 Runner 上,先发先到,先到先执行);
- 任务比较“轻”(IO glue / 小型计算),不值得为它专门开 TaskPool;
- 需要与主流程隔离(避免 UI 线程被卡),但又希望有“顺序保障”。
六、AsyncFunction:动态拼装异步逻辑,交给 TaskPool 安全落地
有些业务需要动态逻辑:后端下发一个“计算脚本”或“策略表达式”,端侧执行。ArkTS 是 TS 超集,本质上运行在 JS 引擎上,所以你可以用原生的 AsyncFunction 构造器动态创建异步函数。
小心使用、注意安全!生产上应有“可信脚本白名单”“沙箱参数”“只读上下文”等限制。以下示例偏工程演示。
// dynamic/loader.ts
export const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor as
new (...args: string[]) => (...args: any[]) => Promise<any>;
export interface ScriptSpec {
args: string[]; // ['text', 'n']
body: string; // 'const arr = text.split(/\\s+/); return arr.slice(0, n);'
}
export function buildAsync(spec: ScriptSpec) {
const fn = new AsyncFunction(...spec.args, spec.body);
return fn; // 异步函数
}
配合 TaskPool:
// dynamic/runner.ts
import taskpool from '@ohos.taskpool';
import { buildAsync, ScriptSpec } from './loader';
export async function runScriptInPool(spec: ScriptSpec, ...params: any[]) {
const fn = buildAsync(spec); // 异步函数
const task = new taskpool.Task(fn, ...params);
return taskpool.execute(task); // 由 TaskPool 调度
}
这样策略就可配置了。比如你要做“文本评分策略 AB 测试”,后端下发 body,端上直接执行,同时又不阻塞 UI。
提醒:不要把敏感对象暴露给脚本,只传入 data 与受控 helper;必要时做执行时间上限与结果结构校验。
七、并发任务调度:别把“多线程”当灵丹妙药
7.1 限流(并发度控制):写个轻量 Semaphore
最常见的坑:一口气启动 100 个并发下载,然后你就看着内存飙升、失败重试排队、用户骂娘。解决它,用 信号量:
// sched/semaphore.ts
export class Semaphore {
private q: Array<() => void> = [];
private used = 0;
constructor(private limit: number) {}
async acquire(): Promise<void> {
if (this.used < this.limit) {
this.used++;
return;
}
return new Promise<void>(resolve => this.q.push(() => {
this.used++;
resolve();
}));
}
release(): void {
this.used--;
const next = this.q.shift();
if (next) next();
}
async use<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
用法:
const sem = new Semaphore(4); // 最多 4 个并发
const results = await Promise.all(files.map(f => sem.use(() => uploadFile(f))));
要点:
- 系统有 TaskPool,不代表你“想开几个就开几个”;合理的并发度往往更快、更稳。
- UI 线程的资源(比如可见列表项)也有限,限流能避免突刺。
7.2 任务编排:队列 + 去重(防抖)
有些任务天然会被频繁触发(例如搜索实时联想、批量同步的增量触发)。写个“队列 + 去重”的小调度器很值当。
// sched/dedupQueue.ts
type Key = string;
export class DedupQueue<T> {
private pend = new Map<Key, Promise<T>>();
constructor(private runner: (key: Key) => Promise<T>) {}
run(key: Key): Promise<T> {
if (!this.pend.has(key)) {
const p = this.runner(key).finally(() => this.pend.delete(key));
this.pend.set(key, p);
}
return this.pend.get(key)!;
}
}
场景:短时间内对同一个资源发起多次“刷新”,只保留一个在飞的任务,节约带宽与算力。
7.3 组合技:TaskPool + EventHandler + 限流
来个“综合菜”:分批把大数组丢给 TaskPool 计算,EventHandler 负责节奏,Semaphore 把并发度卡住。
import { runInPool } from '../worker/pool';
import { topNWords } from '../worker/heavy';
import { Semaphore } from './semaphore';
import eventHandler from '@ohos.eventhandler';
export async function batchAnalyze(documents: string[]) {
const sem = new Semaphore(3);
const runner = eventHandler.createEventRunner('batch');
const handler = new eventHandler.EventHandler(runner);
const results: Array<Array<[string, number]>> = [];
let i = 0;
return await new Promise<typeof results>((resolve, reject) => {
const tick = () => {
if (i >= documents.length) return resolve(results);
// 每tick 启动至多 3 个任务
const chunk = documents.slice(i, i + 3);
i += 3;
Promise.all(chunk.map(doc => sem.use(() => runInPool(topNWords, doc, 10))))
.then(arr => { results.push(...arr); handler.postTask(tick, 0); })
.catch(reject);
};
handler.postTask(tick, 0);
});
}
说明:
- 这里
EventHandler让批处理分帧启动,避免一次性把所有 Promise 建起来。 Semaphore确保每帧最多 3 个在飞。TaskPool真正承担 CPU 负载。- UI 仍然流畅,因为所有重活都移走了。
八、把它落到 ArkUI 页面:一个能跑的“文本分析器”小样
接下来上 ArkUI 组件,把上面的能力拼在一个页面里(结构化省略若干边角逻辑,重点在异步与并发)。
// pages/AnalyzerPage.ets
import taskpool from '@ohos.taskpool';
import { runInPool } from '../worker/pool';
import { topNWords } from '../worker/heavy';
import { withTimeout } from '../utils/timeout';
import { Semaphore } from '../sched/semaphore';
@Entry
@Component
struct AnalyzerPage {
@State text: string = ''
@State top: Array<[string, number]> = []
@State loading: boolean = false
@State logs: string[] = []
private log(s: string) { this.logs = [s, ...this.logs].slice(0, 20) }
build() {
Column({ space: 12 }) {
Text('ArkTS 文本分析器(TaskPool 并发示例)')
.fontSize(20).fontWeight(FontWeight.Bold).margin({ top: 12 })
TextArea({ placeholder: '粘贴一大段文本...' })
.height(160)
.onChange(v => this.text = v)
Row({ space: 12 }) {
Button(this.loading ? '分析中…' : '分析 Top50')
.enabled(!this.loading)
.onClick(() => this.analyzeNow(50))
Button('随机噪声 + 并发分批')
.onClick(() => this.analyzeBatches())
}
if (this.top.length > 0) {
List() {
ForEach(this.top, item => {
ListItem() {
Row({ space: 12 }) {
Text(item[0]).width('50%')
Text(String(item[1])).width('50%').textAlign(TextAlign.End)
}.width('100%').padding(8)
}
}, item => item[0])
}.height(240)
}
Text('Logs').fontWeight(FontWeight.Bold).margin({ top: 8 })
List() {
ForEach(this.logs, (s, i) => ListItem() { Text(s).fontSize(12) }, (s, i) => i.toString())
}.height(120)
}
.padding(16)
}
private async analyzeNow(n: number) {
if (!this.text) { this.log('请输入文本'); return; }
this.loading = true;
try {
const res = await withTimeout(runInPool(topNWords, this.text, n), 8000, '分析超时');
this.top = res;
this.log(`完成:Top${n} 统计`)
} catch (e) {
this.log('失败:' + (e as Error).message)
} finally {
this.loading = false;
}
}
private async analyzeBatches() {
const docs = new Array(12).fill(0).map((_, i) => this.text + ' ' + this.makeNoise(i));
const sem = new Semaphore(3);
this.log('开始分批并发分析(并发=3)');
const all: Array<Array<[string, number]>> = [];
for (let i = 0; i < docs.length; i += 3) {
const batch = docs.slice(i, i + 3);
const res = await Promise.all(batch.map(d => sem.use(() => runInPool(topNWords, d, 20))));
all.push(...res);
this.log(`完成批次 #${(i/3)+1}`);
}
// 随便合并一下:取每批第一项
this.top = all.flat().slice(0, 50);
this.log('全部完成 ✅');
}
private makeNoise(seed: number): string {
const base = ['ArkTS', 'TaskPool', 'EventHandler', 'AsyncFunction', 'Promise', 'await'];
return new Array(2000).fill(0).map((_, i) => base[(i + seed) % base.length]).join(' ');
}
}
你会看到:
- 点击“分析 Top50”,数据会迅速给出,UI 不会卡。
- 点击“并发分批”,日志会按批次滚动打印,整个过程平滑。
- 即使输入超大文本,TaskPool 也能兜住 CPU 压力。
九、工程化套路:从“能跑”到“好维护”
9.1 模块划分:把“异步的角色”写清楚
- api/:Promise 化的系统能力封装(http、文件、相册、蓝牙等)。
- worker/:TaskPool 纯函数库,每个文件尽量无外部闭包依赖。
- sched/:调度器(Semaphore、队列、EventHandler 包装)。
- utils/:withTimeout、重试器(指数退避)、节流/防抖。
- pages/:ArkUI 组件,只关心“业务流程 + UI 状态”,不关心底层如何调度。
9.2 错误边界与兜底
- 所有 await 外围套 try/catch,catch 里只做“复述 + 埋点”,别吞掉错误。
- Promise.all 用时要小心:一个任务挂全挂。对“尽可能完成”的场景,考虑
Promise.allSettled。 - TaskPool 的异常会被正常抛回,别忘了日志(携带任务名、参数摘要、耗时)。
9.3 重试策略(指数退避)
export async function retry<T>(fn: () => Promise<T>, times = 3, base = 300): Promise<T> {
let lastErr: any;
for (let i = 0; i < times; i++) {
try { return await fn(); }
catch (e) {
lastErr = e;
await new Promise(r => setTimeout(r, base * 2 ** i));
}
}
throw lastErr;
}
和 withTimeout 混搭:
await retry(() => withTimeout(api.get('/heavy'), 5000), 2, 400);
9.4 观测与压测
- 打点:TaskPool 任务开始/结束、耗时、内存峰值(可借助系统日志 + 业务统计);
- 压测:用批量生成数据在模拟器/真机上跑,记录冷启动/首屏/交互卡顿;
- 回归测试:对“调度相关 bug”写回归用例(例如“并发=3 时永远不会超过 3 个在飞”)。
十、把“异步/并发”讲成人话:几个常见误区
-
误区:
await就是“后台线程”。
更正:不是。它只是 Promise 的语法糖。CPU 重活仍会卡住当前线程,需要 TaskPool。 -
误区:并发越多越快。
更正:资源竞争 + 上下文切换的代价很实在。合理限流往往更快。 -
误区:EventHandler 只是“高级 setTimeout”。
更正:它提供绑定线程的事件循环,天然串行与顺序保障,适合“轻中型任务编排”。 -
误区:动态脚本必不安全。
更正:风险确实大,但通过 AsyncFunction + 受控上下文 + TaskPool + 超时,你能把风险装进小盒子里。 -
误区:一次性把 10 万条数据都交给 TaskPool。
更正:TaskPool 强,并不等于你能“无脑塞”。分批 + 限流 + 合并才专业。
十一、把“异步 API”到“并发调度”串起来:一个端到端小闭环
来个更贴近实战的“同步流程”——“离线优先的多段上传”(比如把本地离线的 5000 条操作日志合并后上报)。
- 准备:从 RDB 拉出 5000 条记录(IO),然后TaskPool 做“合并 + 去重 + 分桶”(CPU)。
- 分批上传:用 Semaphore(4) 控制并发度 4,分批
Promise.all上传;每批完成后 EventHandlerpostTask把“进度”投递回 UI。 - 出错/重试:每个上传用
retry(withTimeout(...)),最多重试 2 次。 - 收尾:全部成功后标记本地“已上报”;失败则只记录失败桶,等待下一次增量重试。
关键代码骨架:
async function syncAll() {
this.loading = true;
try {
const logs = await this.repo.listPendingLogs(); // IO
const buckets = await runInPool(mergeAndBucket, logs); // CPU: TaskPool
const sem = new Semaphore(4);
let done = 0;
for (const bucket of buckets) {
await Promise.all(bucket.map(item => sem.use(() =>
retry(() => withTimeout(this.api.post('/upload', item), 4000), 2, 300)
)));
done += bucket.length;
this.uiRunner.postTask(() => this.progress = done / total, 0); // EventHandler 投递回 UI
}
await this.repo.markUploaded(buckets.flat());
this.toast('上传完成 ✅');
} catch (e) {
this.toast('上传失败:' + (e as Error).message);
} finally {
this.loading = false;
}
}
这段逻辑是“课代表范本”:IO、CPU、调度、UI 更新各就各位,谁也不抢谁饭碗。
十二、更多“细节的火候”:序列化、数据结构、拷贝成本
- 序列化成本:TaskPool 参数/返回值要过边界,结构越扁平越好;能用
Array<number>就别传“复杂 class”。 - 二进制数据:大块
ArrayBuffer建议分片;必要时仅传“指纹/摘要”做判等。 - 对象拷贝:跨线程边界一般是“复制语义”,避免把大对象在多个任务间来回搬运。
- 临界区:在 EventHandler 的“串行保证”下可以少用锁;多线程共享态尽量改为“消息传递 + 不可变数据”。
十三、常见问题清单(踩坑复盘)
- TaskPool 里引用了外侧闭包变量 → 某些情况下不可序列化,任务直接失败。
改法:纯函数 + 参数传入。 - EventHandler 里跑了重活 → 它也会卡自己所属线程。
改法:EventHandler 跑“轻中型、需要顺序”的活,重活进 TaskPool。 - Promise.all 晚上线上炸了 → 单个失败导致整批失败。
改法:allSettled+ 失败重试,或批次更细。 - 取消语义形同虚设 → 任务内部不检查标志。
改法:建立“取消协议”,循环里轮询;IO 用 API 的取消能力。 - 日志缺失 → 出事找不到定位点。
改法:统一封装runInPool/withTimeout/retry,在封装层打点。
十四、把“知识点”翻成“面经答法”
Q:ArkTS 里 async/await 与 TaskPool 的关系?
A:async/await 是语法层,不改变线程;TaskPool 是运行时层,把 CPU 密集任务丢到后台线程池跑。二者互补,一个负责“流程可读”,一个负责“负载隔离”。
Q:EventHandler 与 TaskPool 的边界?
A:EventHandler 负责轻中型任务的串行调度/定时;TaskPool 负责计算重活。你可以用 EventHandler 分帧分批投递 TaskPool 任务,实现“稳中求快”。
Q:AsyncFunction 有啥用?
A:动态脚本/策略化场景的解法。配合 TaskPool、超时、只读上下文,可以安全地做“端上动态计算”。
十五、一个“可抽走复用”的工具集锦(即拿即用)
// utils/timeout.ts
export async function withTimeout<T>(p: Promise<T>, ms: number, tag = 'timeout'): Promise<T> { /* 同上 */ }
// utils/retry.ts
export async function retry<T>(fn: () => Promise<T>, times = 3, base = 300): Promise<T> { /* 同上 */ }
// worker/pool.ts
import taskpool from '@ohos.taskpool';
export async function runInPool<T extends (...args: any[]) => any>(fn: T, ...args: Parameters<T>): Promise<ReturnType<T>> {
const task = new taskpool.Task(fn, ...args);
return taskpool.execute(task) as Promise<ReturnType<T>>;
}
// sched/semaphore.ts
export class Semaphore { /* 同上 */ }
// sched/scheduler.ts
import eventHandler from '@ohos.eventhandler';
export class Scheduler { /* 同上 */ }
// dynamic/loader.ts
export const AsyncFunction = Object.getPrototypeOf(async function(){}).constructor as
new (...args: string[]) => (...args: any[]) => Promise<any>;
export function buildAsync(spec: { args: string[], body: string }) { return new AsyncFunction(...spec.args, spec.body); }
十六、收个尾:把它当“日常工程”,而不是“黑魔法”
回到文章最开始那个卡成 PPT 的页面。如今它的实现,不过是这些规则的自然结果:
- 主流程
async/await,每段都有错误边界; - 重计算
runInPool(TaskPool),参数扁平、结果轻巧; - 批处理
EventHandler分帧调度 +Semaphore限流; - 失败走
retry(withTimeout),进度用 handler 往 UI 线程 post; - 全程打点、日志清晰,出了事第二天我还能记得昨晚怎么改的(关键)。
异步与并发,不是某个“独门绝技”,而是你每天写代码时的一种“摆放秩序”。当你把“谁做什么、什么时候做、做不完怎么办、做快了怎么限制”都想清楚,ArkTS 的这套工具就会在你手里变得顺手、优雅,甚至……有点好玩。
下次有人问你:“ArkTS 的异步/并发怎么写才不崩?”——不妨把这篇给 TA,然后反问一句:
“都 2025 了,你还在主线程里跑素数筛吗?”
…
(未完待续)
更多推荐


所有评论(0)