分布式训练和推理里,有一个东西你看不见但时刻在影响性能:通信

8 卡跑模型,7 卡在算,1 卡在等——通信拖慢整体。128 卡训练,通信时间占比 30%——优化通信就是优化训练速度。

昇腾的通信库叫 hccl(Huawei Collective Communication Library)。

hccl 在 CANN 架构中的位置

第 4 层:昇腾计算执行层
  └─ HCCL 集合通信库  ← 今天的主角

硬件层:昇腾 AI 硬件
  └─ HCCS(昇腾互联) + RoCE(以太网卡)

hccl 是第 4 层的核心组件,负责多卡、多机之间的数据同步。

核心通信原语

hccl 提供了 6 种基础通信原语:

原语 功能 典型场景
AllReduce 所有节点求和/最大/最小 梯度同步
AllGather 所有节点收集全部数据 特征聚合
ReduceScatter 求和后分发 分布式优化器
Broadcast 从一个节点广播到所有 参数同步
AlltoAll 完全交换 流水并行
Reduce 归约到一个节点 结果收集

AllReduce:最常用的原语

数据并行里,每个 GPU 算完梯度后,要同步给其他 GPU:

import torch.distributed as dist

# 初始化
dist.init_process_group(backend="hccl", world_size=8, rank=0)

# AllReduce:所有节点同步一个 tensor 并求和
tensor = torch.randn(1024, 1024).npu()
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

# 之后 tensor 的值 = 所有节点原始值的和

底层原理

AllReduce 的实现有多种算法,不同场景选不同算法:

  • Ring 算法:适合大 tensor,分 N-1 步完成
  • Tree 算法:适合小 tensor,log(N) 步完成
  • Rabenseifner 算法:混合策略,自动选择
# 手动指定算法
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, algorithm="nccl")

AllGather:收集全部数据

# 每个节点有一个 tensor,AllGather 后每个节点都有全部 tensor
tensor = torch.randn(1024).npu()
output_list = [torch.zeros(1024).npu() for _ in range(8)]

dist.all_gather(output_list, tensor)
# 之后 output_list[0] = rank 0 的数据
#       output_list[1] = rank 1 的数据
#       ...

ReduceScatter:反向的 Gather

# 所有节点都有完整数据,求和后分发给自己的一部分
input_list = [torch.randn(1024).npu() for _ in range(8)]
output = torch.zeros(1024).npu()

dist.reduce_scatter(output, input_list, op=dist.ReduceOp.SUM)
# 之后 output = sum(input_list[0:8]) 的前 1024 个元素

AlltoAll:完全交换

流水线并行里,上游要把数据发给下游,下游要把数据发回上游:

# 每个节点发送不同数据给所有其他节点
send_tensor = torch.randn(1024).npu()
recv_tensor = torch.zeros(1024 * 8).npu()

dist.all_to_all(recv_tensor, send_tensor)

通信算法与拓扑

Ring 算法

Ring 是最常用的 AllReduce 算法:

Rank 0 ──→ Rank 1 ──→ Rank 2 ──→ ... ──→ Rank 7 ──→ Rank 0

分 N-1 步完成,每一步每个节点发给下一个节点。

优点:带宽利用充分
缺点:延迟高(O(N) 步)

Tree 算法

二叉树结构:

         Rank 0
        /      \
    Rank 1    Rank 2
    /   \     /   \
  R3    R4  R5    R6
   \    /    \    /
       R7 (叶子)

优点:延迟低(O(log N) 步)
缺点:根节点带宽压力大

HCCS vs RoCE:链路选择

昇腾支持两种互联方式:

链路 带宽 延迟 适用场景
HCCS 100 GB/s 1-2 μs 单机 8 卡
RoCE 100 GB/s 3-5 μs 多机互联
# 手动指定通信域的链路类型
hccl_info = HCCLInfo(world_size=8, rank=0)
hccl_info.set_transport("hccl")  # 使用 HCCS
# 或者
hccl_info.set_transport("roce")  # 使用 RoCE

性能调优

1. 通信域设计

把经常通信的节点放同一个通信域:

# 创建一个 8 卡通信域
pg = dist.new_group(
    ranks=[0, 1, 2, 3, 4, 5, 6, 7],
    backend="hccl"
)

# 数据并行用这个域
dist.all_reduce(grad, group=pg)

2. 融合通信

小 tensor 的通信开销大,融合成大 tensor 再通信:

# 错误:每个参数单独通信
for param in model.parameters():
    dist.all_reduce(param.grad)

# 正确:融合后通信
all_grads = torch.cat([p.grad.flatten() for p in model.parameters()])
dist.all_reduce(all_grads)

3. 异步通信

计算和通信重叠:

# 异步通信:先发起通信,不等待完成
handle = dist.all_reduce_async(grad, op=dist.ReduceOp.SUM)

# 同时做其他计算
loss.backward()
optimizer.step()

# 需要用结果时再等待
dist.wait(handle)

4. 通信原语选择

不同的原语开销不同:

原语 开销排序(小 → 大)
Reduce 1x
Broadcast 1x
AllReduce 2x
AllGather 3x
ReduceScatter 3x
AlltoAll 4x

能用简单原语就不用复杂的。

通信调度

计算与通信重叠

# Stream 并行:计算 Stream 和通信 Stream 分开
compute_stream = torch.npu.Stream()
comm_stream = torch.npu.Stream()

# 通信 Stream
with torch.npu.stream(comm_stream):
    dist.all_reduce(grad)

# 计算 Stream
with torch.npu.stream(compute_stream):
    loss.backward()

# 同步
torch.npu.synchronize()

流水线并行里的通信调度

# 前向传播时的通信
def forward_step(x, rank, world_size):
    # 接收上游数据
    if rank > 0:
        x = recv_from_rank(rank - 1)
    
    # 本地计算
    x = model(x)
    
    # 发送给下游
    if rank < world_size - 1:
        send_to_rank(rank + 1, x)
    
    return x

# 反向传播时的通信(反过来)
def backward_step(grad, rank, world_size):
    if rank < world_size - 1:
        send_to_rank(rank + 1, grad)
    
    # 本地梯度计算
    grad = model.backward(grad)
    
    if rank > 0:
        recv_from_rank(rank - 1)
    
    return grad

常见坑和解决方案

坑 1:通信超时

# 现象:训练跑一段时间就卡住
# 原因:某个节点卡住或网络抖动

# 解决 1:增加超时时间
dist.init_process_group(
    backend="hccl",
    timeout=datetime.timedelta(hours=2)
)

# 解决 2:检查链路状态
import hccl
hccl.get_device_status()  # 查看每张卡的状态
hccl.check_link_health()  # 检查 HCCS 链路

坑 2:通信成为瓶颈

# 现象:GPU 利用率低,通信占比高
# 原因:通信太频繁或 tensor 太小

# 解决 1:减少通信频率
# 梯度累积:4 步算一次梯度
for i in range(4):
    loss = model(batch[i])
    loss.backward()

dist.all_reduce(all_grads)  # 4 步同步一次

# 解决 2:增大通信粒度
# 把多个小 tensor 拼接成一个大 tensor

坑 3:跨机通信慢

# 现象:单机快,跨机慢
# 原因:RoCE 带宽不如 HCCS

# 解决 1:检查 RoCE 配置
hccl.set_rtr_config(roce_v2_priority=3)  # 高优先级

# 解决 2:用 RDMA
hccl.enable_rdma()

# 解决 3:调整窗口大小
hccl.set_rdma_window_size(128)

坑 4:梯度同步错误

# 现象:loss 不下降或震荡
# 原因:梯度同步顺序错误

# 解决:检查梯度是否全部同步
for name, param in model.named_parameters():
    if param.grad is not None:
        dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
        param.grad /= world_size

性能数据

在 Atlas A2(8× Ascend 910)上实测 DeepSeek-V3 训练:

通信配置 吞吐量 通信占比 GPU 利用率
基线(无优化) 45 samples/s 35% 60%
+ 融合通信 68 samples/s 22% 75%
+ HCCS 链路 95 samples/s 15% 85%
+ 异步通信 112 samples/s 10% 90%
+ 通信调度优化 125 samples/s 8% 93%

参考资料

  • hccl:集合通信库 → https://atomgit.com/cann/hccl
  • hcomm:通信基础库 → https://atomgit.com/cann/hcomm
  • hixl:单边通信库 → https://atomgit.com/cann/hixl
  • cann-recipes-train:训练配方,含通信优化示例 → https://atomgit.com/cann/cann-recipes-train
  • cann-samples:通信调优样例 → https://atomgit.com/cann/cann-samples
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐