鸿蒙 EventBus 与 Message 通信机制详解

适用版本:HarmonyOS NEXT / API 12+
语言:ArkTS


一、为什么需要事件通信?

在鸿蒙应用开发中,组件之间、页面之间、线程之间经常需要传递数据或通知状态变化。直接持有引用会造成强耦合,因此需要专门的通信机制。

鸿蒙提供了两套主流方案:

机制 核心 API 适用场景
EventBus(事件总线) @ohos.events.emitter / context.eventHub 同进程内组件、页面之间解耦通信
Message(消息传递) Worker postMessage / TaskPool 主线程与子线程之间的跨线程通信

二、EventBus:进程内事件总线

2.1 Emitter —— 全局事件总线

emitter 是鸿蒙提供的内置事件总线,基于发布-订阅模式,订阅者和发布者完全解耦。

引入方式

import { emitter } from '@kit.BasicServicesKit';
订阅事件
// 定义事件 ID(number 类型)
const LOGIN_SUCCESS_EVENT: emitter.InnerEvent = {
  eventId: 1001,
};

// 订阅(持久监听)
emitter.on(LOGIN_SUCCESS_EVENT, (data: emitter.EventData) => {
  const userId: string = data.data?.['userId'] as string ?? '';
  console.info(`收到登录成功事件,userId: ${userId}`);
});

// 订阅(只接收一次,自动取消)
emitter.once(LOGIN_SUCCESS_EVENT, (data: emitter.EventData) => {
  console.info('只接收一次的登录事件');
});
发布事件
// 发布事件并携带数据
emitter.emit(LOGIN_SUCCESS_EVENT, {
  data: {
    userId: 'user_123',
    nickname: '张三',
  }
});
取消订阅
// 取消对某个事件的所有订阅
emitter.off(1001);
带优先级发布
const event: emitter.InnerEvent = {
  eventId: 1002,
  priority: emitter.EventPriority.HIGH, // HIGH / LOW / IMMEDIATE / IDLE
};

emitter.emit(event, { data: { msg: '高优先级消息' } });

2.2 EventHub —— UIAbility 内部事件总线

EventHub 绑定在 UIAbilityContext 上,事件生命周期随 UIAbility 结束而结束,更适合 UIAbility 内部的页面间通信。

// 在 UIAbility 中发布
import { UIAbility, Want, AbilityConstant } from '@kit.AbilityKit';

export default class EntryAbility extends UIAbility {
  onForeground() {
    // 发布事件
    this.context.eventHub.emit('userLogout', { reason: '主动退出' });
  }
}
// 在页面中订阅
import { common } from '@kit.AbilityKit';

@Entry
@ComponentV2
struct ProfilePage {
  private context = getContext(this) as common.UIAbilityContext;

  aboutToAppear(): void {
    this.context.eventHub.on('userLogout', (data: Record<string, string>) => {
      console.info(`用户退出,原因:${data.reason}`);
    });
  }

  aboutToDisappear(): void {
    // 页面销毁时一定要取消订阅!
    this.context.eventHub.off('userLogout');
  }
}

2.3 自定义 EventBus(TypeScript 实现)

项目中也可以封装一个轻量 EventBus 单例:

type EventCallback = (data: object) => void;

class EventBus {
  private static instance: EventBus;
  private listeners: Map<string, EventCallback[]> = new Map();

  static getInstance(): EventBus {
    if (!EventBus.instance) {
      EventBus.instance = new EventBus();
    }
    return EventBus.instance;
  }

  on(event: string, callback: EventCallback): void {
    if (!this.listeners.has(event)) {
      this.listeners.set(event, []);
    }
    this.listeners.get(event)!.push(callback);
  }

  off(event: string, callback?: EventCallback): void {
    if (!callback) {
      this.listeners.delete(event);
      return;
    }
    const cbs = this.listeners.get(event) ?? [];
    this.listeners.set(event, cbs.filter(cb => cb !== callback));
  }

  emit(event: string, data: object = {}): void {
    const cbs = this.listeners.get(event) ?? [];
    cbs.forEach(cb => cb(data));
  }
}

export const eventBus = EventBus.getInstance();

使用:

// 订阅
eventBus.on('refresh', (data) => {
  console.info('刷新列表', JSON.stringify(data));
});

// 发布
eventBus.emit('refresh', { tabId: 'recommend' });

// 取消
eventBus.off('refresh');

三、Message:跨线程消息传递

Message 机制用于主线程与子线程之间通信,核心是 WorkerTaskPoolpostMessage / sendData

重要: ArkTS 中不同线程有各自独立的内存空间,不能共享对象引用,只能通过消息序列化传递数据。

3.1 Worker + postMessage

Worker 适合需要长期驻留、双向通信的子线程场景(如持续的 WebSocket 连接、音视频处理等)。

目录结构

entry/src/main/ets/
├── pages/
│   └── Index.ets          ← 主线程页面
└── workers/
    └── MyWorker.ets       ← Worker 子线程

Worker 子线程(MyWorker.ets)

import { worker, MessageEvents, ErrorEvent } from '@kit.ArkTS';

// 获取当前 Worker 的通信端口
const workerPort: worker.ThreadWorkerGlobalScope = worker.workerPort;

// 监听主线程消息
workerPort.onmessage = (event: MessageEvents) => {
  const data = event.data as Record<string, string>;
  console.info(`[Worker] 收到主线程消息: ${JSON.stringify(data)}`);

  // 处理耗时任务...
  const result = heavyTask(data.input);

  // 回传结果给主线程
  workerPort.postMessage({ result, status: 'success' });
};

workerPort.onerror = (err: ErrorEvent) => {
  console.error(`[Worker] 错误: ${err.message}`);
};

function heavyTask(input: string): string {
  // 模拟耗时计算
  return input.toUpperCase();
}

主线程(Index.ets)

import { worker, MessageEvents } from '@kit.ArkTS';

@Entry
@ComponentV2
struct Index {
  private myWorker: worker.ThreadWorker | null = null;
  @Local result: string = '';

  aboutToAppear(): void {
    // 创建 Worker
    this.myWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');

    // 监听 Worker 回传的消息
    this.myWorker.onmessage = (event: MessageEvents) => {
      const data = event.data as Record<string, string>;
      this.result = data.result;
      console.info(`[主线程] Worker 回传: ${this.result}`);
    };
  }

  aboutToDisappear(): void {
    // 页面销毁时终止 Worker,释放资源
    this.myWorker?.terminate();
  }

  build() {
    Column({ space: 16 }) {
      Text(`处理结果: ${this.result}`)
      Button('发送任务给 Worker')
        .onClick(() => {
          // 向 Worker 发送消息
          this.myWorker?.postMessage({ input: 'hello harmony' });
        })
    }
    .width('100%')
    .height('100%')
    .justifyContent(FlexAlign.Center)
  }
}

3.2 TaskPool + sendData(推荐)

TaskPool 是更现代的方案,适合一次性并发任务,由系统自动管理线程池,开发者无需手动创建/销毁线程。

import { taskpool } from '@kit.ArkTS';

// 定义任务函数(必须是顶层函数或静态方法,不能是箭头函数)
@Concurrent
function processData(input: string): string {
  // 在子线程中执行
  return input.split('').reverse().join('');
}

// 主线程调用
async function runTask(): Promise<void> {
  const task = new taskpool.Task(processData, 'hello');
  
  // 等待任务完成并获取结果
  const result: string = await taskpool.execute(task) as string;
  console.info(`任务结果: ${result}`); // 输出: olleh
}

TaskPool 发送中间消息(sendData)

当任务需要在执行过程中不断向主线程汇报进度时:

// 子线程任务
@Concurrent
function downloadFile(url: string): void {
  for (let i = 0; i <= 100; i += 10) {
    // 向主线程发送进度
    taskpool.Task.sendData(i);
    // 模拟耗时
  }
}

// 主线程
const task = new taskpool.Task(downloadFile, 'https://example.com/file.zip');

// 注册进度回调
task.onReceiveData((progress: number) => {
  console.info(`下载进度: ${progress}%`);
});

taskpool.execute(task);

四、EventBus vs Message 核心区别

对比维度 EventBus(emitter / EventHub) Message(Worker / TaskPool)
通信范围 同一线程(主线程)内的组件/页面间 主线程 ↔ 子线程(跨线程)
数据共享 可直接传递对象引用 数据需要可序列化,不能传引用
生命周期 需手动取消订阅,否则内存泄漏 Worker 需手动 terminate,TaskPool 自动管理
使用场景 页面刷新、登录状态同步、Tab切换通知 图片处理、加密运算、大数据解析等耗时任务
是否阻塞 UI 不阻塞(同线程但异步回调) 不阻塞(任务在子线程执行)
线程安全 单线程,无需考虑 需注意数据竞争,ArkTS 通过隔离内存保证安全
复杂度 低,几行代码即可使用 较高,需要额外的 Worker 文件

五、选型建议

需要通信?
    │
    ├── 是否涉及耗时计算(>16ms 会卡 UI)?
    │       ├── 是 → 使用 TaskPool(一次性任务)或 Worker(持续任务)
    │       └── 否 → 继续往下
    │
    ├── 是否跨 UIAbility?
    │       ├── 是 → 使用 emitter(全局)
    │       └── 否 → 同一 UIAbility 内
    │
    └── 是否需要随 UIAbility 自动清理?
            ├── 是 → 使用 EventHub(context.eventHub)
            └── 否 → 使用 emitter 或自定义 EventBus

典型场景对照

场景 推荐方案
用户登录后刷新首页 emitter.emit
购物车数量更新通知各页面 emitter 或自定义 EventBus
退出登录清理页面状态 context.eventHub.emit
大图压缩/格式转换 TaskPool
实时音视频数据处理 Worker
WebSocket 长连接管理 Worker(常驻子线程)
批量数据加密上传 TaskPool(并发多任务)

六、常见坑点

EventBus 篇

坑 1:忘记取消订阅导致内存泄漏

// ❌ 错误:aboutToDisappear 中没有取消订阅
aboutToAppear(): void {
  emitter.on({ eventId: 1001 }, () => { ... });
}

// ✅ 正确:配对 off
aboutToAppear(): void {
  emitter.on({ eventId: 1001 }, this.onEvent);
}
aboutToDisappear(): void {
  emitter.off(1001);
}

坑 2:eventId 冲突

全局使用 emitter 时,不同模块可能使用相同的 eventId,建议统一在常量文件中定义:

// events/EventIds.ts
export const EventIds = {
  LOGIN_SUCCESS: 1001,
  CART_UPDATE: 1002,
  REFRESH_HOME: 1003,
};

Message 篇

坑 3:Worker 中不能使用 UI 相关 API

Worker 线程没有 UI 上下文,不能调用 routerpromptActionAppStorage 等 API。

坑 4:传递不可序列化的对象

// ❌ 错误:传递了含函数的对象
worker.postMessage({ callback: () => {} }); // 报错

// ✅ 正确:只传可序列化的纯数据
worker.postMessage({ userId: '123', action: 'refresh' });

坑 5:@Concurrent 函数不能捕获外部变量

const prefix = 'hello'; // 外部变量

// ❌ 错误:@Concurrent 函数不能闭包捕获外部变量
@Concurrent
function badTask(): string {
  return prefix + ' world'; // 编译报错
}

// ✅ 正确:通过参数传入
@Concurrent
function goodTask(prefix: string): string {
  return prefix + ' world';
}

taskpool.execute(new taskpool.Task(goodTask, prefix));

七、总结

  • EventBus(emitter / EventHub) 本质是主线程内的发布-订阅模式,解决的是组件解耦问题,代码简单,但要注意及时取消订阅。
  • Message(Worker / TaskPool) 本质是跨线程通信,解决的是耗时任务不阻塞 UI 的问题,数据需要可序列化。

两者并不互斥,实际项目中常常配合使用:Worker 在子线程完成计算后,通过 postMessage 回传结果,主线程再通过 emitter 通知相关组件更新 UI。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐