鸿蒙原生开发:实现数据库驱动的文件断点续传与文件管理

在这里插入图片描述

在鸿蒙(HarmonyOS)原生应用开发中,文件下载功能是众多场景的核心需求,而断点续传技术能有效解决网络中断、应用重启导致的下载进度丢失问题,大幅提升用户体验。本文将详细讲解如何基于鸿蒙原生 API,结合关系型数据库(RelationalStore)设计双表结构,实现文件断点续传与全生命周期管理,打造一套规范、可扩展的下载管理体系。

一、核心设计理念

本次方案的核心在于数据分层存储状态联动管理

  1. 双表结构设计:通过FILE_ITEM表存储文件基础信息(名称、大小、类型等),DOWNLOAD_TASK表存储下载任务状态(进度、路径、状态等),利用fileId建立关联,实现数据解耦;
  2. 文件状态独立管理:在FILE_ITEM表新增status字段,记录文件整体状态(待下载、下载中、已完成等),与任务状态联动更新,确保数据一致性;
  3. 断点续传实现:基于 HTTP 的Range请求头分块下载文件,结合本地文件校验和数据库状态持久化,支持断点恢复;
  4. 可视化管理:构建文件列表页面,实时展示文件状态与下载进度,提供完整的操作交互能力。

二、数据模型与数据库设计

1. 核心数据模型定义

首先定义标准化的数据模型,明确文件与任务的属性及关联关系:

// 下载状态枚举
export enum DownloadStatus {
  INIT='init',//初始化状态,任务尚未开始
  WAITING = 'waiting', // 等待下载状态
  DOWNLOADING = 'downloading', // 正在下载状态
  PAUSED = 'paused', // 已暂停状态
  COMPLETED = 'completed', // 下载完成状态
  FAILED = 'failed', // 下载失败状态
  DELETED='deleted', //任务已删除状态
  CANCEL='cancel',  //任务已取消状态
}

// 文件信息接口,描述待下载文件的基本属性
export interface IFileItem {
  fileId?: string;         // 文件唯一标识,用于关联下载任务
  fileName?: string;       // 文件名,用于本地存储和显示
  type?: string;           // 文件类型(如pdf、mp4等),用于分类管理
  size?: number;           // 文件总大小(字节),用于计算下载进度
  status?: string;         // 文件状态(关联DownloadStatus),同步文件的下载状态
  url?:string,             // 文件下载地址,指向远程资源位置
  localPath?: string;      // 文件本地存储路径,指定文件保存位置
  updateTime?: string,     // 最后更新时间,记录文件状态的更新时间
}

// 下载任务接口,描述下载任务的详细信息
export interface IDlTask {
  taskId: string;          // 任务唯一标识,每个下载任务的唯一ID
  fileId: string;          // 关联的文件ID,建立任务与文件的关联关系
  fileName?: string;       // 文件名,冗余存储便于快速访问
  localPath?: string;      // 本地沙箱存储路径,指定文件保存的具体位置
  url: string;             // 文件原始下载地址,远程资源的URL
  totalSize?: number;      // 文件总大小(字节),用于进度计算
  downloadedSize?: number; // 已下载大小(字节),断点续传的核心数据
  progress?: number;       // 下载进度(百分比),便于UI展示
  type?: string;           // 文件类型,用于分类处理不同类型的文件
  status?: string;         // 任务状态(关联DownloadStatus),标识任务当前状态
  speed?: string;          // 下载速度(B/s、KB/s等),用于展示下载速率
  createTime?: string;     // 任务创建时间,记录任务创建的时间点
  lastUpdateTime?: string; // 任务最后更新时间,记录任务状态最后更新的时间
}

2. 数据库工具类实现

基于鸿蒙relationalStore封装数据库工具类,实现FILE_ITEMDOWNLOAD_TASK双表的 CRUD 及关联查询,为文件和下载任务管理提供数据支撑:

在这里插入图片描述

/**
 * 数据库工具类(ArkTS / HarmonyOS RDB)
 * 提供下载任务的持久化存储能力,是断点续传的核心支撑
 * 采用单例模式,确保全局使用同一个数据库实例
 */
import relationalStore from '@ohos.data.relationalStore'
import { IFileItem, IDlTask } from './Interface'

export class RelationalDbUtil {
  public TABLE_FILE_ITEM = 'FILE_ITEM'       // 文件信息表名
  public TABLE_DOWNLOAD_TASK = 'DOWNLOAD_TASK' // 下载任务表名
  private store?: relationalStore.RdbStore   // 数据库实例

  /**
   * 初始化数据库并创建表结构
   * @param context 应用上下文(UIAbilityContext或ApplicationContext)
   * @param dbName 数据库名称,默认 `device_media.db`
   */
  async init(context: Context, dbName: string = 'device_media.db'): Promise<void> {
    // 配置数据库参数
    const config: relationalStore.StoreConfig = {
      name: dbName,         // 数据库文件名
      securityLevel: relationalStore.SecurityLevel.S1 // 安全级别S1(低级别加密)
    }
    // 获取数据库实例,这是鸿蒙RDB的标准初始化方式
    this.store = await relationalStore.getRdbStore(context, config)
    // 开启外键约束,确保表之间的关联完整性
    await this.store.executeSql('PRAGMA foreign_keys = ON')
    // 创建必要的数据表结构
    await this.createTables()
  }

  /**
   * 判断数据库是否就绪
   * @returns 数据库是否初始化完成
   */
  isReady(): boolean {
    return !!this.store
  }

  /**
   * 创建数据表与索引
   * 包含文件信息表和下载任务表,建立表之间的关联关系
   */
  private async createTables(): Promise<void> {
    if (!this.store) {
      return
    }

    // 创建文件信息表,存储文件的基本信息
    await this.store.executeSql(
      `CREATE TABLE IF NOT EXISTS ${this.TABLE_FILE_ITEM} (
        fileId TEXT PRIMARY KEY,        -- 文件唯一标识,主键
        fileName TEXT,                  -- 文件名
        type TEXT,                      -- 文件类型
        size INTEGER,                   -- 文件大小(字节)
        status TEXT,                    -- 文件状态
        url TEXT,                       -- 下载地址
        localPath TEXT,                 -- 本地存储路径
        updateTime TEXT                 -- 最后更新时间
      )`
    )

    // 创建下载任务表,存储下载任务的详细信息
    await this.store.executeSql(
      `CREATE TABLE IF NOT EXISTS ${this.TABLE_DOWNLOAD_TASK} (
        taskId TEXT PRIMARY KEY,        -- 任务唯一标识,主键
        fileId TEXT,                    -- 关联的文件ID,外键
        fileName TEXT,                  -- 文件名(冗余存储)
        type TEXT,                      -- 文件类型(冗余存储)
        localPath TEXT,                 -- 本地存储路径
        url TEXT,                       -- 下载地址
        totalSize INTEGER,              -- 文件总大小
        downloadedSize INTEGER,         -- 已下载大小(断点续传关键)
        progress INTEGER,               -- 下载进度(百分比)
        status TEXT,                    -- 任务状态
        speed TEXT,                     -- 下载速度
        createTime TEXT,                -- 创建时间
        lastUpdateTime TEXT,            -- 最后更新时间
        -- 外键约束,确保数据完整性
        FOREIGN KEY(fileId) REFERENCES ${this.TABLE_FILE_ITEM}(fileId)
      )`
    )

    // 创建索引提升查询性能,特别是通过fileId查询下载任务时
    await this.store.executeSql(`CREATE INDEX IF NOT EXISTS idx_download_task_fileId ON ${this.TABLE_DOWNLOAD_TASK}(fileId)`)
  }

  /**
   * 插入记录到指定表
   * @param table 表名
   * @param values 字段数据
   * @returns 插入行的 rowId
   */
  async insert(table: string,
    values: Record<string, string | number> | IFileItem | IDlTask): Promise<number> {
    if (!this.store) {
      throw new Error('RdbStore not initialized')
    }
    // 鸿蒙RDB的标准插入操作
    return await this.store.insert(table, values as relationalStore.ValuesBucket)
  }

  /**
   * 按主键更新整条记录
   * @param table 表名
   * @param keyColumn 主键列名
   * @param keyValue 主键值
   * @param values 更新的字段数据
   * @returns 影响的行数
   */
  async updateByKey(table: string, keyColumn: string, keyValue: string | number,
    values: Record<string, string | number> | IFileItem | IDlTask): Promise<number> {
    if (!this.store) {
      throw new Error('RdbStore not initialized')
    }
    // 创建查询条件,指定主键值
    const predicates = new relationalStore.RdbPredicates(table)
    predicates.equalTo(keyColumn, keyValue)
    // 执行更新操作
    return await this.store.update(values as relationalStore.ValuesBucket, predicates)
  }

  /**
   * 按主键更新单个字段
   * @param table 表名
   * @param keyColumn 主键列名
   * @param keyValue 主键值
   * @param fieldName 目标字段名
   * @param fieldValue 目标字段值
   * @returns 影响的行数
   */
  async updateField(table: string, keyColumn: string, keyValue: string | number, fieldName: string,
    fieldValue: string | number): Promise<number> {
    if (!this.store) {
      throw new Error('RdbStore not initialized')
    }
    // 创建查询条件
    const predicates = new relationalStore.RdbPredicates(table)
    predicates.equalTo(keyColumn, keyValue)
    // 构建更新数据对象
    const values: Record<string, string | number> = {}
    values[fieldName] = fieldValue
    // 执行字段更新
    return await this.store.update(values as relationalStore.ValuesBucket, predicates)
  }

  /**
   * 按主键删除记录
   * @param table 表名
   * @param keyColumn 主键列名
   * @param keyValue 主键值
   * @returns 影响的行数
   */
  async deleteByKey(table: string, keyColumn: string, keyValue: string | number): Promise<number> {
    if (!this.store) {
      throw new Error('RdbStore not initialized')
    }
    // 创建删除条件
    const predicates = new relationalStore.RdbPredicates(table)
    predicates.equalTo(keyColumn, keyValue)
    // 执行删除操作
    return await this.store.delete(predicates)
  }

  /**
   * 查询全表数据
   * @param table 表名
   * @param columns 需要返回的列列表,为空时返回所有列
   * @returns ResultSet 游标,包含查询结果
   */
  async queryAll(table: string, columns: Array<string> = []): Promise<relationalStore.ResultSet> {
    if (!this.store) {
      throw new Error('RdbStore not initialized')
    }
    // 创建查询条件(无条件,查询全部)
    const predicates = new relationalStore.RdbPredicates(table)
    // 执行查询
    return await this.store.query(predicates, columns)
  }

  /**
   * 按主键查询单条记录
   * @param table 表名
   * @param keyColumn 主键列名
   * @param keyValue 主键值
   * @param columns 返回列列表
   * @returns ResultSet 游标,包含查询结果
   */
  async queryByKey(table: string, keyColumn: string, keyValue: string | number,
    columns: Array<string> = []): Promise<relationalStore.ResultSet> {
    if (!this.store) {
      throw new Error('RdbStore not initialized')
    }
    // 创建查询条件
    const predicates = new relationalStore.RdbPredicates(table)
    predicates.equalTo(keyColumn, keyValue)
    // 执行查询
    return await this.store.query(predicates, columns)
  }
}

// 导出单例实例,确保全局使用同一个数据库工具实例
export const dbUtil = new RelationalDbUtil()

关键设计说明

  • 采用单例模式确保全局数据库实例的唯一性
  • 设计两个关联表:文件信息表和下载任务表,通过外键建立关联
  • 特别关注downloadedSize字段的存储,这是断点续传的核心数据
  • 提供完整的 CRUD 操作接口,支持单字段更新和整行更新
  • 创建索引提升查询性能,特别是针对常用查询条件

3. 事件通知工具类(EventHubUtils.ts)

该模块封装鸿蒙的 EventHub 事件中心,提供全局统一的事件通知机制,用于在下载过程中实时传递状态变化和进度更新,实现 UI 与业务逻辑的解耦。

import { common } from '@kit.AbilityKit';

export class EventHubUtils {
  // 静态变量存储事件中心实例,确保全局唯一性
  private static mEventHub: common.EventHub | null = null;

  /**
   * 获取事件通知实例
   * 确保全局使用同一个EventHub实例,解决不同window中获取的eventhub不是同一个的问题
   * @returns 事件中心实例
   */
  public static getEventHub(){
    // 懒加载模式,首次调用时初始化
    if(!EventHubUtils.mEventHub){
      // 获取应用上下文,确保事件中心的全局唯一性
      let context = getContext() as common.UIAbilityContext;
      EventHubUtils.mEventHub = context.eventHub;
      console.log("EventHubUtils", "EventIns mEventHub done !");
    }
    return EventHubUtils.mEventHub;
  }
}

关键设计说明

  • 静态方法确保全局使用同一个 EventHub 实例
  • 解决鸿蒙中不同窗口可能获取不同 EventHub 实例的问题
  • 为下载状态变化、进度更新提供统一的事件分发机制

4. 辅助工具类(tool.ts)

提供通用的工具函数,包括时间格式化 convertIsoToCustomFormat 和防抖DebounceUtil等辅助功能,为核心业务逻辑提供支持。

/**
 * 将ISO时间格式转换为自定义格式
 * @param isoTime ISO时间字符串(可选),支持带时区和毫秒的格式
 * @param format 自定义格式模板(可选,默认:'YYYY/MM/D/DD HH:mm:ss')
 * 支持的占位符:
 * - YYYY: 4位年份(如:2025)
 * - MM: 2位月份(01-12)
 * - M: 1位/2位月份(1-12)
 * - DD: 2位日期(01-31)
 * - D: 1位/2位日期(1-31)
 * - HH: 2位小时(00-23)
 * - H: 1位/2位小时(0-23)
 * - mm: 2位分钟(00-59)
 * - m: 1位/2位分钟(0-59)
 * - ss: 2位秒数(00-59)
 * - s: 1位/2位秒数(0-59)
 * @returns 格式化后的时间字符串,转换失败返回空字符串
 */
export function convertIsoToCustomFormat(
  isoTime: string | undefined,
  format: string = 'YYYY/M/D HH:mm:ss'
): string {
  if (!isoTime) {
    return '';
  }

  try {
    // 处理ISO字符串(兼容带时区和毫秒的格式)
    const timeWithoutMs = isoTime.split('.')[0];
    const date = new Date(timeWithoutMs);

    // 验证日期有效性
    if (isNaN(date.getTime())) {
      throw new Error('无效的时间格式');
    }

    // 获取各时间部分
    const year = date.getFullYear().toString();
    const month = (date.getMonth() + 1).toString(); // 月份从0开始,需+1
    const day = date.getDate().toString();
    const hours = date.getHours().toString();
    const minutes = date.getMinutes().toString();
    const seconds = date.getSeconds().toString();

    // 定义格式映射规则
    const formatMap: Record<string, string> = {
      'YYYY': year,
      'MM': month.padStart(2, '0'),
      'M': month,
      'DD': day.padStart(2, '0'),
      'D': day,
      'HH': hours.padStart(2, '0'),
      'H': hours,
      'mm': minutes.padStart(2, '0'),
      'm': minutes,
      'ss': seconds.padStart(2, '0'),
      's': seconds
    };

    // 替换格式占位符
    return format.replace(/YYYY|MM|M|DD|D|HH|H|mm|m|ss|s/g, match => {
      return formatMap[match] || match;
    });

  } catch (error) {
    console.error('时间转换失败:', error);
    return '';
  }
}


export function formatHexString(data: Uint8Array, addSpace: string=' ',isReverse:boolean=false): string {
  if (!data || data.length < 1)
    return '';
  if(isReverse){
    data=data.reverse()
  }

  let sb: string = '';
  for (let i: number = 0; i < data.length; i++) {
    let hex: String = (data[i] & 0xFF).toString(16).toUpperCase();
    if (hex.length == 1) {
      hex = '0' + hex;
    }
    sb = sb + hex;
    if (addSpace&&i!=data.length-1)
      sb = sb + addSpace;
  }
  return sb;
}

export function getCurrentDateTime(): string {
  const currentIsoTime = new Date().toISOString();
  return convertIsoToCustomFormat(currentIsoTime, 'YYYY-M-D HH:mm:ss');
}


export class DebounceUtil {
  private static instance: DebounceUtil;
  private static methodsMap: Map<string, number> = new Map();

  private constructor() {
  }

  public static getInstance(): DebounceUtil {
    if (!DebounceUtil.instance) {
      DebounceUtil.instance = new DebounceUtil();
    }
    return DebounceUtil.instance;
  }

  public isDebounced( milliseconds: number=1000,methodName: string='方法',): boolean {
    const now = Date.now();
    const lastExecutionTime = DebounceUtil.methodsMap.get(methodName);

    if (lastExecutionTime && (now - lastExecutionTime) < milliseconds) {
      console.info(`${methodName}方法在指定时间${milliseconds}内已执行过`)
      return true; // 方法在指定时间内已执行过
    } else {
      DebounceUtil.methodsMap.set(methodName, now);
      return false; // 方法在指定时间内未执行过
    }
  }
}

export const Debounce = DebounceUtil.getInstance()

5. 核心下载工具类(DownloadTool.ts)

这是断点续传功能的核心实现模块,负责下载任务的管理、HTTP 请求处理、文件写入、断点续传逻辑实现等。采用单例模式确保全局任务管理的一致性。

import { IDlTask, DownloadStatus, IFileItem } from './Interface';
import { dbUtil } from './RelationalDbUtil';
import { getCurrentDateTime } from './tool';
import relationalStore from '@ohos.data.relationalStore';
import { EventHubUtils } from './EventHubUtils';
import { fileIo as fs } from '@kit.CoreFileKit';
import http from '@ohos.net.http';

// 单例模式实现,确保全局唯一下载管理器
export class DownloadTool {
  private static instance: DownloadTool;
  public isDownloading: boolean = false; // 是否正在执行下载(避免并发冲突)
  private downloadQueue: string[] = []; // 存储taskId的下载队列,按顺序执行
  static downloadingList: Set<number | string> = new Set(); // 正在下载的任务ID集合
  static DownloadTaskMap: Map<string, http.HttpRequest> = new Map(); // 下载任务与HTTP请求的映射

  // 私有构造函数,确保只能通过getInstance获取实例
  private constructor() {
  }

  /**
   * 获取单例实例
   * @returns DownloadTool单例对象
   */
  public static getInstance(): DownloadTool {
    if (!DownloadTool.instance) {
      DownloadTool.instance = new DownloadTool();
    }
    return DownloadTool.instance;
  }

  /**
   * 添加下载任务到队列
   * @param file 文件信息对象,包含下载所需的基本信息
   */
  async addTask(file: IFileItem) {
    try {
      // 检查是否已存在相同fileId的任务,避免重复下载
      const resultSet = await dbUtil.queryByKey(
        dbUtil.TABLE_DOWNLOAD_TASK,
        'fileId',
        file.fileId || ''
      );

      if (resultSet.rowCount == 0) {
        // 不存在则创建新任务,生成必要的任务信息
        const localPath = `${getContext().filesDir}/${file.fileName}`; // 构建本地存储路径
        const taskId = getCurrentDateTime(); // 使用当前时间作为任务ID
        const task: IDlTask = {
          taskId: taskId,
          fileId: file.fileId || '',
          fileName: file.fileName,
          localPath: localPath,
          url: file.url  || '',
          totalSize: file.size,
          downloadedSize: 0, // 初始已下载大小为0
          progress: 0,       // 初始进度为0%
          type: file.type,
          status: DownloadStatus.WAITING, // 初始状态为等待
          speed: '',
          createTime: new Date().toISOString(),
          lastUpdateTime: new Date().toISOString(),
        };

        // 将任务信息插入数据库,持久化存储
        const rowId = await dbUtil.insert(dbUtil.TABLE_DOWNLOAD_TASK, task);
        console.log(`数据插入成功,rowId: ${rowId}`, JSON.stringify(task));

        // 将任务ID添加到下载队列
        this.downloadQueue.push(taskId);
      } else if (resultSet.goToFirstRow()) {
        // 如果任务已存在,获取taskId并加入队列(用于恢复下载)
        const taskId = resultSet.getString(resultSet.getColumnIndex('taskId'));
        const status = resultSet.getString(resultSet.getColumnIndex('status'));

        // 只有未完成的任务才加入队列,避免重复处理已完成任务
        if (status !== DownloadStatus.COMPLETED && !this.downloadQueue.includes(taskId)) {
          this.downloadQueue.push(taskId);
        } else {
          console.warn('文件已存在下载队列或已完成');
        }
      }
      resultSet.close(); // 关闭ResultSet,释放资源

      // 如果当前没有正在下载的任务,立即开始下载队列中的任务
      if (!this.isDownloading) {
        this.starDownload();
      }
    } catch (e) {
      console.log(e, '插入任务失败');
    }
  }

  /**
   * 开始下载队列中的任务
   * 按顺序处理队列中的任务,一次只处理一个
   */
  async starDownload() {
    // 检查是否已有任务在下载或队列为空
    if (this.isDownloading || this.downloadQueue.length === 0) {
      return;
    }

    // 标记开始下载,避免并发执行
    this.isDownloading = true;

    // 循环处理队列中的任务,直到队列为空
    while (this.downloadQueue.length > 0) {
      const taskId = this.downloadQueue[0]; // 获取队列头部的任务ID
      try {
        // 从数据库获取任务的完整信息
        const resultSet = await dbUtil.queryByKey(
          dbUtil.TABLE_DOWNLOAD_TASK,
          'taskId',
          taskId
        );

        if (resultSet.rowCount > 0 && resultSet.goToFirstRow()) {
          // 将ResultSet转换为任务对象
          const task = this.buildTaskFromResultSet(resultSet);
          // 执行具体的下载逻辑
          await this.executeDownload(task);
        }

        resultSet.close(); // 关闭ResultSet
      } catch (error) {
        console.error('下载过程出错:', error);
        // 下载失败时暂停所有后续任务,避免连续失败
        await this.pauseAllTasks();
      } finally {
        // 从队列中移除当前任务,无论成功或失败
        if (this.downloadQueue.length > 0) {
          this.downloadQueue.shift();
        }
      }
    }

    // 所有任务处理完成,重置下载状态
    this.isDownloading = false;
  }

  /**
   * 从ResultSet构建任务对象
   * 将数据库查询结果转换为IDlTask对象
   * @param resultSet 数据库查询结果集
   * @returns IDlTask对象
   */
  private buildTaskFromResultSet(resultSet: relationalStore.ResultSet): IDlTask {
    return {
      taskId: resultSet.getString(resultSet.getColumnIndex('taskId')),
      fileId: resultSet.getString(resultSet.getColumnIndex('fileId')),
      fileName: resultSet.getString(resultSet.getColumnIndex('fileName')),
      localPath: resultSet.getString(resultSet.getColumnIndex('localPath')),
      url: resultSet.getString(resultSet.getColumnIndex('url')),
      totalSize: resultSet.getLong(resultSet.getColumnIndex('totalSize')),
      downloadedSize: resultSet.getLong(resultSet.getColumnIndex('downloadedSize')),
      progress: resultSet.getLong(resultSet.getColumnIndex('progress')),
      type: resultSet.getString(resultSet.getColumnIndex('type')),
      status: resultSet.getString(resultSet.getColumnIndex('status')),
      speed: resultSet.getString(resultSet.getColumnIndex('speed')),
      createTime: resultSet.getString(resultSet.getColumnIndex('createTime')),
      lastUpdateTime: resultSet.getString(resultSet.getColumnIndex('lastUpdateTime')),
    };
  }

  /**
   * 执行下载任务(断点续传核心实现)
   * @param task 下载任务对象
   * @returns Promise<void>
   */
  private async executeDownload(task: IDlTask): Promise<void> {
    return new Promise(async (resolve, reject) => {
      console.log(task.url, 'task.url');
      
      let file: fs.File | undefined = undefined; // 文件操作对象
      let httpRequest: http.HttpRequest | undefined = undefined; // HTTP请求对象
      
      // 速度计算相关变量
      let lastUpdateTime = Date.now(); // 上次更新时间
      let lastDownloadedSize = 0;     // 上次下载大小
      
      try {
        // 1. 检查本地文件是否存在,获取已下载大小(断点续传关键步骤1)
        let downloadedSize = 0;
        
        try {
          // 检查文件是否存在,获取文件信息
          let fileInfo: fs.Stat = await fs.statSync(task.localPath);
          downloadedSize = fileInfo.size; // 获取已下载的文件大小
          console.log(`文件已存在,已下载大小:${downloadedSize}`);

          // 如果文件已完整下载,直接标记为完成状态
          if (task.totalSize !== undefined && downloadedSize === task.totalSize && task.totalSize > 0) {
            task.status = DownloadStatus.COMPLETED;
            task.progress = 100;
            task.downloadedSize = task.totalSize;
            task.lastUpdateTime = new Date().toISOString();
            await this.updateTaskInDb(task); // 更新数据库状态
            EventHubUtils.getEventHub().emit('download_status_change', task); // 发送状态变更事件
            resolve(); // 完成Promise
            return;
          }
          
          // 打开文件,准备追加写入(断点续传关键步骤2)
          // 使用APPEND模式,确保新数据追加到文件末尾
          file = fs.openSync(task.localPath, fs.OpenMode.READ_WRITE | fs.OpenMode.APPEND);
        } catch (error) {
          // 文件不存在,创建新文件
          console.log(`创建新文件:${task.localPath}`);
          file = fs.openSync(task.localPath, fs.OpenMode.CREATE | fs.OpenMode.WRITE_ONLY);
        }
        
        if (!file) {
          throw new Error('无法打开文件');
        }
        
        // 2. 创建HTTP请求对象
        httpRequest = http.createHttp();
        // 将请求对象存入映射表,便于后续控制(如暂停)
        DownloadTool.DownloadTaskMap.set(task.taskId, httpRequest);
        
        // 3. 获取文件总大小(如果未知)
        if (task.totalSize === undefined || task.totalSize <= 0) {
          try {
            // 发送HEAD请求获取文件信息,不下载实际内容
            const headResponse = await httpRequest.request(task.url, {
              method: http.RequestMethod.HEAD
            });
            
            if (headResponse.responseCode === 200) {
              // 获取响应头中的Content-Length字段,获取文件总大小
              const headers = headResponse.header as Record<string, string>;
              const contentLength = headers['content-length'];
              if (contentLength) {
                task.totalSize = Number(contentLength);
                console.log(`获取文件总大小:${task.totalSize}`);
              }
            }
          } catch (error) {
            console.error('获取文件大小失败:', error);
            // 继续执行,总大小未知时也可以下载
          }
        }
        
        // 4. 开始下载流程
        task.status = DownloadStatus.DOWNLOADING; // 更新任务状态
        task.lastUpdateTime = new Date().toISOString();
        DownloadTool.downloadingList.add(task.taskId); // 添加到正在下载列表
        await this.updateTaskInDb(task); // 更新数据库状态
        EventHubUtils.getEventHub().emit('download_status_change', task); // 发送状态变更事件
        
        // 5. 实现分块下载逻辑(断点续传核心步骤3)
        const CHUNK_SIZE = 1024 * 1024 * 1; // 每次下载1MB,避免一次性下载过大数据
        let totalDownloaded = downloadedSize; // 累计已下载大小,初始为文件已存在的大小
        let isDownloadComplete = false; // 下载完成标志
        
        while (!isDownloadComplete) {
          // 计算当前块的起始和结束位置
          const start = totalDownloaded;
          // 结束位置为起始位置+块大小,或文件总大小-1(如果已接近末尾)
          const end = task.totalSize ? Math.min(start + CHUNK_SIZE - 1, task.totalSize - 1) : start + CHUNK_SIZE - 1;
          
          // 构建Range请求头(断点续传核心步骤4)
          // Range头格式:bytes=start-end,告诉服务器从指定位置开始传输数据
          const rangeHeader = task.totalSize ? `bytes=${start}-${end}` : ``;
          
          // 构建HTTP请求选项
          const requestOptions: http.HttpRequestOptions = {
            method: http.RequestMethod.GET, // GET方法获取数据
            header: {
              'Range': rangeHeader  // 指定Range请求头,实现断点续传
            },
            expectDataType: http.HttpDataType.ARRAY_BUFFER // 期望返回二进制数据
          };
          
          console.log(`下载块:${start}-${end},Range:${rangeHeader}`);
          
          // 发送HTTP请求获取数据块
          const response = await httpRequest.request(task.url, requestOptions);
          
          // 检查响应状态码
          // 200: 成功获取完整内容(首次下载)
          // 206: 成功获取部分内容(断点续传)
          if (response.responseCode === 200 || response.responseCode === 206) {
            // 获取响应数据
            const buffer = response.result as ArrayBuffer;
            
            if (buffer.byteLength > 0) {
              // 将数据写入文件(断点续传核心步骤5)
              fs.writeSync(file.fd, buffer);
              // 更新累计下载大小
              totalDownloaded += buffer.byteLength;
              
              // 更新任务信息
              task.downloadedSize = totalDownloaded;
              // 计算下载进度(百分比)
              if (task.totalSize !== undefined && task.totalSize > 0) {
                task.progress = Math.round((totalDownloaded / task.totalSize) * 100);
              }
              task.lastUpdateTime = new Date().toISOString();
              
              // 计算下载速度
              const currentTime = Date.now();
              const timeDiff = currentTime - lastUpdateTime; // 时间差(毫秒)
              const sizeDiff = totalDownloaded - lastDownloadedSize; // 下载大小差
              
              if (timeDiff > 0) {
                // 计算每秒下载字节数
                const bytesPerSecond = (sizeDiff / timeDiff) * 1000;
                task.speed = bytesPerSecond.toString(); // 保存原始速度值
                
                // 更新时间和大小记录,用于下次计算
                lastUpdateTime = currentTime;
                lastDownloadedSize = totalDownloaded;
              }
              
              // 检查是否下载完成
              if (task.totalSize !== undefined && totalDownloaded === task.totalSize) {
                // 下载完成,更新任务状态
                task.status = DownloadStatus.COMPLETED;
                task.progress = 100;
                await this.updateTaskInDb(task); // 更新数据库
                EventHubUtils.getEventHub().emit('download_status_change', task); // 发送完成事件
                isDownloadComplete = true; // 标记下载完成
                resolve(); // 完成Promise
              } else {
                // 更新进度到数据库
                await this.updateProgressInDb(task);
                // 发送进度更新事件
                this.emitProgress(task);
                
                // 检查是否达到文件末尾(对于未知大小的文件)
                if (buffer.byteLength < CHUNK_SIZE) {
                  // 数据块小于设定大小,说明已到文件末尾
                  task.status = DownloadStatus.COMPLETED;
                  task.progress = 100;
                  task.totalSize = totalDownloaded; // 更新文件总大小
                  await this.updateTaskInDb(task); // 更新数据库
                  EventHubUtils.getEventHub().emit('download_status_change', task); // 发送完成事件
                  isDownloadComplete = true; // 标记下载完成
                  resolve(); // 完成Promise
                }
              }
            } else {
              // 没有数据返回,视为下载完成
              task.status = DownloadStatus.COMPLETED;
              task.progress = 100;
              task.totalSize = totalDownloaded;
              await this.updateTaskInDb(task);
              EventHubUtils.getEventHub().emit('download_status_change', task);
              isDownloadComplete = true;
              resolve();
            }
          } else {
            // 响应状态码错误,抛出异常
            throw new Error(`下载失败,响应码:${response.responseCode}`);
          }
        }
        
      } catch (error) {
        // 捕获下载过程中的异常
        console.error('下载失败:', error);
        // 下载失败时标记为暂停状态,以便后续可以继续下载
        task.status = DownloadStatus.PAUSED;
        task.lastUpdateTime = new Date().toISOString();
        await this.updateTaskInDb(task); // 更新数据库状态
        EventHubUtils.getEventHub().emit('download_status_change', task); // 发送状态变更事件
        
        reject(error); // 拒绝Promise
      } finally {
        // 清理资源,无论成功或失败都执行
        if (file) {
          fs.closeSync(file); // 关闭文件,确保数据写入完成
        }
        if (httpRequest) {
          httpRequest.destroy(); // 销毁HTTP请求对象,释放资源
        }
        // 从映射表和下载列表中移除
        DownloadTool.DownloadTaskMap.delete(task.taskId);
        DownloadTool.downloadingList.delete(task.taskId);
      }
    });
  }

  /**
   * 更新任务进度到数据库
   * 只更新进度相关字段,提高更新效率
   * @param task 下载任务对象
   */
  private async updateProgressInDb(task: IDlTask): Promise<void> {
    console.log(JSON.stringify(task.progress),'更新任务进度到数据库')
    try {
      // 分别更新各个进度相关字段,避免整行更新的性能开销
      await dbUtil.updateField(dbUtil.TABLE_DOWNLOAD_TASK, 'taskId', task.taskId, 'progress', task.progress || 0);
      await dbUtil.updateField(dbUtil.TABLE_DOWNLOAD_TASK, 'taskId', task.taskId, 'downloadedSize',
        task.downloadedSize || 0);
      // 格式化速度显示
      await dbUtil.updateField(dbUtil.TABLE_DOWNLOAD_TASK, 'taskId', task.taskId, 'speed', this.formatSpeed(task.speed||0) || '0');
      await dbUtil.updateField(dbUtil.TABLE_DOWNLOAD_TASK, 'taskId', task.taskId, 'lastUpdateTime',
        task.lastUpdateTime || '');
    } catch (error) {
      console.error('更新进度失败:', error);
    }
  }

  /**
   * 更新整个任务信息到数据库
   * 用于状态变更等需要更新多个字段的场景
   * @param task 下载任务对象
   */
  private async updateTaskInDb(task: IDlTask): Promise<void> {
    try {
      console.log(task.fileId, 'task.fileId')
      // 更新整个任务记录
      await dbUtil.updateByKey(dbUtil.TABLE_DOWNLOAD_TASK, 'taskId', task.taskId, task);
      // 同时更新关联的文件状态,保持数据一致性
      await dbUtil.updateField(dbUtil.TABLE_FILE_ITEM, 'fileId', task.fileId || '', 'status', task.status || '');
      // 发送文件更新事件
      EventHubUtils.getEventHub().emit('updateFiles');
    } catch (error) {
      console.error('更新任务失败:', error);
    }
  }

  /**
   * 格式化下载速度显示
   * 将字节数转换为易读的格式(B/s、KB/s、MB/s)
   * @param bytesPerSecond 每秒下载字节数
   * @returns 格式化的速度字符串
   */
  private formatSpeed(bytesPerSecond: number|string): string {
    bytesPerSecond=Number(bytesPerSecond)
    // 根据大小选择合适的单位
    if (bytesPerSecond < 1024) {
      return `${bytesPerSecond} B/s`;
    } else if (bytesPerSecond < 1024 * 1024) {
      return `${(bytesPerSecond / 1024).toFixed(1)} KB/s`;
    } else {
      return `${(bytesPerSecond / (1024 * 1024)).toFixed(1)} MB/s`;
    }
  }

  /**
   * 发送进度更新事件
   * 通知UI更新下载进度
   * @param task 下载任务对象
   */
  private emitProgress(task: IDlTask): void {
    EventHubUtils.getEventHub().emit('download_progress_update', task);
  }

  /**
   * 暂停指定任务
   * @param taskId 任务ID
   */
  public async pauseTask(taskId: string): Promise<void> {
    // 获取对应的HTTP请求对象
    const httpRequest: http.HttpRequest | undefined = DownloadTool.DownloadTaskMap.get(taskId);
    if (httpRequest) {
      // 销毁HTTP请求,终止下载
      httpRequest.destroy();
      // 从映射表和下载列表中移除
      DownloadTool.DownloadTaskMap.delete(taskId);
      DownloadTool.downloadingList.delete(taskId);
    }

    // 更新任务状态为暂停
    const task = await this.getTaskById(taskId);
    if (task) {
      task.status = DownloadStatus.PAUSED;
      await this.updateTaskInDb(task); // 更新数据库
      EventHubUtils.getEventHub().emit('download_status_change', task); // 发送状态变更事件
    }
  }

  /**
   * 恢复指定任务
   * @param taskId 任务ID
   */
  public async resumeTask(taskId: string): Promise<void> {
    // 获取任务信息
    const task = await this.getTaskById(taskId);
    if (task) {
      // 将任务状态改为等待
      task.status = DownloadStatus.WAITING;
      task.lastUpdateTime = new Date().toISOString();
      await this.updateTaskInDb(task); // 更新数据库
      EventHubUtils.getEventHub().emit('download_status_change', task); // 发送状态变更事件
      
      // 将任务添加到队列的合适位置(第二个位置,优先执行)
      const index = this.downloadQueue.indexOf(taskId);
      if (index > -1) {
        this.downloadQueue.splice(index, 1); // 移除已存在的任务ID
      }
      // 插入到队列的第二个位置,确保当前任务完成后立即执行
      if (this.downloadQueue.length > 0) {
        this.downloadQueue.splice(1, 0, taskId);
      } else {
        this.downloadQueue.push(taskId);
      }
      // 如果当前没有任务在下载,立即开始
      if (!this.isDownloading) {
        this.starDownload();
      }
    }
  }

  /**
   * 暂停所有任务
   * 用于错误处理或用户操作
   */
  public async pauseAllTasks(): Promise<void> {
    // 1. 暂停当前正在下载的任务
    for (const taskId of DownloadTool.downloadingList) {
      const stringTaskId = taskId.toString();
      await this.pauseTask(stringTaskId);
    }
    
    // 2. 暂停队列中的所有任务
    const queueCopy = [...this.downloadQueue]; // 创建队列副本,避免遍历时修改原队列
    for (const taskId of queueCopy) {
      const task = await this.getTaskById(taskId);
      if (task) {
        task.status = DownloadStatus.PAUSED;
        await this.updateTaskInDb(task); // 更新数据库状态
        EventHubUtils.getEventHub().emit('download_status_change', task); // 发送状态变更事件
      }
    }
    
    // 3. 清空下载队列
    this.downloadQueue = [];
  }

  /**
   * 取消指定任务
   * @param taskId 任务ID
   */
  public async cancelTask(taskId: string): Promise<void> {
    // 获取HTTP请求对象
    const httpRequest: http.HttpRequest | undefined = DownloadTool.DownloadTaskMap.get(taskId);
    if (httpRequest) {
      // 销毁请求
      httpRequest.destroy();
      // 清理映射关系
      DownloadTool.DownloadTaskMap.delete(taskId);
      DownloadTool.downloadingList.delete(taskId);
    }

    // 更新任务状态为取消
    const task = await this.getTaskById(taskId);
    if (task) {
      task.status = DownloadStatus.CANCEL;
      await this.updateTaskInDb(task); // 更新数据库
      EventHubUtils.getEventHub().emit('download_status_change', task); // 发送状态变更事件

      // 从队列中移除
      const index = this.downloadQueue.indexOf(taskId);
      if (index > -1) {
        this.downloadQueue.splice(index, 1);
      }
    }
  }

  /**
   * 根据taskId获取任务详情
   * @param taskId 任务ID
   * @returns IDlTask对象或null
   */
  private async getTaskById(taskId: string): Promise<IDlTask | null> {
    try {
      // 从数据库查询任务信息
      const resultSet = await dbUtil.queryByKey(dbUtil.TABLE_DOWNLOAD_TASK, 'taskId', taskId);
      if (resultSet.rowCount > 0 && resultSet.goToFirstRow()) {
        const task = this.buildTaskFromResultSet(resultSet);
        resultSet.close(); // 关闭ResultSet
        return task;
      }
      resultSet.close(); // 关闭ResultSet
    } catch (error) {
      console.error('获取任务失败:', error);
    }
    return null;
  }

  /**
   * 批量删除任务
   * @param taskIds 任务ID数组
   */
  public async batchDeleteTasks(taskIds: string[]): Promise<void> {
    if (!taskIds || taskIds.length === 0) {
      return;
    }

    // 遍历删除每个任务
    for (const taskId of taskIds) {
      const task = await this.getTaskById(taskId);
      if (!task) {
        continue;
      }

      // 删除本地文件
      try {
        fs.unlinkSync(task.localPath); // 删除文件
      } catch (err) {
        console.error(`删除文件失败:${task.fileName}`, err);
      }

      // 从数据库删除任务记录
      await dbUtil.deleteByKey(dbUtil.TABLE_DOWNLOAD_TASK, 'taskId', taskId);

      // 更新文件状态为初始
      if (task.fileId) {
        await dbUtil.updateField(dbUtil.TABLE_FILE_ITEM, 'fileId', task.fileId, 'status', 'init');
      }

      // 清理映射关系
      DownloadTool.DownloadTaskMap.delete(taskId);

      // 从队列中移除
      const index = this.downloadQueue.indexOf(taskId);
      if (index > -1) {
        this.downloadQueue.splice(index, 1);
      }
    }
    // 发送文件更新事件
    EventHubUtils.getEventHub().emit("updateFiles");
  }

  /**
   * 处理应用销毁
   * 在应用关闭时将所有未完成任务标记为暂停
   */
  public async handleAppDestroy(): Promise<void> {
    // 查询所有下载任务
    const resultSet = await dbUtil.queryAll(dbUtil.TABLE_DOWNLOAD_TASK);
    if (resultSet.rowCount > 0) {
      resultSet.goToFirstRow();
      // 遍历所有任务
      do {
        const task = this.buildTaskFromResultSet(resultSet);
        // 将未完成的任务标记为暂停
        if (task.status !== DownloadStatus.COMPLETED) {
          task.status = DownloadStatus.PAUSED;
          task.lastUpdateTime = new Date().toISOString();
          await this.updateTaskInDb(task); // 更新数据库状态
        }
      } while (resultSet.goToNextRow()); // 移动到下一行
    }
    resultSet.close(); // 关闭ResultSet

    // 清空下载中集合
    DownloadTool.downloadingList.clear();
  }
}

// 导出单例实例,供外部使用
export const dlTool = DownloadTool.getInstance();

关键设计说明

  • 采用单例模式确保全局任务管理的一致性

  • 实现了完整的断点续传逻辑:

    • 检查本地文件获取已下载大小
    • 使用 Range 请求头请求后续数据
    • 以追加模式写入文件
    • 实时保存下载进度到数据库
  • 支持任务队列管理,按顺序执行下载任务

  • 提供完整的任务控制功能:暂停、恢复、取消、删除

  • 实现下载速度计算和进度更新

  • 完善的错误处理和资源清理机制

断点续传核心原理详解

1. 断点续传的技术基础

断点续传主要依赖于 HTTP 协议的Range 请求头文件追加写入技术:

  • Range 请求头:允许客户端指定需要获取的字节范围,格式为Range: bytes=start-end,服务器支持时会返回状态码 206(Partial Content)和对应的数据块
  • 文件追加写入:在本地文件已存在的情况下,以追加模式打开文件,将新下载的数据写入文件末尾

2. 断点续传的实现步骤

  1. 记录已下载大小

    • 在下载过程中实时记录已下载的字节数(downloadedSize
    • 将该值存储到数据库中,确保应用重启后不丢失
  2. 恢复下载时的处理

    • 检查本地文件是否存在,通过fs.statSync获取文件大小
    • 从数据库读取已记录的下载进度,确认断点位置
  3. 构造 Range 请求

    • 根据断点位置构造 Range 请求头,指定从断点位置开始下载
    • 服务器返回从断点位置开始的数据块
  4. 追加写入文件

    • 以追加模式打开文件(fs.OpenMode.APPEND
    • 将新下载的数据块写入文件末尾
  5. 进度同步

    • 实时更新下载进度到数据库
    • 提供暂停、恢复、取消等操作的状态管理

3. 断点续传的关键技术点

  • 数据持久化:通过数据库存储下载状态和进度,确保断点信息不丢失
  • HTTP 分块请求:利用 Range 头实现分块下载,每次请求固定大小的数据块
  • 文件操作模式:正确使用文件打开模式(追加模式)确保数据不被覆盖
  • 状态管理:维护完整的任务状态机,支持任务的暂停、恢复、取消等操作

使用方法

1. 初始化数据库

在应用启动时初始化数据库,通常在 UIAbility 的onCreate生命周期方法中:

import  {dbUtil} from '../db/RelationalDbUtil'

async  onCreate(want: Want, launchParam: AbilityConstant.LaunchParam) {
   try {
     // 传入应用上下文,默认数据库名为 'device_media.db'
     await dbUtil.init(this.context)
     // 可以自定义数据库名称
     // await dbUtil.init(this.context, 'my_custom.db')
     console.log('数据库初始化成功')
   } catch (error) {
     console.error('数据库初始化失败:', error)
   }
 }

2. 添加下载任务

创建文件信息对象并添加到下载队列

import { dlTool } from './DownloadTool';
import { IFileItem } from './Interface';

// 创建文件信息对象
const fileItem: IFileItem = {
  fileId: 'file123',                 // 文件唯一标识
  fileName: 'document.pdf',          // 文件名
  url: 'https://example.com/document.pdf', // 下载地址
  size: 1024 * 1024 * 5,             // 文件大小(5MB)
  type: 'pdf'                        // 文件类型
};

// 添加到下载队列
dlTool.addTask(fileItem);

3. 监听下载状态和进度

import { EventHubUtils } from './EventHubUtils';
import { IDlTask } from './Interface';

// 获取事件中心实例
const eventHub = EventHubUtils.getEventHub();

// 监听状态变化事件
eventHub.on('download_status_change', (task: IDlTask) => {
  console.log(`任务状态变化:${task.fileName} - ${task.status}`);
  
  // 根据状态更新UI
  switch(task.status) {
    case 'downloading':
      console.log(`开始下载:${task.fileName}`);
      break;
    case 'completed':
      console.log(`下载完成:${task.fileName}`);
      break;
    case 'paused':
      console.log(`下载暂停:${task.fileName}`);
      break;
    case 'failed':
      console.log(`下载失败:${task.fileName}`);
      break;
  }
});

// 监听进度更新事件
eventHub.on('download_progress_update', (task: IDlTask) => {
  console.log(`下载进度:${task.fileName} - ${task.progress}%,速度:${task.speed}`);
  // 更新进度条UI
});

// 监听文件列表更新事件
eventHub.on('updateFiles', () => {
  console.log('文件列表需要更新');
  // 重新加载文件列表
});

4. 控制下载任务

提供暂停、恢复、取消等操作:

import { dlTool } from './DownloadTool';

// 暂停指定任务
async function pauseDownload(taskId: string) {
  await dlTool.pauseTask(taskId);
}

// 恢复指定任务
async function resumeDownload(taskId: string) {
  await dlTool.resumeTask(taskId);
}

// 暂停所有任务
async function pauseAllDownloads() {
  await dlTool.pauseAllTasks();
}

// 取消指定任务
async function cancelDownload(taskId: string) {
  await dlTool.cancelTask(taskId);
}

// 批量删除任务
async function deleteTasks(taskIds: string[]) {
  await dlTool.batchDeleteTasks(taskIds);
}

5. 应用退出处理

// 在UIAbility的onDestroy生命周期方法中
async function onAppDestroy() {
  await dlTool.handleAppDestroy();
  console.log('应用退出,已保存下载状态');
}

总结

本方案完整实现了鸿蒙原生应用中的断点续传功能,核心特点包括:

  1. 完整的断点续传机制:基于 HTTP Range 请求和文件追加写入实现
  2. 数据持久化:通过关系型数据库存储下载状态和进度
  3. 任务管理:支持任务队列、暂停、恢复、取消等操作
  4. 状态通知:通过 EventHub 实现实时状态和进度通知
  5. 资源管理:完善的资源清理和错误处理机制

该实现适用于需要大文件下载的鸿蒙应用,能够有效提升用户体验,特别是在网络不稳定的环境下,避免重复下载和数据丢失。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐