小模型在昇腾NPU上的推理部署:【基于昇腾 NPU 的 OCR 业务系统性能优化实践】
作者:昇腾实战派
知识地图:https://blog.csdn.net/Lumos_Lovegood/article/details/161601003
背景概述
在将多模态OCR业务从GPU迁移至昇腾NPU的过程中,由于硬件架构的差异,原有推理链路在性能和资源管理方面出现了新的瓶颈。本文基于一套五层架构的文档解析系统,详细记录了在昇腾NPU环境下的性能调优过程,涵盖推理调用结构重组、并发模型重构以及底层内存分配策略调整。通过一系列优化,最终在单卡单进程场景下实现了显著的性能提升和资源优化。
系统架构概述
该文档解析系统采用流水线架构,核心功能是对PDF或图片进行版面分析,输出包含文字、表格、印章等元素的结构化JSON。系统自顶向下分为五层:
- 请求接入与缓存层:基于文件MD5进行缓存拦截,命中则直接返回结果。
- 网关调度层:负责任务队列初始化、模型调度与推理上下文准备。
- 版面解析与分流层:调用版面检测模型识别元素类型,随后按类别分流——文本类任务路由至PPOCR_V5 DET/REC(小模型),表格与印章类任务路由至GLM_OCR(大模型)。
- 模型执行层:基于昇腾NPU和AisBench框架,加载om格式模型执行推理。
- 结构化输出层:融合各模型输出结果,生成最终JSON响应。
系统整体架构如下图所示:

初始性能与资源问题
在747页PDF文档上(单卡单进程)完成迁移后首轮测试,性能对标数据如下:
- tool(小模型链路):GPU基线287.93s,NPU初始394.34s(慢37%)。
- vl_table_seal(大模型链路):GPU基线516.12s,NPU初始1225.00s(慢137%)。
同时,通过npu-smi监测发现,NPU显存占用达到8GB且随推理轮次持续增长,而GPU环境下显存稳定在2GB左右。
问题定位
小模型链路性能瓶颈
通过性能分析(profiling)发现,tool链路中OCR enrichment阶段耗时约占总耗时的87%。调用序列分析显示,每个版面元素均独立调用DET模型进行文本行检测,再调用REC模型进行识别。对于Text、Title、Image Caption、Inline Equation等小文本区域,检测阶段引入的额外开销占比过高,且识别阶段batch size恒为1,无法充分利用NPU算力。
大模型链路性能瓶颈
GLM_OCR的VL请求采用同步串行方式调用,每个表格或印章区域的识别耗时约2.5~3s。后端vLLM-Ascend引擎虽支持continuous batching,但因请求无法并发到达,该特性无法生效,总耗时线性累加。
显存异常增长
内存分配序列分析显示,显存占用持续增长的根本原因在于动态shape场景下的内存池复用效率问题。当各裁剪区域尺寸不一致时,模型算子的输入shape持续变化,导致其所需内存大小也随之变动。GE默认的动态内存池中,若已分配的空闲内存块不满足新的内存请求,内存池便会向系统重新申请内存,而原有内存块因无法被有效复用而形成碎片与空洞,造成显存实际占用远高于理论需求。
如下图所示,profiling数据清晰呈现了内存分配随推理迭代不断攀升的趋势:

优化方案
整体架构调整
模型层面,将PaddlePaddle格式模型依次转换为ONNX格式,再进一步转换为NPU亲和的om格式,以利用昇腾GE引擎的融合算子能力。
架构层面,将文档处理与版面解析层从串行改造为并行流水线,引入生产者-消费者队列实现请求接收、版面检测与模型识别环节的解耦。整个并行流水线分为四个层次:
Tier 1:API层(api_ocr.py)
API层通过asyncio.Queue实现请求的异步入队与消费。/submit_task接口将请求作为生产者放入全局队列后立即返回,后台由ThreadPoolExecutor管理的消费者线程池(_ocr_dispatcher_loop)持续从队列中拉取请求,转换为同步调用后提交至下游池服务。
# config.py:82-85
OCR_SUBMIT_ASYNC_ENABLED = os.getenv("OCR_SUBMIT_ASYNC_ENABLED", "1") == "1"
OCR_DISPATCHER_CONCURRENCY = int(os.getenv("OCR_DISPATCHER_CONCURRENCY", "0") or "0")
# api_ocr.py:76-80
_OCR_TASK_QUEUE: Optional[asyncio.Queue[dict[str, Any]]] = None
_OCR_DISPATCHER_EXECUTOR: Optional[ThreadPoolExecutor] = None
# api_ocr.py:548-560
def _start_ocr_dispatchers(concurrency: int) -> None:
_OCR_TASK_QUEUE = asyncio.Queue(maxsize=maxsize)
_OCR_DISPATCHER_EXECUTOR = ThreadPoolExecutor(
max_workers=max(1, int(concurrency)),
thread_name_prefix="ocr-dispatch",
)
for idx in range(max(1, int(concurrency))):
_OCR_DISPATCHER_TASKS.append(asyncio.create_task(_ocr_dispatcher_loop(idx)))
# api_ocr.py:475-534
async def _ocr_dispatcher_loop(dispatcher_index: int) -> None:
while True:
payload = await queue.get() # 消费者阻塞等待新请求
response = await loop.run_in_executor(
_OCR_DISPATCHER_EXECUTOR, # 在线程池中执行同步OCR
_run_queued_ocr_sync,
payload,
)
queue.task_done()
Tier 2:Pool Server层(tool/ocr_pool_server.py)
池服务端通过Unix Socket监听来自API层的请求。OcrPoolServer.submit()方法采用LRU策略从slots列表中获取空闲worker,将任务放入该worker的专属multiprocessing.Queue,然后阻塞等待worker返回结果。
# tool/ocr_pool_server.py:373-426
def submit(self, task: Dict[str, Any], timeout: float) -> Dict[str, Any]:
slot = self._acquire_slot(...) # 获取空闲worker(LRU策略)
slot.task_queue.put(payload) # 生产者:放入worker队列
result = waiter.get(timeout=...) # 阻塞等待结果
return result
# tool/ocr_pool_server.py:428-451
def _acquire_slot(self, ...):
idle = [slot for slot in self.slots if not slot.busy]
if config.SCHED_POLICY == "least_recently_used":
idle.sort(key=lambda slot: slot.last_used_at)
slot.busy = True
return slot
Tier 3:Worker消费者层(tool/ocr_instance_worker.py)
每个Worker是常驻进程,在instance_worker_loop()中持续从task_queue阻塞等待任务。任务到达后根据file_type(pdf/docx/image)分发至对应的pipeline执行,执行完成后将结果写入result_queue。
# tool/ocr_instance_worker.py:110-227
def instance_worker_loop(instance_id, task_queue, result_queue):
while True:
task = task_queue.get() # 消费者:阻塞等待任务
if task is None:
return # 退出信号
result = run_pipeline_task(task) # 执行pipeline
result_queue.put({"ok": True, "result": result})
# tool/ocr_instance_worker.py:69-107
def run_pipeline_task(task: Dict[str, Any]) -> Dict[str, Any]:
file_type = str(task.get("file_type"))
if file_type == "pdf":
return main_pipeline_pdf(file_path, ...)
if file_type == "docx":
return main_pipeline_docx(file_path, ...)
if file_type == "image":
return main_pipeline_image(file_path, ...)
Tier 4:Pipeline内部并行(tool/pipeline_pdf.py + tool/ocr_aisbench.py)
main_pipeline_from_raster_pages()采用五步串行流程,其中步骤4(ocr_enrich_sections())是核心并行点。该函数先通过_collect_ocr_items()按类别将元素分流至direct rec、det+rec和VL三条路径,随后启动VL线程异步调用vLLM-Ascend API,主线程同时执行AisBench小模型的batch推理,实现大模型与小模型的并行执行。
# tool/pipeline_pdf.py:126-207 (五步串行pipeline)
def main_pipeline_from_raster_pages(...):
stage("步骤 1/5: 版面检测") # detect_pdf_layout()
stage("步骤 2/5: 文本合并") # integrate_results()
stage("步骤 3/5: 章节树构建") # analyze_from_integrated_json()
stage("步骤 4/5: OCR/VL 富化") # ocr_enrich_sections() ← 核心并行点
stage("步骤 5/5: 章节整理") # refine_sections()
# tool/ocr_aisbench.py:1841-1895 (全局跨页VL + AisBench并行)
def ocr_enrich_sections(...):
# 遍历所有页面,收集VL items和AisBench items
for page_number in page_numbers:
direct_items, det_items, page_vl_items = _collect_ocr_items(...)
vl_items.extend(page_vl_items)
aisbench_batches.append((direct_items, det_items))
# 统一并行调度
_run_vl_items_with_aisbench_batches(vl_items, aisbench_batches, ...)
# tool/ocr_aisbench.py:1601-1746 (VL与AisBench并行执行)
def _run_vl_items_with_aisbench_batches(...):
if vl_items and vl_mode == "api":
vl_window_thread = threading.Thread(target=run_vl_api_window)
vl_window_thread.start() # VL线程启动,不阻塞
_run_aisbench_batches(aisbench_batches, device=device) # 主线程执行小模型
if vl_window_thread is not None:
vl_window_thread.join() # 等待VL完成
# VL失败项回退到AisBench
_run_aisbench_item_batches(fallback_direct_copy, fallback_det_copy, ...)
该四层流水线架构消除了请求接收、版面检测与模型识别各环节间的相互等待,实现了从请求入队到任务执行的全链路并行。
小模型OCR链路优化
Direct Rec快路径
策略:针对Text、Title、Image Caption、Inline Equation四类小文本区域,跳过DET检测步骤,直接将裁剪图送入REC模型识别。若返回空结果或置信度低于阈值,则回退至det+rec流程以保证准确性。该优化使约40%的区域跳过检测,基于747页PDF测试,总耗时下降明显。
代码实现:
# tool/ocr_aisbench.py:47-48 - 定义 direct rec 目标类别与尺寸阈值
DIRECT_REC_CATEGORIES = {"Text", "Title", "Image Caption", "Inline Equation"}
DIRECT_REC_MAX_HEIGHT = 32
# tool/ocr_aisbench.py:992-997 - 判断是否走快路径
def _should_direct_rec(category_name: str, crop: Image.Image) -> bool:
if category_name not in DIRECT_REC_CATEGORIES:
return False
if crop.width <= 0 or crop.height <= 0:
return False
return crop.height <= DIRECT_REC_MAX_HEIGHT
# tool/ocr_aisbench.py:1288-1299 - fallback 逻辑
if _should_direct_rec(category_name, crop):
text = _run_aisbench_rec_direct(crop, device=device).strip()
if not text:
text = _run_aisbench_on_crop(crop, device=device).strip()
else:
text = _run_aisbench_on_crop(crop, device=device).strip()
跨元素Rec Batch
策略:按页面收集所有OCR元素,统一组batch执行识别。执行顺序为:先执行所有direct rec,再执行det提取行crop,最后将所有crop合并为batch推理。单页crop数量限制为2000,防止内存溢出。该优化使rec batch调用次数从数万次降至千次级别,基于747页PDF测试,总耗时再次下降。
代码实现:
# tool/ocr_aisbench.py:1443-1504 - 核心批处理流程
def _run_aisbench_item_batches(direct_items, det_items, *, device):
model = _get_ocr_model(device) if (direct_items or det_items) else None
# 第一步:所有 direct rec 统一 batch 推理
if model is not None and direct_items:
direct_images = [np.asarray(crop...) for _, crop, _, _, _ in direct_items]
direct_results = _recognize_bucketed(model, direct_images)
for item, (text, score) in zip(direct_items, direct_results):
if text and score >= model.drop_score:
_set_ocr_text(element, text.strip(), ...)
else:
det_items.append(item) # 失败回退到 det
# 第二步:det 提取行级 crop
line_crops = []
for element, crop, ... in det_items:
crops, _ = model.extract_line_crops(image_bgr, crop.width, crop.height)
line_crops.extend(crops)
# 第三步:所有行级 crop 合并为一个 batch 推理
rec_results = _recognize_bucketed(model, line_crops)
# tool/ocr_aisbench.py:1507-1533 - 全局跨 chunk 合并
def _run_aisbench_batches(aisbench_batches, *, device):
if not config.OCR_AISBENCH_GLOBAL_REC_BATCH_ENABLED:
for direct_items, det_items in aisbench_batches:
_run_aisbench_item_batches(direct_items, det_items, device=device)
return
# 合并所有 chunk → 统一一次 batch 推理
merged_direct_items = []
merged_det_items = []
for direct_items, det_items in aisbench_batches:
merged_direct_items.extend(direct_items)
merged_det_items.extend(det_items)
_run_aisbench_item_batches(merged_direct_items, merged_det_items, device=device)
宽度分桶与缓存
策略:不同crop宽度差异大会增加填充开销。将crop缩放到统一高度(32px),按宽度排序分组,宽度相近的crop放入同一batch(桶间隔32px)。同时引入重复crop缓存,以crop图像MD5为key存储识别结果,避免重叠候选框的重复推理。rec阶段增加全局宽度限制OCR_AISBENCH_REC_MAX_WIDTH(默认3200像素),使用贪心算法将crop装入有限宽度的batch,控制单次推理显存峰值。Batch size调参测试了2、4、8三档,基于747页PDF测试,batch=8时效果最优。
代码实现:
# tool/ocr_aisbench.py:1006-1045 - 宽度分桶核心算法(贪心装箱)
def _make_rec_width_batches(items, model):
prepared = [(item, _estimate_rec_width(item[1], model)) for item in items]
prepared.sort(key=lambda pair: pair[1]) # 按宽度排序
max_batch_size = model.rec_batch_size # 默认8
max_width_sum = model.rec_batch_max_width_sum # 默认3200
while start < len(prepared):
chunk = []
while pos < len(prepared):
if len(chunk) + 1 > max_batch_size:
break
if candidate_width_sum > max_width_sum:
break
chunk.append(item)
batches.append(chunk)
start = pos
return batches
# tool/ocr_aisbench.py:1075-1098 - MD5 缓存(LRU淘汰,容量20000)
def _rec_cache_key(image_bgr: np.ndarray) -> str:
arr = np.ascontiguousarray(image_bgr)
digest = hashlib.blake2b(digest_size=16)
digest.update(str(arr.shape).encode("ascii"))
digest.update(str(arr.dtype).encode("ascii"))
digest.update(arr.tobytes())
return digest.hexdigest()
# tool/ocr_aisbench.py:1102-1146 - 带缓存的分桶推理
def _recognize_bucketed(model, images_bgr):
# 1. 查缓存
misses = []
for idx, image_bgr in enumerate(images_bgr):
key = _rec_cache_key(image_bgr)
cached = _rec_cache_get(key)
if cached is not None:
results[idx] = cached # 缓存命中
else:
misses.append((idx, image_bgr, key))
# 2. 未命中项走宽度分桶 → batch 推理
batches = _make_rec_width_batches(misses, model)
for chunk, _ in batches:
chunk_results = model.recognize_batch([image for _, image, _ in chunk])
for (original_index, _, key), rec_result in zip(chunk, chunk_results):
results[original_index] = rec_result
_rec_cache_put(key, rec_result) # 写入缓存
return results
# tool/ocr_aisbench.py:694-710 - batch 相关配置
self.rec_batch_size = _as_int_env("OCR_AISBENCH_REC_BATCH_SIZE", 8)
self.rec_batch_max_width_sum = _as_int_env("OCR_AISBENCH_REC_BATCH_MAX_WIDTH_SUM", 3200)
self.rec_max_w = _as_int_env("OCR_AISBENCH_REC_MAX_WIDTH", 3200)
大模型VL并发优化
GLM_OCR以API服务形式部署,后端使用vLLM-Ascend引擎,支持continuous batching。原同步串行调用方式无法发挥该特性。优化方案如下:
- 新增配置项
VL_OCR_API_CONCURRENCY,控制每个worker内并发提交VL请求的线程数。 - 遍历所有页面,收集VL items(表格和印章区域)。
- 使用
ThreadPoolExecutor异步提交所有VL请求。 - 主线程不等待VL结果,继续执行AisBench OCR小模型任务。
- 小模型完成后,阻塞等待VL结果。
当parse_model != tool时,所有页面的VL items统一进入全局并发队列,允许vLLM-Ascend跨页面进行continuous batching。若VL请求失败或返回空,自动fallback至AisBench备用模型。
代码实现:
# config.py:288-294 - 并发配置
VL_OCR_API_CONCURRENCY = max(1, min(int(os.getenv("VL_OCR_API_CONCURRENCY", "32")), 256))
# tool/ocr_aisbench.py:1849-1876 - 全局跨页收集 VL items
vl_items: List[OCRItem] = []
aisbench_batches: List[Tuple[List[OCRItem], List[OCRItem]]] = []
for page_number in page_numbers:
direct_items, det_items, page_vl_items = _collect_ocr_items(...)
vl_items.extend(page_vl_items) # 所有页面的 VL items 合并
aisbench_batches.append((direct_items, det_items))
# tool/ocr_aisbench.py:1631-1692 - VL 滑动窗口并发调度
def run_vl_api_window() -> None:
max_workers = min(vl_api_concurrency, len(vl_items))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(run_vl_slot, idx) for idx in range(max_workers)]
for future in futures:
future.result()
# tool/ocr_aisbench.py:1621-1629 - Fallback 机制
def queue_fallback(item: OCRItem) -> None:
with fallback_lock:
_queue_aisbench_item(item, fallback_direct_items, fallback_det_items)
进程池与Supervisor稳定性修复
高并发压测中暴露出worker进程残留、socket绑定冲突、supervisor状态卡死等问题。修复措施包括:
- 调整启动顺序:先检查并绑定socket,成功后再创建worker。
- 优化重启流程:pause → shutdown(wait=True)→ settle → unpause → 确认ready。
- 超时清理:pool启动超时后强制终止进程组并清理socket文件。
- 状态查询不自动拉起dead pool,避免健康检查引发连锁重启。
代码实现:
# tool/ocr_pool_server.py:554-591 - 先 bind socket,再创建 worker
def serve_forever(socket_path: str) -> None:
if sock_file.exists():
try:
with socket.socket(...) as probe:
probe.connect(socket_path)
raise RuntimeError("socket already in use")
except OSError:
os.unlink(socket_path) # 清理残留socket
sock.bind(socket_path) # 先bind
sock.listen(listen_backlog)
pool = OcrPoolServer()
pool.start() # 绑定成功后才创建worker
# tool/ocr_scheduler.py:545-583 - 重启流程
def pool_restart() -> Dict[str, Any]:
set_supervisor_paused(True) # 1. pause
pool_shutdown(pause_supervisor=False) # 2. shutdown (wait=True)
time.sleep(OCR_POOL_RESTART_SETTLE_SECONDS) # 3. settle
set_supervisor_paused(False) # 4. unpause
ensure_pool_ready(...) # 5. 确认ready
# tool/ocr_pool_supervisor.py:282-307 - 超时强制终止
deadline = time.time() + OCR_POOL_STARTUP_TIMEOUT_SECONDS
while time.time() < deadline:
if _pool_status_ready(data):
return
os.killpg(proc.pid, signal.SIGTERM) # 超时强杀进程组
# tool/ocr_scheduler.py:190-204 - 状态查询不自动拉起
def status(self) -> Dict[str, Any]:
resp = _pool_request({"type": "status"}, ensure_running=False) # 不拉起
return resp.get("data") or {}
显存优化
针对前述显存增长问题,通过设置环境变量GE_USE_STATIC_MEMORY=3对GE引擎的内存分配策略进行调整。该参数启用了静态分配池机制:GE引擎预先缓存一块较大的虚拟地址段,所有动态shape分配均在该地址段内通过偏移量(offset)进行管理,使内存拼接复用更加灵活,有效消除了碎片与空洞,大幅减少了向系统反复申请内存的次数。配置后,显存占用从8GB降至2GB,且推理时延的性能损失控制在2%以内。
总结
通过上述优化,在747页PDF文档上(单卡单进程)优化效果明显。
关键优化手段汇总:
| 优化类别 | 具体方法 | 适用场景 |
|---|---|---|
| 模型格式适配 | Paddle → ONNX → om,启用GE融合算子 | 从其他框架迁移至昇腾NPU |
| 流水线架构 | 生产者-消费者队列,解耦请求与推理环节 | 多阶段串行链路,存在相互等待 |
| 小模型调用结构 | Direct Rec快路径 + 跨元素Batch + 宽度分桶 + 缓存 | 版面元素种类固定,存在大量规则小文本 |
| 大模型并发 | API线程池并发 + 主任务重叠执行 + 全局队列 | 后端支持continuous batching(vLLM-Ascend、TGI) |
| 显存管理 | GE_USE_STATIC_MEMORY=3 | 动态shape模型,NPU显存只增不减 |
| 进程生命周期 | 先绑定socket再创建worker;重启流程规范化 | 高并发服务,需避免僵尸进程与端口冲突 |
可参考的优化方向:
- 模型格式转换与算子融合:将PyTorch、PaddlePaddle模型转换为ONNX后,利用ATC工具转换为om格式,可充分利用GE引擎的融合算子能力。转换时需关注动态shape场景下的精度校准。
- 检测-识别流水线解耦:对于布局规则的文档,可根据版面元素类别决定是否跳过检测阶段,结合置信度阈值设置回退策略,兼顾速度与精度。
- 跨页面动态批处理:不局限于单页内组batch,可在全局收集一定数量(或总像素宽度)的crop后再推理。使用贪心算法限制单batch总宽度,可有效控制显存峰值。
- 异步并发与流水线重叠:将大模型请求异步化,主线程继续执行小模型任务,实现任务级流水线重叠,可显著降低端到端时延。
- 静态内存分配策略:NPU上显存随时间增长或碎片化严重时,可尝试
GE_USE_STATIC_MEMORY=3,或查找对应版本的static_memory_policy参数。 - 进程池生命周期管理:supervisor模式服务应遵循:启动时资源检查→绑定监听→创建工作进程;重启时暂停接收→等待完成→清理残留→恢复;监控接口不自动拉起。
- 缓存粒度选择:重复crop缓存的key可选用图像下采样后的哈希值替代完整MD5,以降低计算开销,同时需控制缓存容量上限。
更多推荐



所有评论(0)