请添加图片描述

前言

CANN 自带的算子库已经覆盖了大部分常用算子,但总会遇到不支持的。这时候要用 Ascend C 自己写一个算子。

Ascend C 是昇腾的算子编程语言,语法类似 C++,但专门针对达芬奇架构做了优化。这篇文章讲怎么从零写一个矩阵乘法算子。


开发环境准备

安装算子开发包

# CANN 工具包自带算子开发工具
which canndev
# 输出:/usr/local/Ascend/ascend-toolkit/latest/bin/canndev

# 如果没有,安装 toolkit
apt install ascend-toolkit

创建算子工程

# 用 canndev 创建算子工程
canndev create --project=matmul_custom --output=./workspace

# 工程结构
tree ./workspace/matmul_custom/
# output:
# matmul_custom/
# ├── CMakeLists.txt
# ├── framework/
# │   └── tensor_add_impl.cpp  # 算子实现
# ├── op_proto/
# │   └── tensor_add.h            # 算子原型
# ├── unittest/
# │   └── test_tensor_add.py     # 单元测试
# └── CMakeLists.txt

算子原型定义

第一步是定义算子的原型:输入、输出、属性。

头文件定义

// matmul_custom.h
#ifndef MATMUL_CUSTOM_H
#define MATMUL_CUSTOM_H

#include "tiking_pub_api.h"

namespace acl {
namespace ops {

class MatMulCustom {
public:
    MatMulCustom() = default;
    ~MatMulCustom() = default;
    
    // 初始化算子
    graphStatus Init(const Tensor& a, const Tensor& b, Tensor& c,
                    bool trans_a = false, bool trans_b = false);
    
    // 推理接口
    graphStatus InferShape();
    
    // 数据类型推断
    graphStatus InferDataType();
};

} // namespace ops
} // namespace acl

#endif // MATMUL_CUSTOM_H

实现文件

// matmul_custom.cpp
#include "matmul_custom.h"

namespace acl {
namespace ops {

graphStatus MatMulCustom::Init(const Tensor& a, const Tensor& b, Tensor& c,
                               bool trans_a, bool trans_b) {
    // 设置输入
    (void)ge::Node::GetNodeFromTensor(a);
    (void)ge::Node::GetNodeFromTensor(b);
    
    // 设置属性
    SetAttr("trans_a", trans_a);
    SetAttr("trans_b", trans_b);
    
    return GRAPH_SUCCESS;
}

graphStatus MatMulCustom::InferShape() {
    // 获取输入 shape
    std::vector<int64_t> shape_a;
    std::vector<int64_t> shape_b;
    GetInputDesc(0).GetShape(shape_a);
    GetInputDesc(1).GetShape(shape_b);
    
    // 计算输出 shape
    bool trans_a = GetAttr("trans_a")->GetBool();
    bool trans_b = GetAttr("trans_b")->GetBool();
    
    int64_t m = trans_a ? shape_a[1] : shape_a[0];
    int64_t n = trans_b ? shape_b[0] : shape_b[1];
    
    std::vector<int64_t> shape_c = {m, n};
    GetOutputDesc(0).SetShape(shape_c);
    
    return GRAPH_SUCCESS;
}

graphStatus MatMulCustom::InferDataType() {
    // 输出数据类型和输入 A 一致
    DataType dtype = GetInputDesc(0).GetDataType();
    GetOutputDesc(0).SetDataType(dtype);
    return GRAPH_SUCCESS;
}

} // namespace ops
} // namespace acl

算子实现(核心)

这是最关键的部分:用 Ascend C 实现矩阵乘法。

基本框架

#include "kernel_operator.h"

class MatMulCustomKernel {
public:
    __aicore__ inline MatMulCustomKernel() {}
    
    __aicore__ void Init(GM_ADDR a, GM_ADDR b, GM_ADDR c,
                          int32_t M, int32_t N, int32_t K);
    
    __aicore__ void Process();

private:
    GlobalTensor<float> aGm;  // 输入 A(GM 内存)
    GlobalTensor<float> bGm;  // 输入 B(GM 内存)
    GlobalTensor<float> cGm;  // 输出 C(GM 内存)
    
    LocalTensor<float> aLocal;  // 输入 A(UB 内存)
    LocalTensor<float> bLocal;  // 输入 B(UB 内存)
    LocalTensor<float> cLocal;  // 输出 C(UB 内存)
    
    int32_t M, N, K;
};

extern "C" __global__ void matmul_custom(GM_ADDR a, GM_ADDR b, GM_ADDR c,
                                          int32_t M, int32_t N, int32_t K) {
    MatMulCustomKernel op;
    op.Init(a, b, c, M, N, K);
    op.Process();
}

Init 函数

__aicore__ void MatMulCustomKernel::Init(GM_ADDR a, GM_ADDR b, GM_ADDR c,
                                          int32_t M, int32_t N, int32_t K) {
    // 设置 GM 地址
    aGm.SetGlobalBuffer((__gm__ float*)a);
    bGm.SetGlobalBuffer((__gm__ float*)b);
    cGm.SetGlobalBuffer((__gm__ float*)c);
    
    // 保存参数
    this->M = M;
    this->N = N;
    this->K = K;
    
    // 分配 UB 空间
    constexpr int32_t BLOCK_SIZE = 32;  // 一个 block 的大小
    aLocal = GetUbBlock<float>(M * K / BLOCK_SIZE);
    bLocal = GetUbBlock<float>(K * N / BLOCK_SIZE);
    cLocal = GetUbBlock<float>(M * N / BLOCK_SIZE);
}

Process 函数(核心计算)

__aicore__ void MatMulCustomKernel::Process() {
    // 1. 把数据从 GM 搬到 UB
    CopyIn(aGm, aLocal, M * K);
    CopyIn(bGm, bLocal, K * N);
    
    // 2. 矩阵乘法计算
    MatMul(cLocal, aLocal, bLocal, M, N, K);
    
    // 3. 把结果从 UB 写回 GM
    CopyOut(cLocal, cGm, M * N);
}

// 矩阵乘法实现
void MatMul(LocalTensor<float>& c, LocalTensor<float>& a, LocalTensor<float>& b,
            int32_t M, int32_t N, int32_t K) {
    // 简化版本:逐行逐列计算
    for (int32_t i = 0; i < M; i++) {
        for (int32_t j = 0; j < N; j++) {
            float sum = 0.0f;
            for (int32_t k = 0; k < K; k++) {
                sum += a[i * K + k] * b[k * N + j];
            }
            c[i * N + j] = sum;
        }
    }
}

优化版本(使用 Cube Unit)

#include "lib_api.h"

// 使用 Cube Unit 做矩阵乘法
void MatMulCube(LocalTensor<float>& c, LocalTensor<float>& a, LocalTensor<float>& b,
               int32_t M, int32_t N, int32_t K) {
    // 调用 Cube Unit 的 MatMul 接口
    matmul_t matmul_para;
    matmul_para.M = M;
    matmul_para.N = N;
    matmul_para.K = K;
    matmul_para.is_trans_a = false;
    matmul_para.is_trans_b = false;
    
    // 用 Cube Unit 计算
    MatMul(c, a, b, matmul_para);
}

编译算子

CMake 配置

# CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(matmul_custom)

# 设置 Ascend C 编译选项
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=davinci")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2")

# 添加算子实现文件
add_library(matmul_custom SHARED
    matmul_custom.cpp
    matmul_custom_kernel.cpp
)

# 链接 Ascend C 库
target_link_libraries(matmul_custom
    ascendcl
    tbe_operator
)

编译命令

# 创建编译目录
mkdir build && cd build

# 配置
cmake ..

# 编译
make -j8

# 输出:libmatmul_custom.so

在 PyTorch 里调用

注册自定义算子

import torch
import torch.npu
import ctypes

# 加载自定义算子库
lib = ctypes.CDLL("./libmatmul_custom.so")

# 定义 Python 接口
def matmul_custom(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
    """自定义矩阵乘法"""
    # 检查输入
    assert a.dim() == 2 and b.dim() == 2
    assert a.shape[1] == b.shape[0]
    
    M, K = a.shape
    K2, N = b.shape
    
    # 创建输出 tensor
    c = torch.empty(M, N, device='npu:0', dtype=torch.float32)
    
    # 调用自定义算子
    lib.matmul_custom(
        a.data_ptr(),
        b.data_ptr(),
        c.data_ptr(),
        M, N, K
    )
    
    return c

测试自定义算子

# 创建测试数据
a = torch.randn(128, 256, device='npu:0')
b = torch.randn(256, 64, device='npu:0')

# 用自定义算子计算
c_custom = matmul_custom(a, b)

# 用 PyTorch 内置算子计算(参考)
c_ref = torch.matmul(a, b)

# 对比精度
cosine_sim = torch.nn.functional.cosine_similarity(
    c_custom.flatten(), c_ref.flatten(), dim=0
)
print(f"余弦相似度: {cosine_sim.item():.6f}")

# 对比性能
import time

# 预热
for _ in range(10):
    matmul_custom(a, b)

torch.npu.synchronize()
start = time.time()

for _ in range(100):
    matmul_custom(a, b)

torch.npu.synchronize()
end = time.time()

print(f"自定义算子延迟: {(end - start) / 100 * 1000:.2f} ms")

性能优化

使用 Cube Unit

达芬奇架构的 Cube Unit 是专门为矩阵乘法设计的硬件单元,比用 Vector Unit 算快得多。

// 使用 Cube Unit
#include "lib_api.h"

void MatMulOptimized(LocalTensor<float>& c, LocalTensor<float>& a, LocalTensor<float>& b,
                     int32_t M, int32_t N, int32_t K) {
    // 检查是否可以用 Cube
    if (M >= 16 && N >= 16 && K >= 16) {
        // 用 Cube Unit
        matmul_t params;
        params.M = M;
        params.N = N;
        params.K = K;
        MatMul(c, a, b, params);
    } else {
        // 用小矩阵算法
        MatMulSmall(c, a, b, M, N, K);
    }
}

分块计算

当矩阵太大,UB 放不下时,要分块计算。

constexpr int32_t TILE_SIZE = 32;  // 块大小

void MatMulTiled(LocalTensor<float>& c, GlobalTensor<float>& aGm, GlobalTensor<float>& bGm,
                int32_t M, int32_t N, int32_t K) {
    // 分块计算
    for (int32_t i = 0; i < M; i += TILE_SIZE) {
        for (int32_t j = 0; j < N; j += TILE_SIZE) {
            // 搬运当前块
            CopyInA(aGm, aLocal, i, min(i + TILE_SIZE, M), K);
            CopyInB(bGm, bLocal, j, min(j + TILE_SIZE, N), K);
            
            // 计算当前块
            MatMulTile(cLocal, aLocal, bLocal,
                       min(TILE_SIZE, M - i),
                       min(TILE_SIZE, N - j),
                       K);
            
            // 写回当前块
            CopyOutC(cLocal, cGm, i, j, min(TILE_SIZE, M - i), min(TILE_SIZE, N - j));
        }
    }
}

调试技巧

打印调试

// 在算子实现里加打印
#include <cstdio>

__aicore__ void MatMulCustomKernel::Process() {
    // 打印输入参数
    printf("M=%d, N=%d, K=%d\n", M, N, K);
    
    // 打印输入数据(前 10 个)
    for (int i = 0; i < min(10, M * K); i++) {
        printf("a[%d]=%f\n", i, aLocal.GetValue(i));
    }
    
    // 计算结果
    MatMul(cLocal, aLocal, bLocal, M, N, K);
    
    // 打印输出数据(前 10 个)
    for (int i = 0; i < min(10, M * N); i++) {
        printf("c[%d]=%f\n", i, cLocal.GetValue(i));
    }
}

用 Ascend CL 调试

import acl

# 初始化
acl.init()
acl.rt.set_device(0)

# 运行算子(调试模式)
acl.rt.set_op_execute_mode("debug")

# 运行
# ...(运行算子)

# 查看日志
# 日志在 /var/log/Ascend/ascend_toolkit/matmul_custom.log

常见问题

问题一:编译报错

error: 'matmul_t' was not declared in this scope

解决:包含正确的头文件

#include "lib_api.h"  // 包含 matmul_t 的定义

问题二:运行时报错

[ERROR] Kernel execute failed: out of memory

解决:减小块大小,或者检查 UB 大小是否足够

// 检查 UB 大小
uint32_t ub_size = GetUbSize();
printf("UB size: %u bytes\n", ub_size);

// 减小块大小
constexpr int32_t TILE_SIZE = 16;  // 从 32 改成 16

问题三:精度不达标

余弦相似度: 0.97 (应该 > 0.99)

解决:检查计算精度,可能需要用 FP32

// 用 FP32 计算
GlobalTensor<float> aGm;  // float = FP32
GlobalTensor<float> bGm;
GlobalTensor<float> cGm;

// 不要用 FP16
// GlobalTensor<half> aGm;  // half = FP16(精度可能不够)

参考资源

  • Ascend C 编程指南: https://www.hiascend.com/document/detail/zh/CANN/
  • 算子开发最佳实践: https://www.hiascend.com/document/detail/zh/CANN/
  • 算子样例代码: https://atomgit.com/cann/samples
  • 达芬奇架构白皮书: https://www.hiascend.com/document/detail/zh/CANN/

总结

用 Ascend C 开发自定义算子,流程是:定义算子原型 → 实现算子逻辑 → 编译成 .so → 在 PyTorch 里调用。核心是要理解达芬奇架构的内存层次(GM → UB → 计算单元),以及 Cube Unit 和 Vector Unit 的适用场景。性能优化的关键是尽量用 Cube Unit 做矩阵乘法,太大的矩阵要分块计算。调试可以用 printf 打印中间结果,或者用 Ascend CL 的调试模式。精度问题通常是因为用了 FP16,改成 FP32 一般能解决。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐