在鸿蒙(HarmonyOS)的 ArkTS 并发编程中,TaskPool 和 Worker 是实现多线程的两大核心机制。由于 ArkTS 采用了 Actor 内存隔离模型,线程之间不存在共享内存,因此线程间的数据传递与通信是开发中的关键环节。

以下是关于 TaskPool 与 Worker 的线程通信与数据共享机制的详细解析:

一、 核心通信机制:从“深拷贝”到“转移控制权”

无论是 TaskPool 还是 Worker,默认情况下跨线程传递普通 JS 对象时,系统会采用标准的结构化克隆算法(Structured Clone),即深拷贝。这种方式安全但存在性能开销。为了优化大数据传输,两者均提供了以下优化机制:

  1. 转移控制权(Transferable Objects)
    对于 ArrayBuffer 等支持转移的对象,可以在传输时直接转移其内存控制权给目标线程。转移后,原宿主线程中的该对象将失效不可用,但这种方式避免了数据拷贝,极大提升了传输效率。

    • TaskPool:通过 taskpool.setTransferList 接口设置。
    • Worker:通过 postMessage 的 transfer 参数列表设置。
  2. Sendable 对象共享(零拷贝)
    对于需要在多线程间频繁交互且无需隔离的复杂对象,可以使用 @Sendable 装饰器。被标记为 Sendable 的类,其实例会分配在共享堆(SharedHeap)中,跨线程传递时采用引用传递而非拷贝。

    • TaskPool:支持直接传递 Sendable 对象。
    • Worker:需要使用专门的 postMessageWithSharedSendable 接口进行共享传输。
    • 注意:Sendable 具有“传染性”,其关联属性也必须是 Sendable 类型,且为了保证线程安全,通常需要配合 AsyncLock(异步锁)来保护并发修改。
1、 转移控制权(Transferable Objects)

当需要跨线程传递 ArrayBuffer 等海量二进制数据(如图像像素、音视频流)时,使用转移控制权可以实现“零拷贝”传输,但代价是原线程将失去对该内存的访问权。

1. TaskPool 中的转移传输
在 TaskPool 中,默认传递 ArrayBuffer 时就会采用转移方式。如果确实需要保留原数据,可以通过 setTransferList([]) 强制切换为拷贝模式。

// 定义并发任务函数
@Concurrent
function adjustImageValue(arrayBuffer: ArrayBuffer): ArrayBuffer {
    // 在子线程中对 arrayBuffer 进行耗时操作
    return arrayBuffer;  // 返回值默认也是转移回主线程
}

// 在主线程中执行任务
let task = new taskpool.Task(adjustImageValue, largeBuffer);
// 【注意】如果不调用此方法,默认就是转移模式。
// 如果希望原线程保留 largeBuffer 副本,则传入空数组强制拷贝:
// task.setTransferList([]); 

taskpool.execute(task).then((result) => {
    // 此时 largeBuffer 已不可用(byteLength 为 0),数据已转移至 result 中
});

2. Worker 中的转移传输
在 Worker 通信中,通过在 postMessage 的第二个参数中指定要转移的对象列表来实现。

// 主线程向 Worker 转移内存
const bigBuffer = new ArrayBuffer(100 * 1024 * 1024); // 100MB 数据
workerInstance.postMessage(bigBuffer, [bigBuffer]); // 第二个参数指定转移对象

// 转移后,主线程的 bigBuffer 将失效,Worker 接收后可直接使用

2、 Sendable 对象共享(零拷贝引用传递)

对于需要在多线程间频繁交互、且需要保持状态的复杂对象,使用 @Sendable 装饰器可以将其分配在共享堆(SharedHeap)中,跨线程传递时仅传递引用。

1. 定义 Sendable 类与并发任务

// 必须使用 @Sendable 装饰器,且属性需显式初始化
@Sendable
class DownloadTaskInfo {
    url: string = '';
    progress: number = 0;
    
    constructor(url: string) {
        this.url = url;
    }
}

// 并发任务中接收 Sendable 对象
@Concurrent
function startDownload(taskInfo: DownloadTaskInfo) {
    // 子线程直接修改共享对象的进度
    taskInfo.progress = 50; 
    return taskInfo;
}

2. TaskPool 传递 Sendable 对象
TaskPool 执行时,直接传入 Sendable 对象实例即可实现共享传输。

const taskInfo = new DownloadTaskInfo('https://example.com/video.mp4');
let task = new taskpool.Task(startDownload, taskInfo);
taskpool.execute(task).then((res) => {
    // 主线程与子线程操作的是同一块内存,res === taskInfo
    console.info(`下载进度: ${res.progress}`); 
});

3. Worker 传递 Sendable 对象
Worker 必须使用专用的 postMessageWithSharedSendable 方法,才能触发共享传输(若使用普通的 postMessage 则会退化为深拷贝)。

// 主线程向 Worker 发送共享对象
const taskInfo = new DownloadTaskInfo('https://example.com/image.png');
workerInstance.postMessageWithSharedSendable(taskInfo);

// Worker 内部接收
// entry/ets/workers/Worker.ets
workerPort.onmessage = (e: worker.MessageEvents) => {
    const info = e.data as DownloadTaskInfo;
    info.progress = 100; // 修改共享数据
};

二、 TaskPool 的通信特点

TaskPool 是系统级的任务池,适合处理短时、独立、无状态的耗时任务(如 JSON 解析、图片压缩)。

  • 通信方式:采用“提交任务 -> 获取 Promise 结果”的单向异步模式。主线程通过 taskpool.execute() 提交带有 @Concurrent 装饰器的函数及参数,任务执行完毕后通过 Promise 将结果序列化返回主线程。
  • 数据限制:传递的参数和返回的结果必须是可序列化的对象。由于每次执行都是全新的上下文,TaskPool 无法在多次任务间保持状态。
  • 适用场景:高并发的小任务调度,支持任务优先级(HIGH/NORMAL/LOW)和任务组(TaskGroup)管理。
1. 基础通信:提交任务与 Promise 结果返回

TaskPool 最基础的用法是将耗时计算封装在带有 @Concurrent 装饰器的函数中,通过 execute() 提交后,主线程通过 await 或 .then() 获取序列化后的结果。

import { taskpool } from '@kit.ArkTS';

// 1. 定义并发任务函数(必须使用 @Concurrent 装饰器)
@Concurrent
function parseLargeJson(jsonStr: string): object {
    // 在子线程中执行耗时解析,不阻塞 UI
    return JSON.parse(jsonStr);
}

// 2. 主线程提交任务并接收结果
async function loadData() {
    const jsonData = '{"name": "HarmonyOS", "version": 5}';
    const task = new taskpool.Task(parseLargeJson, jsonData);
    
    try {
        // 通过 Promise 获取子线程返回的结果
        const result = await taskpool.execute(task) as object;
        console.info('解析结果:', JSON.stringify(result));
    } catch (error) {
        console.error('任务执行失败:', error);
    }
}
2. 任务优先级调度(HIGH / NORMAL / LOW)

TaskPool 支持根据任务的紧急程度设置优先级,系统底层会动态调度 CPU 核心,优先处理高优先级任务。

import { taskpool } from '@kit.ArkTS';

@Concurrent
function heavyComputation(data: number[]): number {
    // 模拟耗时计算
    return data.reduce((a, b) => a + b, 0);
}

// 设置任务优先级为 HIGH,确保优先被调度执行
const urgentTask = new taskpool.Task(heavyComputation, [1, 2, 3, 4, 5]);
urgentTask.setPriority(taskpool.Priority.HIGH);

taskpool.execute(urgentTask).then((res) => {
    console.info('高优先级任务完成,结果:', res);
});
3. 任务组管理(TaskGroup)

当需要并行处理多个独立任务,并等待所有任务全部完成后再进行下一步操作时,可以使用 TaskGroup(类似于 Promise.all 的并发版本)。

import { taskpool } from '@kit.ArkTS';

@Concurrent
function fetchAndProcessData(id: number): string {
    // 模拟多个独立的数据处理任务
    return `Data_${id}_Processed`;
}

async function batchProcess() {
    // 1. 创建任务组
    const group = new taskpool.TaskGroup();
    
    // 2. 将多个任务添加到任务组中
    for (let i = 0; i < 5; i++) {
        const task = new taskpool.Task(fetchAndProcessData, i);
        group.addTask(task);
    }
    
    // 3. 执行任务组,等待所有任务完成后返回结果数组
    const results = await taskpool.execute(group) as string[];
    console.info('批量处理完成:', results);
}
4. 进阶通信:子线程向主线程持续发送数据(sendData)

虽然 TaskPool 默认是单次返回结果,但对于需要实时反馈进度的场景(如图片批量处理进度),可以通过 sendData 和 onReceiveData 实现子线程向主线程的持续通信。

import { taskpool } from '@kit.ArkTS';

@Concurrent
function processImages(count: number): string[] {
    const results: string[] = [];
    for (let i = 0; i < count; i++) {
        // 模拟耗时操作
        results.push(`Image_${i}`);
        
        // 【关键】在子线程中持续向主线程发送进度数据
        taskpool.Task.sendData(i + 1); 
    }
    return results;
}

// 主线程监听进度并执行任务
const task = new taskpool.Task(processImages, 100);

// 必须在 execute 之前注册数据接收回调
task.onReceiveData((progress: number) => {
    // 回调在宿主线程(UI线程)中执行,可安全更新 UI
    console.info(`当前处理进度: ${progress}%`);
});

taskpool.execute(task).then((finalResult) => {
    console.info('全部处理完成,共:', finalResult.length, '张');
});

三、 Worker 的通信特点

Worker 是独立的工作线程,适合处理长耗时、有状态、需要持续运行的任务(如 WebSocket 长连接、游戏主逻辑、持续的传感器数据采集)。

  • 通信方式:采用基于事件的双向消息传递机制。
    • 主线程 -> Worker:主线程通过 worker.postMessage() 发送消息,Worker 内部通过 onmessage 监听接收。
    • Worker -> 主线程:Worker 通过 workerPort.postMessage() 发送消息,主线程通过监听 Worker 实例的 onmessage 事件接收。
  • 数据限制:同样基于序列化通信,但 Worker 拥有独立的内存堆栈和事件循环,可以在多次消息交互之间持久维护内部状态(变量、对象)。
  • 生命周期:Worker 需要开发者手动创建(new worker.ThreadWorker)和销毁(terminate),且系统对同时运行的 Worker 数量有严格限制(通常最多 64 个),不可滥用。
1. 主线程:创建 Worker 与双向通信

在主线程中,我们通过 new worker.ThreadWorker 显式创建 Worker 实例,并通过监听 onmessage 接收子线程发来的消息。

import { worker } from '@kit.ArkTS';

// 1. 显式创建 Worker 实例(需传入 Worker 脚本路径)
const myWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');

// 2. 主线程向 Worker 发送消息
myWorker.postMessage({ type: 'START_TASK', payload: { id: 1001 } });

// 3. 监听并接收 Worker 发回的消息
myWorker.onmessage = (e: worker.MessageEvents) => {
    const data = e.data;
    console.info(`收到 Worker 消息: ${data.type}, 进度: ${data.progress}%`);
};

// 4. 生命周期管理:在不需要时手动销毁 Worker,防止内存泄漏
// myWorker.terminate(); 
2. Worker 侧:独立脚本与状态维护

Worker 拥有独立的 JS 执行环境和事件循环,可以维护全局状态,并通过 workerPort 主动向主线程推送消息。

// entry/ets/workers/MyWorker.ets
import { worker } from '@kit.ArkTS';

// Worker 内部的全局状态(独立内存堆栈,多次消息交互间持久存在)
let taskCounter = 0; 

// 监听来自主线程的消息
workerPort.onmessage = (e: worker.MessageEvents) => {
    const msg = e.data;
    
    if (msg.type === 'START_TASK') {
        taskCounter++;
        // 模拟耗时任务后,主动向主线程推送进度
        workerPort.postMessage({ 
            type: 'PROGRESS', 
            progress: 50,
            taskId: msg.payload.id,
            totalTasks: taskCounter 
        });
    }
};
3. 实战:模拟 WebSocket 长连接与实时数据流

Worker 非常适合处理需要持续运行、且需要高频双向通信的场景,例如后台持续采集传感器数据或维持网络长连接。

主线程侧:

const sensorWorker = new worker.ThreadWorker('entry/ets/workers/SensorWorker.ets');

// 持续接收传感器数据并更新 UI 状态
sensorWorker.onmessage = (e: worker.MessageEvents) => {
    const sensorData = e.data;
    // 安全地更新 UI 状态
    this.currentSpeed = sensorData.speed; 
};

// 发送指令:开始采集
sensorWorker.postMessage({ action: 'START_COLLECTING' });

Worker 侧:

// entry/ets/workers/SensorWorker.ets
let timerId: number = -1;

workerPort.onmessage = (e: worker.MessageEvents) => {
    if (e.data.action === 'START_COLLECTING') {
        // 启动周期性数据采集(持续运行状态)
        timerId = setInterval(() => {
            const speed = Math.random() * 100;
            // 实时向主线程推送高频数据
            workerPort.postMessage({ speed: speed });
        }, 100); 
    } else if (e.data.action === 'STOP_COLLECTING') {
        clearInterval(timerId);
    }
};

四、 选型与案例

  1. UI 操作红线:只有主线程(UI 线程)能操作 ArkUI 组件(如更新 @State 变量)。子线程(TaskPool/Worker)中严禁调用 UI 相关 API,否则会直接抛出异常。
  2. 避免频繁交互:在并发场景下,子线程任务应保持相对独立,尽量减少与主线程的数据交互频率。如果必须传递超大对象,优先考虑 Transferable 转移控制权或 Sendable 共享机制,避免深拷贝带来的性能瓶颈。
  3. 决策依据:如果任务是一次性计算且耗时较短(通常 ≤5 秒),优先使用 TaskPool;如果任务需要长期运行、维持内部状态或进行复杂的跨线程双向通信,则选择 Worker。
1、 极致共享:Sendable 与共享模块单例

当多线程需要频繁读写同一个复杂对象时,传统的深拷贝开销极大。通过 @Sendable 装饰器,可以实现跨线程的零拷贝引用传递。

实战场景:跨线程共享状态(单例模式)
利用 'use shared' 指令标记共享模块,并在其中导出 Sendable 对象。结合 AsyncLock(异步锁)保障线程安全,可实现主线程与 TaskPool 线程对同一对象的安全访问。

// SharedModule.ets (共享模块)
'use shared';
import { ArkTSUtils } from '@kit.ArkTS';

@Sendable
class SharedCounter {
    private count_: number = 0;
    private lock_: ArkTSUtils.locks.AsyncLock = new ArkTSUtils.locks.AsyncLock();

    public async increase() {
        await this.lock_.lockAsync(() => { this.count_++; });
    }
    public async getCount(): Promise<number> {
        return this.lock_.lockAsync(() => this.count_);
    }
}
export const sharedCounter = new SharedCounter();
2、 大数据传输:ArrayBuffer 与 SharedArrayBuffer

对于图像像素、音视频流等海量数据,除了使用 Transferable 转移所有权外,还可以使用 SharedArrayBuffer 实现真正的底层内存共享。

  • 零拷贝共享SharedArrayBuffer 内部的 Native 内存支持跨并发实例直接共享。
  • 防竞争机制:由于多个线程可同时访问同一块内存,必须配合 Atomics 类(如 Atomics.addAtomics.load)进行原子操作,防止数据竞争(Data Race)。
3、 长时任务处理:TaskPool.LongTask

虽然 TaskPool 适合短时任务,但对于执行周期长、但不频繁阻塞线程的任务(如定期传感器数据采集、Socket 端口监听),官方提供了 TaskPool.LongTask 机制。

  • 核心优势:开发者无需像 Worker 那样手动管理线程的生命周期,避免了线程泛滥。
  • 双向通信:在长时任务内部,可以通过 taskpool.Task.sendData() 不定期将阶段性结果返回给宿主线程,宿主线程通过 onData 事件接收。
  • 避坑指南:长时任务不等于阻塞任务。如果任务需要长时间独占线程(如产线硬件老化压测、游戏主逻辑线程),依然必须使用 Worker。
4、 跨语言并发:NAPI 与 TaskPool 的深度结合

在混合开发中,如果 Native 层的 C++ 计算极其耗时,可以将其封装后放入 ArkTS 的 TaskPool 中调度。

  • Native 对象跨线程传递:当 Native 对象(如 C++ 类的实例)需要传递给 TaskPool 时,需使用 napi_coerce_to_native_binding_object 接口。
  • 生命周期接管:通过绑定 detach(序列化前执行)和 attach(反序列化后执行)回调,安全地将 Native 对象的内存控制权从主线程转移至 TaskPool 的工作线程,处理完毕后再安全传回。

五、 架构演进

鸿蒙的并发模型正在从传统的“共享内存 + 锁”向“Actor 内存隔离 + 显式共享”演进。在实际工程中:

  1. 轻量级计算/解析:首选 TaskPool,享受系统自动调度与优先级管理。
  2. 海量数据处理:使用 SharedArrayBuffer + Atomics,或 Transferable 转移控制权。
  3. 复杂状态共享:使用 @Sendable + 共享模块 + AsyncLock
  4. 长驻后台服务:使用 Worker,但务必在组件销毁时调用 terminate() 释放资源,防止内存泄漏。
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐