昇腾 CANN cann-samples 仓:从 HelloWorld 到 ResNet50 推理
本文介绍了如何使用昇腾官方示例库cann-samples实现ResNet50模型推理的全流程。首先展示了cann-samples仓库的结构,包含基础示例、计算机视觉、自然语言处理等模块。然后通过HelloWorld示例演示了ACL初始化和基本算子调用方法。重点讲解了ResNet50端到端推理的实现,包括图像预处理、模型加载、推理执行和后处理过程。文章还提供了批量推理和性能调优的代码示例,展示了如何
·
前言
cann-samples 仓是昇腾官方的示例代码库,涵盖了从基础到进阶的各种场景。这篇文章从零开始,用 cann-samples 把 ResNet50 的推理走通。
cann-samples 仓的结构
仓库目录
git clone https://atomgit.com/cann/cann-samples
cd cann-samples
tree -L 2
# cann-samples/
# ├── basic/ # 基础示例:HelloWorld、算子调用
# ├── cv/ # 计算机视觉:图像分类、目标检测
# ├── nlp/ # 自然语言处理:文本分类、序列模型
# ├── audio/ # 音频处理:语音识别、音频分类
# └── utils/ # 通用工具:数据加载、性能测试
快速定位示例
每个示例都有完整的 README.md,告诉你需要什么输入、如何运行、预期输出是什么。
基础示例:HelloWorld 算子调用
第一个昇腾程序
# 01_hello_world.py
import cann
import numpy as np
# 1. 初始化 ACL(Ascend Computing Language)
cann.init()
# 2. 创建一个 tensor
device_tensor = cann.Tensor(
shape=(3, 3),
dtype=cann.DTYPE.FLOAT32,
device=cann.DEVICE.NPU
)
# 3. 填充数据
data = np.random.randn(3, 3).astype(np.float32)
device_tensor.from_numpy(data)
# 4. 调用算子(矩阵加法)
result = cann.ops.add(device_tensor, device_tensor)
# 5. 把结果拿回 CPU
output = result.to_numpy()
print(f"输入:\n{data}")
print(f"输出:\n{output}")
print(f"结果正确: {np.allclose(data * 2, output)}")
# 运行:python 01_hello_world.py
理解 ACL 初始化
ACL 是昇腾的统一 API 层,初始化是所有操作的前置条件:
# 02_init_acl.py
import cann
# 方式1:默认初始化
cann.init()
# 方式2:指定配置
config = cann.Config({
"device_id": 0, # 使用第 0 个 NPU
"rank_id": 0, # 分布式 rank(单卡设为 0)
"log_level": 3, # 日志级别:0=ERROR, 1=WARNING, 2=INFO, 3=DEBUG
"acl_config": "acl.json" # 配置文件路径
})
cann.init(config)
# 3. 确认初始化成功
print(f"ACL 版本: {cann.__version__}")
print(f"设备数量: {cann.get_device_count()}")
# 输出:
# ACL 版本: 5.1.RC3
# 设备数量: 8
图像分类:ResNet50 端到端推理
完整推理流程
# 03_resnet50_inference.py
import cann
import numpy as np
from PIL import Image
# 1. 初始化
cann.init()
# 2. 加载模型(OM 格式)
model = cann.model.load("resnet50.om")
print(f"模型加载成功,输入shape: {model.get_input_shape()}")
# 输出:模型加载成功,输入shape: [1, 3, 224, 224]
# 3. 图片预处理
def preprocess(image_path):
"""ImageNet 标准预处理"""
img = Image.open(image_path).convert("RGB")
# 缩放到 256
w, h = img.size
scale = 256 / min(w, h)
new_w, new_h = int(w * scale), int(h * scale)
img = img.resize((new_w, new_h))
# 中心裁剪到 224
left = (new_w - 224) // 2
top = (new_h - 224) // 2
img = img.crop((left, top, left + 224, top + 224))
# 转 numpy 数组 (H, W, C) -> (C, H, W)
img_array = np.array(img).astype(np.float32) / 255.0
# ImageNet 标准化
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_array = (img_array - mean) / std
# (H, W, C) -> (C, H, W) -> (N, C, H, W)
img_array = img_array.transpose(2, 0, 1)
img_array = img_array[np.newaxis, :, :, :]
return img_array.astype(np.float32)
# 4. 执行推理
image = preprocess("test_image.jpg")
input_tensor = cann.Tensor.from_numpy(image)
output_tensor = model.execute(input_tensor)
# 5. 后处理
output = output_tensor.to_numpy()
pred_class = int(np.argmax(output))
print(f"预测类别: {pred_class}")
# 6. 清理资源
model.release()
cann.shutdown()
批量推理
# 04_batch_inference.py
import cann
import glob
# 初始化
cann.init()
model = cann.model.load("resnet50.om")
# 批量处理图片
image_paths = glob.glob("test_images/*.jpg")
batch_size = 8
results = []
for i in range(0, len(image_paths), batch_size):
batch_paths = image_paths[i:i+batch_size]
batch_images = []
for path in batch_paths:
img = preprocess(path)
batch_images.append(img)
# 拼接 batch
batch_tensor = cann.Tensor.from_numpy(np.concatenate(batch_images, axis=0))
# 推理
outputs = model.execute(batch_tensor)
outputs_np = outputs.to_numpy()
# 记录结果
for j, out in enumerate(outputs_np):
pred = int(np.argmax(out))
results.append((batch_paths[j], pred))
print(f"{batch_paths[j]} -> 类别 {pred}")
print(f"批量推理完成: {len(results)} 张图片")
# 清理
model.release()
cann.shutdown()
性能调优:batch size 和并行数配置
调整 batch size
# 05_batch_size_tuning.py
import cann
import time
import numpy as np
cann.init()
model = cann.model.load("resnet50.om")
# 不同的 batch size 测试
batch_sizes = [1, 2, 4, 8, 16, 32, 64]
for bs in batch_sizes:
# 准备输入
dummy = np.random.randn(bs, 3, 224, 224).astype(np.float32)
input_tensor = cann.Tensor.from_numpy(dummy)
# Warmup
for _ in range(10):
_ = model.execute(input_tensor)
# 正式测试
n_iters = 100
start = time.time()
for _ in range(n_iters):
_ = model.execute(input_tensor)
elapsed = time.time() - start
avg_latency = (elapsed / n_iters) * 1000 # ms
throughput = bs / (elapsed / n_iters) # FPS
print(f"Batch={bs:3d} Latency={avg_latency:6.2f}ms Throughput={throughput:7.1f} FPS")
# 输出示例:
# Batch= 1 Latency= 1.23ms Throughput= 813.0 FPS
# Batch= 8 Latency= 4.56ms Throughput= 1754.4 FPS
# Batch= 16 Latency= 8.12ms Throughput= 1970.4 FPS
# Batch= 32 Latency= 14.23ms Throughput= 2248.2 FPS
# Batch= 64 Latency= 26.45ms Throughput= 2420.8 FPS
model.release()
cann.shutdown()
batch size 选型建议:
| 场景 | 推荐 batch size | 原因 |
|---|---|---|
| 延迟敏感(在线推理) | 1~4 | 快速响应 |
| 吞吐敏感(批处理) | 16~32 | 吞吐最优 |
| 超大模型(显存受限) | 1~2 | 显存限制 |
| 延迟不敏感(离线处理) | 64+ | 吞吐最高 |
NPU 并行数配置
# 06_parallel_config.py
import cann
# 配置 NPU 并行执行
config = cann.Config()
# 方案1:单线程(延迟敏感场景)
config.set("acl_thread", 1)
config.set("acl_device", 0)
# 方案2:多线程(吞吐敏感场景)
config.set("acl_thread", 4) # 4 个线程并发执行
config.set("acl_device", 0)
# 方案3:多卡并行(大规模推理)
for device_id in range(8):
model = cann.model.load("resnet50.om", device_id=device_id)
# 每个 device 独立加载一个模型实例
# 验证配置
print(f"当前线程数: {cann.get_config('acl_thread')}")
print(f"当前设备ID: {cann.get_config('acl_device')}")
性能分析工具
# 07_profiling.py
import cann
# 开启 profiling
profiler = cann.Profiler()
profiler.start()
# 执行推理
model = cann.model.load("resnet50.om")
for _ in range(100):
input_tensor = cann.Tensor.from_numpy(np.random.randn(1, 3, 224, 224).astype(np.float32))
_ = model.execute(input_tensor)
# 停止 profiling 并导出报告
profiler.stop()
profiler.export("resnet50_profile.json")
# 查看报告
print(profiler.summary())
# 输出示例:
# Operator Calls Avg(ms) Total(ms)
# Conv2d 100 0.523 52.3
# BatchNorm2d 100 0.089 8.9
# ReLU 100 0.042 4.2
# MatMul 50 0.312 15.6
# Softmax 100 0.067 6.7
踩坑记录:Atlas 服务器上的环境变量问题
常见错误汇总
| 错误 | 原因 | 解决方式 |
|---|---|---|
| ACL 初始化失败 | 驱动版本不匹配 | 升级/降级驱动到匹配版本 |
| 模型加载失败 | OM 文件路径问题 | 使用绝对路径 |
| 推理结果全零 | 输入数据未正确传输到 NPU | 检查 Tensor.from_numpy() |
| batch size 受限 | 显存不足 | 减小 batch 或优化模型 |
| 多线程卡死 | ACL 线程数配置错误 | 设为 1 或偶数 |
环境变量配置
# 07_env_config.sh
#!/bin/bash
# 必须设置的环境变量
export ASCEND_DEVICE_ID=0
export ASCEND_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
# CANN 安装路径
export ASCEND_TOOLKIT_HOME=/usr/local/Ascend/ascend-toolkit
export LD_LIBRARY_PATH=$ASCEND_TOOLKIT_HOME/runtime/lib64:$LD_LIBRARY_PATH
export PATH=$ASCEND_TOOLKIT_HOME/bin:$PATH
# ACL 配置(可选)
export ASCEND_GLOBAL_LOG_LEVEL=3
export ASCEND_GLOBAL_DUMP_PATH=./dump
# 运行
python 03_resnet50_inference.py
# 查看 NPU 状态
npu-smi info
资源泄漏排查
# 08_resource_leak_check.py
import cann
# 每次运行后检查资源
def check_resources():
print(f"已分配 Tensor 数: {cann.Tensor.get_allocated_count()}")
print(f"已加载模型数: {cann.model.get_loaded_count()}")
print(f"显存使用: {cann.mem.get_allocated() / 1024**3:.2f} GB")
print(f"显存峰值: {cann.mem.get_peak() / 1024**3:.2f} GB")
# 在循环中使用,确保资源释放
for epoch in range(1000):
model = cann.model.load("resnet50.om")
_ = model.execute(input_tensor)
# 释放模型资源
model.release()
# 每 100 次检查一次
if epoch % 100 == 0:
check_resources()
# 如果发现资源持续增长,说明有泄漏
进阶示例:目标检测模型
在图像分类基础上,目标检测需要更复杂的预处理和后处理:
# 09_object_detection.py
import cann
import numpy as np
from PIL import Image
cann.init()
model = cann.model.load("yolov5s.om")
def preprocess_detection(image_path):
"""YOLOv5 预处理:640x640 方形输入"""
img = Image.open(image_path).convert("RGB")
w, h = img.size
# 计算缩放比例
scale = 640 / max(w, h)
new_w, new_h = int(w * scale), int(h * scale)
img_resized = img.resize((new_w, new_h), Image.BILINEAR)
# 填充到 640x640(灰色填充)
img_padded = Image.new("RGB", (640, 640), (114, 114, 114))
img_padded.paste(img_resized, (0, 0))
# 归一化
img_array = np.array(img_padded).astype(np.float32) / 255.0
img_array = img_array.transpose(2, 0, 1)
return img_array.astype(np.float32), scale
def postprocess_detect(outputs, scale, orig_size, conf_thresh=0.5):
"""NMS 后处理"""
# 解析输出([batch, 5+num_classes, num_boxes])
predictions = outputs[0] # shape: (1, 85, 8400)
boxes = []
for pred in predictions.transpose(2, 1): # (8400, 85)
obj_conf = pred[4]
class_scores = pred[5:] * obj_conf
class_id = np.argmax(class_scores)
class_score = class_scores[class_id]
if class_score > conf_thresh:
x, y, w, h = pred[0:4]
boxes.append([x, y, w, h, class_id, class_score])
# NMS
boxes = np.array(boxes)
if len(boxes) == 0:
return []
# 按置信度排序
boxes = boxes[boxes[:, 5].argsort()[::-1]]
# NMS
keep = []
while len(boxes) > 0:
keep.append(boxes[0])
if len(boxes) == 1:
break
# 计算 IoU
ious = [compute_iou(boxes[0], b) for b in boxes[1:]]
boxes = boxes[1:][np.array(ious) < 0.45]
return keep
# 完整流程
image, scale = preprocess_detection("test.jpg")
input_tensor = cann.Tensor.from_numpy(image[np.newaxis])
outputs = model.execute(input_tensor)
orig_w, orig_h = Image.open("test.jpg").size
detections = postprocess_detect(outputs, scale, (orig_w, orig_h))
print(f"检测到 {len(detections)} 个目标")
for det in detections:
x, y, w, h, class_id, score = det
print(f" 类别 {int(class_id)}: 置信度 {score:.2f}, 位置 ({x:.0f}, {y:.0f}, {w:.0f}, {h:.0f})")
总结
cann-samples 是学习 CANN 最快的路径,每个示例都跑一遍,上手速度翻倍。
| 阶段 | 内容 | 建议时间 |
|---|---|---|
| HelloWorld | 理解 ACL 初始化和基本 tensor 操作 | 30 分钟 |
| ResNet50 | 走通从图片到预测的完整流程 | 1 小时 |
| 性能调优 | batch size、并行数、profiling | 2 小时 |
| 踩坑记录 | 环境变量、资源泄漏、错误排查 | 持续 |
进阶路线图:
基础 (1-2天)
-> HelloWorld + ResNet50 推理
-> 熟悉 ACL API 和调试方法
进阶 (3-5天)
-> 目标检测、语义分割等 CV 任务
-> 多卡并行推理
-> Profiling 性能分析
生产 (1周+)
-> 模型量化(FP16/INT8)
-> 流水线优化
-> 自定义算子开发
仓库地址:https://atomgit.com/cann/cann-samples
更多推荐


所有评论(0)