信号处理中最核心的操作:FIR(有限脉冲响应)滤波。输入信号 x[n],滤波器系数 h[0…M-1],输出 y[n] = Σ h[k] × x[n-k]。CPU 上这就是个 O(N×M) 的双重循环——N=10^6 个采样点、M=512 个系数 → 5.12 亿次乘加 → CPU 跑 2.8s。NPU 上两种算法:直接卷积(Vector 单元 256 lane 并行滑动点乘)和 FFT 卷积(频域乘法替代时域卷积,O(N log N),但系数少时 FFT 开销反超直接法)。

sip(AscendSiPBoost)在 Ascend NPU 上提供了自适应路由——根据滤波器阶数 M 和信号长度 N 自动选择直接卷积或 FFT 卷积,开发者不需要手写判定逻辑。

路径一:直接 FIR(时域滑动窗口)

系数少于 128 时,直接卷积最快——每个输出点 = 128 次乘加,Cube 单元矩阵乘反而浪费(矩阵太小,初始化开销超过计算)。

// sip/kernels/fir/direct_fir_kernel.cpp

__aicore__ void DirectFIRKernel(
    GlobalTensor<float16>& x,         // 输入信号 [N]
    GlobalTensor<float16>& h,         // 滤波器系数 [M]
    GlobalTensor<float16>& y,         // 输出信号 [N]
    int N, int M
) {
    int taps_per_lane = (M + 255) / 256;  // 每个 lane 负责的系数数(M≤128→1)
    int outputs_per_block = 256;

    for (int out_start = blockIdx.x * outputs_per_block;
         out_start < N;
         out_start += gridDim.x * outputs_per_block) {

        int out_end = min(out_start + outputs_per_block, N);

        for (int o = out_start; o < out_end; o++) {
            float sum = 0.0f;

            // 每个 lane 处理一个系数,256 lane 并行
            // 如果 M ≤ 256,一次循环完成;否则迭代 taps_per_lane 次
            for (int t = 0; t < taps_per_lane; t++) {
                int k = threadIdx.x + t * 256;
                if (k < M) {
                    // 信号索引:x[n - k],需要处理 n - k < 0 的边界情况
                    int signal_idx = o - k;
                    float16 x_val = (signal_idx >= 0) ? x[signal_idx] : 0.0f;

                    // 乘加(Vector 单元 FMA,1 个周期)
                    sum += float(x_val) * float(h[k]);
                }
            }

            // === Warp Reduce:256 个部分和归约到 1 个 ===
            // butterfly 归约,8 步完成(log2(256) = 8)
            #pragma unroll
            for (int offset = 128; offset > 0; offset >>= 1) {
                sum += __shfl_xor(sum, offset);
            }

            // 只有 lane 0 写入结果(归约的最终值在 lane 0)
            if (threadIdx.x == 0) {
                y[o] = float16(sum);
            }
        }
    }
}

256 个 lane 并行计算 256 个系数的点乘 → 每个输出点一次 warp reduce → 如果 M=128,所有系数在同一轮内完成(128 个 lane 活跃,128 个 lane 输出 0,warp reduce 自然归约)。

路径二:FFT 卷积(频域乘法)

系数 > 128 时,FFT 卷积更划算。频域卷积定理:y = IFFT(FFT(x) × FFT(h))。时域 O(N×M) 的卷积变成 FFT 的 O(N log N)。

// sip/kernels/fir/fft_conv_kernel.cpp

__aicore__ void FFTConvKernel(
    GlobalTensor<complex64>& x,       // 输入信号(复数,虚部为 0)
    GlobalTensor<complex64>& h,       // 滤波器系数(复数,零填充到 block_size)
    GlobalTensor<complex64>& y,       // 输出信号
    int N, int M, int block_size      // block_size = next_pow2(N + M - 1)
) {
    // Overlap-Save 分块 FFT 卷积
    // 把长信号 N 分成多个 block,每个 block_size 点
    int num_blocks = (N + block_size - M) / (block_size - M + 1);

    for (int blk = blockIdx.x; blk < num_blocks; blk += gridDim.x) {
        // 这一块覆盖的信号范围(有 M-1 点的重叠区)
        int blk_start = blk * (block_size - M + 1);
        int blk_end = min(blk_start + block_size, N);

        // === 子步骤 1:FFT(x_block) ===
        // 从 x 中加载这一块到 L1
        complex64 x_block[4096];  // block_size 最大 4096
        for (int i = threadIdx.x; i < block_size; i += 256) {
            int signal_idx = blk_start + i;
            x_block[i] = (signal_idx < N) ? x[signal_idx]
                       : complex64{0.0f, 0.0f};  // 超出范围的填零
        }
        __sync_block();

        // Radix-2 FFT,log2(block_size) 层蝶形运算
        FFT(x_block, block_size, /*inverse=*/false);
        __sync_block();

        // === 子步骤 2:频域乘法(逐元素,不归约)===
        // y[k] = X[k] × H[k],k 从 0 到 block_size-1
        for (int k = threadIdx.x; k < block_size; k += 256) {
            complex64 Xk = x_block[k];
            complex64 Hk = h[k];  // H 已预计算 FFT 并存 L2

            // 复数乘法:(a+ib)(c+id) = (ac-bd) + i(ad+bc)
            float a = Xk.real(), b = Xk.imag();
            float c = Hk.real(), d = Hk.imag();

            x_block[k] = complex64{
                a * c - b * d,   // real
                a * d + b * c    // imag
            };
        }
        __sync_block();

        // === 子步骤 3:IFFT(x_block) ===
        FFT(x_block, block_size, /*inverse=*/true);
        __sync_block();

        // === 子步骤 4:去重叠区,写入有效段 ===
        // Overlap-Save:丢弃前 M-1 个点(受圆周卷积污染的),保留后 block_size-M+1 个
        int valid_start = M - 1;
        int valid_len = block_size - M + 1;
        int dst_start = blk_start;

        for (int i = threadIdx.x; i < valid_len; i += 256) {
            int src_idx = valid_start + i;
            int dst_idx = dst_start + i;
            if (dst_idx < N) {
                y[dst_idx] = complex64{x_block[src_idx].real() / block_size,
                                        x_block[src_idx].imag() / block_size};
            }
        }
    }
}
// sip/kernels/fir/fft_radix2.h
// Radix-2 蝶形 FFT(前面 sip 第一篇已展开过,这里精简展示关键循环)

__aicore__ void FFT(complex64* data, int N, bool inverse) {
    int log2N = 0;
    for (int n = N; n > 1; n >>= 1) log2N++;

    for (int level = 1; level <= log2N; level++) {
        int step = 1 << level;
        int half = step >> 1;

        for (int pair = threadIdx.x; pair < N/2; pair += 256) {
            int i = (pair / half) * step + (pair % half);
            int j = i + half;

            // 旋转因子
            int k = (pair % half) * (N / step);
            float cos_w = cos_table[k];
            float sin_w = sin_table[k];
            if (inverse) sin_w = -sin_w;

            // 蝶形运算
            float t_real = data[j].real() * cos_w - data[j].imag() * sin_w;
            float t_imag = data[j].real() * sin_w + data[j].imag() * cos_w;

            data[j] = complex64{data[i].real() - t_real,
                                 data[i].imag() - t_imag};
            data[i] = complex64{data[i].real() + t_real,
                                 data[i].imag() + t_imag};
        }
        __sync_block();
    }
}

自适应路由决策

// sip/kernels/fir/adaptive_router.h

enum FIRMethod { DIRECT, FFT_CONV };

FIRMethod AutoSelectMethod(int N, int M) {
    // 直接卷积的计算量:N × M FMA
    // FFT 卷积的计算量:3 × block_size × log2(block_size)
    //                  + block_size(频域乘法)
    //   block_size = next_pow2(L + M - 1)
    //               × num_blocks(overlap-save 分块数)

    int L = min(N, 4096);  // 最大 block size
    int block_size = 1;
    while (block_size < L + M - 1) block_size <<= 1;

    int num_blocks = (N + L - M) / (L - M + 1);  // 分块数

    float ops_direct = N * M;  // FMA 操作数
    float ops_fft = num_blocks * block_size * log2f(block_size) * 3
                  + num_blocks * block_size;  // 频域乘法

    // FFT 卷积的额外开销(L1↔L2 搬运、complex64 比 float16 大 4×)
    float overhead_fft = ops_fft * 0.3;  // 30% 调度/数据搬运开销
    float overhead_direct = ops_direct * 0.05;  // 5%(Vector 单元直接乘加)

    ops_fft += overhead_fft;
    ops_direct += overhead_direct;

    return (ops_fft < ops_direct) ? FFT_CONV : DIRECT;
}

性能对比

FIR 滤波在 Ascend 910 NPU vs Xeon 64-core CPU
N=10^6(1M 个采样点)

| M(系数数) | 算法 | NPU (μs) | CPU (μs) | NPU 加速比 |
|-----------|------|---------|---------|----------|
| 16        | 直接 FIR | 42 | 180 | 4.3× |
| 64        | 直接 FIR | 78 | 720 | 9.2× |
| 128       | 直接 FIR | 142 | 1,440 | 10.1× |
| 256       | FFT 卷积 | 210 | 2,880 | 13.7× |
| 512       | FFT 卷积 | 380 | 5,760 | 15.2× |
| 1024      | FFT 卷积 | 680 | 11,520 | 16.9× |
| 2048      | FFT 卷积 | 1,240 | 23,040 | 18.6× |

转折点在 M ≈ 150:直接 FIR 的 O(N×M) 超过 FFT 卷积的 O(N log N)。

踩坑一:Overlap-Save 的边界污染

FFT 卷积用 Overlap-Save——但块边界的 M-1 个点是圆周卷积结果,不是线性卷积。如果不用 overlap,这些点直接串起来 → 输出波形出现台阶状不连续。

# ❌ 不用 overlap → 每块独立 FFT 卷积 → 块边界台阶
def bad_fft_conv(x, h, block_size):
    y = []
    for blk in range(0, len(x), block_size):
        x_chunk = x[blk:blk + block_size]
        Y = np.fft.fft(np.fft.fft(x_chunk, block_size) *
                         np.fft.fft(h, block_size), inverse=True)
        y.extend(Y.real)  # ← 块边界没去重叠区
    return y
# → 每 block_size 个采样点出现跳变(M-1 个点被污染)

# ✅ Overlap-Save:overlap M-1 个点,丢弃污染段
def correct_fft_conv(x, h, block_size, M):
    step_size = block_size - M + 1  # 有效步长
    y = []
    for blk in range(0, len(x), step_size):
        x_chunk = x[blk:blk + block_size]  # 包含 overlap
        Y = np.fft.ifft(np.fft.fft(x_chunk) * np.fft.fft(h)).real
        y.extend(Y[M-1:])  # 丢弃前 M-1 个点(被污染的)
    return y[:len(x)]  # 裁到原始长度

踩坑二:旋转因子表的 L2 缓存命中

4096 点 FFT 需要 4096 个旋转因子(sin/cos 值)——预计算存在 L2 缓存(32MB)。但如果同时有 32 个 Stream 都在跑 FFT,32 × 4096 × 8 bytes(complex64)= 1MB → L2 够用。问题是:超过 64 个并发 FFT 时 L2 不够 → 旋转因子溢到 HBM → 每次 FFT 多 128μs。

// ✅ 旋转因子复用:多 Stream 共享同一份 L2 缓存
static complex64 shared_twiddle_table[MAX_FFT_SIZE] __attribute__((section(".l2_cache")));

// 初始化一次,所有 kernel 共享
void InitTwiddleTable(int max_size) {
    for (int k = 0; k < max_size; k++) {
        float angle = -2.0f * M_PI * k / max_size;
        shared_twiddle_table[k] = complex64{cosf(angle), sinf(angle)};
    }
}

L2 共享表 + __attribute__((section(".l2_cache"))) → Ascend 编译器识别 static 变量放 L2,不写 HBM。

踩坑三:复数精度在长 FIR 链中的累积

一根 FIR 输出 → 下一根的输入 → 再下一根…FP16 的 3.3 位有效数字经过 10 次 FFT 卷积后彻底丧失精度(error > 10%)。

# ❌ 10 级 FIR 级联,FP16 → 精度丧失
sig = anp.array(signal, dtype=anp.float16)
for coeffs in filter_bank:  # 10 个滤波器
    sig = anp.sip.fir(sig, coeffs, method='fft')
    # ↑ 每次输出转 FP16 → 累积误差 0.3% × 10 = 3%
    # 如果信号有精细结构(微弱峰),精度不够
# → 第 10 级输出:signal_error > 10%

# ✅ 关键级用 FP32,非关键转回 FP16
for i, coeffs in enumerate(filter_bank):
    if i in critical_stages:  # [0, 3, 5] 关键级
        sig = anp.array(sig, dtype=anp.float32)
        sig = anp.sip.fir(sig, coeffs, method='fft')
        sig = sig.astype(anp.float16)  # 输出转回 FP16(省 HBM)
    else:
        sig = anp.sip.fir(sig, coeffs, method='direct')

sip 的 FIR 滤波器在 Ascend NPU 上有两条路径:M < 150 → 直接卷积(256 lane 并行点乘 + warp reduce,最快 42μs for M=16);M > 150 → FFT 卷积(Overlap-Save 分块 + 蝶形运算,O(N log N) vs O(N×M))。1M 采样点、2048 系数 → NPU 1.24ms vs CPU 23ms(18.6×)。自适应路由自动决策路径,开发者一行判断都不用写。三个关键:Overlap-Save 块边界 M-1 点必须丢弃、旋转因子表用 L2 attribute 共享避免 HBM 读、长 FIR 级联的关键级用 FP32 防误差累积(10 级 FP16 级联 error > 10%)。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐