ArkTS 多线程并发实战:TaskPool 与 Worker 深度对比,打造高性能鸿蒙应用

适用版本:HarmonyOS NEXT / API 12+
核心主题:TaskPool、Worker、并发编程、性能优化
难度等级:⭐⭐⭐(中高级)


前言:单线程的瓶颈

在鸿蒙应用开发中,默认情况下所有代码都运行在主线程(UI 线程)中。这意味着:当你执行一个耗时 500ms 的图片压缩操作,或者解析一个 10MB 的 JSON 文件时,整个 UI 会冻结,用户体验直线下降

HarmonyOS NEXT 提供了两套并发方案:

  • TaskPool:轻量级任务池,适合短时批量任务
  • Worker:独立 Worker 线程,适合长时任务和复杂通信

很多开发者不清楚该选哪个,本文通过完整的实战代码和性能对比,帮你彻底搞清楚这两者的区别和适用场景。


一、ArkTS 并发模型基础

在深入之前,先理解 ArkTS 的并发隔离模型:

┌─────────────────────────────────────────┐
│              主线程 (UI Thread)          │
│   ArkUI 渲染 / 事件处理 / 业务逻辑      │
├──────────────────┬──────────────────────┤
│   TaskPool       │   Worker 线程         │
│   (系统管理)     │   (开发者管理)        │
│   短时任务       │   长时/复杂任务       │
└──────────────────┴──────────────────────┘

关键特性:ArkTS 采用内存隔离模型,线程间通过消息传递(Structured Clone)通信,不共享内存(除 SharedArrayBuffer 外)。


二、TaskPool 实战

2.1 基本用法

TaskPool 最简单的使用方式是通过 @Concurrent 装饰器标注可在线程池中执行的函数:

// utils/imageProcessor.ets
import { taskpool } from '@kit.ArkTS';

// ⚠️ 必须用 @Concurrent 标注,否则运行时报错
@Concurrent
function compressImage(buffer: ArrayBuffer, quality: number): ArrayBuffer {
  // 模拟耗时压缩操作(实际中调用图片处理库)
  const result = new ArrayBuffer(Math.floor(buffer.byteLength * quality));
  const src = new Uint8Array(buffer);
  const dst = new Uint8Array(result);
  for (let i = 0; i < dst.length; i++) {
    dst[i] = src[i];
  }
  return result;
}

// 调用方
export async function processImage(rawBuffer: ArrayBuffer): Promise<ArrayBuffer> {
  const task = new taskpool.Task(compressImage, rawBuffer, 0.7);
  const result = await taskpool.execute(task) as ArrayBuffer;
  return result;
}

在 UI 组件中使用:

// pages/ImagePage.ets
import { processImage } from '../utils/imageProcessor';

@Entry
@Component
struct ImagePage {
  @State status: string = '等待处理';
  @State resultSize: number = 0;

  private async handleCompress() {
    this.status = '压缩中...';
    
    // 模拟原始图片数据(实际从媒体库读取)
    const rawBuffer = new ArrayBuffer(1024 * 1024 * 5); // 5MB
    
    const startTime = Date.now();
    const compressed = await processImage(rawBuffer);
    const elapsed = Date.now() - startTime;
    
    this.resultSize = compressed.byteLength;
    this.status = `完成!耗时 ${elapsed}ms,大小: ${(this.resultSize / 1024).toFixed(1)}KB`;
  }

  build() {
    Column({ space: 20 }) {
      Text(this.status).fontSize(16).fontColor('#333')
      Text(`压缩后大小: ${(this.resultSize / 1024).toFixed(1)} KB`)
        .fontSize(14)
        .fontColor('#666')
      Button('开始压缩')
        .onClick(() => this.handleCompress())
        .backgroundColor('#007DFF')
        .fontColor(Color.White)
        .borderRadius(8)
    }
    .width('100%')
    .padding(24)
    .justifyContent(FlexAlign.Center)
  }
}

2.2 TaskPool 批量并发

TaskPool 真正的威力在于批量任务并发执行

// utils/batchProcessor.ets
import { taskpool } from '@kit.ArkTS';

@Concurrent
function parseJsonChunk(chunk: string): object[] {
  try {
    return JSON.parse(chunk) as object[];
  } catch {
    return [];
  }
}

// 将大 JSON 分块并发解析
export async function parseLargeJson(rawJson: string): Promise<object[]> {
  const chunkSize = Math.ceil(rawJson.length / 4); // 分成4块
  const tasks: taskpool.Task[] = [];
  
  // 按行分割避免破坏JSON结构(实际业务中需更精细处理)
  const lines = rawJson.split('\n');
  const linesPerChunk = Math.ceil(lines.length / 4);
  
  for (let i = 0; i < 4; i++) {
    const chunk = lines.slice(i * linesPerChunk, (i + 1) * linesPerChunk).join('\n');
    if (chunk.trim()) {
      tasks.push(new taskpool.Task(parseJsonChunk, chunk));
    }
  }
  
  // 并发执行所有任务
  const results = await Promise.all(
    tasks.map(task => taskpool.execute(task) as Promise<object[]>)
  );
  
  return results.flat();
}

2.3 TaskPool 优先级控制

import { taskpool } from '@kit.ArkTS';

@Concurrent
function heavyComputation(n: number): number {
  let result = 0;
  for (let i = 0; i < n; i++) {
    result += Math.sqrt(i);
  }
  return result;
}

// 高优先级任务(用户交互触发)
const highPriorityTask = new taskpool.Task(heavyComputation, 1000000);
await taskpool.execute(highPriorityTask, taskpool.Priority.HIGH);

// 后台低优先级任务(预加载)
const bgTask = new taskpool.Task(heavyComputation, 5000000);
taskpool.execute(bgTask, taskpool.Priority.LOW); // 不 await,后台运行

三、Worker 实战

Worker 适合需要长时间运行复杂双向通信的场景,比如:WebSocket 长连接处理、持续的音频解码、文件实时监控。

3.1 创建 Worker 线程文件

📁 文件必须放在 workers/ 目录下,且在 build-profile.json5 中配置

// workers/DataWorker.ets
import { worker, MessageEvents, ErrorEvent } from '@kit.ArkTS';

const workerPort = worker.workerPort;

interface ProcessRequest {
  type: 'PROCESS_DATA' | 'CANCEL' | 'STATUS';
  data?: number[];
  taskId?: string;
}

interface ProcessResponse {
  type: 'RESULT' | 'PROGRESS' | 'ERROR' | 'STATUS';
  result?: number;
  progress?: number;
  error?: string;
  taskId?: string;
  status?: string;
}

// 监听主线程消息
workerPort.onmessage = (event: MessageEvents) => {
  const req = event.data as ProcessRequest;
  
  switch (req.type) {
    case 'PROCESS_DATA':
      processData(req.data ?? [], req.taskId ?? '');
      break;
    case 'STATUS':
      const response: ProcessResponse = { type: 'STATUS', status: 'alive' };
      workerPort.postMessage(response);
      break;
    default:
      break;
  }
};

function processData(data: number[], taskId: string) {
  const total = data.length;
  let sum = 0;
  
  for (let i = 0; i < total; i++) {
    // 模拟复杂计算
    sum += Math.pow(data[i], 2) / (i + 1);
    
    // 每处理 20% 上报一次进度
    if (i % Math.floor(total / 5) === 0) {
      const progress = Math.floor((i / total) * 100);
      const progressMsg: ProcessResponse = { 
        type: 'PROGRESS', 
        progress, 
        taskId 
      };
      workerPort.postMessage(progressMsg);
    }
  }
  
  // 发送最终结果
  const resultMsg: ProcessResponse = { 
    type: 'RESULT', 
    result: sum, 
    taskId 
  };
  workerPort.postMessage(resultMsg);
}

workerPort.onmessageerror = (event: MessageEvents) => {
  console.error('Worker message error:', JSON.stringify(event));
};

3.2 主线程使用 Worker

// utils/workerManager.ets
import { worker, MessageEvents } from '@kit.ArkTS';

export class DataWorkerManager {
  private workerInstance: worker.ThreadWorker | null = null;
  private taskCallbacks: Map<string, {
    resolve: (value: number) => void;
    reject: (reason: string) => void;
    onProgress?: (progress: number) => void;
  }> = new Map();

  init() {
    if (this.workerInstance) return;
    
    this.workerInstance = new worker.ThreadWorker('entry/ets/workers/DataWorker.ets');
    
    this.workerInstance.onmessage = (event: MessageEvents) => {
      const msg = event.data as { type: string; result?: number; progress?: number; taskId?: string; error?: string };
      
      if (msg.taskId && this.taskCallbacks.has(msg.taskId)) {
        const callbacks = this.taskCallbacks.get(msg.taskId)!;
        
        switch (msg.type) {
          case 'RESULT':
            callbacks.resolve(msg.result ?? 0);
            this.taskCallbacks.delete(msg.taskId);
            break;
          case 'PROGRESS':
            callbacks.onProgress?.(msg.progress ?? 0);
            break;
          case 'ERROR':
            callbacks.reject(msg.error ?? 'Unknown error');
            this.taskCallbacks.delete(msg.taskId);
            break;
        }
      }
    };
    
    this.workerInstance.onerror = (err: ErrorEvent) => {
      console.error('Worker error:', err.message);
    };
  }

  processData(
    data: number[],
    onProgress?: (progress: number) => void
  ): Promise<number> {
    return new Promise((resolve, reject) => {
      if (!this.workerInstance) {
        reject('Worker not initialized');
        return;
      }
      
      const taskId = `task_${Date.now()}_${Math.random().toString(36).slice(2)}`;
      this.taskCallbacks.set(taskId, { resolve, reject, onProgress });
      
      this.workerInstance.postMessage({ 
        type: 'PROCESS_DATA', 
        data, 
        taskId 
      });
    });
  }

  terminate() {
    this.workerInstance?.terminate();
    this.workerInstance = null;
  }
}

3.3 在页面中集成 Worker

// pages/WorkerDemoPage.ets
import { DataWorkerManager } from '../utils/workerManager';

@Entry
@Component
struct WorkerDemoPage {
  private workerManager = new DataWorkerManager();
  @State progress: number = 0;
  @State result: string = '';
  @State isProcessing: boolean = false;

  aboutToAppear() {
    this.workerManager.init();
  }

  aboutToDisappear() {
    this.workerManager.terminate();
  }

  private async startProcessing() {
    this.isProcessing = true;
    this.progress = 0;
    this.result = '';
    
    // 生成 10 万个随机数
    const data = Array.from({ length: 100000 }, () => Math.random() * 100);
    
    try {
      const finalResult = await this.workerManager.processData(
        data,
        (p) => { this.progress = p; } // 进度回调
      );
      this.result = `计算结果: ${finalResult.toFixed(4)}`;
    } catch (e) {
      this.result = `错误: ${e}`;
    } finally {
      this.isProcessing = false;
      this.progress = 100;
    }
  }

  build() {
    Column({ space: 16 }) {
      Text('Worker 并发计算演示')
        .fontSize(20)
        .fontWeight(FontWeight.Bold)
      
      Progress({ value: this.progress, total: 100, type: ProgressType.Linear })
        .width('90%')
        .color('#007DFF')
      
      Text(`进度: ${this.progress}%`)
        .fontSize(14)
        .fontColor('#666')
      
      if (this.result) {
        Text(this.result)
          .fontSize(16)
          .fontColor('#2ECC71')
          .fontWeight(FontWeight.Medium)
      }
      
      Button(this.isProcessing ? '处理中...' : '开始计算 10 万数据')
        .enabled(!this.isProcessing)
        .onClick(() => this.startProcessing())
        .backgroundColor(this.isProcessing ? '#CCCCCC' : '#007DFF')
        .fontColor(Color.White)
        .borderRadius(8)
        .width('80%')
    }
    .width('100%')
    .height('100%')
    .justifyContent(FlexAlign.Center)
    .padding(24)
  }
}

四、TaskPool vs Worker 对比总结

维度 TaskPool Worker
线程管理 系统自动管理线程池 开发者手动创建/销毁
通信方式 返回 Promise 双向消息传递
适用场景 短时、批量、独立任务 长时、有状态、需进度反馈
内存开销 低(线程复用) 较高(独立线程)
代码复杂度 低(一两行调用) 中(需处理消息协议)
任务取消 支持 cancel() 需自定义 cancel 消息
数据传输 Structured Clone Structured Clone
最大并发数 系统动态调整 无硬性限制(建议 ≤8)

选型决策树

是否需要实时进度反馈?
    ├─ 是 → Worker
    └─ 否 → 任务时间是否 > 3 秒?
                ├─ 是 → Worker
                └─ 否 → 是否需要批量并发?
                            ├─ 是 → TaskPool
                            └─ 否 → TaskPool(简单场景)

五、性能优化建议

5.1 避免频繁传输大对象

// ❌ 错误:每次都传完整数组
@Concurrent
function badPractice(hugeArray: number[]): number {
  return hugeArray.reduce((a, b) => a + b, 0);
}

// ✅ 正确:使用 SharedArrayBuffer 或分块处理
@Concurrent
function goodPractice(buffer: SharedArrayBuffer, length: number): number {
  const view = new Float64Array(buffer, 0, length);
  let sum = 0;
  for (let i = 0; i < length; i++) {
    sum += view[i];
  }
  return sum;
}

5.2 TaskPool 任务分组

import { taskpool } from '@kit.ArkTS';

// 使用 TaskGroup 确保相关任务一起调度
const group = new taskpool.TaskGroup();
group.addTask(new taskpool.Task(task1, param1));
group.addTask(new taskpool.Task(task2, param2));
group.addTask(new taskpool.Task(task3, param3));

// 等待所有任务完成
const results = await taskpool.execute(group) as unknown[];

5.3 Worker 池化复用

在高频调用场景中,避免每次都创建/销毁 Worker,应使用单例或对象池:

// 应用级 Worker 单例(在 EntryAbility 中初始化)
export class AppWorkerPool {
  private static instance: DataWorkerManager;
  
  static getInstance(): DataWorkerManager {
    if (!AppWorkerPool.instance) {
      AppWorkerPool.instance = new DataWorkerManager();
      AppWorkerPool.instance.init();
    }
    return AppWorkerPool.instance;
  }
}

六、常见踩坑

问题 原因 解决方案
@Concurrent 函数中无法使用闭包变量 隔离执行环境 通过参数传入所有依赖
Worker 文件路径报错 路径写法错误 格式固定为 entry/ets/workers/XXX.ets
传输数据后原对象变为空 Transferable 对象被转移 使用 SharedArrayBuffer 或克隆后传输
TaskPool 函数调用其他模块报错 不支持导入外部模块 将依赖逻辑内联到 @Concurrent 函数中
Worker 消息堆积导致内存增长 没有及时处理消息 加入消息队列或背压机制

总结

  • TaskPool 是鸿蒙并发的"快餐方案":一行 @Concurrent,几行 Promise 调用,搞定 90% 的耗时任务
  • Worker 是"重型武器":适合需要持续运行、复杂状态管理、进度上报的场景
  • 不要在主线程做任何耗时超过 16ms 的操作(1帧时间),否则会掉帧
  • 线程间通信代价不低,避免频繁传递大对象,优先使用 SharedArrayBuffer

掌握这两套工具,你的鸿蒙应用性能将上一个台阶。代码已在 HarmonyOS NEXT API 12 上验证,可直接运行。


如果这篇文章对你有帮助,欢迎点赞收藏!有问题可以在评论区交流 👇

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐