写给新手的 pyasc:昇腾 Python Ascend C 绑定到底是啥?
写给新手的 pyasc:昇腾 Python Ascend C 绑定到底是啥?
·
之前做算子开发,兄弟问我:“哥,我想用 Python 写 Ascend C 算子,不用 C++,有办法吗?”
我说有,pyasc。
好问题。今天一次说清楚。
pyasc 是啥?
pyasc = Python Ascend C Binding,昇腾的 Python Ascend C 绑定。让能用 Python 写昇腾 C 算子。
一句话说清楚:pyasc 是昇腾的 Python Ascend C 绑定,让你在 Python 里直接调用 Ascend C API 写算子,不用 C++。
你说气人不气人,原来要写 C++ 才能写算子,现在用 Python 就行。
为什么要用 pyasc?
三个字:Python。
不用 pyasc(C++ 写算子)
// C++ Ascend C 代码
#include "acl/acl.h"
#include "tbe/tbe.h"
// 定义算子
extern "C" void conv2d(void* args, void** kwargs) {
// 获取输入输出
aclfloat16* input = (aclfloat16*)kwargs[0];
aclfloat16* weight = (aclfloat16*)kwargs[1];
aclfloat16* output = (aclfloat16*)kwargs[2];
// 获取属性
int64_t stride = *(int64_t*)kwargs[3];
int64_t padding = *(int64_t*)kwargs[4];
// 写计算逻辑(几百行...)
for (int n = 0; n < N; n++) {
for (int k = 0; k < K; k++) {
// ... 计算逻辑 ...
}
}
// 返回
return;
}
// 编译、注册、调用...
// 复杂死了!
用 pyasc(Python 写算子)
# Python Ascend C 代码
import pyasc
@pyasc.operator("Conv2d")
def conv2d(input, weight, output, stride=1, padding=0):
# 直接写计算逻辑
N, C, H, W = input.shape
K, _, kH, kW = weight.shape
for n in range(N):
for k in range(K):
for oh in range(out_h):
for ow in range(out_w):
# 计算逻辑
s = 0
for c in range(C):
for kh in range(kH):
for kw in range(kW):
ih = oh * stride + kh - padding
iw = ow * stride + kw - padding
s += input[n, c, ih, iw] * weight[k, c, kh, kw]
output[n, k, oh, ow] = s
# 一行注册
pyasc.register(conv2d)
# 直接调用
output = conv2d(input, weight, stride=2, padding=3)
你说气人不气人,用 Python 代码少了 90%。
核心概念就三个
1. 算子装饰器
用装饰器定义算子:
import pyasc
import numpy as np
@pyasc.operator("MyOp")
def my_operator(input_a, input_b, output, alpha=1.0):
"""
我的算子描述
"""
# 计算逻辑(Python 实现)
output[:] = input_a + alpha * input_b
2. Tensor 表述
用 NumPy 数组表示 Tensor:
import pyasc
import numpy as np
# 输入 Tensor(NumPy 数组)
input = np.random.randn(1, 3, 224, 224).astype(np.float16)
input = pyasc.tensor(input) # 包装成 pyasc Tensor
# 输出 Tensor
output = pyasc.empty((1, 64, 112, 112), dtype=np.float16)
output = pyasc.tensor(output)
# 或让 pyasc 自动创建
output = pyasc.tensor(shape=(1, 64, 112, 112), dtype=np.float16)
3. 执行引擎
执行算子:
import pyasc
# 编译算子
pyasc.compile("MyOp")
# 调用算子
output = my_operator(input, weight, alpha=0.5)
# 或批量调用
outputs = pyasc.batch([
(input1, weight1),
(input2, weight2),
(input3, weight3),
], func=my_operator)
为什么要用 pyasc?
三个理由:
1. 代码量少
同样一个算子:
| 方式 | 代码行数 | 说明 |
|---|---|---|
| C++ Ascend C | 300 行 | 内存管理、类型转换、循环 |
| pyasc | 30 行 | 直接写逻辑 |
2. 开发快
Python 调试比 C++ 快:
# Python 调试
# 1. 出错直接报异常
# 2. 可以用 pdb/IPython 调试
# 3. 修改完就能跑
# C++ 调试
# 1. 编译很慢
# 2. gdb 调试麻烦
# 3. 改完要重新编译
你说气人不气人,开发效率高 10 倍。
3. 生态好
Python 库丰富:
# 用 NumPy、SciPy、Pillow
import numpy as np
from PIL import Image
from scipy import ndimage
# 数据预处理
img = np.array(Image.open("test.jpg"))
# NumPy 处理
filtered = ndimage.gaussian_filter(img, sigma=1.5)
# 传给 Ascend C 算子
input_tensor = pyasc.tensor(filtered)
output = conv2d(input_tensor)
怎么用?代码示例
示例 1:卷积算子
import pyasc
import numpy as np
@pyasc.operator("Conv2d")
def conv2d(input, weight, output, stride=1, padding=0):
"""
2D Convolution operator
Args:
input: Input tensor (N, C, H, W)
weight: Weight tensor (K, C, kH, kW)
output: Output tensor (N, K, oH, oW)
stride: Convolution stride
padding: Padding size
"""
N, C, H, W = input.shape
K, _, kH, kW = weight.shape
# 计算输出尺寸
oH = (H - kH + 2 * padding) // stride + 1
oW = (W - kW + 2 * padding) // stride + 1
# 确保输出形状正确
output.resize((N, K, oH, oW))
# 零初始化
output[:] = 0
# 卷积计算
for n in range(N):
for k in range(K):
for c in range(C):
for oh in range(oH):
for ow in range(oW):
s = 0
for kh in range(kH):
for kw in range(kW):
ih = oh * stride + kh - padding
iw = ow * stride + kw - padding
if 0 <= ih < H and 0 <= iw < W:
s += input[n, c, ih, iw] * weight[k, c, kh, kw]
output[n, k, oh, ow] += s
# 注册
pyasc.register(conv2d)
# 测试
input = np.random.randn(1, 3, 224, 224).astype(np.float32)
weight = np.random.randn(64, 3, 7, 7).astype(np.float32)
output = np.empty((1, 64, 112, 112), dtype=np.float32)
# 调用
conv2d(input, weight, output, stride=2, padding=3)
print(f"Output shape: {output.shape}")
示例 2:矩阵乘法
import pyasc
import numpy as np
@pyasc.operator("Matmul")
def matmul(a, b, c, transpose_a=False, transpose_b=False):
"""
Matrix multiplication
Args:
a: Matrix A (M, K) or (K, M)
b: Matrix B (K, N) or (N, K)
c: Output (M, N)
transpose_a: Transpose A
transpose_b: Transpose B
"""
# 转置(如果需要)
if transpose_a:
a = a.T
if transpose_b:
b = b.T
# 确保形状正确
M, K = a.shape
K_, N = b.shape
if K != K_:
raise ValueError(f"A's K ({K}) != B's K ({K_})")
# 重置输出形状
c.resize((M, N))
# 矩阵乘法
# 使用 NumPy 的风格但手动实现
for m in range(M):
for n in range(N):
s = 0
for k in range(K):
s += a[m, k] * b[k, n]
c[m, n] = s
# 注册
pyasc.register(matmul)
# 测试
a = np.random.randn(1024, 512).astype(np.float32)
b = np.random.randn(512, 2048).astype(np.float32)
c = np.empty((1024, 2048), dtype=np.float32)
matmul(a, b, c)
print(f"Output shape: {c.shape}")
示例 3:Pooling
import pyasc
import numpy as np
@pyasc.operator("MaxPool2d")
def max_pool2d(input, output, kernel_size=2, stride=2, padding=0):
"""
Max pooling
Args:
input: Input tensor (N, C, H, W)
output: Output tensor (N, C, oH, oW)
kernel_size: Pooling window size
stride: Stride
padding: Padding
"""
N, C, H, W = input.shape
# 计算输出尺寸
oH = (H - kernel_size + 2 * padding) // stride + 1
oW = (W - kernel_size + 2 * padding) // stride + 1
output.resize((N, C, oH, oW))
# Max pooling
for n in range(N):
for c in range(C):
for oh in range(oH):
for ow in range(oW):
# 计算窗口区域
h_start = oh * stride - padding
w_start = ow * stride - padding
# 找最大值
max_val = -np.inf
for kh in range(kernel_size):
for kw in range(kernel_size):
ih = h_start + kh
iw = w_start + kw
if 0 <= ih < H and 0 <= iw < W:
max_val = max(max_val, input[n, c, ih, iw])
output[n, c, oh, ow] = max_val
# 注册
pyasc.register(max_pool2d)
# 测试
input = np.random.randn(1, 64, 224, 224).astype(np.float32)
output = np.empty((1, 64, 112, 112), dtype=np.float32)
max_pool2d(input, output, kernel_size=2, stride=2)
print(f"Output shape: {output.shape}")
示例 4:编译和加速
import pyasc
import numpy as np
import time
@pyasc.operator("MyOp")
def my_op(input, weight, output, alpha=1.0):
"""简单的算子"""
output[:] = input + alpha * weight
# 第一次运行(JIT 编译)
input = np.random.randn(1024, 1024)
weight = np.random.randn(1024, 1024)
output = np.empty_like(input)
# JIT 编译
start = time.time()
for _ in range(10):
my_op(input, weight, output)
first_time = (time.time() - start) / 10
# 编译为原生代码(Ahead-of-Time)
pyasc.compile("MyOp", backend="npu", opt_level=3)
# 再次运行(原生代码)
start = time.time()
for _ in range(10):
my_op(input, weight, output)
second_time = (time.time() - start) / 10
print(f"JIT time: {first_time*1000:.2f} ms")
print(f"AOT time: {second_time*1000:.2f} ms")
print(f"Speedup: {first_time/second_time:.1f}x")
性能数据
pyasc vs C++:
| 方式 | 开发时间 | 运行时间 | 说明 |
|---|---|---|---|
| C++ Ascend C | 1 天 | 1x | 需要编译 |
| pyasc (JIT) | 1 小时 | 1.5x | 即时运行 |
| pyasc (AOT) | 1 小时 | 1.1x | 编译后运行 |
你说气人不气人,pyasc 开发快 10 倍,运行只慢一点点。
跟其他仓库的关系
pyasc 在 CANN 架构里属于第 1 层(昇腾计算语言层),是Python 绑定层。
依赖关系:
pyasc(Python 绑定)
↓ 调用
Ascend C(算子编程语言)
↓ 编译
昇腾运行时
↓ 执行
硬件(昇腾 NPU)
解释一下:
- Ascend C:算子编程语言(C++)
- pyasc:Python 绑定,方便用
- 昇腾运行时:底层运行时
- 硬件:昇腾 NPU
简单说:pyasc 是 Ascend C 的 Python 外套。脱掉外套是 C++,穿上外套是 Python。
pyasc 的核心能力
1. 算子定义
@pyasc.operator("MyOp")
def my_op(input, output, param=1.0):
# 算子逻辑
output[:] = input * param
2. Tensor 操作
# 创建
t = pyasc.tensor(shape=(1, 3, 224, 224), dtype=np.float32)
# 转换
t = pyasc.from_numpy(arr)
arr = t.numpy()
# slicing/索引
t[0, :, :, :] = value
value = t[0, 0, 0, 0]
3. 执行模式
# JIT(即时执行)
output = func(input)
# AOT(编译后执行)
pyasc.compile(func, backend="npu", opt_level=3)
output = func(input)
# 批处理
outputs = pyasc.batch(inputs, func)
适用场景
什么情况下用 pyasc:
- 快速原型:验证算子算法
- Python 开发者:不想学 C++
- 教学演示:Python 代码更易懂
什么情况下不用:
- 极致性能:还是得用 C++
- 已有算子:直接用 ops-xxx
总结
pyasc 就是昇腾的"Python Ascend C 绑定":
- 装饰器定义:用 @pyasc.operator
- NumPy 风格:用数组,不用指针
- JIT/AOT:即时/编译执行
- Python 生态:用 NumPy、Pillow…
更多推荐




所有评论(0)