之前做算子开发,兄弟问我:“哥,我想用 Python 写 Ascend C 算子,不用 C++,有办法吗?”

我说有,pyasc。

好问题。今天一次说清楚。

pyasc 是啥?

pyasc = Python Ascend C Binding,昇腾的 Python Ascend C 绑定。让能用 Python 写昇腾 C 算子。

一句话说清楚:pyasc 是昇腾的 Python Ascend C 绑定,让你在 Python 里直接调用 Ascend C API 写算子,不用 C++。

你说气人不气人,原来要写 C++ 才能写算子,现在用 Python 就行。

为什么要用 pyasc?

三个字:Python

不用 pyasc(C++ 写算子)

// C++ Ascend C 代码
#include "acl/acl.h"
#include "tbe/tbe.h"

// 定义算子
extern "C" void conv2d(void* args, void** kwargs) {
    // 获取输入输出
    aclfloat16* input = (aclfloat16*)kwargs[0];
    aclfloat16* weight = (aclfloat16*)kwargs[1];
    aclfloat16* output = (aclfloat16*)kwargs[2];

    // 获取属性
    int64_t stride = *(int64_t*)kwargs[3];
    int64_t padding = *(int64_t*)kwargs[4];

    // 写计算逻辑(几百行...)
    for (int n = 0; n < N; n++) {
        for (int k = 0; k < K; k++) {
            // ... 计算逻辑 ...
        }
    }

    // 返回
    return;
}

// 编译、注册、调用...
// 复杂死了!

用 pyasc(Python 写算子)

# Python Ascend C 代码
import pyasc

@pyasc.operator("Conv2d")
def conv2d(input, weight, output, stride=1, padding=0):
    # 直接写计算逻辑
    N, C, H, W = input.shape
    K, _, kH, kW = weight.shape
    
    for n in range(N):
        for k in range(K):
            for oh in range(out_h):
                for ow in range(out_w):
                    # 计算逻辑
                    s = 0
                    for c in range(C):
                        for kh in range(kH):
                            for kw in range(kW):
                                ih = oh * stride + kh - padding
                                iw = ow * stride + kw - padding
                                s += input[n, c, ih, iw] * weight[k, c, kh, kw]
                    output[n, k, oh, ow] = s

# 一行注册
pyasc.register(conv2d)

# 直接调用
output = conv2d(input, weight, stride=2, padding=3)

你说气人不气人,用 Python 代码少了 90%。

核心概念就三个

1. 算子装饰器

用装饰器定义算子:

import pyasc
import numpy as np

@pyasc.operator("MyOp")
def my_operator(input_a, input_b, output, alpha=1.0):
    """
    我的算子描述
    """
    # 计算逻辑(Python 实现)
    output[:] = input_a + alpha * input_b

2. Tensor 表述

用 NumPy 数组表示 Tensor:

import pyasc
import numpy as np

# 输入 Tensor(NumPy 数组)
input = np.random.randn(1, 3, 224, 224).astype(np.float16)
input = pyasc.tensor(input)  # 包装成 pyasc Tensor

# 输出 Tensor
output = pyasc.empty((1, 64, 112, 112), dtype=np.float16)
output = pyasc.tensor(output)

# 或让 pyasc 自动创建
output = pyasc.tensor(shape=(1, 64, 112, 112), dtype=np.float16)

3. 执行引擎

执行算子:

import pyasc

# 编译算子
pyasc.compile("MyOp")

# 调用算子
output = my_operator(input, weight, alpha=0.5)

# 或批量调用
outputs = pyasc.batch([
    (input1, weight1),
    (input2, weight2),
    (input3, weight3),
], func=my_operator)

为什么要用 pyasc?

三个理由:

1. 代码量少

同样一个算子:

方式 代码行数 说明
C++ Ascend C 300 行 内存管理、类型转换、循环
pyasc 30 行 直接写逻辑

2. 开发快

Python 调试比 C++ 快:

# Python 调试
# 1. 出错直接报异常
# 2. 可以用 pdb/IPython 调试
# 3. 修改完就能跑

# C++ 调试
# 1. 编译很慢
# 2. gdb 调试麻烦
# 3. 改完要重新编译

你说气人不气人,开发效率高 10 倍。

3. 生态好

Python 库丰富:

# 用 NumPy、SciPy、Pillow
import numpy as np
from PIL import Image
from scipy import ndimage

# 数据预处理
img = np.array(Image.open("test.jpg"))

# NumPy 处理
filtered = ndimage.gaussian_filter(img, sigma=1.5)

# 传给 Ascend C 算子
input_tensor = pyasc.tensor(filtered)
output = conv2d(input_tensor)

怎么用?代码示例

示例 1:卷积算子

import pyasc
import numpy as np

@pyasc.operator("Conv2d")
def conv2d(input, weight, output, stride=1, padding=0):
    """
    2D Convolution operator
    
    Args:
        input: Input tensor (N, C, H, W)
        weight: Weight tensor (K, C, kH, kW)
        output: Output tensor (N, K, oH, oW)
        stride: Convolution stride
        padding: Padding size
    """
    N, C, H, W = input.shape
    K, _, kH, kW = weight.shape
    
    # 计算输出尺寸
    oH = (H - kH + 2 * padding) // stride + 1
    oW = (W - kW + 2 * padding) // stride + 1
    
    # 确保输出形状正确
    output.resize((N, K, oH, oW))
    
    # 零初始化
    output[:] = 0
    
    # 卷积计算
    for n in range(N):
        for k in range(K):
            for c in range(C):
                for oh in range(oH):
                    for ow in range(oW):
                        s = 0
                        for kh in range(kH):
                            for kw in range(kW):
                                ih = oh * stride + kh - padding
                                iw = ow * stride + kw - padding
                                if 0 <= ih < H and 0 <= iw < W:
                                    s += input[n, c, ih, iw] * weight[k, c, kh, kw]
                        output[n, k, oh, ow] += s

# 注册
pyasc.register(conv2d)

# 测试
input = np.random.randn(1, 3, 224, 224).astype(np.float32)
weight = np.random.randn(64, 3, 7, 7).astype(np.float32)
output = np.empty((1, 64, 112, 112), dtype=np.float32)

# 调用
conv2d(input, weight, output, stride=2, padding=3)
print(f"Output shape: {output.shape}")

示例 2:矩阵乘法

import pyasc
import numpy as np

@pyasc.operator("Matmul")
def matmul(a, b, c, transpose_a=False, transpose_b=False):
    """
    Matrix multiplication
    
    Args:
        a: Matrix A (M, K) or (K, M)
        b: Matrix B (K, N) or (N, K)
        c: Output (M, N)
        transpose_a: Transpose A
        transpose_b: Transpose B
    """
    # 转置(如果需要)
    if transpose_a:
        a = a.T
    if transpose_b:
        b = b.T
    
    # 确保形状正确
    M, K = a.shape
    K_, N = b.shape
    
    if K != K_:
        raise ValueError(f"A's K ({K}) != B's K ({K_})")
    
    # 重置输出形状
    c.resize((M, N))
    
    # 矩阵乘法
    # 使用 NumPy 的风格但手动实现
    for m in range(M):
        for n in range(N):
            s = 0
            for k in range(K):
                s += a[m, k] * b[k, n]
            c[m, n] = s

# 注册
pyasc.register(matmul)

# 测试
a = np.random.randn(1024, 512).astype(np.float32)
b = np.random.randn(512, 2048).astype(np.float32)
c = np.empty((1024, 2048), dtype=np.float32)

matmul(a, b, c)
print(f"Output shape: {c.shape}")

示例 3:Pooling

import pyasc
import numpy as np

@pyasc.operator("MaxPool2d")
def max_pool2d(input, output, kernel_size=2, stride=2, padding=0):
    """
    Max pooling
    
    Args:
        input: Input tensor (N, C, H, W)
        output: Output tensor (N, C, oH, oW)
        kernel_size: Pooling window size
        stride: Stride
        padding: Padding
    """
    N, C, H, W = input.shape
    
    # 计算输出尺寸
    oH = (H - kernel_size + 2 * padding) // stride + 1
    oW = (W - kernel_size + 2 * padding) // stride + 1
    
    output.resize((N, C, oH, oW))
    
    # Max pooling
    for n in range(N):
        for c in range(C):
            for oh in range(oH):
                for ow in range(oW):
                    # 计算窗口区域
                    h_start = oh * stride - padding
                    w_start = ow * stride - padding
                    
                    # 找最大值
                    max_val = -np.inf
                    for kh in range(kernel_size):
                        for kw in range(kernel_size):
                            ih = h_start + kh
                            iw = w_start + kw
                            if 0 <= ih < H and 0 <= iw < W:
                                max_val = max(max_val, input[n, c, ih, iw])
                    
                    output[n, c, oh, ow] = max_val

# 注册
pyasc.register(max_pool2d)

# 测试
input = np.random.randn(1, 64, 224, 224).astype(np.float32)
output = np.empty((1, 64, 112, 112), dtype=np.float32)

max_pool2d(input, output, kernel_size=2, stride=2)
print(f"Output shape: {output.shape}")

示例 4:编译和加速

import pyasc
import numpy as np
import time

@pyasc.operator("MyOp")
def my_op(input, weight, output, alpha=1.0):
    """简单的算子"""
    output[:] = input + alpha * weight

# 第一次运行(JIT 编译)
input = np.random.randn(1024, 1024)
weight = np.random.randn(1024, 1024)
output = np.empty_like(input)

# JIT 编译
start = time.time()
for _ in range(10):
    my_op(input, weight, output)
first_time = (time.time() - start) / 10

# 编译为原生代码(Ahead-of-Time)
pyasc.compile("MyOp", backend="npu", opt_level=3)

# 再次运行(原生代码)
start = time.time()
for _ in range(10):
    my_op(input, weight, output)
second_time = (time.time() - start) / 10

print(f"JIT time: {first_time*1000:.2f} ms")
print(f"AOT time: {second_time*1000:.2f} ms")
print(f"Speedup: {first_time/second_time:.1f}x")

性能数据

pyasc vs C++:

方式 开发时间 运行时间 说明
C++ Ascend C 1 天 1x 需要编译
pyasc (JIT) 1 小时 1.5x 即时运行
pyasc (AOT) 1 小时 1.1x 编译后运行

你说气人不气人,pyasc 开发快 10 倍,运行只慢一点点。

跟其他仓库的关系

pyasc 在 CANN 架构里属于第 1 层(昇腾计算语言层),是Python 绑定层

依赖关系:

pyasc(Python 绑定)
    ↓ 调用
Ascend C(算子编程语言)
    ↓ 编译
昇腾运行时
    ↓ 执行
硬件(昇腾 NPU)

解释一下:

  • Ascend C:算子编程语言(C++)
  • pyasc:Python 绑定,方便用
  • 昇腾运行时:底层运行时
  • 硬件:昇腾 NPU

简单说:pyasc 是 Ascend C 的 Python 外套。脱掉外套是 C++,穿上外套是 Python。

pyasc 的核心能力

1. 算子定义

@pyasc.operator("MyOp")
def my_op(input, output, param=1.0):
    # 算子逻辑
    output[:] = input * param

2. Tensor 操作

# 创建
t = pyasc.tensor(shape=(1, 3, 224, 224), dtype=np.float32)

# 转换
t = pyasc.from_numpy(arr)
arr = t.numpy()

# slicing/索引
t[0, :, :, :] = value
value = t[0, 0, 0, 0]

3. 执行模式

# JIT(即时执行)
output = func(input)

# AOT(编译后执行)
pyasc.compile(func, backend="npu", opt_level=3)
output = func(input)

# 批处理
outputs = pyasc.batch(inputs, func)

适用场景

什么情况下用 pyasc:

  • 快速原型:验证算子算法
  • Python 开发者:不想学 C++
  • 教学演示:Python 代码更易懂

什么情况下不用:

  • 极致性能:还是得用 C++
  • 已有算子:直接用 ops-xxx

总结

pyasc 就是昇腾的"Python Ascend C 绑定":

  • 装饰器定义:用 @pyasc.operator
  • NumPy 风格:用数组,不用指针
  • JIT/AOT:即时/编译执行
  • Python 生态:用 NumPy、Pillow…
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐