Ascend C 算子开发实战:从零写一个矩阵乘法
本文介绍了如何使用Ascend C语言在昇腾平台上开发自定义矩阵乘法算子。主要内容包括: 开发环境准备:安装CANN工具包并创建算子工程框架 算子原型定义:通过头文件和实现文件定义算子的输入输出及属性 核心算子实现: 使用GlobalTensor和LocalTensor管理内存 实现基本的矩阵乘法计算流程(GM->UB->计算->GM) 提供优化版本建议(使用Cube Unit加速) 文章详细展示

前言
CANN 自带的算子库已经覆盖了大部分常用算子,但总会遇到不支持的。这时候要用 Ascend C 自己写一个算子。
Ascend C 是昇腾的算子编程语言,语法类似 C++,但专门针对达芬奇架构做了优化。这篇文章讲怎么从零写一个矩阵乘法算子。
开发环境准备
安装算子开发包
# CANN 工具包自带算子开发工具
which canndev
# 输出:/usr/local/Ascend/ascend-toolkit/latest/bin/canndev
# 如果没有,安装 toolkit
apt install ascend-toolkit
创建算子工程
# 用 canndev 创建算子工程
canndev create --project=matmul_custom --output=./workspace
# 工程结构
tree ./workspace/matmul_custom/
# output:
# matmul_custom/
# ├── CMakeLists.txt
# ├── framework/
# │ └── tensor_add_impl.cpp # 算子实现
# ├── op_proto/
# │ └── tensor_add.h # 算子原型
# ├── unittest/
# │ └── test_tensor_add.py # 单元测试
# └── CMakeLists.txt
算子原型定义
第一步是定义算子的原型:输入、输出、属性。
头文件定义
// matmul_custom.h
#ifndef MATMUL_CUSTOM_H
#define MATMUL_CUSTOM_H
#include "tiking_pub_api.h"
namespace acl {
namespace ops {
class MatMulCustom {
public:
MatMulCustom() = default;
~MatMulCustom() = default;
// 初始化算子
graphStatus Init(const Tensor& a, const Tensor& b, Tensor& c,
bool trans_a = false, bool trans_b = false);
// 推理接口
graphStatus InferShape();
// 数据类型推断
graphStatus InferDataType();
};
} // namespace ops
} // namespace acl
#endif // MATMUL_CUSTOM_H
实现文件
// matmul_custom.cpp
#include "matmul_custom.h"
namespace acl {
namespace ops {
graphStatus MatMulCustom::Init(const Tensor& a, const Tensor& b, Tensor& c,
bool trans_a, bool trans_b) {
// 设置输入
(void)ge::Node::GetNodeFromTensor(a);
(void)ge::Node::GetNodeFromTensor(b);
// 设置属性
SetAttr("trans_a", trans_a);
SetAttr("trans_b", trans_b);
return GRAPH_SUCCESS;
}
graphStatus MatMulCustom::InferShape() {
// 获取输入 shape
std::vector<int64_t> shape_a;
std::vector<int64_t> shape_b;
GetInputDesc(0).GetShape(shape_a);
GetInputDesc(1).GetShape(shape_b);
// 计算输出 shape
bool trans_a = GetAttr("trans_a")->GetBool();
bool trans_b = GetAttr("trans_b")->GetBool();
int64_t m = trans_a ? shape_a[1] : shape_a[0];
int64_t n = trans_b ? shape_b[0] : shape_b[1];
std::vector<int64_t> shape_c = {m, n};
GetOutputDesc(0).SetShape(shape_c);
return GRAPH_SUCCESS;
}
graphStatus MatMulCustom::InferDataType() {
// 输出数据类型和输入 A 一致
DataType dtype = GetInputDesc(0).GetDataType();
GetOutputDesc(0).SetDataType(dtype);
return GRAPH_SUCCESS;
}
} // namespace ops
} // namespace acl
算子实现(核心)
这是最关键的部分:用 Ascend C 实现矩阵乘法。
基本框架
#include "kernel_operator.h"
class MatMulCustomKernel {
public:
__aicore__ inline MatMulCustomKernel() {}
__aicore__ void Init(GM_ADDR a, GM_ADDR b, GM_ADDR c,
int32_t M, int32_t N, int32_t K);
__aicore__ void Process();
private:
GlobalTensor<float> aGm; // 输入 A(GM 内存)
GlobalTensor<float> bGm; // 输入 B(GM 内存)
GlobalTensor<float> cGm; // 输出 C(GM 内存)
LocalTensor<float> aLocal; // 输入 A(UB 内存)
LocalTensor<float> bLocal; // 输入 B(UB 内存)
LocalTensor<float> cLocal; // 输出 C(UB 内存)
int32_t M, N, K;
};
extern "C" __global__ void matmul_custom(GM_ADDR a, GM_ADDR b, GM_ADDR c,
int32_t M, int32_t N, int32_t K) {
MatMulCustomKernel op;
op.Init(a, b, c, M, N, K);
op.Process();
}
Init 函数
__aicore__ void MatMulCustomKernel::Init(GM_ADDR a, GM_ADDR b, GM_ADDR c,
int32_t M, int32_t N, int32_t K) {
// 设置 GM 地址
aGm.SetGlobalBuffer((__gm__ float*)a);
bGm.SetGlobalBuffer((__gm__ float*)b);
cGm.SetGlobalBuffer((__gm__ float*)c);
// 保存参数
this->M = M;
this->N = N;
this->K = K;
// 分配 UB 空间
constexpr int32_t BLOCK_SIZE = 32; // 一个 block 的大小
aLocal = GetUbBlock<float>(M * K / BLOCK_SIZE);
bLocal = GetUbBlock<float>(K * N / BLOCK_SIZE);
cLocal = GetUbBlock<float>(M * N / BLOCK_SIZE);
}
Process 函数(核心计算)
__aicore__ void MatMulCustomKernel::Process() {
// 1. 把数据从 GM 搬到 UB
CopyIn(aGm, aLocal, M * K);
CopyIn(bGm, bLocal, K * N);
// 2. 矩阵乘法计算
MatMul(cLocal, aLocal, bLocal, M, N, K);
// 3. 把结果从 UB 写回 GM
CopyOut(cLocal, cGm, M * N);
}
// 矩阵乘法实现
void MatMul(LocalTensor<float>& c, LocalTensor<float>& a, LocalTensor<float>& b,
int32_t M, int32_t N, int32_t K) {
// 简化版本:逐行逐列计算
for (int32_t i = 0; i < M; i++) {
for (int32_t j = 0; j < N; j++) {
float sum = 0.0f;
for (int32_t k = 0; k < K; k++) {
sum += a[i * K + k] * b[k * N + j];
}
c[i * N + j] = sum;
}
}
}
优化版本(使用 Cube Unit)
#include "lib_api.h"
// 使用 Cube Unit 做矩阵乘法
void MatMulCube(LocalTensor<float>& c, LocalTensor<float>& a, LocalTensor<float>& b,
int32_t M, int32_t N, int32_t K) {
// 调用 Cube Unit 的 MatMul 接口
matmul_t matmul_para;
matmul_para.M = M;
matmul_para.N = N;
matmul_para.K = K;
matmul_para.is_trans_a = false;
matmul_para.is_trans_b = false;
// 用 Cube Unit 计算
MatMul(c, a, b, matmul_para);
}
编译算子
CMake 配置
# CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(matmul_custom)
# 设置 Ascend C 编译选项
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=davinci")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2")
# 添加算子实现文件
add_library(matmul_custom SHARED
matmul_custom.cpp
matmul_custom_kernel.cpp
)
# 链接 Ascend C 库
target_link_libraries(matmul_custom
ascendcl
tbe_operator
)
编译命令
# 创建编译目录
mkdir build && cd build
# 配置
cmake ..
# 编译
make -j8
# 输出:libmatmul_custom.so
在 PyTorch 里调用
注册自定义算子
import torch
import torch.npu
import ctypes
# 加载自定义算子库
lib = ctypes.CDLL("./libmatmul_custom.so")
# 定义 Python 接口
def matmul_custom(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
"""自定义矩阵乘法"""
# 检查输入
assert a.dim() == 2 and b.dim() == 2
assert a.shape[1] == b.shape[0]
M, K = a.shape
K2, N = b.shape
# 创建输出 tensor
c = torch.empty(M, N, device='npu:0', dtype=torch.float32)
# 调用自定义算子
lib.matmul_custom(
a.data_ptr(),
b.data_ptr(),
c.data_ptr(),
M, N, K
)
return c
测试自定义算子
# 创建测试数据
a = torch.randn(128, 256, device='npu:0')
b = torch.randn(256, 64, device='npu:0')
# 用自定义算子计算
c_custom = matmul_custom(a, b)
# 用 PyTorch 内置算子计算(参考)
c_ref = torch.matmul(a, b)
# 对比精度
cosine_sim = torch.nn.functional.cosine_similarity(
c_custom.flatten(), c_ref.flatten(), dim=0
)
print(f"余弦相似度: {cosine_sim.item():.6f}")
# 对比性能
import time
# 预热
for _ in range(10):
matmul_custom(a, b)
torch.npu.synchronize()
start = time.time()
for _ in range(100):
matmul_custom(a, b)
torch.npu.synchronize()
end = time.time()
print(f"自定义算子延迟: {(end - start) / 100 * 1000:.2f} ms")
性能优化
使用 Cube Unit
达芬奇架构的 Cube Unit 是专门为矩阵乘法设计的硬件单元,比用 Vector Unit 算快得多。
// 使用 Cube Unit
#include "lib_api.h"
void MatMulOptimized(LocalTensor<float>& c, LocalTensor<float>& a, LocalTensor<float>& b,
int32_t M, int32_t N, int32_t K) {
// 检查是否可以用 Cube
if (M >= 16 && N >= 16 && K >= 16) {
// 用 Cube Unit
matmul_t params;
params.M = M;
params.N = N;
params.K = K;
MatMul(c, a, b, params);
} else {
// 用小矩阵算法
MatMulSmall(c, a, b, M, N, K);
}
}
分块计算
当矩阵太大,UB 放不下时,要分块计算。
constexpr int32_t TILE_SIZE = 32; // 块大小
void MatMulTiled(LocalTensor<float>& c, GlobalTensor<float>& aGm, GlobalTensor<float>& bGm,
int32_t M, int32_t N, int32_t K) {
// 分块计算
for (int32_t i = 0; i < M; i += TILE_SIZE) {
for (int32_t j = 0; j < N; j += TILE_SIZE) {
// 搬运当前块
CopyInA(aGm, aLocal, i, min(i + TILE_SIZE, M), K);
CopyInB(bGm, bLocal, j, min(j + TILE_SIZE, N), K);
// 计算当前块
MatMulTile(cLocal, aLocal, bLocal,
min(TILE_SIZE, M - i),
min(TILE_SIZE, N - j),
K);
// 写回当前块
CopyOutC(cLocal, cGm, i, j, min(TILE_SIZE, M - i), min(TILE_SIZE, N - j));
}
}
}
调试技巧
打印调试
// 在算子实现里加打印
#include <cstdio>
__aicore__ void MatMulCustomKernel::Process() {
// 打印输入参数
printf("M=%d, N=%d, K=%d\n", M, N, K);
// 打印输入数据(前 10 个)
for (int i = 0; i < min(10, M * K); i++) {
printf("a[%d]=%f\n", i, aLocal.GetValue(i));
}
// 计算结果
MatMul(cLocal, aLocal, bLocal, M, N, K);
// 打印输出数据(前 10 个)
for (int i = 0; i < min(10, M * N); i++) {
printf("c[%d]=%f\n", i, cLocal.GetValue(i));
}
}
用 Ascend CL 调试
import acl
# 初始化
acl.init()
acl.rt.set_device(0)
# 运行算子(调试模式)
acl.rt.set_op_execute_mode("debug")
# 运行
# ...(运行算子)
# 查看日志
# 日志在 /var/log/Ascend/ascend_toolkit/matmul_custom.log
常见问题
问题一:编译报错
error: 'matmul_t' was not declared in this scope
解决:包含正确的头文件
#include "lib_api.h" // 包含 matmul_t 的定义
问题二:运行时报错
[ERROR] Kernel execute failed: out of memory
解决:减小块大小,或者检查 UB 大小是否足够
// 检查 UB 大小
uint32_t ub_size = GetUbSize();
printf("UB size: %u bytes\n", ub_size);
// 减小块大小
constexpr int32_t TILE_SIZE = 16; // 从 32 改成 16
问题三:精度不达标
余弦相似度: 0.97 (应该 > 0.99)
解决:检查计算精度,可能需要用 FP32
// 用 FP32 计算
GlobalTensor<float> aGm; // float = FP32
GlobalTensor<float> bGm;
GlobalTensor<float> cGm;
// 不要用 FP16
// GlobalTensor<half> aGm; // half = FP16(精度可能不够)
参考资源
- Ascend C 编程指南: https://www.hiascend.com/document/detail/zh/CANN/
- 算子开发最佳实践: https://www.hiascend.com/document/detail/zh/CANN/
- 算子样例代码: https://atomgit.com/cann/samples
- 达芬奇架构白皮书: https://www.hiascend.com/document/detail/zh/CANN/
总结
用 Ascend C 开发自定义算子,流程是:定义算子原型 → 实现算子逻辑 → 编译成 .so → 在 PyTorch 里调用。核心是要理解达芬奇架构的内存层次(GM → UB → 计算单元),以及 Cube Unit 和 Vector Unit 的适用场景。性能优化的关键是尽量用 Cube Unit 做矩阵乘法,太大的矩阵要分块计算。调试可以用 printf 打印中间结果,或者用 Ascend CL 的调试模式。精度问题通常是因为用了 FP16,改成 FP32 一般能解决。
更多推荐

所有评论(0)