作者​:昇腾实战派
知识地图​:https://blog.csdn.net/Lumos_Lovegood/article/details/161601003

背景概述

在将多模态OCR业务从GPU迁移至昇腾NPU的过程中,由于硬件架构的差异,原有推理链路在性能和资源管理方面出现了新的瓶颈。本文基于一套五层架构的文档解析系统,详细记录了在昇腾NPU环境下的性能调优过程,涵盖推理调用结构重组、并发模型重构以及底层内存分配策略调整。通过一系列优化,最终在单卡单进程场景下实现了显著的性能提升和资源优化。

系统架构概述

该文档解析系统采用流水线架构,核心功能是对PDF或图片进行版面分析,输出包含文字、表格、印章等元素的结构化JSON。系统自顶向下分为五层:

  1. 请求接入与缓存层:基于文件MD5进行缓存拦截,命中则直接返回结果。
  2. 网关调度层:负责任务队列初始化、模型调度与推理上下文准备。
  3. 版面解析与分流层:调用版面检测模型识别元素类型,随后按类别分流——文本类任务路由至PPOCR_V5 DET/REC(小模型),表格与印章类任务路由至GLM_OCR(大模型)。
  4. 模型执行层:基于昇腾NPU和AisBench框架,加载om格式模型执行推理。
  5. 结构化输出层:融合各模型输出结果,生成最终JSON响应。

系统整体架构如下图所示:

在这里插入图片描述

初始性能与资源问题

在747页PDF文档上(单卡单进程)完成迁移后首轮测试,性能对标数据如下:

  • tool(小模型链路):GPU基线287.93s,NPU初始394.34s(慢37%)。
  • vl_table_seal(大模型链路):GPU基线516.12s,NPU初始1225.00s(慢137%)。

同时,通过npu-smi监测发现,NPU显存占用达到8GB且随推理轮次持续增长,而GPU环境下显存稳定在2GB左右。

问题定位

小模型链路性能瓶颈

通过性能分析(profiling)发现,tool链路中OCR enrichment阶段耗时约占总耗时的87%。调用序列分析显示,每个版面元素均独立调用DET模型进行文本行检测,再调用REC模型进行识别。对于Text、Title、Image Caption、Inline Equation等小文本区域,检测阶段引入的额外开销占比过高,且识别阶段batch size恒为1,无法充分利用NPU算力。

大模型链路性能瓶颈

GLM_OCR的VL请求采用同步串行方式调用,每个表格或印章区域的识别耗时约2.5~3s。后端vLLM-Ascend引擎虽支持continuous batching,但因请求无法并发到达,该特性无法生效,总耗时线性累加。

显存异常增长

内存分配序列分析显示,显存占用持续增长的根本原因在于动态shape场景下的内存池复用效率问题。当各裁剪区域尺寸不一致时,模型算子的输入shape持续变化,导致其所需内存大小也随之变动。GE默认的动态内存池中,若已分配的空闲内存块不满足新的内存请求,内存池便会向系统重新申请内存,而原有内存块因无法被有效复用而形成碎片与空洞,造成显存实际占用远高于理论需求。

如下图所示,profiling数据清晰呈现了内存分配随推理迭代不断攀升的趋势:
在这里插入图片描述

在这里插入图片描述

优化方案

整体架构调整

模型层面,将PaddlePaddle格式模型依次转换为ONNX格式,再进一步转换为NPU亲和的om格式,以利用昇腾GE引擎的融合算子能力。

架构层面,将文档处理与版面解析层从串行改造为并行流水线,引入生产者-消费者队列实现请求接收、版面检测与模型识别环节的解耦。整个并行流水线分为四个层次:

Tier 1:API层(api_ocr.py

API层通过asyncio.Queue实现请求的异步入队与消费。/submit_task接口将请求作为生产者放入全局队列后立即返回,后台由ThreadPoolExecutor管理的消费者线程池(_ocr_dispatcher_loop)持续从队列中拉取请求,转换为同步调用后提交至下游池服务。

# config.py:82-85
OCR_SUBMIT_ASYNC_ENABLED = os.getenv("OCR_SUBMIT_ASYNC_ENABLED", "1") == "1"
OCR_DISPATCHER_CONCURRENCY = int(os.getenv("OCR_DISPATCHER_CONCURRENCY", "0") or "0")

# api_ocr.py:76-80
_OCR_TASK_QUEUE: Optional[asyncio.Queue[dict[str, Any]]] = None
_OCR_DISPATCHER_EXECUTOR: Optional[ThreadPoolExecutor] = None

# api_ocr.py:548-560
def _start_ocr_dispatchers(concurrency: int) -> None:
    _OCR_TASK_QUEUE = asyncio.Queue(maxsize=maxsize)
    _OCR_DISPATCHER_EXECUTOR = ThreadPoolExecutor(
        max_workers=max(1, int(concurrency)),
        thread_name_prefix="ocr-dispatch",
    )
    for idx in range(max(1, int(concurrency))):
        _OCR_DISPATCHER_TASKS.append(asyncio.create_task(_ocr_dispatcher_loop(idx)))

# api_ocr.py:475-534
async def _ocr_dispatcher_loop(dispatcher_index: int) -> None:
    while True:
        payload = await queue.get()          # 消费者阻塞等待新请求
        response = await loop.run_in_executor(
            _OCR_DISPATCHER_EXECUTOR,         # 在线程池中执行同步OCR
            _run_queued_ocr_sync,
            payload,
        )
        queue.task_done()

Tier 2:Pool Server层(tool/ocr_pool_server.py

池服务端通过Unix Socket监听来自API层的请求。OcrPoolServer.submit()方法采用LRU策略从slots列表中获取空闲worker,将任务放入该worker的专属multiprocessing.Queue,然后阻塞等待worker返回结果。

# tool/ocr_pool_server.py:373-426
def submit(self, task: Dict[str, Any], timeout: float) -> Dict[str, Any]:
    slot = self._acquire_slot(...)           # 获取空闲worker(LRU策略)
    slot.task_queue.put(payload)             # 生产者:放入worker队列
    result = waiter.get(timeout=...)         # 阻塞等待结果
    return result

# tool/ocr_pool_server.py:428-451
def _acquire_slot(self, ...):
    idle = [slot for slot in self.slots if not slot.busy]
    if config.SCHED_POLICY == "least_recently_used":
        idle.sort(key=lambda slot: slot.last_used_at)
    slot.busy = True
    return slot

Tier 3:Worker消费者层(tool/ocr_instance_worker.py

每个Worker是常驻进程,在instance_worker_loop()中持续从task_queue阻塞等待任务。任务到达后根据file_type(pdf/docx/image)分发至对应的pipeline执行,执行完成后将结果写入result_queue

# tool/ocr_instance_worker.py:110-227
def instance_worker_loop(instance_id, task_queue, result_queue):
    while True:
        task = task_queue.get()              # 消费者:阻塞等待任务
        if task is None:
            return                           # 退出信号
        result = run_pipeline_task(task)     # 执行pipeline
        result_queue.put({"ok": True, "result": result})

# tool/ocr_instance_worker.py:69-107
def run_pipeline_task(task: Dict[str, Any]) -> Dict[str, Any]:
    file_type = str(task.get("file_type"))
    if file_type == "pdf":
        return main_pipeline_pdf(file_path, ...)
    if file_type == "docx":
        return main_pipeline_docx(file_path, ...)
    if file_type == "image":
        return main_pipeline_image(file_path, ...)

Tier 4:Pipeline内部并行(tool/pipeline_pdf.py + tool/ocr_aisbench.py

main_pipeline_from_raster_pages()采用五步串行流程,其中步骤4(ocr_enrich_sections())是核心并行点。该函数先通过_collect_ocr_items()按类别将元素分流至direct rec、det+rec和VL三条路径,随后启动VL线程异步调用vLLM-Ascend API,主线程同时执行AisBench小模型的batch推理,实现大模型与小模型的并行执行。

# tool/pipeline_pdf.py:126-207 (五步串行pipeline)
def main_pipeline_from_raster_pages(...):
    stage("步骤 1/5: 版面检测")       # detect_pdf_layout()
    stage("步骤 2/5: 文本合并")       # integrate_results()
    stage("步骤 3/5: 章节树构建")     # analyze_from_integrated_json()
    stage("步骤 4/5: OCR/VL 富化")    # ocr_enrich_sections() ← 核心并行点
    stage("步骤 5/5: 章节整理")       # refine_sections()

# tool/ocr_aisbench.py:1841-1895 (全局跨页VL + AisBench并行)
def ocr_enrich_sections(...):
    # 遍历所有页面,收集VL items和AisBench items
    for page_number in page_numbers:
        direct_items, det_items, page_vl_items = _collect_ocr_items(...)
        vl_items.extend(page_vl_items)
        aisbench_batches.append((direct_items, det_items))

# 统一并行调度
    _run_vl_items_with_aisbench_batches(vl_items, aisbench_batches, ...)

# tool/ocr_aisbench.py:1601-1746 (VL与AisBench并行执行)
def _run_vl_items_with_aisbench_batches(...):
    if vl_items and vl_mode == "api":
        vl_window_thread = threading.Thread(target=run_vl_api_window)
        vl_window_thread.start()           # VL线程启动,不阻塞

_run_aisbench_batches(aisbench_batches, device=device)  # 主线程执行小模型

if vl_window_thread is not None:
        vl_window_thread.join()            # 等待VL完成

# VL失败项回退到AisBench
    _run_aisbench_item_batches(fallback_direct_copy, fallback_det_copy, ...)

该四层流水线架构消除了请求接收、版面检测与模型识别各环节间的相互等待,实现了从请求入队到任务执行的全链路并行。

小模型OCR链路优化

Direct Rec快路径

策略:针对Text、Title、Image Caption、Inline Equation四类小文本区域,跳过DET检测步骤,直接将裁剪图送入REC模型识别。若返回空结果或置信度低于阈值,则回退至det+rec流程以保证准确性。该优化使约40%的区域跳过检测,基于747页PDF测试,总耗时下降明显。

代码实现

# tool/ocr_aisbench.py:47-48 - 定义 direct rec 目标类别与尺寸阈值
DIRECT_REC_CATEGORIES = {"Text", "Title", "Image Caption", "Inline Equation"}
DIRECT_REC_MAX_HEIGHT = 32

# tool/ocr_aisbench.py:992-997 - 判断是否走快路径
def _should_direct_rec(category_name: str, crop: Image.Image) -> bool:
    if category_name not in DIRECT_REC_CATEGORIES:
        return False
    if crop.width <= 0 or crop.height <= 0:
        return False
    return crop.height <= DIRECT_REC_MAX_HEIGHT

# tool/ocr_aisbench.py:1288-1299 - fallback 逻辑
if _should_direct_rec(category_name, crop):
    text = _run_aisbench_rec_direct(crop, device=device).strip()
    if not text:
        text = _run_aisbench_on_crop(crop, device=device).strip()
else:
    text = _run_aisbench_on_crop(crop, device=device).strip()

跨元素Rec Batch

策略:按页面收集所有OCR元素,统一组batch执行识别。执行顺序为:先执行所有direct rec,再执行det提取行crop,最后将所有crop合并为batch推理。单页crop数量限制为2000,防止内存溢出。该优化使rec batch调用次数从数万次降至千次级别,基于747页PDF测试,总耗时再次下降。

代码实现

# tool/ocr_aisbench.py:1443-1504 - 核心批处理流程
def _run_aisbench_item_batches(direct_items, det_items, *, device):
    model = _get_ocr_model(device) if (direct_items or det_items) else None

# 第一步:所有 direct rec 统一 batch 推理
    if model is not None and direct_items:
        direct_images = [np.asarray(crop...) for _, crop, _, _, _ in direct_items]
        direct_results = _recognize_bucketed(model, direct_images)
        for item, (text, score) in zip(direct_items, direct_results):
            if text and score >= model.drop_score:
                _set_ocr_text(element, text.strip(), ...)
            else:
                det_items.append(item)  # 失败回退到 det

# 第二步:det 提取行级 crop
    line_crops = []
    for element, crop, ... in det_items:
        crops, _ = model.extract_line_crops(image_bgr, crop.width, crop.height)
        line_crops.extend(crops)

# 第三步:所有行级 crop 合并为一个 batch 推理
    rec_results = _recognize_bucketed(model, line_crops)

# tool/ocr_aisbench.py:1507-1533 - 全局跨 chunk 合并
def _run_aisbench_batches(aisbench_batches, *, device):
    if not config.OCR_AISBENCH_GLOBAL_REC_BATCH_ENABLED:
        for direct_items, det_items in aisbench_batches:
            _run_aisbench_item_batches(direct_items, det_items, device=device)
        return
    # 合并所有 chunk → 统一一次 batch 推理
    merged_direct_items = []
    merged_det_items = []
    for direct_items, det_items in aisbench_batches:
        merged_direct_items.extend(direct_items)
        merged_det_items.extend(det_items)
    _run_aisbench_item_batches(merged_direct_items, merged_det_items, device=device)

宽度分桶与缓存

策略:不同crop宽度差异大会增加填充开销。将crop缩放到统一高度(32px),按宽度排序分组,宽度相近的crop放入同一batch(桶间隔32px)。同时引入重复crop缓存,以crop图像MD5为key存储识别结果,避免重叠候选框的重复推理。rec阶段增加全局宽度限制OCR_AISBENCH_REC_MAX_WIDTH(默认3200像素),使用贪心算法将crop装入有限宽度的batch,控制单次推理显存峰值。Batch size调参测试了2、4、8三档,基于747页PDF测试,batch=8时效果最优。

代码实现

# tool/ocr_aisbench.py:1006-1045 - 宽度分桶核心算法(贪心装箱)
def _make_rec_width_batches(items, model):
    prepared = [(item, _estimate_rec_width(item[1], model)) for item in items]
    prepared.sort(key=lambda pair: pair[1])          # 按宽度排序
    max_batch_size = model.rec_batch_size            # 默认8
    max_width_sum = model.rec_batch_max_width_sum    # 默认3200
    while start < len(prepared):
        chunk = []
        while pos < len(prepared):
            if len(chunk) + 1 > max_batch_size:
                break
            if candidate_width_sum > max_width_sum:
                break
            chunk.append(item)
        batches.append(chunk)
        start = pos
    return batches

# tool/ocr_aisbench.py:1075-1098 - MD5 缓存(LRU淘汰,容量20000)
def _rec_cache_key(image_bgr: np.ndarray) -> str:
    arr = np.ascontiguousarray(image_bgr)
    digest = hashlib.blake2b(digest_size=16)
    digest.update(str(arr.shape).encode("ascii"))
    digest.update(str(arr.dtype).encode("ascii"))
    digest.update(arr.tobytes())
    return digest.hexdigest()

# tool/ocr_aisbench.py:1102-1146 - 带缓存的分桶推理
def _recognize_bucketed(model, images_bgr):
    # 1. 查缓存
    misses = []
    for idx, image_bgr in enumerate(images_bgr):
        key = _rec_cache_key(image_bgr)
        cached = _rec_cache_get(key)
        if cached is not None:
            results[idx] = cached       # 缓存命中
        else:
            misses.append((idx, image_bgr, key))

# 2. 未命中项走宽度分桶 → batch 推理
    batches = _make_rec_width_batches(misses, model)
    for chunk, _ in batches:
        chunk_results = model.recognize_batch([image for _, image, _ in chunk])
        for (original_index, _, key), rec_result in zip(chunk, chunk_results):
            results[original_index] = rec_result
            _rec_cache_put(key, rec_result)   # 写入缓存
    return results

# tool/ocr_aisbench.py:694-710 - batch 相关配置
self.rec_batch_size = _as_int_env("OCR_AISBENCH_REC_BATCH_SIZE", 8)
self.rec_batch_max_width_sum = _as_int_env("OCR_AISBENCH_REC_BATCH_MAX_WIDTH_SUM", 3200)
self.rec_max_w = _as_int_env("OCR_AISBENCH_REC_MAX_WIDTH", 3200)

大模型VL并发优化

GLM_OCR以API服务形式部署,后端使用vLLM-Ascend引擎,支持continuous batching。原同步串行调用方式无法发挥该特性。优化方案如下:

  1. 新增配置项VL_OCR_API_CONCURRENCY,控制每个worker内并发提交VL请求的线程数。
  2. 遍历所有页面,收集VL items(表格和印章区域)。
  3. 使用ThreadPoolExecutor异步提交所有VL请求。
  4. 主线程不等待VL结果,继续执行AisBench OCR小模型任务。
  5. 小模型完成后,阻塞等待VL结果。

parse_model != tool时,所有页面的VL items统一进入全局并发队列,允许vLLM-Ascend跨页面进行continuous batching。若VL请求失败或返回空,自动fallback至AisBench备用模型。

代码实现

# config.py:288-294 - 并发配置
VL_OCR_API_CONCURRENCY = max(1, min(int(os.getenv("VL_OCR_API_CONCURRENCY", "32")), 256))

# tool/ocr_aisbench.py:1849-1876 - 全局跨页收集 VL items
vl_items: List[OCRItem] = []
aisbench_batches: List[Tuple[List[OCRItem], List[OCRItem]]] = []
for page_number in page_numbers:
    direct_items, det_items, page_vl_items = _collect_ocr_items(...)
    vl_items.extend(page_vl_items)           # 所有页面的 VL items 合并
    aisbench_batches.append((direct_items, det_items))

# tool/ocr_aisbench.py:1631-1692 - VL 滑动窗口并发调度
def run_vl_api_window() -> None:
    max_workers = min(vl_api_concurrency, len(vl_items))
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(run_vl_slot, idx) for idx in range(max_workers)]
        for future in futures:
            future.result()

# tool/ocr_aisbench.py:1621-1629 - Fallback 机制
def queue_fallback(item: OCRItem) -> None:
    with fallback_lock:
        _queue_aisbench_item(item, fallback_direct_items, fallback_det_items)

进程池与Supervisor稳定性修复

高并发压测中暴露出worker进程残留、socket绑定冲突、supervisor状态卡死等问题。修复措施包括:

  • 调整启动顺序:先检查并绑定socket,成功后再创建worker。
  • 优化重启流程:pause → shutdown(wait=True)→ settle → unpause → 确认ready。
  • 超时清理:pool启动超时后强制终止进程组并清理socket文件。
  • 状态查询不自动拉起dead pool,避免健康检查引发连锁重启。

代码实现

# tool/ocr_pool_server.py:554-591 - 先 bind socket,再创建 worker
def serve_forever(socket_path: str) -> None:
    if sock_file.exists():
        try:
            with socket.socket(...) as probe:
                probe.connect(socket_path)
            raise RuntimeError("socket already in use")
        except OSError:
            os.unlink(socket_path)          # 清理残留socket
    sock.bind(socket_path)                  # 先bind
    sock.listen(listen_backlog)
    pool = OcrPoolServer()
    pool.start()                            # 绑定成功后才创建worker

# tool/ocr_scheduler.py:545-583 - 重启流程
def pool_restart() -> Dict[str, Any]:
    set_supervisor_paused(True)             # 1. pause
    pool_shutdown(pause_supervisor=False)   # 2. shutdown (wait=True)
    time.sleep(OCR_POOL_RESTART_SETTLE_SECONDS)  # 3. settle
    set_supervisor_paused(False)            # 4. unpause
    ensure_pool_ready(...)                  # 5. 确认ready

# tool/ocr_pool_supervisor.py:282-307 - 超时强制终止
deadline = time.time() + OCR_POOL_STARTUP_TIMEOUT_SECONDS
while time.time() < deadline:
    if _pool_status_ready(data):
        return
os.killpg(proc.pid, signal.SIGTERM)         # 超时强杀进程组

# tool/ocr_scheduler.py:190-204 - 状态查询不自动拉起
def status(self) -> Dict[str, Any]:
    resp = _pool_request({"type": "status"}, ensure_running=False)  # 不拉起
    return resp.get("data") or {}

显存优化

针对前述显存增长问题,通过设置环境变量GE_USE_STATIC_MEMORY=3对GE引擎的内存分配策略进行调整。该参数启用了静态分配池机制:GE引擎预先缓存一块较大的虚拟地址段,所有动态shape分配均在该地址段内通过偏移量(offset)进行管理,使内存拼接复用更加灵活,有效消除了碎片与空洞,大幅减少了向系统反复申请内存的次数。配置后,显存占用从8GB降至2GB,且推理时延的性能损失控制在2%以内。

总结

通过上述优化,在747页PDF文档上(单卡单进程)优化效果明显。
关键优化手段汇总:

优化类别 具体方法 适用场景
模型格式适配 Paddle → ONNX → om,启用GE融合算子 从其他框架迁移至昇腾NPU
流水线架构 生产者-消费者队列,解耦请求与推理环节 多阶段串行链路,存在相互等待
小模型调用结构 Direct Rec快路径 + 跨元素Batch + 宽度分桶 + 缓存 版面元素种类固定,存在大量规则小文本
大模型并发 API线程池并发 + 主任务重叠执行 + 全局队列 后端支持continuous batching(vLLM-Ascend、TGI)
显存管理 GE_USE_STATIC_MEMORY=3 动态shape模型,NPU显存只增不减
进程生命周期 先绑定socket再创建worker;重启流程规范化 高并发服务,需避免僵尸进程与端口冲突

可参考的优化方向:

  1. 模型格式转换与算子融合:将PyTorch、PaddlePaddle模型转换为ONNX后,利用ATC工具转换为om格式,可充分利用GE引擎的融合算子能力。转换时需关注动态shape场景下的精度校准。
  2. 检测-识别流水线解耦:对于布局规则的文档,可根据版面元素类别决定是否跳过检测阶段,结合置信度阈值设置回退策略,兼顾速度与精度。
  3. 跨页面动态批处理:不局限于单页内组batch,可在全局收集一定数量(或总像素宽度)的crop后再推理。使用贪心算法限制单batch总宽度,可有效控制显存峰值。
  4. 异步并发与流水线重叠:将大模型请求异步化,主线程继续执行小模型任务,实现任务级流水线重叠,可显著降低端到端时延。
  5. 静态内存分配策略:NPU上显存随时间增长或碎片化严重时,可尝试GE_USE_STATIC_MEMORY=3,或查找对应版本的static_memory_policy参数。
  6. 进程池生命周期管理:supervisor模式服务应遵循:启动时资源检查→绑定监听→创建工作进程;重启时暂停接收→等待完成→清理残留→恢复;监控接口不自动拉起。
  7. 缓存粒度选择:重复crop缓存的key可选用图像下采样后的哈希值替代完整MD5,以降低计算开销,同时需控制缓存容量上限。
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐