本文同步发表于我的微信公众号,微信搜索 程语新视界 即可关注,每个工作日都有文章更新

一、Worker 线程通信

1. Worker 与 TaskPool 的差异

特性 Worker TaskPool
生命周期 长时间存在,可重复使用 短时任务,自动销毁
数量限制 最多64个Worker线程 无明确数量限制,受系统资源约束
使用场景 复杂长时任务,需要状态保持 独立短时任务,无状态
通信方式 双向消息传递,支持同步调用 单向结果返回,支持进度通知

2. 通信的核心

  • 双向消息传递:宿主线程与Worker线程可相互发送消息

  • 同步方法调用:Worker可直接调用宿主线程中注册的方法

  • 长时连接:保持连接状态,支持多次交互

  • 状态保持:Worker可维护内部状态,执行多个相关任务

二、Worker 消息通信

1. 通信原理

  • 消息队列:基于事件驱动的消息传递机制

  • 异步处理:非阻塞式通信,不影响各自线程执行

  • 序列化传输:数据自动序列化,支持基本类型和可序列化对象

2. 示例

步骤1:创建 Worker 线程处理逻辑
// DataProcessor.ets
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

// Worker接收宿主线程的消息,做相应的处理
workerPort.onmessage = (e: MessageEvents): void => {
  const requestData = e.data;
  
  if (requestData === 'start_processing') {
    // 执行数据处理
    const result = processData();
    workerPort.postMessage('processing_completed');
    
  } else if (requestData === 'get_status') {
    // 返回状态信息
    workerPort.postMessage('worker_running');
    
  } else if (requestData === 'calculate_sum') {
    // 执行计算任务
    const numbers = [1, 2, 3, 4, 5];
    const sum = numbers.reduce((a, b) => a + b, 0);
    workerPort.postMessage(`sum_result:${sum}`);
    
  } else {
    workerPort.postMessage('unknown_command');
  }
};

function processData(): string {
  // 模拟数据处理
  let result = '';
  for (let i = 0; i < 1000; i++) {
    result += i.toString();
  }
  return result;
}
步骤2:宿主线程管理与通信
// MainPage.ets
import { worker, MessageEvents } from '@kit.ArkTS';

@Entry
@Component
struct MainPage {
  @State pageTitle: string = 'Worker通信演示';
  @State workerStatus: string = '未启动';
  @State lastMessage: string = '无消息';
  
  private workerInstance: worker.ThreadWorker | null = null;

  build() {
    Column() {
      Text(this.pageTitle)
        .fontSize(24)
        .fontWeight(FontWeight.Bold)
        .margin({ bottom: 10 })
      
      Text(`Worker状态: ${this.workerStatus}`)
        .fontSize(16)
        .fontColor(Color.Blue)
        .margin({ bottom: 10 })
      
      Text(`最后消息: ${this.lastMessage}`)
        .fontSize(14)
        .margin({ bottom: 20 })
      
      Button('启动Worker')
        .fontSize(16)
        .padding(8)
        .backgroundColor(Color.Green)
        .onClick(() => this.startWorker())
        .margin({ bottom: 10 })
      
      Button('发送处理请求')
        .fontSize(16)
        .padding(8)
        .backgroundColor(Color.Blue)
        .onClick(() => this.sendProcessRequest())
        .margin({ bottom: 10 })
      
      Button('请求状态')
        .fontSize(16)
        .padding(8)
        .backgroundColor(Color.Orange)
        .onClick(() => this.requestStatus())
        .margin({ bottom: 10 })
      
      Button('停止Worker')
        .fontSize(16)
        .padding(8)
        .backgroundColor(Color.Red)
        .onClick(() => this.stopWorker())
    }
    .padding(20)
    .width('100%')
  }

  private startWorker(): void {
    try {
      this.workerInstance = new worker.ThreadWorker("entry/ets/workers/DataProcessor.ets");
      
      // 设置消息接收回调
      this.workerInstance.onmessage = (e: MessageEvents) => {
        const response = e.data;
        this.lastMessage = `收到: ${response}`;
        console.info("主线程收到Worker消息: " + response);
      };
      
      // 设置错误处理
      this.workerInstance.onerror = (error) => {
        this.workerStatus = '错误状态';
        this.lastMessage = `错误: ${error.message}`;
        console.error("Worker错误: " + error.message);
      };
      
      // 设置退出回调
      this.workerInstance.onexit = (code: number) => {
        this.workerStatus = '已退出';
        this.lastMessage = `退出代码: ${code}`;
        console.info("Worker已退出,代码: " + code);
      };
      
      this.workerStatus = '运行中';
      this.lastMessage = 'Worker启动成功';
      
    } catch (error) {
      this.workerStatus = '启动失败';
      this.lastMessage = `启动错误: ${error}`;
    }
  }

  private sendProcessRequest(): void {
    if (this.workerInstance) {
      this.workerInstance.postMessage('start_processing');
      this.lastMessage = '已发送处理请求';
    } else {
      this.lastMessage = 'Worker未启动';
    }
  }

  private requestStatus(): void {
    if (this.workerInstance) {
      this.workerInstance.postMessage('get_status');
      this.lastMessage = '已发送状态请求';
    } else {
      this.lastMessage = 'Worker未启动';
    }
  }

  private stopWorker(): void {
    if (this.workerInstance) {
      this.workerInstance.terminate();
      this.workerInstance = null;
      this.workerStatus = '已停止';
      this.lastMessage = 'Worker已终止';
    }
  }
}

3. 异步等待机制实现

// 异步工具函数
function createDelay(ms: number): Promise<void> {
  return new Promise<void>((resolve) => {
    setTimeout(() => {
      resolve();
    }, ms);
  });
}

// 带等待的Worker通信
async function executeWorkerWithWait(): Promise<void> {
  const workerInst = new worker.ThreadWorker("entry/ets/workers/DataProcessor.ets");
  let responseReceived = false;
  let workerTerminated = false;
  let finalResult: string = '';

  workerInst.onexit = () => {
    workerTerminated = true;
  };

  workerInst.onmessage = (e) => {
    finalResult = e.data;
    responseReceived = true;
    console.info("Worker返回结果: " + finalResult);
  };

  // 发送计算请求
  workerInst.postMessage('calculate_sum');

  // 等待响应
  while (!responseReceived) {
    await createDelay(100);
  }

  // 终止Worker
  workerInst.terminate();

  // 等待Worker完全退出
  while (!workerTerminated) {
    await createDelay(100);
  }

  console.info("Worker任务完成,结果: " + finalResult);
}

三、Worker 同步调用宿主线程接口

1. 同步调用原理

  • 方法注册:在宿主线程中注册对象和方法供Worker调用

  • 同步执行:Worker线程阻塞等待宿主线程方法执行完成

  • 结果返回:方法返回值直接返回给Worker线程

  • 异常传递:宿主线程方法异常会传递到Worker线程

2. 示例

步骤1:在宿主线程注册可调用对象
// HostServices.ets
import { worker, MessageEvents } from '@kit.ArkTS';

// 在宿主线程中实现的服务类
class HostServiceProvider {
  private dataCache: Map<string, any> = new Map();
  
  public getDataInfo(key: string): string {
    console.info("宿主线程: 获取数据信息 " + key);
    
    if (this.dataCache.has(key)) {
      return `缓存数据: ${this.dataCache.get(key)}`;
    } else {
      const newData = `生成的数据_${key}_${Date.now()}`;
      this.dataCache.set(key, newData);
      return `新数据: ${newData}`;
    }
  }
  
  public validateInput(input: string): boolean {
    console.info("宿主线程: 验证输入 " + input);
    return input.length > 0 && input.length < 100;
  }
  
  public getCurrentTime(): string {
    const now = new Date();
    return `当前时间: ${now.toLocaleString()}`;
  }

  // 创建单例实例
  static sharedInstance: HostServiceProvider = new HostServiceProvider();
}

// 主页面组件
@Entry
@Component
struct ServiceDemoPage {
  @State pageTitle: string = 'Worker同步调用演示';
  @State callLogs: string[] = [];
  
  private workerInst: worker.ThreadWorker | null = null;

  build() {
    Column() {
      Text(this.pageTitle)
        .fontSize(24)
        .fontWeight(FontWeight.Bold)
        .margin({ bottom: 20 })
      
      Button('启动服务Worker')
        .fontSize(16)
        .padding(10)
        .backgroundColor(Color.Green)
        .onClick(() => this.launchServiceWorker())
        .margin({ bottom: 10 })
      
      Button('测试数据获取')
        .fontSize(16)
        .padding(10)
        .backgroundColor(Color.Blue)
        .onClick(() => this.testDataRetrieval())
        .margin({ bottom: 10 })
      
      Button('测试输入验证')
        .fontSize(16)
        .padding(10)
        .backgroundColor(Color.Orange)
        .onClick(() => this.testInputValidation())
        .margin({ bottom: 10 })
      
      // 显示调用日志
      List({ space: 5 }) {
        ForEach(this.callLogs, (log: string) => {
          ListItem() {
            Text(log)
              .fontSize(12)
              .fontColor(Color.Gray)
          }
        })
      }
      .layoutWeight(1)
      .width('100%')
    }
    .padding(20)
    .width('100%')
    .height('100%')
  }

  private launchServiceWorker(): void {
    try {
      this.workerInst = new worker.ThreadWorker("entry/ets/workers/ServiceWorker.ets");
      
      // 在Worker中注册宿主线程的服务对象
      this.workerInst.registerGlobalCallObject("hostServices", HostServiceProvider.sharedInstance);
      
      // 设置消息接收回调
      this.workerInst.onmessage = (e: MessageEvents) => {
        const response = e.data;
        this.addLog(`Worker响应: ${response}`);
      };
      
      this.addLog('服务Worker启动成功');
      
    } catch (error) {
      this.addLog(`Worker启动失败: ${error}`);
    }
  }

  private testDataRetrieval(): void {
    if (this.workerInst) {
      this.workerInst.postMessage({ action: 'fetch_data', key: 'user_profile' });
      this.addLog('发送数据获取请求');
    }
  }

  private testInputValidation(): void {
    if (this.workerInst) {
      this.workerInst.postMessage({ action: 'validate_input', text: 'test_input' });
      this.addLog('发送输入验证请求');
    }
  }

  private addLog(log: string): void {
    const timeStamp = new Date().toLocaleTimeString();
    this.callLogs = [...this.callLogs, `[${timeStamp}] ${log}`];
    
    // 保持日志数量合理
    if (this.callLogs.length > 20) {
      this.callLogs = this.callLogs.slice(1);
    }
  }
}
步骤2:Worker线程调用宿主线程方法
// ServiceWorker.ets
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

workerPort.onmessage = async (e: MessageEvents) => {
  const request = e.data;
  
  try {
    if (request.action === 'fetch_data') {
      await handleDataFetch(request.key);
    } else if (request.action === 'validate_input') {
      await handleInputValidation(request.text);
    } else if (request.action === 'get_time') {
      await handleTimeRequest();
    } else {
      workerPort.postMessage(`未知操作: ${request.action}`);
    }
  } catch (error) {
    console.error(`Worker处理错误: ${error}`);
    workerPort.postMessage(`处理失败: ${error.message}`);
  }
};

// 处理数据获取
async function handleDataFetch(key: string): Promise<void> {
  try {
    console.info(`Worker: 开始获取数据 ${key}`);
    
    // 同步调用宿主线程的数据获取方法
    const result = workerPort.callGlobalCallObjectMethod("hostServices", "getDataInfo", key) as string;
    
    console.info(`Worker: 数据获取结果 ${result}`);
    workerPort.postMessage(`数据获取成功: ${result}`);
    
  } catch (error) {
    console.error(`数据获取失败: ${error}`);
    workerPort.postMessage(`数据获取失败: ${error.message}`);
  }
}

// 处理输入验证
async function handleInputValidation(inputText: string): Promise<void> {
  try {
    console.info(`Worker: 验证输入 ${inputText}`);
    
    // 同步调用宿主线程的验证方法
    const isValid = workerPort.callGlobalCallObjectMethod("hostServices", "validateInput", inputText) as boolean;
    
    console.info(`Worker: 输入验证结果 ${isValid}`);
    workerPort.postMessage(`输入验证: ${isValid ? '有效' : '无效'}`);
    
  } catch (error) {
    console.error(`输入验证失败: ${error}`);
    workerPort.postMessage(`输入验证失败: ${error.message}`);
  }
}

// 处理时间请求
async function handleTimeRequest(): Promise<void> {
  try {
    console.info('Worker: 获取当前时间');
    
    // 同步调用宿主线程的时间获取方法
    const currentTime = workerPort.callGlobalCallObjectMethod("hostServices", "getCurrentTime") as string;
    
    console.info(`Worker: 时间获取结果 ${currentTime}`);
    workerPort.postMessage(`时间信息: ${currentTime}`);
    
  } catch (error) {
    console.error(`时间获取失败: ${error}`);
    workerPort.postMessage(`时间获取失败: ${error.message}`);
  }
}

四、使用场景

场景特征 即时消息通信 同步方法调用
交互模式 异步,事件驱动 同步,方法调用
执行方式 非阻塞,继续执行 阻塞,等待结果
适用场景 状态通知、进度报告 数据查询、配置获取
复杂度 中等,需要消息协议 简单,直接方法调用
性能影响 较小,异步处理 较大,线程阻塞

五、总结

Worker 线程通信的核心:

  1. 双向通信:支持宿主线程与Worker线程的相互通信

  2. 同步调用:Worker可直接调用宿主线程中注册的方法

  3. 状态保持:Worker可长时间运行,维护内部状态

  4. 资源复用:一个Worker可处理多个相关任务

技术要点:

  • 使用 postMessage() 和 onmessage 实现异步消息通信

  • 使用 registerGlobalCallObject() 和 callGlobalCallObjectMethod() 实现同步方法调用

  • 合理管理Worker生命周期,及时销毁不再使用的Worker

  • 设计清晰的消息协议,确保通信的可靠性和可维护性

通过这两种通信机制,Worker能够与宿主线程进行灵活高效的交互,满足复杂应用场景。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐