前言

cann-samples 仓是昇腾官方的示例代码库,涵盖了从基础到进阶的各种场景。这篇文章从零开始,用 cann-samples 把 ResNet50 的推理走通。

cann-samples 仓的结构

仓库目录

git clone https://atomgit.com/cann/cann-samples
cd cann-samples

tree -L 2
# cann-samples/
# ├── basic/          # 基础示例:HelloWorld、算子调用
# ├── cv/             # 计算机视觉:图像分类、目标检测
# ├── nlp/            # 自然语言处理:文本分类、序列模型
# ├── audio/          # 音频处理:语音识别、音频分类
# └── utils/          # 通用工具:数据加载、性能测试

快速定位示例

每个示例都有完整的 README.md,告诉你需要什么输入、如何运行、预期输出是什么。

基础示例:HelloWorld 算子调用

第一个昇腾程序

# 01_hello_world.py
import cann
import numpy as np

# 1. 初始化 ACL(Ascend Computing Language)
cann.init()

# 2. 创建一个 tensor
device_tensor = cann.Tensor(
    shape=(3, 3),
    dtype=cann.DTYPE.FLOAT32,
    device=cann.DEVICE.NPU
)

# 3. 填充数据
data = np.random.randn(3, 3).astype(np.float32)
device_tensor.from_numpy(data)

# 4. 调用算子(矩阵加法)
result = cann.ops.add(device_tensor, device_tensor)

# 5. 把结果拿回 CPU
output = result.to_numpy()
print(f"输入:\n{data}")
print(f"输出:\n{output}")
print(f"结果正确: {np.allclose(data * 2, output)}")

# 运行:python 01_hello_world.py

理解 ACL 初始化

ACL 是昇腾的统一 API 层,初始化是所有操作的前置条件:

# 02_init_acl.py
import cann

# 方式1:默认初始化
cann.init()

# 方式2:指定配置
config = cann.Config({
    "device_id": 0,           # 使用第 0 个 NPU
    "rank_id": 0,             # 分布式 rank(单卡设为 0)
    "log_level": 3,           # 日志级别:0=ERROR, 1=WARNING, 2=INFO, 3=DEBUG
    "acl_config": "acl.json"  # 配置文件路径
})
cann.init(config)

# 3. 确认初始化成功
print(f"ACL 版本: {cann.__version__}")
print(f"设备数量: {cann.get_device_count()}")
# 输出:
# ACL 版本: 5.1.RC3
# 设备数量: 8

图像分类:ResNet50 端到端推理

完整推理流程

# 03_resnet50_inference.py
import cann
import numpy as np
from PIL import Image

# 1. 初始化
cann.init()

# 2. 加载模型(OM 格式)
model = cann.model.load("resnet50.om")
print(f"模型加载成功,输入shape: {model.get_input_shape()}")
# 输出:模型加载成功,输入shape: [1, 3, 224, 224]

# 3. 图片预处理
def preprocess(image_path):
    """ImageNet 标准预处理"""
    img = Image.open(image_path).convert("RGB")

    # 缩放到 256
    w, h = img.size
    scale = 256 / min(w, h)
    new_w, new_h = int(w * scale), int(h * scale)
    img = img.resize((new_w, new_h))

    # 中心裁剪到 224
    left = (new_w - 224) // 2
    top = (new_h - 224) // 2
    img = img.crop((left, top, left + 224, top + 224))

    # 转 numpy 数组 (H, W, C) -> (C, H, W)
    img_array = np.array(img).astype(np.float32) / 255.0

    # ImageNet 标准化
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    img_array = (img_array - mean) / std

    # (H, W, C) -> (C, H, W) -> (N, C, H, W)
    img_array = img_array.transpose(2, 0, 1)
    img_array = img_array[np.newaxis, :, :, :]

    return img_array.astype(np.float32)

# 4. 执行推理
image = preprocess("test_image.jpg")
input_tensor = cann.Tensor.from_numpy(image)
output_tensor = model.execute(input_tensor)

# 5. 后处理
output = output_tensor.to_numpy()
pred_class = int(np.argmax(output))
print(f"预测类别: {pred_class}")

# 6. 清理资源
model.release()
cann.shutdown()

批量推理

# 04_batch_inference.py
import cann
import glob

# 初始化
cann.init()
model = cann.model.load("resnet50.om")

# 批量处理图片
image_paths = glob.glob("test_images/*.jpg")
batch_size = 8
results = []

for i in range(0, len(image_paths), batch_size):
    batch_paths = image_paths[i:i+batch_size]
    batch_images = []

    for path in batch_paths:
        img = preprocess(path)
        batch_images.append(img)

    # 拼接 batch
    batch_tensor = cann.Tensor.from_numpy(np.concatenate(batch_images, axis=0))

    # 推理
    outputs = model.execute(batch_tensor)
    outputs_np = outputs.to_numpy()

    # 记录结果
    for j, out in enumerate(outputs_np):
        pred = int(np.argmax(out))
        results.append((batch_paths[j], pred))
        print(f"{batch_paths[j]} -> 类别 {pred}")

print(f"批量推理完成: {len(results)} 张图片")

# 清理
model.release()
cann.shutdown()

性能调优:batch size 和并行数配置

调整 batch size

# 05_batch_size_tuning.py
import cann
import time
import numpy as np

cann.init()
model = cann.model.load("resnet50.om")

# 不同的 batch size 测试
batch_sizes = [1, 2, 4, 8, 16, 32, 64]

for bs in batch_sizes:
    # 准备输入
    dummy = np.random.randn(bs, 3, 224, 224).astype(np.float32)
    input_tensor = cann.Tensor.from_numpy(dummy)

    # Warmup
    for _ in range(10):
        _ = model.execute(input_tensor)

    # 正式测试
    n_iters = 100
    start = time.time()
    for _ in range(n_iters):
        _ = model.execute(input_tensor)
    elapsed = time.time() - start

    avg_latency = (elapsed / n_iters) * 1000  # ms
    throughput = bs / (elapsed / n_iters)    # FPS

    print(f"Batch={bs:3d}  Latency={avg_latency:6.2f}ms  Throughput={throughput:7.1f} FPS")

# 输出示例:
# Batch=  1  Latency=  1.23ms  Throughput=   813.0 FPS
# Batch=  8  Latency=  4.56ms  Throughput=  1754.4 FPS
# Batch= 16  Latency=  8.12ms  Throughput=  1970.4 FPS
# Batch= 32  Latency= 14.23ms  Throughput=  2248.2 FPS
# Batch= 64  Latency= 26.45ms  Throughput=  2420.8 FPS

model.release()
cann.shutdown()

batch size 选型建议

场景 推荐 batch size 原因
延迟敏感(在线推理) 1~4 快速响应
吞吐敏感(批处理) 16~32 吞吐最优
超大模型(显存受限) 1~2 显存限制
延迟不敏感(离线处理) 64+ 吞吐最高

NPU 并行数配置

# 06_parallel_config.py
import cann

# 配置 NPU 并行执行
config = cann.Config()

# 方案1:单线程(延迟敏感场景)
config.set("acl_thread", 1)
config.set("acl_device", 0)

# 方案2:多线程(吞吐敏感场景)
config.set("acl_thread", 4)  # 4 个线程并发执行
config.set("acl_device", 0)

# 方案3:多卡并行(大规模推理)
for device_id in range(8):
    model = cann.model.load("resnet50.om", device_id=device_id)
    # 每个 device 独立加载一个模型实例

# 验证配置
print(f"当前线程数: {cann.get_config('acl_thread')}")
print(f"当前设备ID: {cann.get_config('acl_device')}")

性能分析工具

# 07_profiling.py
import cann

# 开启 profiling
profiler = cann.Profiler()
profiler.start()

# 执行推理
model = cann.model.load("resnet50.om")
for _ in range(100):
    input_tensor = cann.Tensor.from_numpy(np.random.randn(1, 3, 224, 224).astype(np.float32))
    _ = model.execute(input_tensor)

# 停止 profiling 并导出报告
profiler.stop()
profiler.export("resnet50_profile.json")

# 查看报告
print(profiler.summary())
# 输出示例:
# Operator          Calls    Avg(ms)    Total(ms)
# Conv2d            100      0.523      52.3
# BatchNorm2d        100      0.089       8.9
# ReLU              100      0.042       4.2
# MatMul             50      0.312      15.6
# Softmax           100      0.067       6.7

踩坑记录:Atlas 服务器上的环境变量问题

常见错误汇总

错误 原因 解决方式
ACL 初始化失败 驱动版本不匹配 升级/降级驱动到匹配版本
模型加载失败 OM 文件路径问题 使用绝对路径
推理结果全零 输入数据未正确传输到 NPU 检查 Tensor.from_numpy()
batch size 受限 显存不足 减小 batch 或优化模型
多线程卡死 ACL 线程数配置错误 设为 1 或偶数

环境变量配置

# 07_env_config.sh
#!/bin/bash

# 必须设置的环境变量
export ASCEND_DEVICE_ID=0
export ASCEND_VISIBLE_DEVICES=0,1,2,3,4,5,6,7

# CANN 安装路径
export ASCEND_TOOLKIT_HOME=/usr/local/Ascend/ascend-toolkit
export LD_LIBRARY_PATH=$ASCEND_TOOLKIT_HOME/runtime/lib64:$LD_LIBRARY_PATH
export PATH=$ASCEND_TOOLKIT_HOME/bin:$PATH

# ACL 配置(可选)
export ASCEND_GLOBAL_LOG_LEVEL=3
export ASCEND_GLOBAL_DUMP_PATH=./dump

# 运行
python 03_resnet50_inference.py

# 查看 NPU 状态
npu-smi info

资源泄漏排查

# 08_resource_leak_check.py
import cann

# 每次运行后检查资源
def check_resources():
    print(f"已分配 Tensor 数: {cann.Tensor.get_allocated_count()}")
    print(f"已加载模型数: {cann.model.get_loaded_count()}")
    print(f"显存使用: {cann.mem.get_allocated() / 1024**3:.2f} GB")
    print(f"显存峰值: {cann.mem.get_peak() / 1024**3:.2f} GB")

# 在循环中使用,确保资源释放
for epoch in range(1000):
    model = cann.model.load("resnet50.om")
    _ = model.execute(input_tensor)

    # 释放模型资源
    model.release()

    # 每 100 次检查一次
    if epoch % 100 == 0:
        check_resources()

# 如果发现资源持续增长,说明有泄漏

进阶示例:目标检测模型

在图像分类基础上,目标检测需要更复杂的预处理和后处理:

# 09_object_detection.py
import cann
import numpy as np
from PIL import Image

cann.init()
model = cann.model.load("yolov5s.om")

def preprocess_detection(image_path):
    """YOLOv5 预处理:640x640 方形输入"""
    img = Image.open(image_path).convert("RGB")
    w, h = img.size

    # 计算缩放比例
    scale = 640 / max(w, h)
    new_w, new_h = int(w * scale), int(h * scale)
    img_resized = img.resize((new_w, new_h), Image.BILINEAR)

    # 填充到 640x640(灰色填充)
    img_padded = Image.new("RGB", (640, 640), (114, 114, 114))
    img_padded.paste(img_resized, (0, 0))

    # 归一化
    img_array = np.array(img_padded).astype(np.float32) / 255.0
    img_array = img_array.transpose(2, 0, 1)

    return img_array.astype(np.float32), scale

def postprocess_detect(outputs, scale, orig_size, conf_thresh=0.5):
    """NMS 后处理"""
    # 解析输出([batch, 5+num_classes, num_boxes])
    predictions = outputs[0]  # shape: (1, 85, 8400)

    boxes = []
    for pred in predictions.transpose(2, 1):  # (8400, 85)
        obj_conf = pred[4]
        class_scores = pred[5:] * obj_conf
        class_id = np.argmax(class_scores)
        class_score = class_scores[class_id]

        if class_score > conf_thresh:
            x, y, w, h = pred[0:4]
            boxes.append([x, y, w, h, class_id, class_score])

    # NMS
    boxes = np.array(boxes)
    if len(boxes) == 0:
        return []

    # 按置信度排序
    boxes = boxes[boxes[:, 5].argsort()[::-1]]

    # NMS
    keep = []
    while len(boxes) > 0:
        keep.append(boxes[0])
        if len(boxes) == 1:
            break
        # 计算 IoU
        ious = [compute_iou(boxes[0], b) for b in boxes[1:]]
        boxes = boxes[1:][np.array(ious) < 0.45]

    return keep

# 完整流程
image, scale = preprocess_detection("test.jpg")
input_tensor = cann.Tensor.from_numpy(image[np.newaxis])
outputs = model.execute(input_tensor)
orig_w, orig_h = Image.open("test.jpg").size
detections = postprocess_detect(outputs, scale, (orig_w, orig_h))

print(f"检测到 {len(detections)} 个目标")
for det in detections:
    x, y, w, h, class_id, score = det
    print(f"  类别 {int(class_id)}: 置信度 {score:.2f}, 位置 ({x:.0f}, {y:.0f}, {w:.0f}, {h:.0f})")

总结

cann-samples 是学习 CANN 最快的路径,每个示例都跑一遍,上手速度翻倍。

阶段 内容 建议时间
HelloWorld 理解 ACL 初始化和基本 tensor 操作 30 分钟
ResNet50 走通从图片到预测的完整流程 1 小时
性能调优 batch size、并行数、profiling 2 小时
踩坑记录 环境变量、资源泄漏、错误排查 持续

进阶路线图

基础 (1-2天)
  -> HelloWorld + ResNet50 推理
  -> 熟悉 ACL API 和调试方法

进阶 (3-5天)
  -> 目标检测、语义分割等 CV 任务
  -> 多卡并行推理
  -> Profiling 性能分析

生产 (1周+)
  -> 模型量化(FP16/INT8)
  -> 流水线优化
  -> 自定义算子开发

仓库地址:https://atomgit.com/cann/cann-samples

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐