我是兰瓶Coding,一枚刚踏入鸿蒙领域的转型小白,原是移动开发中级,如下是我学习笔记《零基础学鸿蒙》,若对你所有帮助,还请不吝啬的给个大大的赞~

前言

直说吧:要把“同一份业务数据”在多台设备上像影子一样跟着走,靠手搓 Socket 和自研协议,很容易把自己绕进沼泽。鸿蒙把这件事做成“工程配置 + 少量 API”的形态:把某些表设为分布式表,按需 sync(),或直接 remoteQuery(),跨设备的数据就能“像本地一样用”。本文按你的大纲来拆:分布式存储架构 → 冲突检测 → 数据一致性,并用 DistributedRdbStore / RdbStore同步回调(等价的 SyncCallback 形态) 把流程跑通。所有代码用 ArkTS,接口依据最新官方指南与 API。(华为开发者)

一、分布式存储架构:把“本地 RDB”升级成“可同步的 RDB”

1)核心角色与能力边界

  • RdbStore:你照常建库、建表、增删改查;只不过当某张表被标记为分布式表后,它具备“可跨设备同步”的资格。相关方法集中在 @kit.ArkDatarelationalStore 模块(过去叫 @ohos.data.relationalStore,新包名以 ArkData 聚合)。(华为开发者)
  • 分布式表:通过 store.setDistributedTables(['Todo','User']) 指定;同步粒度以表为单位。含复合主键(复合键)的表不可设置为分布式表,这是设计约束。(华为开发者)
  • 同步通道:系统在设备认证后建立加密通道,应用层只需发起 sync() 即可完成推/拉/双向同步;也可不落地、直接 remoteQuery() 读取远端数据视图。(华为开发者)
  • 变更订阅store.on('dataChange', type, cb) 既能订阅本地变更,也能订阅来自其它设备的变更(分布式变更);API 12 起还提供“本地详情”级别的订阅类型。(华为开发者)

小抄:一个应用最多同时打开 16 个分布式关系库;单库最多 8 个变更订阅回调。做中大型项目时要留心资源上限。(华为开发者)

2)最小落地:建库、设表、连权限、可同步

// 1) 权限(module.json5)
{
  "requestPermissions": [
    { "name": "ohos.permission.DISTRIBUTED_DATASYNC" }
  ]
}
// 2) 打开 RdbStore & 初始化分布式表
import { relationalStore } from '@kit.ArkData';
import type { UIAbilityContext } from '@kit.AbilityKit';

const CONFIG: relationalStore.StoreConfig = {
  name: 'sample.db',
  securityLevel: relationalStore.SecurityLevel.S1
};

async function openStore(ctx: UIAbilityContext) {
  const store = await relationalStore.getRdbStore(ctx, CONFIG);
  await store.execute(`CREATE TABLE IF NOT EXISTS Todo(
    id TEXT PRIMARY KEY,
    title TEXT NOT NULL,
    done INTEGER DEFAULT 0,
    updated_at INTEGER NOT NULL
  )`);
  await store.setDistributedTables(['Todo']);              // 标记为分布式表
  return store;
}
// 3) 订阅分布式变更(来自其它设备)
store.on('dataChange',
  // 不同版本有不同的订阅类型枚举;此处示意“订阅远端/分布式变更”
  relationalStore.SubscribeType.SUBSCRIBE_TYPE_REMOTE,
  (devices: string[]) => {
    console.info('remote changes from -> ' + JSON.stringify(devices));
    // 这里可以触发 UI 刷新或增量拉取
  }
);
// 4) 发起同步(双向)
const preds = new relationalStore.RdbPredicates('Todo');   // 可加 where 条件限制同步范围
await store.sync(relationalStore.SyncMode.PUSH_PULL, preds, (results) => {
  // 等价“SyncCallback”:返回 [deviceId, code] 数组
  console.info('sync results -> ' + JSON.stringify(results));
});

上面涉及的 setDistributedTables / sync / on('dataChange') / remoteQuery 都在官方“关系型数据库跨设备数据同步”与 RdbStore 接口页中清晰定义。(华为开发者)


二、冲突检测:从“插入冲突”到“并发更新”,怎么稳住?

分布式同步的两个常见冲突面:

  1. 主键冲突(插入):两台设备分别插入了相同主键(例如同一个业务 ID);
  2. 并发更新(修改):两台设备对同一行在不同时间更新了不同字段值

1)检测思路与事件入口

  • 在同步回调里看结果sync() 的回调会给出“目标设备 → 返回码”列表,便于统计哪台设备失败,需要重试或人工干预。(华为开发者)
  • 在数据变更订阅里做校验:订阅分布式变更后,本地可针对冲突行做额外校验(例如根据 id 做一次查询比对)。API 12 还支持“本地详情”型订阅,用于更精细的差异分析。(知乎专栏)

2)解决策略(工程可落地)

重点:分布式表不支持复合主键,所以别指望“设备 ID + 业务 ID 组成联合主键”来回避冲突。把分歧装进业务列(如 updated_atlast_writerversion),在应用层合并。(华为开发者)

  • LWW(最后写入优先):在表中维护 updated_at(毫秒时间戳)与 last_writer(设备或用户);入库冲突时比时间戳,晚者胜。

    • 实现点子:本地写之前就把 updated_at = Date.now();接到远端变更后,如果远端行 updated_at 更大,则覆盖本地。
  • 字段级合并(Merge by Field):对“可并列更新”的字段(例如待办 donetitle),可定义“只要有一个为真就为真”“拼接标签去重”等幂等合并函数

  • 数据库级冲突分流(API 18+):新版本提供 batchInsertWithConflictResolution(...),把“插入冲突时 REPLACE/IGNORE”这类策略交给底层执行(具体枚举以官方接口为准)。(华为开发者)

3)样例:一次“安全插入 + 并发合并”的写法

type Todo = { id: string; title: string; done: 0|1; updated_at: number; last_writer: string }

async function upsertTodo(store: relationalStore.RdbStore, row: Todo) {
  // 方式 A:走 SQL 的 ON CONFLICT(底层 SQLite 支持)
  await store.execute(`INSERT INTO Todo(id,title,done,updated_at,last_writer)
                       VALUES(?,?,?,?,?)
                       ON CONFLICT(id) DO UPDATE SET
                         title=excluded.title,
                         done=excluded.done,
                         updated_at=excluded.updated_at,
                         last_writer=excluded.last_writer`,
    [row.id, row.title, row.done, row.updated_at, row.last_writer]);
  // 方式 B:API 18+ 可调用 batchInsertWithConflictResolution(按文档枚举传参)。:contentReference[oaicite:11]{index=11}
}

小建议:把合并策略封成纯函数并做单测,订阅到分布式变更或 sync() 结束后统一走一遍“合并 + 幂等写回”。这比在各处零散地 if-else 更稳。


三、数据一致性:不是“强一致”,而是“按需拉 + 事件感知 + 最终一致”

官方定位很明确:RDB 的跨设备同步是按需触发的能力(你调 sync() 或开启自动策略),配合变更订阅来感知其它设备写入,从而在体验上达成“最终一致”。再加上一个“随时直读对端”的后门:remoteQuery()。(华为开发者)

1)三条“拿数据”的路径

  • 直接读远端(不落地)remoteQuery(deviceId, table, predicates, columns) 用于“只看一下对端表”的场景,比如“看看另一台设备的播放列表”。(华为开发者)
  • 双向同步后本地读sync(PUSH_PULL, preds, cb) 先把两边对齐,再走普通 query/executeSql。(华为开发者)
  • 事件驱动的增量拉取:订阅 dataChange,一旦别的设备改了本表,就筛选条件做一次增量 PULLremoteQuery(),仅处理“可能受影响”的那部分数据。(华为开发者)

2)可观测与长链路:别做“黑箱同步”

RdbStore 还提供一些同步与统计相关的事件(例如自动同步进度 on('autoSyncProgress')、性能统计 on('statistics') 等,视版本而定),配合 sync() 的“设备返回码列表”,你可以把“开始→进度→结束→异常”做成链路埋点与 UI 提示。(华为开发者)

3)端到端样例:带“最终一致”保障的小流水线

// 订阅分布式变更 → 触发增量拉
store.on('dataChange',
  relationalStore.SubscribeType.SUBSCRIBE_TYPE_REMOTE,
  async (_devices: string[]) => {
    const preds = new relationalStore.RdbPredicates('Todo')
      .greaterThan('updated_at', lastSyncedAt);  // 只拉需要的
    // 也可以直接 remoteQuery 某台设备
    await store.sync(relationalStore.SyncMode.PULL, preds, (results) => {
      console.info('pull after remote change -> ' + JSON.stringify(results));
    });
    lastSyncedAt = Date.now();
  }
);

// 主动双向对齐(如用户点击“同步”按钮)
async function fullSync() {
  const preds = new relationalStore.RdbPredicates('Todo'); // 全表或带条件
  await store.sync(relationalStore.SyncMode.PUSH_PULL, preds, (r) => {
    console.info('full sync done -> ' + JSON.stringify(r));
  });
}

4)事务 & 幂等:一致性的工程抓手

  • 把“合并 + 写回”放进事务beginTransaction/commit/rollback 或新版 createTransaction(),避免半途失败留下“幽灵数据”。(华为开发者)
  • 幂等更新:按主键 id 合并,二次执行不会损坏结果;必要时加“版本号 + 时间戳”双保险。
  • 失败重试:对 sync() 的失败设备做指数退避;对 remoteQuery() 的网络错误做友好降级。

四、完整示例:多设备 Todo 的“建—写—订阅—同步—合并”

// 0) 打开库 + 分布式表(同上 openStore)
// 1) 写入一条并触发同步
async function addTodo(store: relationalStore.RdbStore, title: string, deviceId: string) {
  const row = {
    id: `${Date.now()}_${Math.random().toString(16).slice(2)}`,
    title, done: 0,
    updated_at: Date.now(),
    last_writer: deviceId
  };
  await upsertTodo(store, row); // 上文的幂等 upsert
  const preds = new relationalStore.RdbPredicates('Todo').equalTo('id', row.id);
  await store.sync(relationalStore.SyncMode.PUSH, preds, (ret) => {
    console.info('push new row -> ' + JSON.stringify(ret));
  });
}

// 2) 订阅来自远端的变更,做一次“LWW 合并”
store.on('dataChange',
  relationalStore.SubscribeType.SUBSCRIBE_TYPE_REMOTE,
  async () => {
    // 简化:把 “updated_at > lastSyncedAt” 的记录拉到本地
    const preds = new relationalStore.RdbPredicates('Todo')
      .greaterThan('updated_at', lastSyncedAt);
    await store.sync(relationalStore.SyncMode.PULL, preds, () => {});
    lastSyncedAt = Date.now();
});

上述模式把“写本地 → 细粒度 PUSH”与“订阅远端变更 → 增量 PULL”结合起来,体验上很接近“实时”,成本却远小于自研链路。接口出处见官方指南与 API。(华为开发者)


五、实战清单 & 踩坑提示(真的能少掉很多坑)

  • 表设计

    • 分布式表不能用复合主键;必要时把设备信息放进业务列(last_writer),不要拼联合主键。(华为开发者)
    • 预留 updated_atversion 等列,给合并算法留抓手。
  • 同步与查询

    • “看一眼就走”的场景尽量用 remoteQuery();需要本地可离线则 sync()。(华为开发者)
    • 同步范围用 RdbPredicates 精确控制(按时间、按用户、按分区)。
  • 回调与可观测

    • sync()设备返回码打点;结合 on('autoSyncProgress')on('statistics') 做进度与耗时观测(按你所用 API 版本提供的事件为准)。(华为开发者)
  • 资源上限

    • 打开库 ≤ 16 个;变更订阅回调 ≤ 8 个;超限会直接失败。(华为开发者)
  • 权限与用户授权

    • 记得申请并在首启时向用户弹窗授权 ohos.permission.DISTRIBUTED_DATASYNC,跨设备同步不生效。c(华为开发者)—

六、把“DistributedRdbStore / SyncCallback”放到位

  • 历史资料里常见 DistributedRdbStore 的说法;在当前 ArkData 形态下,你实际拿到的是 relationalStore.RdbStore,通过 setDistributedTables/sync/remoteQuery 实现“分布式能力”。c(华为开发者)
  • SyncCallback”在接口形态上体现为 sync(mode, preds, (results) => {...}) 的回调(数组元素是 [deviceId, code]),另配 on('dataChange', ...) 事件流转。你也可以把两者统一封装成自己的 SyncCallback 接口,便于工程内复用与测试。c(华为开发者)—

七、收束:一致性的“组合拳”

分布式 RDB 的核心心法,不是去追“强一致”,而是把同步触发(sync)+ 远端直查(remoteQuery)+ 变更订阅(dataChange)+ 幂等合并(LWW/字段合并/冲突策略)打成组合拳。它让你在“最终一致”的工程边界里,做出接近实时的用户体验,同时能量化每一步(有回调、有事件、有统计)。下次当你犹豫“要不要自建同步服务”时,不妨反问自己:**有一套现成的分布式 RDB,为什么不拿来就用?**😉

参考(官方为主)

  • 关系型数据库跨设备数据同步:架构、能力、接口(setDistributedTables/sync/on/remoteQuery/约束)。c(华为开发者)
  • RdbStore 接口参考sync/remoteQuery/setDistributedTables/订阅事件/统计/事务等(含 12+、18+ 增强项)。c(华为开发者)
  • 订阅类型新增说明(API 12):本地详情订阅类型。c(知乎专栏)要不要我把上面的样例整理成可跑的 Demo 页面(带“选择设备、双向同步、冲突调解可视化”)?点个头,我就把工程骨架和封装都给你备好~ 🚀

(未完待续)

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐