鸿蒙 Worker和宿主线程的通信
本文详细介绍了ArkTS中Worker线程的通信机制,主要包含两种交互模式:异步消息传递和同步方法调用。Worker线程具有长生命周期、状态保持和双向通信能力,适用于复杂任务处理。文章通过具体代码示例展示了Worker与宿主线程的交互实现,包括消息队列机制、异步等待处理以及同步调用宿主方法的完整流程。技术要点包括postMessage/onmessage异步通信、registerGlobalCal
本文同步发表于我的微信公众号,微信搜索 程语新视界 即可关注,每个工作日都有文章更新
一、Worker 线程通信
1. Worker 与 TaskPool 的差异
| 特性 | Worker | TaskPool |
|---|---|---|
| 生命周期 | 长时间存在,可重复使用 | 短时任务,自动销毁 |
| 数量限制 | 最多64个Worker线程 | 无明确数量限制,受系统资源约束 |
| 使用场景 | 复杂长时任务,需要状态保持 | 独立短时任务,无状态 |
| 通信方式 | 双向消息传递,支持同步调用 | 单向结果返回,支持进度通知 |
2. 通信的核心
-
双向消息传递:宿主线程与Worker线程可相互发送消息
-
同步方法调用:Worker可直接调用宿主线程中注册的方法
-
长时连接:保持连接状态,支持多次交互
-
状态保持:Worker可维护内部状态,执行多个相关任务
二、Worker 消息通信
1. 通信原理
-
消息队列:基于事件驱动的消息传递机制
-
异步处理:非阻塞式通信,不影响各自线程执行
-
序列化传输:数据自动序列化,支持基本类型和可序列化对象
2. 示例
步骤1:创建 Worker 线程处理逻辑
// DataProcessor.ets
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// Worker接收宿主线程的消息,做相应的处理
workerPort.onmessage = (e: MessageEvents): void => {
const requestData = e.data;
if (requestData === 'start_processing') {
// 执行数据处理
const result = processData();
workerPort.postMessage('processing_completed');
} else if (requestData === 'get_status') {
// 返回状态信息
workerPort.postMessage('worker_running');
} else if (requestData === 'calculate_sum') {
// 执行计算任务
const numbers = [1, 2, 3, 4, 5];
const sum = numbers.reduce((a, b) => a + b, 0);
workerPort.postMessage(`sum_result:${sum}`);
} else {
workerPort.postMessage('unknown_command');
}
};
function processData(): string {
// 模拟数据处理
let result = '';
for (let i = 0; i < 1000; i++) {
result += i.toString();
}
return result;
}
步骤2:宿主线程管理与通信
// MainPage.ets
import { worker, MessageEvents } from '@kit.ArkTS';
@Entry
@Component
struct MainPage {
@State pageTitle: string = 'Worker通信演示';
@State workerStatus: string = '未启动';
@State lastMessage: string = '无消息';
private workerInstance: worker.ThreadWorker | null = null;
build() {
Column() {
Text(this.pageTitle)
.fontSize(24)
.fontWeight(FontWeight.Bold)
.margin({ bottom: 10 })
Text(`Worker状态: ${this.workerStatus}`)
.fontSize(16)
.fontColor(Color.Blue)
.margin({ bottom: 10 })
Text(`最后消息: ${this.lastMessage}`)
.fontSize(14)
.margin({ bottom: 20 })
Button('启动Worker')
.fontSize(16)
.padding(8)
.backgroundColor(Color.Green)
.onClick(() => this.startWorker())
.margin({ bottom: 10 })
Button('发送处理请求')
.fontSize(16)
.padding(8)
.backgroundColor(Color.Blue)
.onClick(() => this.sendProcessRequest())
.margin({ bottom: 10 })
Button('请求状态')
.fontSize(16)
.padding(8)
.backgroundColor(Color.Orange)
.onClick(() => this.requestStatus())
.margin({ bottom: 10 })
Button('停止Worker')
.fontSize(16)
.padding(8)
.backgroundColor(Color.Red)
.onClick(() => this.stopWorker())
}
.padding(20)
.width('100%')
}
private startWorker(): void {
try {
this.workerInstance = new worker.ThreadWorker("entry/ets/workers/DataProcessor.ets");
// 设置消息接收回调
this.workerInstance.onmessage = (e: MessageEvents) => {
const response = e.data;
this.lastMessage = `收到: ${response}`;
console.info("主线程收到Worker消息: " + response);
};
// 设置错误处理
this.workerInstance.onerror = (error) => {
this.workerStatus = '错误状态';
this.lastMessage = `错误: ${error.message}`;
console.error("Worker错误: " + error.message);
};
// 设置退出回调
this.workerInstance.onexit = (code: number) => {
this.workerStatus = '已退出';
this.lastMessage = `退出代码: ${code}`;
console.info("Worker已退出,代码: " + code);
};
this.workerStatus = '运行中';
this.lastMessage = 'Worker启动成功';
} catch (error) {
this.workerStatus = '启动失败';
this.lastMessage = `启动错误: ${error}`;
}
}
private sendProcessRequest(): void {
if (this.workerInstance) {
this.workerInstance.postMessage('start_processing');
this.lastMessage = '已发送处理请求';
} else {
this.lastMessage = 'Worker未启动';
}
}
private requestStatus(): void {
if (this.workerInstance) {
this.workerInstance.postMessage('get_status');
this.lastMessage = '已发送状态请求';
} else {
this.lastMessage = 'Worker未启动';
}
}
private stopWorker(): void {
if (this.workerInstance) {
this.workerInstance.terminate();
this.workerInstance = null;
this.workerStatus = '已停止';
this.lastMessage = 'Worker已终止';
}
}
}
3. 异步等待机制实现
// 异步工具函数
function createDelay(ms: number): Promise<void> {
return new Promise<void>((resolve) => {
setTimeout(() => {
resolve();
}, ms);
});
}
// 带等待的Worker通信
async function executeWorkerWithWait(): Promise<void> {
const workerInst = new worker.ThreadWorker("entry/ets/workers/DataProcessor.ets");
let responseReceived = false;
let workerTerminated = false;
let finalResult: string = '';
workerInst.onexit = () => {
workerTerminated = true;
};
workerInst.onmessage = (e) => {
finalResult = e.data;
responseReceived = true;
console.info("Worker返回结果: " + finalResult);
};
// 发送计算请求
workerInst.postMessage('calculate_sum');
// 等待响应
while (!responseReceived) {
await createDelay(100);
}
// 终止Worker
workerInst.terminate();
// 等待Worker完全退出
while (!workerTerminated) {
await createDelay(100);
}
console.info("Worker任务完成,结果: " + finalResult);
}
三、Worker 同步调用宿主线程接口
1. 同步调用原理
-
方法注册:在宿主线程中注册对象和方法供Worker调用
-
同步执行:Worker线程阻塞等待宿主线程方法执行完成
-
结果返回:方法返回值直接返回给Worker线程
-
异常传递:宿主线程方法异常会传递到Worker线程
2. 示例
步骤1:在宿主线程注册可调用对象
// HostServices.ets
import { worker, MessageEvents } from '@kit.ArkTS';
// 在宿主线程中实现的服务类
class HostServiceProvider {
private dataCache: Map<string, any> = new Map();
public getDataInfo(key: string): string {
console.info("宿主线程: 获取数据信息 " + key);
if (this.dataCache.has(key)) {
return `缓存数据: ${this.dataCache.get(key)}`;
} else {
const newData = `生成的数据_${key}_${Date.now()}`;
this.dataCache.set(key, newData);
return `新数据: ${newData}`;
}
}
public validateInput(input: string): boolean {
console.info("宿主线程: 验证输入 " + input);
return input.length > 0 && input.length < 100;
}
public getCurrentTime(): string {
const now = new Date();
return `当前时间: ${now.toLocaleString()}`;
}
// 创建单例实例
static sharedInstance: HostServiceProvider = new HostServiceProvider();
}
// 主页面组件
@Entry
@Component
struct ServiceDemoPage {
@State pageTitle: string = 'Worker同步调用演示';
@State callLogs: string[] = [];
private workerInst: worker.ThreadWorker | null = null;
build() {
Column() {
Text(this.pageTitle)
.fontSize(24)
.fontWeight(FontWeight.Bold)
.margin({ bottom: 20 })
Button('启动服务Worker')
.fontSize(16)
.padding(10)
.backgroundColor(Color.Green)
.onClick(() => this.launchServiceWorker())
.margin({ bottom: 10 })
Button('测试数据获取')
.fontSize(16)
.padding(10)
.backgroundColor(Color.Blue)
.onClick(() => this.testDataRetrieval())
.margin({ bottom: 10 })
Button('测试输入验证')
.fontSize(16)
.padding(10)
.backgroundColor(Color.Orange)
.onClick(() => this.testInputValidation())
.margin({ bottom: 10 })
// 显示调用日志
List({ space: 5 }) {
ForEach(this.callLogs, (log: string) => {
ListItem() {
Text(log)
.fontSize(12)
.fontColor(Color.Gray)
}
})
}
.layoutWeight(1)
.width('100%')
}
.padding(20)
.width('100%')
.height('100%')
}
private launchServiceWorker(): void {
try {
this.workerInst = new worker.ThreadWorker("entry/ets/workers/ServiceWorker.ets");
// 在Worker中注册宿主线程的服务对象
this.workerInst.registerGlobalCallObject("hostServices", HostServiceProvider.sharedInstance);
// 设置消息接收回调
this.workerInst.onmessage = (e: MessageEvents) => {
const response = e.data;
this.addLog(`Worker响应: ${response}`);
};
this.addLog('服务Worker启动成功');
} catch (error) {
this.addLog(`Worker启动失败: ${error}`);
}
}
private testDataRetrieval(): void {
if (this.workerInst) {
this.workerInst.postMessage({ action: 'fetch_data', key: 'user_profile' });
this.addLog('发送数据获取请求');
}
}
private testInputValidation(): void {
if (this.workerInst) {
this.workerInst.postMessage({ action: 'validate_input', text: 'test_input' });
this.addLog('发送输入验证请求');
}
}
private addLog(log: string): void {
const timeStamp = new Date().toLocaleTimeString();
this.callLogs = [...this.callLogs, `[${timeStamp}] ${log}`];
// 保持日志数量合理
if (this.callLogs.length > 20) {
this.callLogs = this.callLogs.slice(1);
}
}
}
步骤2:Worker线程调用宿主线程方法
// ServiceWorker.ets
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
workerPort.onmessage = async (e: MessageEvents) => {
const request = e.data;
try {
if (request.action === 'fetch_data') {
await handleDataFetch(request.key);
} else if (request.action === 'validate_input') {
await handleInputValidation(request.text);
} else if (request.action === 'get_time') {
await handleTimeRequest();
} else {
workerPort.postMessage(`未知操作: ${request.action}`);
}
} catch (error) {
console.error(`Worker处理错误: ${error}`);
workerPort.postMessage(`处理失败: ${error.message}`);
}
};
// 处理数据获取
async function handleDataFetch(key: string): Promise<void> {
try {
console.info(`Worker: 开始获取数据 ${key}`);
// 同步调用宿主线程的数据获取方法
const result = workerPort.callGlobalCallObjectMethod("hostServices", "getDataInfo", key) as string;
console.info(`Worker: 数据获取结果 ${result}`);
workerPort.postMessage(`数据获取成功: ${result}`);
} catch (error) {
console.error(`数据获取失败: ${error}`);
workerPort.postMessage(`数据获取失败: ${error.message}`);
}
}
// 处理输入验证
async function handleInputValidation(inputText: string): Promise<void> {
try {
console.info(`Worker: 验证输入 ${inputText}`);
// 同步调用宿主线程的验证方法
const isValid = workerPort.callGlobalCallObjectMethod("hostServices", "validateInput", inputText) as boolean;
console.info(`Worker: 输入验证结果 ${isValid}`);
workerPort.postMessage(`输入验证: ${isValid ? '有效' : '无效'}`);
} catch (error) {
console.error(`输入验证失败: ${error}`);
workerPort.postMessage(`输入验证失败: ${error.message}`);
}
}
// 处理时间请求
async function handleTimeRequest(): Promise<void> {
try {
console.info('Worker: 获取当前时间');
// 同步调用宿主线程的时间获取方法
const currentTime = workerPort.callGlobalCallObjectMethod("hostServices", "getCurrentTime") as string;
console.info(`Worker: 时间获取结果 ${currentTime}`);
workerPort.postMessage(`时间信息: ${currentTime}`);
} catch (error) {
console.error(`时间获取失败: ${error}`);
workerPort.postMessage(`时间获取失败: ${error.message}`);
}
}
四、使用场景
| 场景特征 | 即时消息通信 | 同步方法调用 |
|---|---|---|
| 交互模式 | 异步,事件驱动 | 同步,方法调用 |
| 执行方式 | 非阻塞,继续执行 | 阻塞,等待结果 |
| 适用场景 | 状态通知、进度报告 | 数据查询、配置获取 |
| 复杂度 | 中等,需要消息协议 | 简单,直接方法调用 |
| 性能影响 | 较小,异步处理 | 较大,线程阻塞 |
五、总结
Worker 线程通信的核心:
-
双向通信:支持宿主线程与Worker线程的相互通信
-
同步调用:Worker可直接调用宿主线程中注册的方法
-
状态保持:Worker可长时间运行,维护内部状态
-
资源复用:一个Worker可处理多个相关任务
技术要点:
-
使用
postMessage()和onmessage实现异步消息通信 -
使用
registerGlobalCallObject()和callGlobalCallObjectMethod()实现同步方法调用 -
合理管理Worker生命周期,及时销毁不再使用的Worker
-
设计清晰的消息协议,确保通信的可靠性和可维护性
通过这两种通信机制,Worker能够与宿主线程进行灵活高效的交互,满足复杂应用场景。
更多推荐


所有评论(0)