【高心星出品】

多线程批量插入数据

概述

应用中的每个进程都会有一个主线程,主线程主要承担执行UI绘制操作、管理ArkTS引擎实例的创建和销毁、分发和处理事件、管理Ability生命周期等职责,具体可参见线程模型概述。在主线程中执行耗时操作将会引起UI绘制卡顿,因此,开发应用时应当尽量避免将耗时的操作放在主线程中执行。ArkTS提供了多线程并发能力,多线程并发允许在同一时间段内同时执行多段代码,本文介绍如何利用多线程解决密集型文件和数据库读写时造成主线程阻塞的问题。

实现原理

在密集型读写操作时,由于系统会进行大量任务分发和数据拷贝,这两项任务均会阻塞主线程,系统提供了TaskPool和Sendable避免阻塞。

其中,任务池(TaskPool)旨在为应用程序构建多线程运行环境,它具有易用性,并且可以避免对于主线程的占用;Sendable对象则提供了并发实例间高效的通信效率,凭借其引用传递的能力,在多并发实例的数据交互等场景中可避免传统通信方式的效率低下问题,从而进一步提升系统在密集型读写这类复杂场景下的性能表现,为系统的高效稳定运行提供有力支持。

使用TaskPool进行读写

本章介绍使用TaskPool进行读写的方案,以及讨论其对于性能的提升。

实现原理

任务池(TaskPool)作用是为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能,且开发者无需关心线程实例的生命周期。

在这里插入图片描述

TaskPool在执行密集型I/O读写方面具有以下优势:

  1. 自动任务分发:当调用Taskpool执行密集型读写时,其线程池会自动将任务分发到子线程完成,无需人工干预,实现了高效的任务分配流程。
  2. 不阻塞主线程:通过在子线程完成任务,有效避免了对主线程的阻塞,确保主线程能够继续执行其他操作,维持系统整体的流畅运行。
  3. 资源节约:TaskPool本身通过系统统一线程管理,结合动态调度及负载均衡算法可节约系统资源,这也为执行密集型读写提供了更优的系统资源环境,保障操作的顺利进行。

TaskPool文件读写

开发步骤

  1. 封装write()函数,使用@Concurrent进行装饰,执行的并发函数需要使用该装饰器修饰,否则无法通过相关校验。

    @Concurrent
    function writeFile(fd: number[], content: string, times: number) {
      for (let i: number = 0; i < times; i++) {
        fileIo.write(fd[i], content);
      }
    }
    

    代码逻辑走读:

    1. 函数 writeFile被定义为并发函数,这意味着它可以在不等待其他操作完成的情况下执行多个操作。
    2. 函数接受三个参数:fd是一个数字数组,表示多个文件描述符;content是一个字符串,表示要写入的内容;times是一个数字,表示要写入的次数。
    3. 使用 for循环遍历 times次,每次循环中调用 fileIo.write方法,将 content写入 fd数组中的当前索引 i对应的文件描述符。
    4. 由于函数使用了 @Concurrent装饰器,因此在执行时可能会并发地写入多个文件描述符,这取决于具体的并发实现。
  2. 封装read()函数,同样地,也使用@Concurrent进行装饰。在读文件时使用循环读取,这也是大文件读取时常用的方法,使用Array存储一个大文件中的信息。

    @Concurrent
    function readFile(fd: number[], path: string, fileName: string, times: number): Array<Array<ArrayBuffer>> {
      let result: Array<Array<ArrayBuffer>> = [];
      for (let i = 0; i < times; i++) {
        let buffSize: number = 4096;
        let state = fileIo.statSync(path + fileName + JSON.stringify(i) + CommonConstants.FILE_SUFFIX);
        if (state.size === 0){
          return result;
        }
        let buffer: ArrayBuffer = new ArrayBuffer(Math.min(buffSize, state.size));
        let off: number = 0;
        let len: number = fileIo.readSync(fd[i], buffer, { offset: off, length: buffSize });
        let readLen: number = 0;
        let bufferList: Array<ArrayBuffer> = [];
        while (len > 0) {
          readLen += len;
          bufferList.push(buffer);
          off = off + len;
          if ((state.size - readLen) < buffSize) {
            buffSize = state.size - readLen;
          }
          len = fileIo.readSync(fd[i], buffer, { offset: off, length: buffSize });
        }
        result.push(bufferList);
      }
      return result;
    }
    

    代码逻辑走读:

    1. 函数定义与参数说明
      • @Concurrent装饰器表示该函数可以并发执行。
      • fd是一个数字数组,用于文件描述符。
      • path是文件路径的前缀。
      • fileName是文件名。
      • times是读取文件的次数。
    2. 初始化结果数组
      • 创建一个空的结果数组result,用于存储读取到的缓冲区数组。
    3. 循环读取文件
      • 使用for循环,根据times参数进行迭代,每次迭代对应一次文件读取操作。
      • 定义buffSize为4096字节,表示每次读取的缓冲区大小。
      • 使用fileIo.statSync获取文件状态,检查文件大小。如果文件大小为0,直接返回空结果数组。
    4. 创建缓冲区并读取文件
      • 创建一个ArrayBuffer作为缓冲区,大小为buffSize或文件实际大小,取两者中的较小值。
      • 初始化读取长度readLen为0,用于记录已读取的字节数。
      • 初始化一个空的bufferList数组,用于存储每个读取操作生成的缓冲区。
    5. 读取文件内容
      • 使用while循环,只要读取长度len大于0,就持续读取文件。
      • 将每次读取的长度累加到readLen,并将缓冲区添加到bufferList
      • 更新偏移量off,并根据文件剩余大小调整缓冲区大小buffSize
      • 再次调用fileIo.readSync进行读取操作,直到读取完文件或达到缓冲区大小限制。
    6. 存储结果并返回
      • bufferList添加到结果数组result
      • 循环结束后,返回包含所有缓冲区数组的结果数组。
  3. 使用taskpool.execute()执行任务时,传入的第一个参数是调用的函数名,其余参数则是该函数的参数。

    async write(): Promise<void> {
      await taskpool.execute(writeFile, this.fd, this.content, this.times);
      // ...
      return;
    }
    
    async read(): Promise<number> {
      let value = await taskpool.execute(readFile, this.fd, this.path, this.fileName,
        this.times) as object as Array<Array<ArrayBuffer>>;
      // ...
      return value.length;
    }
    

    代码逻辑走读:

    1. async write(): Promise<void>定义了一个异步函数 write,该函数没有返回值。
    2. await taskpool.execute(writeFile, this.fd, this.content, this.times);使用 taskpool执行 writeFile操作,将 this.content写入 this.fd指定的文件描述符,操作 this.times次。
    3. async read(): Promise<number>定义了一个异步函数 read,该函数返回一个数字类型的值。
    4. let value = await taskpool.execute(readFile, this.fd, this.path, this.fileName, this.times) as object as Array<Array<ArrayBuffer>>;使用 taskpool执行 readFile操作,从 this.paththis.fileName指定的文件中读取数据,并将结果强制转换为 Array<Array<ArrayBuffer>>类型,赋值给 value
    5. return value.length;返回读取数据的长度。

TaskPool关系型数据库读写

开发步骤

  1. 封装关系型数据库的写数据库的函数,使用@Concurrent进行装饰。

    @Concurrent
    async function insert(context: common.UIAbilityContext, valueBucket: Array<relationalStore.ValuesBucket>,
      config: relationalStore.StoreConfig) {
      const store = await relationalStore.getRdbStore(context, config);
      store.batchInsert('EMPLOYEE', valueBucket);
    }
    

    代码逻辑走读:

    1. 函数定义与装饰器
      • 定义了一个名为 insert的异步函数,该函数接受三个参数:context(UI能力上下文)、valueBucket(包含多个数据记录的数组)和 config(数据库配置)。
      • 使用了 @Concurrent装饰器,确保函数在并发环境下安全执行。
    2. 获取数据库实例
      • 通过 relationalStore.getRdbStore(context, config)获取关系型数据库实例 store,用于后续的数据操作。
    3. 批量插入数据
      • 调用 store.batchInsert('EMPLOYEE', valueBucket)方法,将 valueBucket中的数据批量插入到名为 'EMPLOYEE'的表中。
  2. 封装数据库读出的函数,使用getRow()和goToNextRow()循环读出符合查询条件的数据,这里读出所有数据。

    @Concurrent
    async function read(context: common.UIAbilityContext, config: relationalStore.StoreConfig) {
      const store = await relationalStore.getRdbStore(context, config);
      const predicates = new relationalStore.RdbPredicates('EMPLOYEE');
      const resultSet = store.querySync(predicates);
      let ValuesBucketArray: ValuesBucket[] = [];
      if (resultSet.rowCount === 0) {
        return ValuesBucketArray;
      }
      resultSet.goToFirstRow();
      do {
        const ValuesBucket = resultSet.getRow() as ValuesBucket;
        ValuesBucketArray.push(ValuesBucket);
      } while (resultSet.goToNextRow());
      resultSet.close();
      return ValuesBucketArray;
    }
    

    代码逻辑走读:

    1. 获取数据库连接
      • 使用 relationalStore.getRdbStore方法获取数据库连接,参数为 contextconfig
    2. 执行查询操作
      • 创建 RdbPredicates对象,指定查询的表名为 'EMPLOYEE'
      • 调用 store.querySync方法执行同步查询,返回结果集 resultSet
    3. 处理查询结果
      • 初始化一个空的 ValuesBucketArray数组。
      • 检查 resultSet.rowCount是否为 0,如果是,直接返回空数组。
      • 调用 resultSet.goToFirstRow方法将结果集指针移动到第一行。
      • 使用do-while循环遍历结果集中的每一行:
        • 使用 resultSet.getRow方法获取当前行的数据,并将其转换为 ValuesBucket对象。
        • ValuesBucket对象添加到 ValuesBucketArray数组中。
        • 调用 resultSet.goToNextRow方法将结果集指针移动到下一行。
      • 循环结束后,调用 resultSet.close方法关闭结果集。
    4. 返回结果
      • 返回包含所有行数据的 ValuesBucketArray数组。
  3. taskpool.execute()执行任务。

    async insertRDB(): Promise<void> {
      await taskpool.execute(insert, this.context, this.valueBucketArray, STORE_CONFIG);
      return;
    }
    

    代码逻辑走读:

    1. 函数定义:定义了一个名为 insertRDB的异步函数,返回类型为 Promise<void>,表示该函数不会返回任何值。
    2. 等待任务执行:使用 await关键字调用 taskpool.execute方法,执行插入操作。insert是待执行的任务,this.contextthis.valueBucketArray是传递给任务的参数,STORE_CONFIG是存储配置信息。
    3. 返回结果:函数执行完毕后,返回 undefined,因为返回类型为 void

使用Sendable进一步提升性能

在上一章节中介绍了如何使用TaskPool进行读写,解决了密集型读写场景下任务分发的的问题,但是实际开发中还面临密集的数据传递问题,系统提供了@Sendable进行解决,本章将介绍如何在TaskPool基础上使用@Sendable。

实现原理

为了实现Sendable数据在不同并发实例间的引用传递,Sendable共享对象会分配在共享堆中,以实现跨并发实例的内存共享。

共享堆(SharedHeap)是进程级别的堆空间,与虚拟机本地堆(LocalHeap)不同的是,LocalHeap只能被单个并发实例访问,而SharedHeap可以被所有线程访问。一个Sendable共享对象的跨线程行为是引用传递。因此,Sendable可能被多个并发实例引用,判断Sendable共享对象是否存活,取决于所有并发实例的对象是否存在对此Sendable共享对象的引用,更多原理请见Sendable的实现原理

在这里插入图片描述

在密集型I/O处理场景中,文件读写会涉及大量数据的传输,而数据库读写则通常被封装成class进行传递,Sendable用引用代替拷贝,可以有效的降低序列化时间,从而提升性能,Sendable主要可以解决两个场景的问题:

  • 跨并发实例传输大数据(例如可能达到100KB以上的数据)。
  • 跨并发实例传递带方法的class实例对象。

文件读写大数据使用@Sendable传输

开发步骤

  1. 在使用@Sendable进行文件写入操作时,首先需要定义Sendable对象存放写入数据,然后封装TaskPool函数传入。

    @Sendable
    class Content {
      content: string;
    
      constructor(content: string) {
        this.content = content;
      }
    }
    

    代码逻辑走读:

    1. 定义了一个装饰器 @Sendable,用于标记 Content类可以被发送。
    2. 定义了一个类 Content,其中包含一个字符串类型的属性 content
    3. 类的构造函数接受一个字符串参数 content,并将其赋值给类的属性 this.content
    @Concurrent
    function writeFile(fd: number[], content: Content, times: number) {
      for (let i: number = 0; i < times; i++) {
        fileIo.write(fd[i], content.content);
      }
    }
    

    代码逻辑走读:

    1. 函数 writeFile被定义为并发函数,这意味着它可以在不等待其他操作完成的情况下执行多个操作。
    2. 函数接收三个参数:fd(文件描述符数组)、content(内容对象)和 times(写入次数)。
    3. 使用 for循环,循环次数为 times,用于遍历文件描述符数组 fd
    4. 在每次循环中,调用 fileIo.write方法,将 content.content写入当前循环的文件描述符 fd[i]
    5. 循环结束后,函数完成所有指定的写入操作。
  2. 接下来进行读取操作,封装TaskPool函数,需要使用collections.Array代替Array,collections.ArrayBuffer代替ArrayBuffer接收结果值,在使用Sendable时,需要注意该数据类型Sendable是否支持。

    @Concurrent
    function readFile(fd: number[], path: string, fileName: string,
      times: number): collections.Array<collections.Array<collections.ArrayBuffer>> {
      let result: collections.Array<collections.Array<collections.ArrayBuffer>> =
        new collections.Array<collections.Array<collections.ArrayBuffer>>();
      for (let i = 0; i < times; i++) {
        let buffSize: number = 4096;
        let state = fileIo.statSync(path + fileName + JSON.stringify(i) + CommonConstants.FILE_SUFFIX);
        if (state.size === 0) {
          return result;
        }
        let buffer: collections.ArrayBuffer = new collections.ArrayBuffer(Math.min(buffSize, state.size));
        let off: number = 0;
        let len: number = fileIo.readSync(fd[i], buffer as ArrayBuffer, { offset: off, length: buffSize });
        let readLen: number = 0;
        let bufferList: collections.Array<collections.ArrayBuffer> = new collections.Array<collections.ArrayBuffer>();
        while (len > 0) {
          readLen += len;
          bufferList.push(buffer);
          off = off + len;
          if ((state.size - readLen) < buffSize) {
            buffSize = state.size - readLen;
          }
          len = fileIo.readSync(fd[i], buffer as ArrayBuffer, { offset: off, length: buffSize });
        }
        result.push(bufferList);
      }
      return result;
    }
    

    代码逻辑走读:

    1. 初始化结果数组
      • 创建一个空的结果数组result,用于存储每次读取的缓冲区数组。
    2. 循环读取文件
      • 使用for循环,根据times参数执行多次读取操作。
      • 每次循环中,初始化缓冲区大小为4096字节,并获取文件状态信息。
      • 如果文件大小为0,直接返回当前结果数组并结束函数。
    3. 读取文件内容
      • 创建一个新的缓冲区buffer,大小为4096字节或文件剩余大小,取较小值。
      • 使用fileIo.readSync方法从文件中读取数据到缓冲区,读取长度为buffSize
      • 初始化读取长度readLen为0,用于累计已读取的字节数。
      • 创建一个空的缓冲区列表bufferList,用于存储每次读取的缓冲区。
    4. 处理读取数据
      • 使用while循环,只要读取长度len大于0,就继续读取。
      • 将读取的缓冲区添加到bufferList中,更新偏移量off和累计读取长度readLen
      • 如果文件剩余大小小于缓冲区大小,调整缓冲区大小为文件剩余大小。
      • 再次调用fileIo.readSync方法读取数据,更新len
    5. 存储结果
      • bufferList添加到结果数组result中。
    6. 返回结果
      • 循环结束后,返回结果数组result,包含所有读取的缓冲区。
  3. taskpool.execute()执行任务。

    async write(): Promise<void> {
      await taskpool.execute(writeFile, this.fd, this.content, this.times);
      // ...
      return;
    }
    
    async read(): Promise<number> {
      let value = await taskpool.execute(readFile, this.fd, this.path, this.fileName,
        this.times) as collections.Array<collections.Array<collections.ArrayBuffer>>;
      // ...
      return value.length;
    }
    

    代码逻辑走读:

    1. 异步函数定义:定义了两个异步函数 writeread,这意味着这两个函数可以并发执行,不会阻塞主线程。
    2. 文件写入操作
      • write函数首先调用 taskpool.execute方法,传入 writeFile函数、文件描述符 this.fd、内容 this.content和次数 this.times作为参数。
      • await关键字用于等待 taskpool.execute完成 writeFile操作后再继续执行后续代码。
    3. 文件读取操作
      • read函数调用 taskpool.execute方法,传入 readFile函数、文件描述符 this.fd、路径 this.path、文件名 this.fileName和次数 this.times作为参数。
      • await关键字用于等待 taskpool.execute完成 readFile操作后再继续执行后续代码。
      • 读取的数据存储在 value变量中,并通过 as collections.Array<collections.Array<collections.ArrayBuffer>>进行类型断言,确保数据类型正确。
    4. 返回值
      • write函数在操作完成后返回 void
      • read函数返回读取数据的长度 value.length

关系型数据库读写使用@Sendable

开发步骤

  1. 在关系型数据库写入操作时,同样需要封装写入数据使用@Sendable进行装饰,传入TaskPool函数中。

    @Sendable
    class SharedValuesBucket {
      NAME: string;
      AGE: number;
      SALARY: number;
    
      constructor(NAME: string, AGE: number, SALARY: number) {
        this.NAME = NAME;
        this.AGE = AGE;
        this.SALARY = SALARY;
      }
    }
    

    代码逻辑走读:

    1. 类定义:定义了一个名为 SharedValuesBucket的类,该类用于封装和管理共享值。
    2. 属性声明:在类中声明了三个属性:NAMEAGESALARY,分别用于存储字符串、数字和数字类型的值。
    3. 构造函数:定义了一个构造函数,该函数接收三个参数(分别对应于类的三个属性),并将这些参数的值赋给类的实例属性。
    @Concurrent
    async function insert(context: common.UIAbilityContext, valueBucket: Array<SharedValuesBucket>,
      config: relationalStore.StoreConfig) {
      const store = await relationalStore.getRdbStore(context, config);
      store.batchInsert('EMPLOYEE', valueBucket as object as Array<relationalStore.ValuesBucket>);
    }
    

    代码逻辑走读:

    1. 异步函数定义:定义了一个名为 insert的异步函数,该函数接收三个参数:context(应用上下文)、valueBucket(数据条目数组)和 config(数据库配置)。
    2. 获取数据库连接:使用 relationalStore.getRdbStore方法,通过传入应用上下文和数据库配置,异步获取数据库连接对象 store
    3. 批量插入数据:调用 store.batchInsert方法,将数据条目数组 valueBucket插入到名为 'EMPLOYEE'的表中。这里假设 valueBucket已经被正确格式化为数据库可接受的格式。
  2. 在读取时,关系型数据库模块提供了getSendableRow()可以直接获取当前行数据的Sendable形式。

    @Concurrent
    async function read(context: common.UIAbilityContext, config: relationalStore.StoreConfig) {
      const store = await relationalStore.getRdbStore(context, config);
      const predicates = new relationalStore.RdbPredicates('EMPLOYEE');
      const resultSet = store.querySync(predicates);
      let ValuesBucketArray: sendableRelationalStore.ValuesBucket[] = [];
      if (resultSet.rowCount === 0) {
        return ValuesBucketArray;
      }
      resultSet.goToFirstRow();
      do {
        const ValuesBucket = resultSet.getSendableRow();
        ValuesBucketArray.push(ValuesBucket);
      } while (resultSet.goToNextRow());
      resultSet.close();
      return ValuesBucketArray;
    }
    

    代码逻辑走读:

    1. 获取数据库连接
      • 使用 relationalStore.getRdbStore方法获取数据库连接,参数包括 contextconfig
    2. 执行查询操作
      • 创建 RdbPredicates对象,指定查询的表名为 'EMPLOYEE'
      • 调用 store.querySync方法执行同步查询,返回 resultSet
    3. 处理查询结果
      • 初始化一个空数组 ValuesBucketArray,用于存储查询结果。
      • 检查 resultSet.rowCount是否为 0,如果是,则直接返回空数组。
      • 调用 resultSet.goToFirstRow方法将结果集指针移动到第一行。
      • 使用do-while循环遍历结果集中的每一行:
        • 调用 resultSet.getSendableRow方法获取当前行的可发送数据。
        • 将获取到的数据添加到 ValuesBucketArray中。
        • 调用 resultSet.goToNextRow方法将结果集指针移动到下一行。
      • 循环结束后,调用 resultSet.close方法关闭结果集。
    4. 返回结果
      • 返回包含所有查询结果的 ValuesBucketArray
  3. taskpool.execute()执行任务。

    async insertRDB(): Promise<void> {
      await taskpool.execute(insert, this.context, this.valueBucketArray, STORE_CONFIG);
      return;
    }
    
    async readRDB(): Promise<number> {
      let value = await taskpool.execute(read, this.context, STORE_CONFIG) as sendableRelationalStore.ValuesBucket[];
      return value.length;
    }
    

    代码逻辑走读:

    1. async insertRDB(): Promise<void>定义了一个异步函数insertRDB,该函数没有返回值。
    2. await taskpool.execute(insert, this.context, this.valueBucketArray, STORE_CONFIG);使用taskpool执行插入操作,等待操作完成。
    3. return;函数执行完毕,返回undefined
    4. async readRDB(): Promise<number>定义了一个异步函数readRDB,该函数返回一个数字类型的值。
    5. let value = await taskpool.execute(read, this.context, STORE_CONFIG) as sendableRelationalStore.ValuesBucket[];使用taskpool执行读取操作,并将结果转换为sendableRelationalStore.ValuesBucket数组。
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐