在昇腾NPU上写生产级代码这几年,踩过的坑大概能凑一篇避坑指南了。有些是自己写的bug,有些是对平台特性理解不到位导致的,还有一些是昇腾Runtime和CUDA的表面相似性带来的认知陷阱——看起来差不多,实际上差了十万八千里。

写这篇的目的不是批判昇腾平台——昇腾NPU本身的性能表现是值得认可的——而是把真实遇到的问题整理出来,让后来者少走弯路。内容基于实际开发经验,部分场景涉及CANN版本的演进,描述可能因具体版本而有所差异。


一、显存管理的常见错误

1.1 分配了没释放,显存泄漏到崩溃

这是最常见的错误,而且昇腾Runtime的显存泄漏比CUDA更隐蔽。CUDA有cuda-memcheck这样的工具可以检测显存泄漏,昇腾这边的工具链还没那么成熟,显存泄漏通常表现为"跑着跑着越来越慢,最后直接OOM"。

典型场景:Python的异常处理分支里忘记调用acl.rt.free。代码在正常路径下有资源释放,但某个if分支里直接raise了,释放代码没有走到。

import acl
import numpy as np

acl.init()
device_id = 0
acl.rt.set_device(device_id)

# 分配显存
data_size = 1024 * 1024  # 1MB
ptr, ret = acl.rt.malloc(data_size, acl.GDDR_MEM)

host_data = np.random.randn(1024).astype(np.float32)
acl.rt.memcpy(ptr, data_size, host_data.ctypes.data, data_size, acl.HOST_TO_DEVICE)

# ⚠️ 典型错误:只在正常路径里释放,异常分支漏掉了
try:
    if host_data.sum() < 0:
        raise ValueError("Negative sum detected")
    # ... 正常使用ptr
    print("Normal path, data processed")
except ValueError as e:
    # 这里直接raise了,ptr的释放代码永远不会执行
    print(f"Error: {e}")
    raise  # ← 坑在这里:raise之前应该先释放ptr

# 如果没有异常,正常路径里释放
acl.rt.free(ptr)  # ← 这行只有在try没抛异常时才会执行
acl.rt.reset_device(device_id)
acl.finalize()

正确的做法是用context manager或者finally确保释放一定会执行:

# 正确的资源管理模式:用Python的try-finally确保释放
ptr = None
try:
    ptr, _ = acl.rt.malloc(data_size, acl.GDDR_MEM)
    acl.rt.memcpy(ptr, data_size, host_data.ctypes.data, data_size, acl.HOST_TO_DEVICE)
    
    if host_data.sum() < 0:
        raise ValueError("Negative sum detected")
    
    # 正常使用ptr
finally:
    # 无论是否抛异常,finally块都会执行
    if ptr is not None:
        acl.rt.free(ptr)

注释解释WHY:昇腾的Device Memory没有Python那样的GC机制——Python的解释器退出进程时只会释放Host内存,C层的设备内存必须手动释放。如果在某个异常分支里提前raise,后续的资源释放代码根本执行不到。try-finally是处理这类场景的标准写法,相当于给资源释放加了个保险。

1.2 显存碎片化:小块分配导致的内存池耗尽

昇腾Runtime的显存分配器不是 slab 型的,而是基于伙伴系统(Buddy System)的变体。每次malloc按2的幂次分配空间,如果反复申请和释放不同大小的显存块,就会产生大量小的空闲区间——这些区间单独看都不够用,但加起来远超实际需要的显存。

# ⚠️ 典型场景:循环里反复申请/释放不同大小的显存
acl.init()
device_id = 0
acl.rt.set_device(device_id)

# 模拟一个典型的"动态Shape推理"场景
# 每次处理的图片尺寸不同,导致分配的显存大小也不同
image_sizes = [
    (3, 224, 224),   # batch 1
    (3, 640, 640),   # batch 2 — 显存需求翻倍
    (3, 224, 224),   # batch 3 — 又回到小尺寸
    (3, 480, 480),   # batch 4 — 中等尺寸
    (3, 224, 224),   # batch 5
    # ... 如此反复
]

ptrs = []
for size in image_sizes:
    data_size = size[0] * size[1] * size[2] * 4  # float32
    ptr, _ = acl.rt.malloc(data_size, acl.GDDR_MEM)
    ptrs.append(ptr)
    # ... 处理 ...

# 一次性释放(通常这样做是对的)
for ptr in ptrs:
    acl.rt.free(ptr)

# 但如果ptrs里有些已经被释放,有些没有(比如中途出错)
# 显存池里就留下了大小不一的小碎片
# 后续申请大块显存时,即使总空闲显存足够,也申请失败

解决方案是使用预分配显存池:

# 正确的做法:启动时一次性申请大显存池
acl.init()
acl.rt.set_device(0)

# 根据最大可能需求一次性申请整个显存池
max_memory = 8 * 1024**3  # 8GB,按最大batch size预留
pool_ptr, _ = acl.rt.malloc(max_memory, acl.GDDR_MEM)

# 将显存池注册为可用内存池
acl.rt.register_mem(pool_ptr, max_memory, acl.ACMMemAttr{"memory_type": "extra"})

# 后续所有显存分配都从池子里分,不直接调用malloc
def alloc_from_pool(size):
    # 从预分配的池子里分配,自动管理碎片
    return acl.rt.malloc_from_mem(size, pool_ptr)

# 这样即使多次分配/释放,也不会产生显存碎片
ptr1 = alloc_from_pool(1024**2)   # 1MB
ptr2 = alloc_from_pool(4 * 1024**2)  # 4MB
acl.rt.free(ptr1)
ptr3 = alloc_from_pool(2 * 1024**2)  # 2MB — 复用ptr1的空间,不产生碎片

注释解释WHY:显存碎片化的危害是隐性的——程序看起来没问题,分配释放逻辑都对,但跑到某个时间点突然报"显存不足",即使free_mem显示还有大量空闲空间。这是因为碎片化的空闲区域是离散的,无法满足连续大块申请。预分配显存池的本质是把多次物理分配变成一次+多次逻辑分配,从根本上规避碎片问题。

1.3 Host-Device拷贝的隐式同步

昇腾Runtime的acl.rt.memcpy行为和CUDA的cudaMemcpy有一点根本性差异:昇腾的memcpy默认是同步的(显式等待拷贝完成才返回),但如果指定了目标buffer是某个Stream关联的内存,则会变成隐式异步——看起来像同步API,实际上是异步行为。这个"看起来同步实际异步"的边界非常模糊,是新手踩坑的重灾区。

# ⚠️ 隐式同步问题:拷贝操作看似简单,实际可能阻塞你以为是并行的操作
acl.init()
acl.rt.set_device(0)
stream1, _ = acl.rt.create_stream()
stream2, _ = acl.rt.create_stream()

host_data = np.random.randn(1024).astype(np.float32)
device_ptr, _ = acl.rt.malloc(host_data.nbytes, acl.GDDR_MEM)

# 场景:Stream1在跑模型推理,Stream2准备下一批数据
# Stream1的推理操作已经在硬件上异步执行
# 此时我们想用Stream2把下一批数据传到设备上(预加载)
acl.rt.memcpy(device_ptr, host_data.nbytes,
              host_data.ctypes.data, host_data.nbytes,
              acl.HOST_TO_DEVICE)  # ← 这里有个坑

# 继续往Stream1提交操作
# 问题:上面的memcpy可能在Stream1上引入了隐式同步
# Stream1正在跑的推理可能需要等待memcpy完成(如果它们访问同一块内存)
# 结果是两个Stream的实际执行变成了串行,完全没并行起来

acl.rt.reset_device(0)
acl.finalize()

正确的异步拷贝方式是明确指定目标和来源:

# 正确的异步预加载:显式使用异步拷贝接口
acl.init()
acl.rt.set_device(0)
stream1, _ = acl.rt.create_stream()
stream2, _ = acl.rt.create_stream()

host_data1 = np.random.randn(1024).astype(np.float32)
host_data2 = np.random.randn(1024).astype(np.float32)
device_ptr1, _ = acl.rt.malloc(host_data1.nbytes, acl.GDDR_MEM)
device_ptr2, _ = acl.rt.malloc(host_data2.nbytes, acl.GDDR_MEM)

# 关键:用acl.rt.memcpy的async变体,明确指定拷贝方向和大小
# acl.rt.memcpy_async是非阻塞的,立即返回
acl.rt.memcpy_async(device_ptr1, host_data1.nbytes,
                    host_data1.ctypes.data, host_data1.nbytes,
                    acl.HOST_TO_DEVICE, stream2)  # ← 绑定到Stream2

# Stream2在异步搬运数据,Stream1可以同时跑推理
# 两个操作物理上可以并行

# 需要等数据到达设备后再使用device_ptr1时,再同步
acl.rt.synchronize_stream(stream2)  # 只同步Stream2,不阻塞Stream1

acl.rt.reset_device(0)
acl.finalize()

注释解释WHY:昇腾的隐式同步规则很复杂,和CUDA的"host-device拷贝总是同步"规则完全不同。在昇腾上,错误的memcpy调用可能无声无息地引入设备级别的同步点,把你精心设计的双Stream并行计划完全破坏。解决方法是永远用显式的异步接口memcpy_async)并在完成后显式同步,不依赖默认行为的隐式语义。


二、异步执行的理解误区

2.1 “异步调用=立即返回”:对了一半

昇腾Runtime的acl.mdl.execute有两种形式:同步的execute和异步的execute_async。很多人以为异步调用会立即返回、后面可以立即跑其他代码——这个理解只对了一半。

异步调用确实会立即返回,调用本身不阻塞。但这里有个陷阱:异步调用提交到Stream的操作,和Host端后续代码之间没有自动同步。如果你在异步推理调用之后立即读取输出buffer,读到的很可能是旧数据(推理还没跑完)。

# ⚠️ 常见错误:以为异步执行后不需要同步
acl.init()
acl.rt.set_device(0)
model_id, _ = acl.mdl.load_from_file("/path/to/model.om")

# 准备输入输出
input_ptr, _ = acl.rt.malloc(input_size, acl.GDDR_MEM)
output_ptr, _ = acl.rt.malloc(output_size, acl.GDDR_MEM)

input_dataset = acl.mdl.create_dataset()
acl.mdl.add_dataset_buffer(input_dataset, input_ptr, input_size)
output_dataset = acl.mdl.create_dataset()
acl.mdl.add_dataset_buffer(output_dataset, output_ptr, output_size)

# 提交异步推理
acl.mdl.execute_async(model_id, input_dataset, output_dataset, stream)
print("推理已提交")  # ← 异步,推理还没开始!

# 立即读取结果(⚠️ 错误!推理还在队列里根本没跑)
result = np.empty(output_size // 4, dtype=np.float32)
acl.rt.memcpy(result.ctypes.data, output_size, output_ptr, output_size, acl.DEVICE_TO_HOST)
print(result[0])  # ← 读到的可能是上次的结果或者未初始化的内存

# 必须等推理完成
acl.rt.synchronize_stream(stream)  # ← 加上这个才能安全读取

# 现在读到的才是真正的推理结果
acl.rt.memcpy(result.ctypes.data, output_size, output_ptr, output_size, acl.DEVICE_TO_HOST)
print(result[0])  # 正确

# 清理
acl.mdl.unload(model_id)
acl.rt.reset_device(0)
acl.finalize()

异步的核心价值是"重叠",不是"跳过":

# 正确使用异步:重叠数据准备和计算
acl.init()
acl.rt.set_device(0)
stream1, _ = acl.rt.create_stream()
stream2, _ = acl.rt.create_stream()

# 两个推理任务,可以完全并行
task1_input, _ = acl.rt.malloc(input_size, acl.GDDR_MEM)
task2_input, _ = acl.rt.malloc(input_size, acl.GDDR_MEM)

# 先把第一批数据拷进去
acl.rt.memcpy(task1_input, input_size, host_data1.ctypes.data, input_size, acl.HOST_TO_DEVICE)

# 提交第一批推理
acl.mdl.execute_async(model_id, task1_input, task1_output, stream1)

# 在第一批推理的同时,准备第二批数据(数据准备和计算重叠)
acl.rt.memcpy_async(task2_input, input_size, host_data2.ctypes.data, input_size,
                    acl.HOST_TO_DEVICE, stream2)

# 两边都在异步跑,此时可以并行处理其他事情
# ...

# 分别等待各自的结果
acl.rt.synchronize_stream(stream1)  # 等task1完成
result1 = read_result(task1_output)

acl.rt.synchronize_stream(stream2)  # 等task2完成
result2 = read_result(task2_output)

注释解释WHY:异步编程的本质是让不相关的操作尽可能并行执行,减少等待时间。不是"异步调用之后不需要等待",而是"不同任务之间可以并行,让各自的等待时间重叠"。如果两个任务之间有数据依赖(task2的输入需要task1的输出),那异步没有意义,还是得串行。异步适合的场景是:任务A的数据准备和任务B的计算重叠、多个完全独立的推理请求并行处理。


三、多线程安全的问题

3.1 Runtime的线程模型:ACL是多线程安全的吗?

昇腾Runtime(ACL)不是完全线程安全的,但也不是完全线程不安全的。这个边界的划分很微妙,很多踩坑都是因为没搞清楚哪些操作可以在多线程里并行、哪些必须串行。

基本规则:同一个Device上的同一个Stream,ACL内部会做同步。不同Stream之间可以并行。不同Device之间天然并行。

# ⚠️ 线程安全错误:在同一Stream上并发提交操作
import threading

acl.init()
acl.rt.set_device(0)
stream, _ = acl.rt.create_stream()

# 两个线程同时在同一个Stream上提交操作
def worker1():
    # 两个线程共享同一个Stream
    # ACL内部对同一Stream的操作不做互斥保护
    # 结果是操作顺序不确定,可能导致硬件状态冲突
    for _ in range(10):
        acl.mdl.execute_async(model_id, input1_dataset, output1_dataset, stream)
        # 不做任何同步直接返回,两个线程的调用可能交叉

def worker2():
    for _ in range(10):
        acl.mdl.execute_async(model_id, input2_dataset, output2_dataset, stream)

t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t1.start(); t2.start()
t1.join(); t2.join()
# ⚠️ 这个代码的行为是未定义的,可能崩溃、可能乱序、可能正确

# 正确做法:每个线程用独立的Stream
stream1, _ = acl.rt.create_stream()
stream2, _ = acl.rt.create_stream()

def worker1_correct():
    for _ in range(10):
        acl.mdl.execute_async(model_id, input1_dataset, output1_dataset, stream1)
    acl.rt.synchronize_stream(stream1)

def worker2_correct():
    for _ in range(10):
        acl.mdl.execute_async(model_id, input2_dataset, output2_dataset, stream2)
    acl.rt.synchronize_stream(stream2)

# 不同Stream之间的调度由昇腾硬件自己处理,用户态不需要加锁
t1 = threading.Thread(target=worker1_correct)
t2 = threading.Thread(target=worker2_correct)
t1.start(); t2.start()
t1.join(); t2.join()
print("Both workers completed safely")

注释解释WHY:昇腾Runtime对同一个Stream的多个并发调用是不保证原子性的。两个线程同时向同一个Stream提交操作,它们的实际执行顺序取决于硬件调度,不取决于代码里的提交顺序。如果操作之间有数据依赖(比如线程1的输出是线程2的输入),这种交叉提交会产生完全错误的结果。每个线程用独立的Stream是避免这类问题的最简单方式——昇腾会负责多个Stream之间的并行调度。

3.2 模型加载的竞态条件

另一个常见的多线程问题是模型加载。如果多个线程同时调用acl.mdl.load_from_file加载同一个模型,在某些CANN版本里会触发竞态条件——可能两个线程都成功加载了同一个模型(浪费显存),也可能其中一个加载失败。

# ⚠️ 多线程加载同一模型的竞态问题
import threading

def load_model(path):
    model_id, ret = acl.mdl.load_from_file(path)
    if ret != 0:
        print(f"Load failed: {ret}")
        return None
    print(f"Loaded model, id={model_id}")
    return model_id

# 5个线程同时加载同一个模型
# 每个线程都占用一份模型占用的显存
# 假设模型需要2GB显存,5个线程同时加载 → 至少10GB显存占用
paths = ["/path/to/model.om"] * 5
threads = [threading.Thread(target=lambda p=p: load_model(p)) for p in paths]

# 如果显存不够,可能部分加载失败,或者系统变慢

正确的做法是单例模式加载,多线程共享同一个模型ID:

# 单例加载 + 引用计数安全释放
import threading

_model_lock = threading.Lock()
_model_id = None
_load_count = 0

def load_model_once(path):
    global _model_id, _load_count
    with _model_lock:
        if _load_count == 0:
            _model_id, ret = acl.mdl.load_from_file(path)
            if ret != 0:
                raise RuntimeError(f"Model load failed: {ret}")
        _load_count += 1
        return _model_id

def unload_model_once():
    global _model_id, _load_count
    with _model_lock:
        _load_count -= 1
        if _load_count == 0:
            acl.mdl.unload(_model_id)
            _model_id = None

# 使用时
model_id = load_model_once("/path/to/model.om")
# 多个线程共享同一个model_id,每个线程可以独立创建自己的input/output dataset
# 但模型本身只加载一次

注释解释WHY:昇腾的模型加载是一个相对重的操作——它涉及读.om文件、解析图结构、分配模型运行时的各种内部buffer。如果每个线程都独立加载一个模型,显存占用会成倍增加。正确的做法是单例加载,多线程共享同一个model_id。昇腾Runtime允许多个线程使用同一个model_id提交推理(只要它们使用各自的dataset对象),这是设计支持的使用模式。


四、性能相关的几个常见误区

4.1 batch size越大越好

这是从CUDA迁移过来最容易犯的错误。CUDA生态里很多人习惯了"尽量把显存用满",batch size越大吞吐量越高。但昇腾NPU的算力利用率和batch size的关系更复杂——存在一个最优batch size区间,超过之后性能收益递减。

原因在于:昇腾AI Core的矩阵乘法单元(Cube)利用率和输入数据的shape有关,当矩阵维度太小时(如batch size=1),Cube利用率很低(大量硬件空转);但当batch size过大时,显存带宽成为瓶颈(搬运数据的速度跟不上计算速度),Cube又要等数据。所以需要通过 profiling工具(如aclprof)测出当前模型的最优batch size区间。

4.2 多Device并行 = 多卡推理

昇腾多卡推理时,不同卡之间需要显式同步才能保证数据一致性。torch.nn.DataParallel在CUDA上是开箱即用的,在昇腾上用torch.npu的方式会有所不同——需要确保数据正确分配到各卡、结果正确归约。多卡场景推荐用HCCL做集合通信。

# 昇腾多卡推理的简化示意
import torch
import torch.distributed as dist

# 初始化分布式环境(昇腾用NCCL backend)
dist.init_process_group(backend="hccl", init_method="env://", world_size=8, rank=0)

# 每个进程绑定到一张卡
local_rank = int(os.environ["RANK"])
torch.npu.set_device(f"npu:{local_rank}")

# 数据分片:每张卡只处理1/8的数据
model = load_model().npu(local_rank)
inputs = load_shard_for_rank(rank=local_rank, world_size=8)

# 每张卡独立推理
outputs = model(inputs)

# 结果归约到rank=0(AllReduce)
if local_rank == 0:
    torch.distributed.all_reduce(outputs, op=dist.ReduceOp.SUM)

注释解释WHY:昇腾多卡的HCCL backend(对应CUDA的NCCL)需要显式初始化和同步,不像单卡场景那么透明。DataParallel在底层其实也是用了集合通信,但在昇腾上用torch.npu的方式直接调用HCCL接口更可控。world_size等于总卡数,rank是当前卡在全局中的编号,这两个值必须和实际部署环境对应。


仓库链接:https://atomgit.com/cann/runtime

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐