前言

鸿蒙的 UI 跑在主线程上,这一点和 Android、iOS 一样。只要你在主线程做了耗时操作——大数据解析、图片处理、复杂计算——界面就开始掉帧。用户滑动列表卡一下,点开详情页顿一下,体验直接打折。

HarmonyOS 提供了两套多线程方案:TaskPoolWorker。很多开发者分不清该用哪个,这篇把两者的定位讲清楚,再给两个实战案例。

TaskPool 和 Worker 的定位

A clean, minimalist Notion-style comparison table

TaskPool 适合"一次性任务"。你把一个函数扔进去,它在线程池里找个空闲线程跑完,把结果返回。不用管线程创建和销毁,框架全包了。

Worker 适合"长期驻留的后台线程"。你创建一个 Worker,它有自己的生命周期,可以反复和主线程通信。适合需要持续处理数据的场景,比如实时数据流处理、WebSocket 长连接的数据解析。

简单说:任务跑完就走用 TaskPool,任务需要长期存在用 Worker。

90% 的场景 TaskPool 就够用。

@Concurrent 注解的规则

TaskPool 和 Worker 都需要用 @Concurrent 装饰器标记函数。这个注解告诉编译器:这个函数会在别的线程执行。

有几条硬性限制:

  • 函数必须是普通函数(不能是箭头函数、不能是类方法)
  • 函数体里不能引用外部变量(闭包不行)
  • 参数和返回值必须是可序列化的(基本类型、ArrayBuffer、Sendable 对象)
  • 不能用 @Sendable 之外的自定义类
// 正确 ✓
@Concurrent
function processData(data: ArrayBuffer): number[] {
  // 只能在这里使用 data 参数,不能引用外部变量
  const result: number[] = []
  // ... 处理逻辑
  return result
}

// 错误 ✗ 引用了外部变量
let globalConfig = { threshold: 0.5 }
@Concurrent
function processData(data: number[]): number[] {
  return data.filter(v => v > globalConfig.threshold) // 编译报错
}

A technical diagram in Notion aesthetic showing Ty

这些限制看着烦,但理解了就好:函数会被序列化到另一个线程执行,它必须自给自足。

TaskPool 实战:图片批量压缩

场景:用户选了 20 张图片要上传,需要先压缩到 500KB 以下。在主线程做会卡死 UI,用 TaskPool 并行处理。

先定义压缩函数:

import { image } from '@kit.ImageKit'

@Concurrent
function compressImage(buffer: ArrayBuffer, maxWidth: number, quality: number): ArrayBuffer {
  // 解码
  const pixelMap = image.createPixelMapFromSurface(buffer)
  const info = pixelMap.getImageInfoSync()

  // 计算缩放比例
  const ratio = Math.min(1, maxWidth / info.size.width)
  const newWidth = Math.floor(info.size.width * ratio)
  const newHeight = Math.floor(info.size.height * ratio)

  // 缩放
  pixelMap.scaleSync(newWidth, newHeight)

  // 压缩编码
  const packer = image.createImagePacker()
  const result = packer.packingSync(pixelMap, {
    format: 'image/jpeg',
    quality: quality
  })

  pixelMap.release()
  packer.release()
  return result
}

在 UI 层调用 TaskPool 并行执行:

import { taskpool } from '@kit.ArkTS'

@Component
struct ImageUploader {
  @State selectedImages: ArrayBuffer[] = []
  @State progress: string = '0/0'
  @State isProcessing: boolean = false

  async compressAll() {
    this.isProcessing = true
    const total = this.selectedImages.length
    let completed = 0
    const results: ArrayBuffer[] = []

    // 创建所有任务
    const tasks = this.selectedImages.map(buffer => {
      return taskpool.execute(
        compressImage,
        buffer,
        1080, // maxWidth
        80    // quality
      ) as Promise<ArrayBuffer>
    })

    // 并行执行,逐个收集结果
    for (const task of tasks) {
      try {
        const result = await task
        results.push(result)
        completed++
        this.progress = `${completed}/${total}`
      } catch (err) {
        Logger.error('Compress failed', err)
        completed++
        this.progress = `${completed}/${total}`
      }
    }

    this.isProcessing = false
    // results 里就是压缩后的图片数据
    await this.uploadImages(results)
  }

  build() {
    Column({ space: 16 }) {
      Button(this.isProcessing ? `压缩中 ${this.progress}` : '开始压缩')
        .enabled(!this.isProcessing)
        .onClick(() => this.compressAll())
    }
  }
}

A step-by-step flowchart for 'Image Batch Compress

TaskPool 会自动把任务分配到多个线程,充分利用多核 CPU。20 张图片可能同时有 4-6 张在并行压缩,比串行快好几倍。

取消任务

用户点了取消,或者页面退出了,正在跑的任务要能停掉:

// 创建任务时拿到 Task 对象
const task = new taskpool.Task(compressImage, buffer, 1080, 80)
taskpool.execute(task)

// 取消
taskpool.cancel(task)

注意 cancel 只能取消还没开始执行的任务。已经在跑的取消不掉,只能等它跑完。如果确实需要中断正在执行的逻辑,得在函数内部加检查点:

@Concurrent
function longRunningTask(data: number[], signal: boolean[]): number[] {
  const result: number[] = []
  for (let i = 0; i < data.length; i++) {
    // 每处理 100 个检查一下取消信号
    if (i % 100 === 0 && signal[0]) {
      break
    }
    result.push(data[i] * 2)
  }
  return result
}

这里 signal 用数组包一层是因为要满足可序列化的要求,SendableSharedArrayBuffer 也可以实现类似效果。

Worker 实战:数据流实时处理

场景:App 通过蓝牙连接传感器,每秒收到几十条数据,需要做滤波和聚合后再更新 UI。这种持续的数据流处理适合用 Worker。

先创建 Worker 文件 DataProcessor.ets

import { worker } from '@kit.ArkTS'

const workerPort = worker.workerPort

// 滑动窗口,用于滤波
let window: number[] = []
const WINDOW_SIZE = 10

workerPort.onmessage = (event: MessageEvents) => {
  const msg = event.data
  if (msg.type === 'data') {
    const rawValue: number = msg.value

    // 滑动窗口滤波
    window.push(rawValue)
    if (window.length > WINDOW_SIZE) {
      window.shift()
    }

    // 计算窗口均值
    const avg = window.reduce((a, b) => a + b, 0) / window.length

    // 检测异常值(偏离均值超过 3 倍标准差)
    const variance = window.reduce((sum, v) =>
      sum + Math.pow(v - avg, 2), 0) / window.length
    const stdDev = Math.sqrt(variance)
    const isAnomaly = Math.abs(rawValue - avg) > stdDev * 3

    // 回传处理结果
    workerPort.postMessage({
      type: 'processed',
      value: avg,
      raw: rawValue,
      isAnomaly: isAnomaly,
      timestamp: Date.now()
    })
  } else if (msg.type === 'reset') {
    window = []
    workerPort.postMessage({ type: 'reset_done' })
  }
}

// Worker 初始化完成通知
workerPort.postMessage({ type: 'ready' })

主线程里创建和使用 Worker:

import { worker } from '@kit.ArkTS'

@Component
struct SensorDashboard {
  private workerInstance: worker.ThreadWorker | null = null
  @State currentValue: number = 0
  @State isAnomaly: boolean = false
  @State dataHistory: number[] = []

  aboutToAppear() {
    // 创建 Worker
    this.workerInstance = new worker.ThreadWorker(
      'entry/ets/workers/DataProcessor.ets'
    )

    this.workerInstance.onmessage = (event: MessageEvents) => {
      const msg = event.data
      if (msg.type === 'ready') {
        Logger.info('Worker is ready')
      } else if (msg.type === 'processed') {
        this.currentValue = msg.value
        this.isAnomaly = msg.isAnomaly
        this.dataHistory.push(msg.value)
        // 只保留最近 100 个数据点用于图表
        if (this.dataHistory.length > 100) {
          this.dataHistory.shift()
        }
      }
    }
  }

  aboutToDisappear() {
    // 页面销毁时终止 Worker
    this.workerInstance?.terminate()
    this.workerInstance = null
  }

  // 蓝牙数据回调里,把原始数据推给 Worker
  onBluetoothDataReceived(value: number) {
    this.workerInstance?.postMessage({
      type: 'data',
      value: value
    })
  }

  build() {
    Column({ space: 12 }) {
      Text(`当前值: ${this.currentValue.toFixed(2)}`)
        .fontSize(24)
        .fontColor(this.isAnomaly ? '#FF4D4F' : '#333333')

      Text(this.isAnomaly ? '⚠ 检测到异常值' : '数据正常')
        .fontColor(this.isAnomaly ? '#FF4D4F' : '#52C41A')

      // 这里可以放一个 Canvas 折线图,参考第 29 篇
      // LineChart({ data: this.dataHistory, ... })
    }
    .padding(20)
  }
}

Worker 的关键优势在于它一直活着。不像 TaskPool 每次任务都要调度,Worker 创建后就常驻后台,数据来一条处理一条,没有调度开销。对于高频数据流,这个差异很明显。

TaskPool 的任务编排

TaskPool 还有个实用功能——可以控制任务的优先级和执行顺序:

// 设置优先级(HIGH 优先执行)
const highPriorityTask = new taskpool.Task(importantWork, data)
taskpool.execute(highPriorityTask, taskpool.Priority.HIGH)

// Promise.all 并行等待所有任务完成
const results = await Promise.all([
  taskpool.execute(taskA, data1),
  taskpool.execute(taskB, data2),
  taskpool.execute(taskC, data3),
])

// 串行执行(前一个完成再跑下一个)
const result1 = await taskpool.execute(step1, input)
const result2 = await taskpool.execute(step2, result1)
const finalResult = await taskpool.execute(step3, result2)

串行执行虽然不是"并行",但把每步放到 TaskPool 里跑,至少不会阻塞主线程,UI 还是流畅的。

选型的经验

能用 TaskPool 就别用 Worker。 TaskPool 用起来简单,不用管生命周期,线程池自动管理,适合绝大多数场景。

Worker 只在这些场景用: 需要维护内部状态的后台任务、高频数据流处理、需要长连接的后台处理。

别在多线程里操作 UI。 不管是 TaskPool 还是 Worker,都只能处理数据,不能碰 UI 组件。处理完把结果传回主线程,主线程更新 @State 驱动 UI 刷新。

传参尽量用 ArrayBuffer。 大数据量传输时,对象序列化开销不小。能用 ArrayBuffer 传就用 ArrayBuffer,零拷贝,快得多。

多线程这块不难,难的是判断"要不要用"。很多时候你觉得需要多线程,其实是因为数据结构没设计好,或者做了不必要的重复计算。先优化主线程逻辑,真卡了再上多线程。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐