一张表,怎么同时活在两台设备里?”——分布式数据库在多设备同步中的应用
本文介绍了鸿蒙系统分布式数据存储的核心机制与应用实践。首先从架构层面解析了RdbStore与分布式表的关系,通过setDistributedTables()标记可同步表,并利用sync()和remoteQuery()实现跨设备数据同步。其次探讨了冲突处理策略,包括LWW(最后写入优先)和字段级合并等方法应对主键冲突和并发更新问题。最后阐述了鸿蒙分布式数据"最终一致性"的实现方式
我是兰瓶Coding,一枚刚踏入鸿蒙领域的转型小白,原是移动开发中级,如下是我学习笔记《零基础学鸿蒙》,若对你所有帮助,还请不吝啬的给个大大的赞~
前言
直说吧:要把“同一份业务数据”在多台设备上像影子一样跟着走,靠手搓 Socket 和自研协议,很容易把自己绕进沼泽。鸿蒙把这件事做成“工程配置 + 少量 API”的形态:把某些表设为分布式表,按需 sync(),或直接 remoteQuery(),跨设备的数据就能“像本地一样用”。本文按你的大纲来拆:分布式存储架构 → 冲突检测 → 数据一致性,并用 DistributedRdbStore / RdbStore 与 同步回调(等价的 SyncCallback 形态) 把流程跑通。所有代码用 ArkTS,接口依据最新官方指南与 API。(华为开发者)
一、分布式存储架构:把“本地 RDB”升级成“可同步的 RDB”
1)核心角色与能力边界
- RdbStore:你照常建库、建表、增删改查;只不过当某张表被标记为分布式表后,它具备“可跨设备同步”的资格。相关方法集中在
@kit.ArkData的relationalStore模块(过去叫@ohos.data.relationalStore,新包名以 ArkData 聚合)。(华为开发者) - 分布式表:通过
store.setDistributedTables(['Todo','User'])指定;同步粒度以表为单位。含复合主键(复合键)的表不可设置为分布式表,这是设计约束。(华为开发者) - 同步通道:系统在设备认证后建立加密通道,应用层只需发起
sync()即可完成推/拉/双向同步;也可不落地、直接remoteQuery()读取远端数据视图。(华为开发者) - 变更订阅:
store.on('dataChange', type, cb)既能订阅本地变更,也能订阅来自其它设备的变更(分布式变更);API 12 起还提供“本地详情”级别的订阅类型。(华为开发者)
小抄:一个应用最多同时打开 16 个分布式关系库;单库最多 8 个变更订阅回调。做中大型项目时要留心资源上限。(华为开发者)
2)最小落地:建库、设表、连权限、可同步
// 1) 权限(module.json5)
{
"requestPermissions": [
{ "name": "ohos.permission.DISTRIBUTED_DATASYNC" }
]
}
// 2) 打开 RdbStore & 初始化分布式表
import { relationalStore } from '@kit.ArkData';
import type { UIAbilityContext } from '@kit.AbilityKit';
const CONFIG: relationalStore.StoreConfig = {
name: 'sample.db',
securityLevel: relationalStore.SecurityLevel.S1
};
async function openStore(ctx: UIAbilityContext) {
const store = await relationalStore.getRdbStore(ctx, CONFIG);
await store.execute(`CREATE TABLE IF NOT EXISTS Todo(
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
done INTEGER DEFAULT 0,
updated_at INTEGER NOT NULL
)`);
await store.setDistributedTables(['Todo']); // 标记为分布式表
return store;
}
// 3) 订阅分布式变更(来自其它设备)
store.on('dataChange',
// 不同版本有不同的订阅类型枚举;此处示意“订阅远端/分布式变更”
relationalStore.SubscribeType.SUBSCRIBE_TYPE_REMOTE,
(devices: string[]) => {
console.info('remote changes from -> ' + JSON.stringify(devices));
// 这里可以触发 UI 刷新或增量拉取
}
);
// 4) 发起同步(双向)
const preds = new relationalStore.RdbPredicates('Todo'); // 可加 where 条件限制同步范围
await store.sync(relationalStore.SyncMode.PUSH_PULL, preds, (results) => {
// 等价“SyncCallback”:返回 [deviceId, code] 数组
console.info('sync results -> ' + JSON.stringify(results));
});
上面涉及的
setDistributedTables / sync / on('dataChange') / remoteQuery都在官方“关系型数据库跨设备数据同步”与 RdbStore 接口页中清晰定义。(华为开发者)
二、冲突检测:从“插入冲突”到“并发更新”,怎么稳住?
分布式同步的两个常见冲突面:
- 主键冲突(插入):两台设备分别插入了相同主键(例如同一个业务 ID);
- 并发更新(修改):两台设备对同一行在不同时间更新了不同字段值。
1)检测思路与事件入口
- 在同步回调里看结果:
sync()的回调会给出“目标设备 → 返回码”列表,便于统计哪台设备失败,需要重试或人工干预。(华为开发者) - 在数据变更订阅里做校验:订阅分布式变更后,本地可针对冲突行做额外校验(例如根据
id做一次查询比对)。API 12 还支持“本地详情”型订阅,用于更精细的差异分析。(知乎专栏)
2)解决策略(工程可落地)
重点:分布式表不支持复合主键,所以别指望“设备 ID + 业务 ID 组成联合主键”来回避冲突。把分歧装进业务列(如
updated_at、last_writer、version),在应用层合并。(华为开发者)
-
LWW(最后写入优先):在表中维护
updated_at(毫秒时间戳)与last_writer(设备或用户);入库冲突时比时间戳,晚者胜。- 实现点子:本地写之前就把
updated_at = Date.now();接到远端变更后,如果远端行updated_at更大,则覆盖本地。
- 实现点子:本地写之前就把
-
字段级合并(Merge by Field):对“可并列更新”的字段(例如待办
done与title),可定义“只要有一个为真就为真”“拼接标签去重”等幂等合并函数。 -
数据库级冲突分流(API 18+):新版本提供
batchInsertWithConflictResolution(...),把“插入冲突时 REPLACE/IGNORE”这类策略交给底层执行(具体枚举以官方接口为准)。(华为开发者)
3)样例:一次“安全插入 + 并发合并”的写法
type Todo = { id: string; title: string; done: 0|1; updated_at: number; last_writer: string }
async function upsertTodo(store: relationalStore.RdbStore, row: Todo) {
// 方式 A:走 SQL 的 ON CONFLICT(底层 SQLite 支持)
await store.execute(`INSERT INTO Todo(id,title,done,updated_at,last_writer)
VALUES(?,?,?,?,?)
ON CONFLICT(id) DO UPDATE SET
title=excluded.title,
done=excluded.done,
updated_at=excluded.updated_at,
last_writer=excluded.last_writer`,
[row.id, row.title, row.done, row.updated_at, row.last_writer]);
// 方式 B:API 18+ 可调用 batchInsertWithConflictResolution(按文档枚举传参)。:contentReference[oaicite:11]{index=11}
}
小建议:把合并策略封成纯函数并做单测,订阅到分布式变更或
sync()结束后统一走一遍“合并 + 幂等写回”。这比在各处零散地 if-else 更稳。
三、数据一致性:不是“强一致”,而是“按需拉 + 事件感知 + 最终一致”
官方定位很明确:RDB 的跨设备同步是按需触发的能力(你调 sync() 或开启自动策略),配合变更订阅来感知其它设备写入,从而在体验上达成“最终一致”。再加上一个“随时直读对端”的后门:remoteQuery()。(华为开发者)
1)三条“拿数据”的路径
- 直接读远端(不落地):
remoteQuery(deviceId, table, predicates, columns)用于“只看一下对端表”的场景,比如“看看另一台设备的播放列表”。(华为开发者) - 双向同步后本地读:
sync(PUSH_PULL, preds, cb)先把两边对齐,再走普通query/executeSql。(华为开发者) - 事件驱动的增量拉取:订阅
dataChange,一旦别的设备改了本表,就筛选条件做一次增量PULL或remoteQuery(),仅处理“可能受影响”的那部分数据。(华为开发者)
2)可观测与长链路:别做“黑箱同步”
RdbStore 还提供一些同步与统计相关的事件(例如自动同步进度 on('autoSyncProgress')、性能统计 on('statistics') 等,视版本而定),配合 sync() 的“设备返回码列表”,你可以把“开始→进度→结束→异常”做成链路埋点与 UI 提示。(华为开发者)
3)端到端样例:带“最终一致”保障的小流水线
// 订阅分布式变更 → 触发增量拉
store.on('dataChange',
relationalStore.SubscribeType.SUBSCRIBE_TYPE_REMOTE,
async (_devices: string[]) => {
const preds = new relationalStore.RdbPredicates('Todo')
.greaterThan('updated_at', lastSyncedAt); // 只拉需要的
// 也可以直接 remoteQuery 某台设备
await store.sync(relationalStore.SyncMode.PULL, preds, (results) => {
console.info('pull after remote change -> ' + JSON.stringify(results));
});
lastSyncedAt = Date.now();
}
);
// 主动双向对齐(如用户点击“同步”按钮)
async function fullSync() {
const preds = new relationalStore.RdbPredicates('Todo'); // 全表或带条件
await store.sync(relationalStore.SyncMode.PUSH_PULL, preds, (r) => {
console.info('full sync done -> ' + JSON.stringify(r));
});
}
4)事务 & 幂等:一致性的工程抓手
- 把“合并 + 写回”放进事务:
beginTransaction/commit/rollback或新版createTransaction(),避免半途失败留下“幽灵数据”。(华为开发者) - 幂等更新:按主键
id合并,二次执行不会损坏结果;必要时加“版本号 + 时间戳”双保险。 - 失败重试:对
sync()的失败设备做指数退避;对remoteQuery()的网络错误做友好降级。
四、完整示例:多设备 Todo 的“建—写—订阅—同步—合并”
// 0) 打开库 + 分布式表(同上 openStore)
// 1) 写入一条并触发同步
async function addTodo(store: relationalStore.RdbStore, title: string, deviceId: string) {
const row = {
id: `${Date.now()}_${Math.random().toString(16).slice(2)}`,
title, done: 0,
updated_at: Date.now(),
last_writer: deviceId
};
await upsertTodo(store, row); // 上文的幂等 upsert
const preds = new relationalStore.RdbPredicates('Todo').equalTo('id', row.id);
await store.sync(relationalStore.SyncMode.PUSH, preds, (ret) => {
console.info('push new row -> ' + JSON.stringify(ret));
});
}
// 2) 订阅来自远端的变更,做一次“LWW 合并”
store.on('dataChange',
relationalStore.SubscribeType.SUBSCRIBE_TYPE_REMOTE,
async () => {
// 简化:把 “updated_at > lastSyncedAt” 的记录拉到本地
const preds = new relationalStore.RdbPredicates('Todo')
.greaterThan('updated_at', lastSyncedAt);
await store.sync(relationalStore.SyncMode.PULL, preds, () => {});
lastSyncedAt = Date.now();
});
上述模式把“写本地 → 细粒度 PUSH”与“订阅远端变更 → 增量 PULL”结合起来,体验上很接近“实时”,成本却远小于自研链路。接口出处见官方指南与 API。(华为开发者)
五、实战清单 & 踩坑提示(真的能少掉很多坑)
-
表设计
- 分布式表不能用复合主键;必要时把设备信息放进业务列(
last_writer),不要拼联合主键。(华为开发者) - 预留
updated_at、version等列,给合并算法留抓手。
- 分布式表不能用复合主键;必要时把设备信息放进业务列(
-
同步与查询
- “看一眼就走”的场景尽量用
remoteQuery();需要本地可离线则sync()。(华为开发者) - 同步范围用
RdbPredicates精确控制(按时间、按用户、按分区)。
- “看一眼就走”的场景尽量用
-
回调与可观测
- 把
sync()的设备返回码打点;结合on('autoSyncProgress')、on('statistics')做进度与耗时观测(按你所用 API 版本提供的事件为准)。(华为开发者)
- 把
-
资源上限
- 打开库 ≤ 16 个;变更订阅回调 ≤ 8 个;超限会直接失败。(华为开发者)
-
权限与用户授权
- 记得申请并在首启时向用户弹窗授权
ohos.permission.DISTRIBUTED_DATASYNC,跨设备同步不生效。c(华为开发者)—
- 记得申请并在首启时向用户弹窗授权
六、把“DistributedRdbStore / SyncCallback”放到位
- 历史资料里常见 DistributedRdbStore 的说法;在当前 ArkData 形态下,你实际拿到的是
relationalStore.RdbStore,通过setDistributedTables/sync/remoteQuery实现“分布式能力”。c(华为开发者) - “SyncCallback”在接口形态上体现为
sync(mode, preds, (results) => {...})的回调(数组元素是[deviceId, code]),另配on('dataChange', ...)事件流转。你也可以把两者统一封装成自己的SyncCallback接口,便于工程内复用与测试。c(华为开发者)—
七、收束:一致性的“组合拳”
分布式 RDB 的核心心法,不是去追“强一致”,而是把同步触发(sync)+ 远端直查(remoteQuery)+ 变更订阅(dataChange)+ 幂等合并(LWW/字段合并/冲突策略)打成组合拳。它让你在“最终一致”的工程边界里,做出接近实时的用户体验,同时能量化每一步(有回调、有事件、有统计)。下次当你犹豫“要不要自建同步服务”时,不妨反问自己:**有一套现成的分布式 RDB,为什么不拿来就用?**😉
参考(官方为主)
- 关系型数据库跨设备数据同步:架构、能力、接口(
setDistributedTables/sync/on/remoteQuery/约束)。c(华为开发者) - RdbStore 接口参考:
sync/remoteQuery/setDistributedTables/订阅事件/统计/事务等(含 12+、18+ 增强项)。c(华为开发者) - 订阅类型新增说明(API 12):本地详情订阅类型。c(知乎专栏)要不要我把上面的样例整理成可跑的 Demo 页面(带“选择设备、双向同步、冲突调解可视化”)?点个头,我就把工程骨架和封装都给你备好~ 🚀
…
(未完待续)
更多推荐





所有评论(0)