我是兰瓶Coding,一枚刚踏入鸿蒙领域的转型小白,原是移动开发中级,如下是我学习笔记《零基础学鸿蒙》,若对你所有帮助,还请不吝啬的给个大大的赞~

前言

我吃过不少“单机当牛马”的亏:一台手机,既要抓网络、又要做重算,列表还要丝滑滚,结果就是——UI 抽风耗电飙升用户差评。后来我把鸿蒙的分布式家族认真抠了个底儿掉:DeviceManager 负责“认识谁能干活”、DistributedScheduler 顶着“把谁叫过来干”、再加上“跨端协同计算”的回路接上,整套调度就从“单机苦力”升到“多机协作”。这篇,我按你给的路径排兵布阵:任务分发 → 设备发现 → 跨端协同计算,中间穿插可直接改抄的 ArkTS/ArkUI 代码,力求能跑、好拆、好维护
  我会尽量说人话,但专业细节一个都不藏。偶尔吐槽两句,也是我半夜掉过的坑、交过的学费。走起。

一、开场白:先把“调度”说清楚

“分布式任务调度”在端上是什么?一句话:把一份工作拆成“可描述的任务”,并且把它交给“最合适的设备”去执行。合适的标准可以是:CPU/GPU 能力、网络、是否充电、是否闲置、是否同账号可信、是否在同一局域网,等等。
  而在鸿蒙语境里,两件事是地基

  • 设备发现与可信关系:靠 DeviceManager 获取本地/可信设备、监听上下线、做认证。它是“我有谁能用”的来源。
  • 跨端启动/调用/迁移:分布式任务调度子系统(DistributedScheduler,DMS)的职责是“把远端组件拉起来、让它干活”。从开发者视角,常见就是跨设备启动 Ability / 连接会话 / 迁移等,属于“把执行权交出去”的那只手。

小结:DeviceManager 决定“人选”,DistributedScheduler 决定“调度动作”。配合一条数据通路(比如分布式数据对象、跨端 Ability 连接、或你自建的网络 RPC),就能闭环。

二、任务分发:从“任务模型”到“把人叫来干活”

2.1 先定任务模型(Task Model)

所有调度系统第一步,都是把任务定义“可传输、可落地、可重试”。一个轻量的 ArkTS 版 Task 接口:

// task/model.ts
export type TaskType = 'TEXT_SUMMARY' | 'THUMBNAIL' | 'VEC_SEARCH'

export interface Task {
  id: string               // 全局唯一ID(雪花、UUID都行)
  type: TaskType
  payload: any             // 任务输入(JSON可序列化)
  priority?: number        // 0-100
  deadlineMs?: number      // 软截止
  retry?: { times: number; backoffMs: number }
}

export interface TaskResult {
  id: string
  ok: boolean
  output?: any
  error?: string
  execDeviceId?: string    // 哪台设备干的活
  durationMs?: number
}

为什么要这么“俗套”

  • 可序列化:跨设备传输必须 JSON 化。
  • 可重试:移动网络真会“抽风”。
  • 可观测:谁执行、耗时多少、失败原因,都是线上排障的命根子。

2.2 分发器(Dispatcher):把“可执行的 Want”装出来

在鸿蒙上把“远端能力拉起来”,从 ArkTS 开发者视角,核心动作其实是构造一个含远端 deviceId 的 Want,然后 startAbility(或创建跨端连接会话),DMS 在下层完成“调度”。远端可以是 UIAbilityServiceExtensionAbility(做计算更合适)。重点是 Want 中携带 deviceId/bundle/abilityName 等信息:这一步属于“分发动作”的具体落点。DMS 的开源实现描述了它“跨设备组件管理(启动/迁移/绑定)”的职责边界。

// dispatcher/remote.ts
// 简化:把任务封到 Want 里,发给远端ServiceExtension执行
interface RemoteTarget {
  deviceId: string
  bundleName: string
  abilityName: string // ServiceExtensionAbility 名
}

function buildTaskWant(target: RemoteTarget, task: Task): Want {
  return {
    deviceId: target.deviceId,
    bundleName: target.bundleName,
    abilityName: target.abilityName,
    parameters: { task: JSON.stringify(task) },
  } as Want
}

export async function dispatchToRemote(ctx: UIAbilityContext, target: RemoteTarget, task: Task) {
  const want = buildTaskWant(target, task)
  // 分布式调度:跨端启动远端能力
  await ctx.startAbility(want) // 由 DMS 完成远端拉起
  // 数据回路你可以选择:1) ability 连接会话收发文本;2) 分布式数据对象;3) 自建RPC
}

说明:上面用了“带 deviceId 的 Want + startAbility”作为分发落地方式,这符合“分布式调度跨设备启动组件”的范式;具体 API 命名/版本以官方文档为准,但概念不变分发动作=构造远端 Want + 启动/连接

2.3 会话通道:如何把结果拿回来?

通道有三条常用路:

  1. ability 连接会话:HarmonyOS 提供了跨设备 UIAbility 会话管理接口(abilityConnectionManager),可以在会话内进行文本消息传递(API 18+,同账号协同拉起)。做请求-响应非常顺手。
  2. 分布式数据对象(DDO):把结果写入某个共享 DataObject,另一端订阅变更即可,适合多对多协同
  3. 自建网络 RPC:走 HTTP/WebSocket/自研协议,伸缩自如,但要自己打理安全与重试。

一个“能力会话”风格的最小骨架(伪代码,聚焦思路)

// channel/session.ts
import abilityConnMgr from '@ohos.distributed-abilityconnectionmanager' // 名称以文档为准

export async function requestResponseOnce(target: RemoteTarget, payload: string): Promise<string> {
  // 1) 创建会话(同账号&组网前提)
  const sessionId = await abilityConnMgr.createAbilityConnectionSession({ serviceName: 'dms-compute' })
  // 2) 连接到远端 Ability
  await abilityConnMgr.connectAbility(sessionId, {
    deviceId: target.deviceId,
    bundleName: target.bundleName,
    abilityName: target.abilityName
  })
  // 3) 发送请求
  await abilityConnMgr.send(sessionId, payload)
  // 4) 等待响应(可加超时)
  const resp = await abilityConnMgr.receive(sessionId, { timeoutMs: 8000 })
  // 5) 断开 & 释放
  await abilityConnMgr.disconnectAbility(sessionId)
  await abilityConnMgr.closeAbilityConnectionSession(sessionId)
  return resp
}

要点:官方文档明确该模块用于创建协同会话并跨设备拉起/连接 UIAbility,并进行文本传递(API 18+)。命名可能会有小差异,但“会话→连接→消息→释放”这条链路是固定的。


三、设备发现:把“谁能干活”摸清楚

没有设备清单和可信关系,谈什么调度?DeviceManager 就是我们的人事部:发现、认证、监听、查询可信设备。官方文档(ArkTS API)写得清清楚楚:你可以注册上下线监听、发现周边不可信设备、发起认证、拿可信列表、拿本机信息,这一切都在分布式管理服务(Distributed Service Kit)里。

3.1 一个实用的 DeviceManager 封装

// device/dm.ts
import { distributedDeviceManager as ddm } from '@kit.DistributedServiceKit' // 按当前SDK导入

type Device = { deviceId: string; deviceName: string; deviceType: string }

export class DeviceDirectory {
  private dm!: ddm.DeviceManager
  private devices: Device[] = []

  async init(bundleName: string) {
    this.dm = ddm.createDeviceManager(bundleName)
    // 上下线监听
    this.dm.on('deviceStateChange', (data) => {
      // data 里会有设备id/状态;正式代码里做diff并刷新缓存
      this.refreshTrusted()
    })
    await this.refreshTrusted()
  }

  async refreshTrusted() {
    const list = this.dm.getTrustedDeviceListSync() // 同步拿可信列表
    this.devices = list.map(it => ({
      deviceId: it.deviceId,
      deviceName: it.deviceName,
      deviceType: String(it.deviceType)
    }))
  }

  // 发现周边设备(不可信→可发起认证流程)
  startDiscovery() {
    this.dm.startDeviceDiscovery({ subscribeId: 9527 }) // 订阅ID自定义
  }
  stopDiscovery() { this.dm.stopDeviceDiscovery(9527) }

  // 认证(用户侧会有配对/确认)
  async authenticate(deviceId: string) {
    await this.dm.authenticateDevice({ deviceId, authType: 1 /* 示例 */ })
    await this.refreshTrusted()
  }

  getTrusted(): Device[] { return this.devices }
}

关键事实:DeviceManager 提供发现不可信设备、认证、查询可信列表、监听上下线等核心能力,是“跨端协同”的入场券。

3.2 把“人选”挑出来:调度选择策略

有了清单,还得挑人。最小可用的“选择器”:

// device/pick.ts
type ScoreRule = (d: Device) => number

export function pickBest(devs: Device[], rules: ScoreRule[]): Device | null {
  let best: Device | null = null
  let bestScore = -1
  for (const d of devs) {
    const s = rules.reduce((acc, r) => acc + r(d), 0)
    if (s > bestScore) { best = d; bestScore = s }
  }
  return best
}

规则示例:

  • 同账户 & 同一局域网(这通常由系统协同前提保证)+50;
  • 平板/电视优先(CPU/GPU 更强)+20;
  • 设备名包含“office”(自定义偏好)+10;
  • 最近 5 分钟无任务 +10;
  • 电量 > 60 且在充电 +10。

别忘了:失败兜底要回落到本地执行,或者切到“第二人选”。这才是真正“健壮”的调度。


四、跨端协同计算:把“算力”拎到对的设备上

4.1 远端执行的基本套路

远端我们准备一个 ServiceExtensionAbility(名字就叫 ComputeServiceExtAbility 吧),被拉起后从 Want 里把任务 JSON 拿出来,执行,并通过会话/数据对象/网络把结果回传。

(远端)执行器骨架

// service/ComputeServiceExtAbility.ets
import { AbilityConstant, ServiceExtensionAbility } from '@ohos.app.ability'

export default class ComputeServiceExtAbility extends ServiceExtensionAbility {
  onStart(want: Want) {
    // 取任务
    const taskJson = want?.parameters?.task as string
    if (!taskJson) { console.error('No task payload'); return }
    const task: Task = JSON.parse(taskJson)
    // 异步执行
    this.runTask(task).then(res => {
      // 1) 写入分布式数据对象;或
      // 2) 通过会话发送回执;或
      // 3) 发HTTP回源
      console.info('task done', res.id)
    }).catch(e => console.error(e))
  }

  private async runTask(task: Task): Promise<TaskResult> {
    const t0 = Date.now()
    try {
      let output: any
      switch (task.type) {
        case 'TEXT_SUMMARY':
          output = summarize(task.payload.text); break
        case 'THUMBNAIL':
          output = await genThumb(task.payload.uri); break
        case 'VEC_SEARCH':
          output = await vecSearch(task.payload.query); break
        default:
          throw new Error('unknown task type')
      }
      return { id: task.id, ok: true, output, execDeviceId: getLocalDeviceId(), durationMs: Date.now()-t0 }
    } catch (e) {
      return { id: task.id, ok: false, error: String(e), execDeviceId: getLocalDeviceId(), durationMs: Date.now()-t0 }
    }
  }
}

这段落地到哪?让 DMS(DistributedScheduler) 把它“跨端启动”,你这边只需给 Want 填上 deviceId/bundle/abilityName 即可。它属于 DMS “远端组件启动/管理”的适用面。

4.2 结果回传:我更推荐“会话通道”或“DataObject”

方案 A:ability 连接会话(文本消息)
  官方文档指出可创建跨设备协同会话并进行文本消息传输;适合“请求-响应”。服务端拿到 sessionIdsend(),客户端 receive(),流程清爽。

方案 B:分布式数据对象(DDO)
  建立一个 TaskResult 的共享对象(或集合),服务端写、客户端订阅;多设备协同/掉线重连更友好。

4.3 调度策略:并发度、回退与幂等

  • 并发度:一次只派 N 个任务到远端,避免把对端打爆;对端也做小型队列。
  • 回退:远端 8 秒不回,就在本地跑(或换下一台)。
  • 幂等Task.id 作为幂等键;对端若收到重复任务,直接返回“已有结果”。
  • 观测:记录 execDeviceId/耗时/失败码,下一次“挑人”可以更聪明。

把“三板斧”写进代码(主端发起)

// orchestrator/runner.ts
import { dispatchToRemote } from '../dispatcher/remote'
import { requestResponseOnce } from '../channel/session'
import { DeviceDirectory } from '../device/dm'
import { pickBest } from '../device/pick'

export class Orchestrator {
  constructor(private ctx: UIAbilityContext, private dm: DeviceDirectory) {}

  async execute(task: Task): Promise<TaskResult> {
    const devs = this.dm.getTrusted()
    const best = pickBest(devs, [/* 你的评分规则 */])
    if (!best) return this.runLocally(task, 'NO_DEVICE')

    try {
      await dispatchToRemote(this.ctx, {
        deviceId: best.deviceId,
        bundleName: 'com.example.compute',
        abilityName: 'ComputeServiceExtAbility'
      }, task)

      // 等待回执(会话通道的简单形态)
      const resp = await withTimeout(
        requestResponseOnce({ deviceId: best.deviceId, bundleName: 'com.example.compute', abilityName: 'ComputeServiceExtAbility' },
                            JSON.stringify({ type: 'FETCH_RESULT', id: task.id })), 
        8000, 'remote-timeout'
      )
      const res: TaskResult = JSON.parse(resp)
      return res
    } catch (e) {
      // 回退本地
      return this.runLocally(task, String(e))
    }
  }

  private async runLocally(task: Task, reason: string): Promise<TaskResult> {
    const t0 = Date.now()
    try {
      // 本地执行(同步或TaskPool)
      const output = await localExec(task)
      return { id: task.id, ok: true, output, execDeviceId: 'LOCAL', durationMs: Date.now()-t0 }
    } catch (e) {
      return { id: task.id, ok: false, error: `${reason}:${String(e)}`, execDeviceId: 'LOCAL', durationMs: Date.now()-t0 }
    }
  }
}

五、把“任务分发 → 设备发现 → 协同计算”串成可跑 Demo

5.1 UI:一个“分布式计算面板”

// pages/DistComputePanel.ets
@Entry
@Component
struct DistComputePanel {
  @State text: string = ''
  @State logs: string[] = []
  private dm = new DeviceDirectory()
  private oc!: Orchestrator

  aboutToAppear() {
    this.dm.init(getBundleName()).then(() => this.log('DeviceManager ready'))
    this.oc = new Orchestrator(getContext(this), this.dm)
  }
  private log(s: string) { this.logs = [s, ...this.logs].slice(0, 30) }

  build() {
    Column({ space: 12 }) {
      Row() {
        Button('发现设备').onClick(() => { this.dm.startDiscovery(); this.log('Start discovery') })
        Button('停止发现').onClick(() => { this.dm.stopDiscovery(); this.log('Stop discovery') })
        Blank()
        Button('刷新可信').onClick(() => { this.dm.refreshTrusted(); this.log('Refresh trusted') })
      }.padding(12)

      // 可信设备列表
      List() {
        ForEach(this.dm.getTrusted(), (d) => ListItem() {
          Row() {
            Text(`${d.deviceName} (${d.deviceType})`).fontWeight(FontWeight.Medium)
            Blank()
            Button('认证').onClick(async () => { await this.dm.authenticate(d.deviceId); this.log(`认证:${d.deviceName}`) })
          }.padding(8)
        }, d => d.deviceId)
      }.height(160)

      TextArea({ placeholder: '贴点文本来个跨端摘要?' }).height(120).onChange(v => this.text = v)

      Row({ space: 12 }) {
        Button('分布式摘要').onClick(async () => {
          const task: Task = {
            id: `${Date.now()}`,
            type: 'TEXT_SUMMARY',
            payload: { text: this.text },
            retry: { times: 1, backoffMs: 400 }
          }
          const res = await this.oc.execute(task)
          if (res.ok) this.log(`${res.execDeviceId} 用时 ${res.durationMs}ms`)
          else this.log(`${res.error}`)
        })
      }.padding(12)

      Text('Logs').fontWeight(FontWeight.Bold)
      List() {
        ForEach(this.logs, (s, i) => ListItem() { Text(s).fontSize(12) }, (s, i) => String(i))
      }.height(160)
    }.padding(12).height('100%')
  }
}

体验走向:已经建立可信关系的平板/电视优先“吃活”,手机 UI 丝滑不抖;一旦远端不回,秒回落本机。这就是分布式调度给用户“无感”的那点魔法


六、工程抛光:权限、安全、可观测

  • 权限与前置条件

    • 跨设备协同通常要求同一华为账号登录、蓝牙/同一网络环境可发现,并在系统侧建立可信关系。这属于分布式管理/协同的通用前提。
    • API 版本差异:ability 连接会话属于较新的公开能力(API 18+),用前确认当前 SDK/API Level(文档中心有“版本标识”与系统能力说明)。
  • 安全

    • 只传必要数据,避免隐私越权;
    • 对端结果要签名/验参(哪怕是同账号设备);
    • 远端能力拉起要白名单控制(bundle/abilityName),拒绝“野设备”。
  • 可观测

    • 对每个任务记录:派发时间、派往设备、回执耗时、失败码;
    • 建一个健康面板(成功率、P95 时延、回退率、设备得分),调度策略靠数据说话。

七、跨端协同计算的“老江湖诀窍”

  1. 先做本地可跑,再加远端:把“执行器”做成同一接口,远端失败时直接落到本地。
  2. 任务切碎:小任务拼成批;短平快、可并行、好重试。
  3. 传指纹不传大对象:让远端按 URI 拉取素材;结果只回摘要或产物地址
  4. 会话通道文本化:文本(JSON)最稳,复杂二进制放到对象存储/共享数据再引用。
  5. 设备画像要动态:把成功率、耗时写回“分数”,让“挑人”进化。

八、常见坑位(我都中招过)

  • “我 startAbility 了,远端没起来?”

    • 看看 deviceId/bundle/abilityName 拼得对不对;
    • 设备是否同账号且已建立可信
    • 某些能力仅支持特定 API 级别,对照文档版本标识。
  • “能发现设备,但拿不到可信列表?”

    • 发现是不可信的第一步,还需认证才能进可信池;认证流程通常伴随系统 UI/确认。
  • “会话能连上,但收不到回应?”

    • 确认双方使用同一服务标识、消息格式吻合;
    • 记得释放会话,否则资源泄漏、后续连不上。

九、把方案“产品化”:三种落地档位

  • 轻量版:手机把重算丢给平板;回执失败就本地算。代价小、收益立竿见影
  • 协同版:手机分片下发到平板 + 电视双端并行;结果合并(Map-Reduce 味儿)。吞吐翻倍
  • 平台版:做“设备调度服务”(可含云端)+ 动态权重 + 灰度试验;跨团队/跨产品复用

十、附录:极简“文本摘要”示例的端端契约

请求(Task.payload):

{ "text": "请对这段文本做 120 字摘要……" }

响应(TaskResult.output):

{
  "summary": "……",
  "confidence": 0.87,
  "device": "HUAWEI PAD 11 2025"
}

端到端对齐“契约”,就不用在会上各执一词地吵“你们返回的是啥”。


结语:把“多设备”当你的“天然线程池”

我现在写端上复杂功能,脑子已自带一个小人儿:**这活儿,一定要手机干吗?**如果不是,那就交给“隔壁壮实的家伙”(平板/电视/笔记本)。
  分布式调度不是魔法,它就是工程化地把“谁干什么、在哪儿干、干完怎么算账”说清楚。DeviceManager 负责认识“人”、DistributedScheduler 负责指挥“干”、会话/数据对象负责“说话传账”。当你的项目从“单机苦力”升级为“团队协作”,你会明显感觉到:UI 更稳、功耗更友好、用户更快乐
  下次需求评审,产品说“这段超重计算能不能不卡?”你就笑笑:
“不用卡。分出去干。”


参考与进一步阅读

  • 分布式设备管理(@ohos.distributedDeviceManager / Distributed Service Kit):官方 ArkTS API,覆盖设备发现/认证/可信列表/本机信息/上下线监听等核心能力。
  • 文档中心 & API 版本说明:查看系统能力(SysCap)API 等级是否支持元服务/卡片等信息,做版本适配。
  • 分布式任务调度(DMS)开源仓 / 能力说明:底层跨设备组件管理(启动/迁移/绑定)职责与功能概览。
  • ability 连接会话(AbilityConnectionManager)创建跨设备会话并传递文本消息(API 18+),便于“请求-响应”式协同。
  • 分布式数据对象(DataObject):端端共享数据、订阅变更,适合“多对多协同”。

注:示例中的模块导入名/函数名以你项目所用 HarmonyOS SDK 版本为准;概念与链路保持稳定(设备发现→分发启动→通道回执)。在接入前先对照当前 API 文档页的版本标识系统能力提示,避免“在不支持的 API Level 上用新功能”的尴尬。

(未完待续)

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐