在这里插入图片描述

性能调优是一个系统化工程,不是靠玄学调参。这份Checklist将调优过程拆分为20个可执行的步骤,每步都有明确的判断标准、操作命令和避坑指南。


第一阶段:建立基线(Step 1-5)

在动手优化前,必须先知道“现在有多慢”以及“瓶颈在哪里”。

Step 1:测量当前性能(Baseline)

不要只看平均值,要关注 P99延迟吞吐量

import time
import torch
import numpy as np

def measure_latency(model, input_data, num_runs=200, warmup=20):
    """
    测量延迟的规范方法
    
    关键点:
    1. 必须Warmup(排除JIT编译/初始化开销)
    2. 循环内只同步一次(避免频繁同步掩盖真实耗时)
    3. 使用 perf_counter() 高精度计时
    """
    # 预热
    for _ in range(warmup):
        _ = model(input_data)
    torch.npu.synchronize()
    
    latencies = []
    for _ in range(num_runs):
        t0 = time.perf_counter()
        _ = model(input_data)
        # 注意:这里不立即同步,等循环结束统一同步或仅最后同步
        # 若需精确单轮延迟,需在循环内 sync,但会略微影响结果
        # 推荐:循环内不sync,循环后sync一次获取整体时间,再除以次数
        # 但为了统计分布,通常每轮都sync确保数据完整
        torch.npu.synchronize() 
        t1 = time.perf_counter()
        latencies.append((t1 - t0) * 1000)
    
    latencies = np.array(latencies)
    
    print(f"Latency P50: {np.percentile(latencies, 50):.2f}ms")
    print(f"Latency P90: {np.percentile(latencies, 90):.2f}ms")
    print(f"Latency P99: {np.percentile(latencies, 99):.2f}ms")
    print(f"Throughput: {1000 / np.mean(latencies):.1f} samples/s")
    
    return {"p50": np.percentile(latencies, 50), "p99": np.percentile(latencies, 99)}

# 执行
model = torch.jit.load("model_fp16.om").eval()
inp = torch.randn(1, 3, 224, 224).npu()
baseline = measure_latency(model, inp)
# 记录:baseline = {"p50": 45.2, "p99": 61.3}

Step 2:检查NPU利用率

利用率高不代表快,但利用率低一定有问题。

# 方法1:实时监控(发现持续性低负载)
watch -n 1 'npu-smi info -t utilization,power,temperature -i 0'

# 方法2:Profiling抓Trace(发现间歇性卡顿)
python3 << 'EOF'
import torch
with torch.npu.profile('./trace.json'):
    for _ in range(100):
        model(inp)
print("Trace saved to trace.json")
EOF

# 分析工具:
# 1. 浏览器打开 https://ui.perfetto.dev/
# 2. 导入 trace.json
# 3. 查看 Device Utilization 轨道
# 4. 关键判断:
#    - > 80%: 正常,优化空间小
#    - 50~80%: 有优化空间 (IO或调度问题)
#    - < 50%: 严重瓶颈 (必须优化)

Step 3:确认数据传输不是瓶颈

PCIe传输是常见的隐形杀手。

import torch
import numpy as np
import time

# 测试纯计算耗时
t_model_start = time.perf_counter()
for _ in range(100):
    out = model(inp) # inp已在NPU上
torch.npu.synchronize()
t_model = (time.perf_counter() - t_model_start) / 100 * 1000

# 测试含传输的总耗时
t_total_start = time.perf_counter()
for _ in range(100):
    # 模拟真实场景:CPU生成 -> 传输 -> 推理
    inp_cpu = np.random.randn(1, 3, 224, 224).astype(np.float32)
    inp_npu = torch.from_numpy(inp_cpu).npu() # 触发传输
    out = model(inp_npu)
    torch.npu.synchronize()
t_total = (time.perf_counter() - t_total_start) / 100 * 1000

overhead = t_total - t_model
ratio = overhead / t_total * 100

print(f"Model Only: {t_model:.2f}ms | Total: {t_total:.2f}ms | Overhead: {overhead:.2f}ms ({ratio:.1f}%)")

# 判断标准:
# - overhead < 5%: 正常
# - 5% ~ 20%: 需优化 (考虑异步传输)
# - > 20%: 严重瓶颈 (必须优化)

Step 4:定位最大瓶颈(算子级分析)

找出耗时最长的Top 5算子。

import cann

# 开启详细算子日志
cann.set_op_trace_mode(True)

# 运行推理并抓取Trace
with torch.npu.profile('./op_trace.json'):
    model(inp)

# 分析 op_trace.json (或使用msprof工具)
# 典型瓶颈特征:
# - Conv2d/MatMul 占 60%+: 算子融合或减少计算量
# - DataCopy 占 30%+: 格式转换太多 (NCHW <-> NC1HWC0)
# - Reshape/Transpose 占 20%+: 数据排布不合理
# - Softmax 占 15%+: Shape不规则,尝试融合

Step 5:确认Batch Size是否合理

寻找吞吐量拐点。

results = {}
for bs in [1, 2, 4, 8, 16, 32]:
    inp = torch.randn(bs, 3, 224, 224).npu()
    
    # Warmup
    for _ in range(10): model(inp)
    torch.npu.synchronize()
    
    # Measure
    t0 = time.perf_counter()
    for _ in range(100): model(inp)
    torch.npu.synchronize()
    latency = (time.perf_counter() - t0) / 100 * 1000
    
    throughput = bs * 1000 / latency
    results[bs] = {"latency": latency, "throughput": throughput}
    print(f"BS={bs:2d}: Latency={latency:.2f}ms, Throughput={throughput:.1f}/s")

# 策略:
# - 实时服务 (低延迟): 选最小Batch (通常1或2)
# - 离线批处理 (高吞吐): 选Throughput峰值对应的Batch
# - 拐点判断: 过了某个点后,Throughput不再增加甚至下降

第二阶段:逐项优化(Step 6-15)

Step 6:启用算子融合(最有效手段之一)

减少Kernel Launch次数。

# 检查融合情况
python3 -c "
from cann.graph import load_model
g = load_model('model_fp16.om')
print(g.get_fusion_report())
# 输出示例:
# [x] Conv+BN+ReLU fused
# [ ] Conv+Conv+Conv NOT fused  <-- 重点优化对象
"

# ATC编译时强制开启激进融合
atc --model=model.onnx \
    --fusion_switch_file=aggressive_fusion.cfg \
    --op_compiler_params="enable_tiling=true,enable_fusion=true" \
    ...

Step 7:消除 NCHW ↔ NC1HWC0 格式转换

格式转换是NPU的大忌。

# 诊断:在模型前后打印Shape和Stride
def check_format(model, inp):
    out = model(inp)
    print(f"Inp: {inp.shape}, Stride: {inp.stride()}")
    print(f"Out: {out.shape}, Stride: {out.stride()}")
    return out

# 解决策略:
# 1. 尽量保持NC1HWC0格式贯穿整个网络
# 2. 如果必须用NCHW,将所有NCHW算子集中在一起,减少切换次数
# 3. 使用 `--input_format` 参数指定输入格式

Step 8:检查并启用混合精度 (FP16/BF16)

提升速度并降低显存占用。

# ATC编译配置
atc --model=model.onnx \
    --precision_mode=allow_mix_precision \
    --insert_op_conf=mix.cfg \
    ...

# mix.cfg 示例
{
  "mixed_precision_ratio": 0.8,  # 80%算子转FP16
  "skip_layers": ["output_layer"] # 最后一层保留FP32防止溢出
}

Step 9:启用图优化 (Graph Engine, GE)

让编译器自动进行常量折叠、死代码消除。

atc --model=model.onnx \
    --graph_engine_mode=high_performance \
    ...

Step 10:启用连续Batch (Continuous Batching)

解决静态Batch带来的等待延迟。

class ContinuousBatcher:
    def __init__(self, model, max_batch=16, timeout_ms=10):
        self.model = model
        self.max_batch = max_batch
        self.pending = []
        
    def add_request(self, req_id, input_data):
        self.pending.append((req_id, input_data))
        if len(self.pending) >= self.max_batch:
            return self._run_batch()
        # 可选:超时自动触发
        
    def _run_batch(self):
        inputs = torch.stack([x for _, x in self.pending])
        outputs = self.model(inputs)
        results = {req_id: out for (req_id, _), out in zip(self.pending, outputs)}
        self.pending = []
        return results

# 配合 CANN 动态Shape特性效果更佳

Step 11:优化数据预处理 (AIPP)

将预处理固化进OM,利用硬件加速单元。

# aipp.cfg 示例
aipp_op {
    related_input_rank: 0
    src_image_size_w: 224
    src_image_size_h: 224
    resize: 224
    mean_ax1: 123.675
    var_reci_ax1: 0.017124761
    # ... 其他归一化参数
}

# 编译
atc --model=model.onnx \
    --insert_op_conf=aipp.cfg \
    ...

Step 12:减少小算子数量

避免频繁的Kernel Launch。

# ❌ 错误写法:100次小运算
class BadModel(torch.nn.Module):
    def forward(self, x):
        for _ in range(100):
            x = x + 1
            x = x * 2
        return x

# ✅ 正确写法:数学合并
class GoodModel(torch.nn.Module):
    def forward(self, x):
        return x * 2 + 2  # 一次完成

Step 13:检查Tiling策略

大图推理时的内存分块优化。

# 查看默认Tiling
python3 -c "import cann; print(cann.op_info('Conv2d').tiling_params)"

# 手动调整Tiling (针对特定大模型)
atc --model=model.onnx \
    --te_config='{"tile_memory_size": 16777216}' \
    ...

Step 14:检查内存排布 (Contiguous)

确保数据访问连续性,减少Cache Miss。

# 检查Stride
x = torch.randn(1, 3, 224, 224).npu()
print(x.stride()) 

# 如果中间结果不连续,强制连续化
output = output.contiguous() 

Step 15:启用L2 Cache优化

复用中间结果。

atc --model=model.onnx \
    --enable_l2_cache=true \
    ...

第三阶段:进阶调优与监控(Step 16-20)

Step 16:多卡通信优化 (HCCL)

如果是多机多卡训练/推理,检查通信带宽。

# 检查网络带宽
iperf3 -c <peer_ip>

# 调整HCCL参数
export HCCL_CONNECT_TIMEOUT=600
export HCCL_STREAM_PRIORITY=0

Step 17:显存管理优化

防止碎片化导致的OOM。

# 定期清理缓存
torch.npu.empty_cache()

# 避免在循环中创建大量临时Tensor
# 使用 `del` 及时释放
del intermediate_tensor

Step 18:使用 msprof 深度分析

CANN自带的火焰图分析工具。

# 生成profile文件
msprof --mode=trace --output=profile.prof ./train.py

# 分析
msprof --input=profile.prof --output=report.html

Step 19:温度与功耗控制

过热会导致降频。

# 监控
watch -n 1 'npu-smi info -t temperature -i 0'

# 优化:
# 1. 改善机房散热
# 2. 降低频率限制 (if allowed)
# 3. 避免长时间满负荷运行

Step 20:自动化回归测试

建立性能基线库,每次代码更新自动对比。

# 伪代码
def regression_test(new_model, baseline_metrics):
    new_metrics = measure_latency(new_model)
    if new_metrics['p99'] > baseline_metrics['p99'] * 1.05:
        raise PerformanceRegressionError("P99延迟增加超过5%!")

总结:调优路线图

  1. 先测: 建立Baseline,量化当前性能。
  2. 找病: 通过Profiler和Utilization找到瓶颈(IO? 计算? 通信?)。
  3. 开刀: 按优先级实施优化(融合 > 格式 > 精度 > 并行)。
  4. 复查: 再次测量,确保优化有效且无副作用。
  5. 固化: 将最佳配置写入脚本,形成自动化流程。

记住:没有银弹。最好的优化方案是针对你的具体模型和数据特征量身定制的。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐