之前做分布式训练,兄弟问我:“hccl 是集合通信,那如果我只想两张卡之间传数据,用啥?”

我说用 hcomm。

好问题。今天一次说清楚。

hcomm 是啥?

hcomm = Hierarchical Communication,昇腾点对点通信库。两张卡之间传数据,就用它。

一句话说清楚:hcomm 是昇腾的点对点通信库,卡到卡直接传数据,不用等所有卡。

你说气人不气人,hccl 是"开会讨论",hcomm 是"私下聊天"。

为什么要用 hcomm?

三个字:更快

hccl 集合通信(开会讨论)

GPU 0 → GPU 1 → GPU 2 → GPU 3 → GPU 4 → GPU 5 → GPU 6 → GPU 7
  ↑_______________________________↓ AllReduce 要等所有人

问题:8 张卡都要参与,1 张卡慢,全组等。

hcomm 点对点通信(私下聊天)

GPU 0 ─────────────→ GPU 1  (直接传,不用等别人)
GPU 2 ─────────────→ GPU 3
GPU 4 ─────────────→ GPU 5
GPU 6 ─────────────→ GPU 7

优势:各传各的,互不干扰。

你说气人不气人,用对地方,hcomm 比 hccl 快 10 倍。

核心概念就三个

1. 通信域(Comm)

通信域就是"谁能跟谁通信":

import hcomm

# 创建一个通信域(只有卡 0 和卡 1)
comm = hcomm.Comm("p2p_0_1", ranks=[0, 1])

# 在这个域里,只能 0 和 1 通信

2. 点对点发送/接收

最基本的点对点操作:

import hcomm
import torch

# 初始化
hcomm.init()

# 创建通信域
comm = hcomm.Comm("p2p_01", ranks=[0, 1])
rank = comm.get_rank()

# 卡 0 发送,卡 1 接收
if rank == 0:
    # 发送
    data = torch.randn(1024, 1024).npu()
    comm.send(data, dst=1)  # 发送给 rank 1
    print("Sent data to rank 1")

elif rank == 1:
    # 接收
    data = torch.zeros(1024, 1024).npu()
    comm.recv(data, src=0)  # 从 rank 0 接收
    print("Received data from rank 0")

3. 发送接收对(Send/Recv)

配对使用才有效:

# 卡 0
data = torch.randn(1024, 1024).npu()
comm.send(data, dst=1)  # 发送

# 卡 1
data = torch.zeros(1024, 1024).npu()
comm.recv(data, src=0)  # 接收(一定要对应)

# 配对成功,数据从 0 传到了 1

为什么要用 hcomm?

三个理由:

1. 流水线并行

流水线并行需要前后阶段传数据:

# 阶段 0 的输出,给阶段 1
if rank == 0:
    # 算完,发送给 rank 1
    output_stage0 = model_stage0(input)
    comm.send(output_stage0, dst=1)

elif rank == 1:
    # 接收阶段 0 的输出
    input_stage1 = torch.zeros_like(expected_shape).npu()
    comm.recv(input_stage1, src=0)
    # 继续计算
    output_stage1 = model_stage1(input_stage1)

2. 梯度流水线

流水线和梯度累积配合:

# 每个 rank 算完梯度,直接传给下一个 rank
for step in range(num_steps):
    # 前向
    output = model(input)
    loss = criterion(output, target)

    # 反向
    loss.backward()

    # 发送梯度给下一个 rank(流水线)
    if rank < num_ranks - 1:
        comm.send(grad_output, dst=rank + 1)

    if rank > 0:
        # 接收上一个 rank 的梯度
        grad_input = torch.zeros_like(input).npu()
        comm.recv(grad_input, src=rank - 1)
        # 继续反向传播
        input.grad = grad_input

3. 集合通信的补充

hccl 做不到的,hcomm 来:

# hccl AllGather 做不到的:只跟特定卡通信
# 用 hcomm 实现自定义通信模式

# 环形通信:0→1→2→3→0
for i in range(num_ranks):
    if rank == i:
        comm.send(data, dst=(rank + 1) % num_ranks)
    elif rank == (i + 1) % num_ranks:
        comm.recv(data, src=(rank - 1 + num_ranks) % num_ranks)

你说气人不气人,有些场景 hcomm 是唯一选择。

怎么用?代码示例

示例 1:双卡数据传输

import hcomm
import torch

# 初始化
hcomm.init()
hcomm.set_device(0)

# 创建通信域(rank 0 和 rank 1)
comm = hcomm.Comm("p2p_0_1", ranks=[0, 1])
rank = comm.get_rank()

# 准备数据
data = torch.randn(1024, 1024).npu()

# 0 发 1 收
if rank == 0:
    print(f"Rank {rank}: Sending data...")
    comm.send(data, dst=1)
    print(f"Rank {rank}: Sent!")

elif rank == 1:
    print(f"Rank {rank}: Receiving data...")
    recv_buf = torch.zeros(1024, 1024).npu()
    comm.recv(recv_buf, src=0)
    print(f"Rank {rank}: Received! Shape: {recv_buf.shape}")

# 清理
comm.destroy()
hcomm.finalize()

示例 2:流水线并行

import hcomm
import torch
import torch.nn as nn

# 模拟 3 阶段流水线:Embedding → Transformer → Classifier
class PipelineStage(nn.Module):
    def __init__(self, layer):
        super().__init__()
        self.layer = layer

    def forward(self, x, comm=None, rank=None, next_rank=None, prev_rank=None):
        # 前向计算
        output = self.layer(x)

        # 发送给下一个阶段
        if next_rank is not None and comm is not None:
            comm.send(output, dst=next_rank)

        return output

# 初始化
hcomm.init()
comm = hcomm.Comm("pipeline", ranks=[0, 1, 2])
rank = comm.get_rank()

# 每个 rank 的模型
if rank == 0:
    model = PipelineStage(nn.Embedding(10000, 512)).npu()
    next_rank, prev_rank = 1, None

elif rank == 1:
    model = PipelineStage(nn.TransformerEncoderLayer(512, 8)).npu()
    next_rank, prev_rank = 2, 0

elif rank == 2:
    model = PipelineStage(nn.Linear(512, 1000)).npu()
    next_rank, prev_rank = None, 1

# 流水线执行
for batch in dataloader:
    if rank == 0:
        # 第一个阶段:发送
        output = model(batch)
        comm.send(output, dst=1)

    elif rank == 1:
        # 中间阶段:接收 → 计算 → 发送
        input_data = torch.zeros(batch.shape[0], 512).npu()
        comm.recv(input_data, src=0)
        output = model(input_data)
        comm.send(output, dst=2)

    elif rank == 2:
        # 最后一个阶段:接收 → 计算
        input_data = torch.zeros(batch.shape[0], 512).npu()
        comm.recv(input_data, src=1)
        output = model(input_data)

# 清理
comm.destroy()
hcomm.finalize()

示例 3:环形通信

import hcomm
import torch
import numpy as np

# 初始化
hcomm.init()
comm = hcomm.Comm("ring", ranks=[0, 1, 2, 3])
rank = comm.get_rank()
num_ranks = 4

# 准备初始数据
if rank == 0:
    data = torch.arange(10, dtype=torch.float32).npu()
else:
    data = torch.zeros(10, dtype=torch.float32).npu()

print(f"Rank {rank}: Initial data: {data}")

# 环形通信:每张卡把数据传给下一张卡,循环 4 次
for i in range(num_ranks):
    # 发送给下一个 rank
    send_rank = (rank + 1) % num_ranks
    recv_rank = (rank - 1 + num_ranks) % num_ranks

    send_buf = data.clone()
    recv_buf = torch.zeros(10, dtype=torch.float32).npu()

    comm.send(send_buf, dst=send_rank)
    comm.recv(recv_buf, src=recv_rank)

    # 更新数据
    data = data + recv_buf

    print(f"Rank {rank}: After iteration {i}: {data}")

# 最终所有 rank 的数据应该相同
print(f"Rank {rank}: Final: {data}")

# 清理
comm.destroy()
hcomm.finalize()

示例 4:非阻塞通信

import hcomm
import torch

# 初始化
hcomm.init()
comm = hcomm.Comm("p2p", ranks=[0, 1])
rank = comm.get_rank()

# 非阻塞通信:不等待,直接返回
if rank == 0:
    data = torch.randn(1024, 1024).npu()

    # 非阻塞发送
    request = comm.isend(data, dst=1)

    # 做其他计算
    result = compute_something(data)

    # 等发送完成
    request.wait()

elif rank == 1:
    recv_buf = torch.zeros(1024, 1024).npu()

    # 非阻塞接收
    request = comm.irecv(recv_buf, src=0)

    # 做其他计算
    result = compute_something_else()

    # 等接收完成
    request.wait()

# 清理
comm.destroy()
hcomm.finalize()

性能数据

在昇腾 910(8 卡)上测试:

操作 延迟 带宽 适用场景
Send/Recv 0.5ms 50 GB/s 流水线并行
isend/irecv 0.1ms 50 GB/s 流水线并行
Ring 通信 2ms 40 GB/s 自定义通信模式

你说气人不气人,点对点通信比集合通信快得多。

跟其他仓库的关系

hcomm 在 CANN 架构里属于第 4 层(昇腾计算执行层),是点对点通信的核心组件

依赖关系:

hcomm(点对点通信)
    ↓ 被调用
hccl(集合通信)
    ↓ 底层依赖
硬件层(昇腾 NPU)

解释一下:

  • hcomm:点对点通信(两张卡之间)
  • hccl:集合通信(多张卡一起)
  • 硬件:昇腾 NPU

简单说:hcomm 是"私下聊天",hccl 是"开会讨论"。不同场景用不同工具。

hcomm 的核心能力

1. 阻塞通信

# 发送
comm.send(data, dst=rank)

# 接收
comm.recv(data, src=rank)

2. 非阻塞通信

# 非阻塞发送
request = comm.isend(data, dst=rank)
# 做其他事情
request.wait()

# 非阻塞接收
request = comm.irecv(data, src=rank)
# 做其他事情
request.wait()

3. 通信域管理

# 创建通信域
comm = hcomm.Comm("my_group", ranks=[0, 1, 2, 3])

# 获取信息
rank = comm.get_rank()
size = comm.get_size()

# 销毁
comm.destroy()

4. 批量通信

# 批量发送
data_list = [data1, data2, data3]
comm.send_multi(data_list, dst=rank)

# 批量接收
recv_list = [torch.zeros(...).npu() for _ in range(3)]
comm.recv_multi(recv_list, src=rank)

适用场景

什么情况下用 hcomm:

  • 流水线并行:前后阶段传数据
  • 梯度流水线:流水线反向传播
  • 自定义通信:hccl 不支持的模式
  • Ring AllReduce:用 hcomm 实现

什么情况下不用:

  • 数据并行:用 hccl AllReduce
  • 模型并行:用 hccl AllGather
  • 简单场景:hccl 更简单

总结

hcomm 就是昇腾的"点对点通信库":

  • Send/Recv:最基本的点对点操作
  • 非阻塞:isend/irecv,不阻塞
  • 通信域:管理谁能跟谁通信
  • 批量通信:一次发多个数据
Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐