昇腾NPU性能调优Checklist——从“能跑“到“跑得快“的20步
昇腾NPU性能调优Checklist——从"能跑"到"跑得快"的20步
·

性能调优是一个系统化工程,不是靠玄学调参。这份Checklist将调优过程拆分为20个可执行的步骤,每步都有明确的判断标准、操作命令和避坑指南。
第一阶段:建立基线(Step 1-5)
在动手优化前,必须先知道“现在有多慢”以及“瓶颈在哪里”。
Step 1:测量当前性能(Baseline)
不要只看平均值,要关注 P99延迟 和 吞吐量。
import time
import torch
import numpy as np
def measure_latency(model, input_data, num_runs=200, warmup=20):
"""
测量延迟的规范方法
关键点:
1. 必须Warmup(排除JIT编译/初始化开销)
2. 循环内只同步一次(避免频繁同步掩盖真实耗时)
3. 使用 perf_counter() 高精度计时
"""
# 预热
for _ in range(warmup):
_ = model(input_data)
torch.npu.synchronize()
latencies = []
for _ in range(num_runs):
t0 = time.perf_counter()
_ = model(input_data)
# 注意:这里不立即同步,等循环结束统一同步或仅最后同步
# 若需精确单轮延迟,需在循环内 sync,但会略微影响结果
# 推荐:循环内不sync,循环后sync一次获取整体时间,再除以次数
# 但为了统计分布,通常每轮都sync确保数据完整
torch.npu.synchronize()
t1 = time.perf_counter()
latencies.append((t1 - t0) * 1000)
latencies = np.array(latencies)
print(f"Latency P50: {np.percentile(latencies, 50):.2f}ms")
print(f"Latency P90: {np.percentile(latencies, 90):.2f}ms")
print(f"Latency P99: {np.percentile(latencies, 99):.2f}ms")
print(f"Throughput: {1000 / np.mean(latencies):.1f} samples/s")
return {"p50": np.percentile(latencies, 50), "p99": np.percentile(latencies, 99)}
# 执行
model = torch.jit.load("model_fp16.om").eval()
inp = torch.randn(1, 3, 224, 224).npu()
baseline = measure_latency(model, inp)
# 记录:baseline = {"p50": 45.2, "p99": 61.3}
Step 2:检查NPU利用率
利用率高不代表快,但利用率低一定有问题。
# 方法1:实时监控(发现持续性低负载)
watch -n 1 'npu-smi info -t utilization,power,temperature -i 0'
# 方法2:Profiling抓Trace(发现间歇性卡顿)
python3 << 'EOF'
import torch
with torch.npu.profile('./trace.json'):
for _ in range(100):
model(inp)
print("Trace saved to trace.json")
EOF
# 分析工具:
# 1. 浏览器打开 https://ui.perfetto.dev/
# 2. 导入 trace.json
# 3. 查看 Device Utilization 轨道
# 4. 关键判断:
# - > 80%: 正常,优化空间小
# - 50~80%: 有优化空间 (IO或调度问题)
# - < 50%: 严重瓶颈 (必须优化)
Step 3:确认数据传输不是瓶颈
PCIe传输是常见的隐形杀手。
import torch
import numpy as np
import time
# 测试纯计算耗时
t_model_start = time.perf_counter()
for _ in range(100):
out = model(inp) # inp已在NPU上
torch.npu.synchronize()
t_model = (time.perf_counter() - t_model_start) / 100 * 1000
# 测试含传输的总耗时
t_total_start = time.perf_counter()
for _ in range(100):
# 模拟真实场景:CPU生成 -> 传输 -> 推理
inp_cpu = np.random.randn(1, 3, 224, 224).astype(np.float32)
inp_npu = torch.from_numpy(inp_cpu).npu() # 触发传输
out = model(inp_npu)
torch.npu.synchronize()
t_total = (time.perf_counter() - t_total_start) / 100 * 1000
overhead = t_total - t_model
ratio = overhead / t_total * 100
print(f"Model Only: {t_model:.2f}ms | Total: {t_total:.2f}ms | Overhead: {overhead:.2f}ms ({ratio:.1f}%)")
# 判断标准:
# - overhead < 5%: 正常
# - 5% ~ 20%: 需优化 (考虑异步传输)
# - > 20%: 严重瓶颈 (必须优化)
Step 4:定位最大瓶颈(算子级分析)
找出耗时最长的Top 5算子。
import cann
# 开启详细算子日志
cann.set_op_trace_mode(True)
# 运行推理并抓取Trace
with torch.npu.profile('./op_trace.json'):
model(inp)
# 分析 op_trace.json (或使用msprof工具)
# 典型瓶颈特征:
# - Conv2d/MatMul 占 60%+: 算子融合或减少计算量
# - DataCopy 占 30%+: 格式转换太多 (NCHW <-> NC1HWC0)
# - Reshape/Transpose 占 20%+: 数据排布不合理
# - Softmax 占 15%+: Shape不规则,尝试融合
Step 5:确认Batch Size是否合理
寻找吞吐量拐点。
results = {}
for bs in [1, 2, 4, 8, 16, 32]:
inp = torch.randn(bs, 3, 224, 224).npu()
# Warmup
for _ in range(10): model(inp)
torch.npu.synchronize()
# Measure
t0 = time.perf_counter()
for _ in range(100): model(inp)
torch.npu.synchronize()
latency = (time.perf_counter() - t0) / 100 * 1000
throughput = bs * 1000 / latency
results[bs] = {"latency": latency, "throughput": throughput}
print(f"BS={bs:2d}: Latency={latency:.2f}ms, Throughput={throughput:.1f}/s")
# 策略:
# - 实时服务 (低延迟): 选最小Batch (通常1或2)
# - 离线批处理 (高吞吐): 选Throughput峰值对应的Batch
# - 拐点判断: 过了某个点后,Throughput不再增加甚至下降
第二阶段:逐项优化(Step 6-15)
Step 6:启用算子融合(最有效手段之一)
减少Kernel Launch次数。
# 检查融合情况
python3 -c "
from cann.graph import load_model
g = load_model('model_fp16.om')
print(g.get_fusion_report())
# 输出示例:
# [x] Conv+BN+ReLU fused
# [ ] Conv+Conv+Conv NOT fused <-- 重点优化对象
"
# ATC编译时强制开启激进融合
atc --model=model.onnx \
--fusion_switch_file=aggressive_fusion.cfg \
--op_compiler_params="enable_tiling=true,enable_fusion=true" \
...
Step 7:消除 NCHW ↔ NC1HWC0 格式转换
格式转换是NPU的大忌。
# 诊断:在模型前后打印Shape和Stride
def check_format(model, inp):
out = model(inp)
print(f"Inp: {inp.shape}, Stride: {inp.stride()}")
print(f"Out: {out.shape}, Stride: {out.stride()}")
return out
# 解决策略:
# 1. 尽量保持NC1HWC0格式贯穿整个网络
# 2. 如果必须用NCHW,将所有NCHW算子集中在一起,减少切换次数
# 3. 使用 `--input_format` 参数指定输入格式
Step 8:检查并启用混合精度 (FP16/BF16)
提升速度并降低显存占用。
# ATC编译配置
atc --model=model.onnx \
--precision_mode=allow_mix_precision \
--insert_op_conf=mix.cfg \
...
# mix.cfg 示例
{
"mixed_precision_ratio": 0.8, # 80%算子转FP16
"skip_layers": ["output_layer"] # 最后一层保留FP32防止溢出
}
Step 9:启用图优化 (Graph Engine, GE)
让编译器自动进行常量折叠、死代码消除。
atc --model=model.onnx \
--graph_engine_mode=high_performance \
...
Step 10:启用连续Batch (Continuous Batching)
解决静态Batch带来的等待延迟。
class ContinuousBatcher:
def __init__(self, model, max_batch=16, timeout_ms=10):
self.model = model
self.max_batch = max_batch
self.pending = []
def add_request(self, req_id, input_data):
self.pending.append((req_id, input_data))
if len(self.pending) >= self.max_batch:
return self._run_batch()
# 可选:超时自动触发
def _run_batch(self):
inputs = torch.stack([x for _, x in self.pending])
outputs = self.model(inputs)
results = {req_id: out for (req_id, _), out in zip(self.pending, outputs)}
self.pending = []
return results
# 配合 CANN 动态Shape特性效果更佳
Step 11:优化数据预处理 (AIPP)
将预处理固化进OM,利用硬件加速单元。
# aipp.cfg 示例
aipp_op {
related_input_rank: 0
src_image_size_w: 224
src_image_size_h: 224
resize: 224
mean_ax1: 123.675
var_reci_ax1: 0.017124761
# ... 其他归一化参数
}
# 编译
atc --model=model.onnx \
--insert_op_conf=aipp.cfg \
...
Step 12:减少小算子数量
避免频繁的Kernel Launch。
# ❌ 错误写法:100次小运算
class BadModel(torch.nn.Module):
def forward(self, x):
for _ in range(100):
x = x + 1
x = x * 2
return x
# ✅ 正确写法:数学合并
class GoodModel(torch.nn.Module):
def forward(self, x):
return x * 2 + 2 # 一次完成
Step 13:检查Tiling策略
大图推理时的内存分块优化。
# 查看默认Tiling
python3 -c "import cann; print(cann.op_info('Conv2d').tiling_params)"
# 手动调整Tiling (针对特定大模型)
atc --model=model.onnx \
--te_config='{"tile_memory_size": 16777216}' \
...
Step 14:检查内存排布 (Contiguous)
确保数据访问连续性,减少Cache Miss。
# 检查Stride
x = torch.randn(1, 3, 224, 224).npu()
print(x.stride())
# 如果中间结果不连续,强制连续化
output = output.contiguous()
Step 15:启用L2 Cache优化
复用中间结果。
atc --model=model.onnx \
--enable_l2_cache=true \
...
第三阶段:进阶调优与监控(Step 16-20)
Step 16:多卡通信优化 (HCCL)
如果是多机多卡训练/推理,检查通信带宽。
# 检查网络带宽
iperf3 -c <peer_ip>
# 调整HCCL参数
export HCCL_CONNECT_TIMEOUT=600
export HCCL_STREAM_PRIORITY=0
Step 17:显存管理优化
防止碎片化导致的OOM。
# 定期清理缓存
torch.npu.empty_cache()
# 避免在循环中创建大量临时Tensor
# 使用 `del` 及时释放
del intermediate_tensor
Step 18:使用 msprof 深度分析
CANN自带的火焰图分析工具。
# 生成profile文件
msprof --mode=trace --output=profile.prof ./train.py
# 分析
msprof --input=profile.prof --output=report.html
Step 19:温度与功耗控制
过热会导致降频。
# 监控
watch -n 1 'npu-smi info -t temperature -i 0'
# 优化:
# 1. 改善机房散热
# 2. 降低频率限制 (if allowed)
# 3. 避免长时间满负荷运行
Step 20:自动化回归测试
建立性能基线库,每次代码更新自动对比。
# 伪代码
def regression_test(new_model, baseline_metrics):
new_metrics = measure_latency(new_model)
if new_metrics['p99'] > baseline_metrics['p99'] * 1.05:
raise PerformanceRegressionError("P99延迟增加超过5%!")
总结:调优路线图
- 先测: 建立Baseline,量化当前性能。
- 找病: 通过Profiler和Utilization找到瓶颈(IO? 计算? 通信?)。
- 开刀: 按优先级实施优化(融合 > 格式 > 精度 > 并行)。
- 复查: 再次测量,确保优化有效且无副作用。
- 固化: 将最佳配置写入脚本,形成自动化流程。
记住:没有银弹。最好的优化方案是针对你的具体模型和数据特征量身定制的。
更多推荐




所有评论(0)