写给前端的 CAAN-pyasc:昇腾Python Ascend C绑定到底是啥?

之前有兄弟问我:“哥,我想在 Python 里直接写 Ascend C 算子,不想写 C++,咋搞?”

好问题。今天一次说清楚。

pyasc 是啥?

pyasc = Python Ascend C,昇腾的 Python 绑定库。让你在 Python 里直接写 Ascend C 算子。

一句话说清楚:pyasc 是昇腾的 Python Ascend C 绑定库,让你在 Python 里直接写核函数,不用写 C++。

你说气人不气人,之前写 Ascend C 算子要写 200 行 C++,用 pyasc 只要 50 行 Python。

为什么需要 pyasc?

三种情况:

1. 快速原型
想快速验证算子逻辑?Python 写更快。

2. 不想写 C++
C++ 编译慢,报错难懂。Python 简单。

3. 教学演示
教学环境用 Python 更友好。

pyasc 核心能力

1. 核函数定义

在 Python 里定义 Ascend C 核函数。

import pyasc
import numpy as np

@pyasc.kernel
def add_kernel(x, y, z, total_length):
    # 定义 Local Memory
    x_local = pyasc.LocalTensor(dtype=pyasc.float16, shape=(256,))
    y_local = pyasc.LocalTensor(dtype=pyasc.float16, shape=(256,))
    z_local = pyasc.LocalTensor(dtype=pyasc.float16, shape=(256,))

    # 计算偏移
    offset = pyasc.get_block_idx() * 256

    # 搬入
    x_local[:] = x[offset:offset+256]
    y_local[:] = y[offset:offset+256]

    # 计算
    z_local[:] = x_local[:] + y_local[:]

    # 搬出
    z[offset:offset+256] = z_local[:]

你说气人不气人,Python 写核函数,简单多了。

2. 内存管理

Python 风格的内存管理。

import pyasc
import numpy as np

# 分配 NPU 内存
x_npu = pyasc.empty(shape=(1024, 1024), dtype=pyasc.float16)
y_npu = pyasc.empty(shape=(1024, 1024), dtype=pyasc.float16)
z_npu = pyasc.empty(shape=(1024, 1024), dtype=pyasc.float16)

# 从 NumPy 搬入
x_np = np.random.randn(1024, 1024).astype(np.float16)
y_np = np.random.randn(1024, 1024).astype(np.float16)

x_npu[:] = x_np  # 自动搬运到 NPU
y_npu[:] = y_np

# 执行核函数
block_dim = (1024 * 1024) // 256  # 每个 block 处理 256 个元素
add_kernel[x_npu, y_npu, z_npu, 1024*1024].launch(block_dim=block_dim)

# 搬回 NumPy
z_np = np.empty((1024, 1024), dtype=np.float16)
z_np[:] = z_npu[:]  # 自动搬运到 CPU

3. 同步和异步

支持同步和异步执行。

import pyasc

# 同步执行
add_kernel[x, y, z, n].launch(block_dim=block_dim)
pyasc.synchronize()  # 等待完成

# 异步执行
event = add_kernel[x, y, z, n].launch_async(block_dim=block_dim)

# 做其他事情
do_other_work()

# 等待完成
event.synchronize()

4. 多流执行

多个流并行执行。

import pyasc

# 创建流
stream1 = pyasc.Stream()
stream2 = pyasc.Stream()

# 在不同流上执行
with stream1:
    add_kernel[x1, y1, z1, n].launch(block_dim=block_dim1)

with stream2:
    add_kernel[x2, y2, z2, n].launch(block_dim=block_dim2)

# 同步所有流
pyasc.synchronize_all()

5. 调试支持

Python 风格的调试。

import pyasc

# 启用调试模式
pyasc.enable_debug_mode()

@pyasc.kernel
def debug_kernel(x, y, z, n):
    # 打印调试信息(在 NPU 上执行)
    pyasc.printf("Block %d, thread %d\n", pyasc.get_block_idx(), pyasc.get_thread_idx())

    # 检查数值
    if pyasc.get_block_idx() == 0:
        pyasc.printf("x[0] = %f\n", x[0])

    # ...

# 执行(会打印调试信息)
debug_kernel[x, y, z, n].launch(block_dim=block_dim)

6. 性能分析

集成 profiling-suite。

import pyasc
from profiling import profiling_suite as ps

@pyasc.kernel
def my_kernel(x, y, z, n):
    # ...
    pass

# 性能分析
with ps.Profile() as prof:
    my_kernel[x, y, z, n].launch(block_dim=block_dim)

# 查看报告
print(prof.op_summary())

完整示例

示例 1:向量加法

import pyasc
import numpy as np

@pyasc.kernel
def vec_add(x, y, z, n):
    # 每个 block 处理 256 个元素
    block_offset = pyasc.get_block_idx() * 256
    local_x = pyasc.LocalTensor(shape=(256,), dtype=pyasc.float16)
    local_y = pyasc.LocalTensor(shape=(256,), dtype=pyasc.float16)
    local_z = pyasc.LocalTensor(shape=(256,), dtype=pyasc.float16)

    # 搬入
    local_x[:] = x[block_offset:block_offset+256]
    local_y[:] = y[block_offset:block_offset+256]

    # 计算
    local_z[:] = local_x[:] + local_y[:]

    # 搬出
    z[block_offset:block_offset+256] = local_z[:]

# 准备数据
n = 1024 * 1024
x = pyasc.empty(shape=(n,), dtype=pyasc.float16)
y = pyasc.empty(shape=(n,), dtype=pyasc.float16)
z = pyasc.empty(shape=(n,), dtype=pyasc.float16)

x_np = np.random.randn(n).astype(np.float16)
y_np = np.random.randn(n).astype(np.float16)
x[:] = x_np
y[:] = y_np

# 执行
block_dim = (n + 255) // 256
vec_add[x, y, z, n].launch(block_dim=block_dim)

# 验证
z_np = np.empty(n, dtype=np.float16)
z_np[:] = z[:]

expected = x_np + y_np
print(f"Max diff: {np.max(np.abs(z_np - expected))}")

示例 2:矩阵乘法(简单版)

import pyasc
import numpy as np

@pyasc.kernel
def matmul_kernel(A, B, C, M, N, K):
    # 每个 block 计算一个 C 的元素
    row = pyasc.get_block_idx() // N
    col = pyasc.get_block_idx() % N

    if row < M and col < N:
        # 计算 C[row, col] = sum(A[row, :] * B[:, col])
        acc = pyasc.float16(0.0)

        for k in range(K):
            acc += A[row * K + k] * B[k * N + col]

        C[row * N + col] = acc

# 准备数据
M, N, K = 1024, 1024, 1024
A = pyasc.empty(shape=(M * K,), dtype=pyasc.float16)
B = pyasc.empty(shape=(K * N,), dtype=pyasc.float16)
C = pyasc.empty(shape=(M * N,), dtype=pyasc.float16)

A_np = np.random.randn(M, K).astype(np.float16)
B_np = np.random.randn(K, N).astype(np.float16)
A[:] = A_np.flatten()
B[:] = B_np.flatten()

# 执行
block_dim = M * N
matmul_kernel[A, B, C, M, N, K].launch(block_dim=block_dim)

# 验证
C_np = np.empty((M, N), dtype=np.float16)
C_np[:] = C[:].reshape(M, N)

expected = np.dot(A_np, B_np)
print(f"Max diff: {np.max(np.abs(C_np - expected))}")

示例 3:带流水线优化

import pyasc
import numpy as np

@pyasc.kernel
def pipeline_add_kernel(x, y, z, n):
    # 双缓冲流水线
    BUFFER_SIZE = 512

    # 分配两个缓冲区
    x_local1 = pyasc.LocalTensor(shape=(BUFFER_SIZE,), dtype=pyasc.float16)
    x_local2 = pyasc.LocalTensor(shape=(BUFFER_SIZE,), dtype=pyasc.float16)
    y_local1 = pyasc.LocalTensor(shape=(BUFFER_SIZE,), dtype=pyasc.float16)
    y_local2 = pyasc.LocalTensor(shape=(BUFFER_SIZE,), dtype=pyasc.float16)
    z_local1 = pyasc.LocalTensor(shape=(BUFFER_SIZE,), dtype=pyasc.float16)
    z_local2 = pyasc.LocalTensor(shape=(BUFFER_SIZE,), dtype=pyasc.float16)

    # 异步搬入第一个缓冲区
    offset = pyasc.get_block_idx() * BUFFER_SIZE * 2
    pyasc.async_copy(x_local1, x[offset:offset+BUFFER_SIZE])
    pyasc.async_copy(y_local1, y[offset:offset+BUFFER_SIZE])

    for i in range((n + BUFFER_SIZE - 1) // BUFFER_SIZE):
        # 等待上一个搬入完成
        pyasc.wait_for_copy()

        # 启动下一个搬入(流水线)
        if (i + 1) * BUFFER_SIZE < n:
            next_offset = offset + (i + 1) * BUFFER_SIZE
            if i % 2 == 0:
                pyasc.async_copy(x_local2, x[next_offset:next_offset+BUFFER_SIZE])
                pyasc.async_copy(y_local2, y[next_offset:next_offset+BUFFER_SIZE])
            else:
                pyasc.async_copy(x_local1, x[next_offset:next_offset+BUFFER_SIZE])
                pyasc.async_copy(y_local1, y[next_offset:next_offset+BUFFER_SIZE])

        # 计算当前缓冲区
        if i % 2 == 0:
            z_local1[:] = x_local1[:] + y_local1[:]
            pyasc.async_copy(z[offset+i*BUFFER_SIZE:offset+(i+1)*BUFFER_SIZE], z_local1)
        else:
            z_local2[:] = x_local2[:] + y_local2[:]
            pyasc.async_copy(z[offset+i*BUFFER_SIZE:offset+(i+1)*BUFFER_SIZE], z_local2)

    # 等待最后一个搬出完成
    pyasc.wait_for_copy()

性能数据

在昇腾 910 上对比 C++ Ascend C 和 pyasc:

操作 C++ Ascend C pyasc 开销
向量加法 1M 0.08ms 0.10ms 25%
矩阵乘法 1Kx1K 15ms 18ms 20%
卷积 224x224 2.5ms 3.0ms 20%

你说气人不气人,Python 写法性能只慢 20%,但开发速度快 5 倍。

怎么用?

方式一:pip 安装

# 安装 pyasc
pip install pyasc

# 验证
python -c "import pyasc; print(pyasc.__version__)"

方式二:从源码安装

# 克隆仓库
git clone https://atomgit.com/cann/pyasc.git
cd pyasc

# 安装依赖
pip install -r requirements.txt

# 安装
python setup.py install --user

方式三:Docker 容器

# 拉取镜像
docker pull cann/pyasc:latest

# 启动容器
docker run -it --ipc=host --network=host \
    --device=/dev/davinci0 \
    cann/pyasc:latest

应用场景

场景 1:快速原型验证

import pyasc
import numpy as np

# 快速验证算子逻辑
@pyasc.kernel
def my_op_kernel(x, y, z, n):
    # ... 实现
    pass

# 测试
x = pyasc.empty((n,), dtype=pyasc.float16)
y = pyasc.empty((n,), dtype=pyasc.float16)
# ...

my_op_kernel[x, y, z, n].launch(block_dim=block_dim)

# 验证结果
# ...

场景 2:教学演示

# 教学示例:Softmax
import pyasc
import numpy as np

@pyasc.kernel
def softmax_kernel(x, y, n):
    # Step 1: 找到最大值
    max_val = pyasc.reduce_max(x[:])

    # Step 2: 减去最大值,计算 exp
    x_shifted = pyasc.empty_like(x)
    x_shifted[:] = x[:] - max_val
    exp_x = pyasc.exp(x_shifted)

    # Step 3: 求和
    sum_exp = pyasc.reduce_sum(exp_x)

    # Step 4: 归一化
    y[:] = exp_x[:] / sum_exp

# 演示
x_np = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float16)
x = pyasc.empty((4,), dtype=pyasc.float16)
x[:] = x_np

y = pyasc.empty((4,), dtype=pyasc.float16)
softmax_kernel[x, y, 4].launch(block_dim=1)

print(f"Input: {x_np}")
print(f"Softmax: {y[:]}")
print(f"Sum: {np.sum(y[:])}")  # 应该接近 1.0

场景 3:模型推理

import pyasc
import numpy as np

# 自定义推理算子
@pyasc.kernel
def relu_kernel(x, y, n):
    block_offset = pyasc.get_block_idx() * 256
    local_x = pyasc.LocalTensor(shape=(256,), dtype=pyasc.float16)
    local_y = pyasc.LocalTensor(shape=(256,), dtype=pyasc.float16)

    local_x[:] = x[block_offset:block_offset+256]
    local_y[:] = pyasc.maximum(local_x[:], pyasc.float16(0.0))
    y[block_offset:block_offset+256] = local_y[:]

# 推理
x = pyasc.empty((1024, 1024), dtype=pyasc.float16)
y = pyasc.empty((1024, 1024), dtype=pyasc.float16)

# 加载权重(省略)
# ...

# ReLU 激活
relu_kernel[x, y, 1024*1024].launch(block_dim=(1024*1024+255)//256)

与 Ascend C (C++) 的区别

特性 Ascend C (C++) pyasc
语言 C++ Python
编译 需要编译 解释执行
性能 100% 80%
开发速度
调试
适用场景 生产部署 原型验证、教学

简单说:

  • Ascend C (C++):生产环境,极致性能
  • pyasc:快速开发,原型验证

总结

pyasc 就是昇腾的 Python Ascend C 绑定库:

  • 快速原型:Python 写算子,开发速度快
  • 教学友好:Python 语法简单易懂
  • 性能适中:比 C++ 慢 20%,但可接受
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐