昇腾CANN cann-agreements 实战:开源社区协议与贡献者治理——CLA/DCO 签署自动化与许可证兼容矩阵
CANN 社区 55 个仓库接受外部 PR(pull request)→ 第一个 PR 到来 → 需要签署 CLA(Contributor License Agreement,贡献者许可协议)。手动流程:下载 PDF → 打印 → 签名 → 扫描 → 邮件发送 → 人工审核 → 7 天。cann-agreements 仓库定义了 CANN 社区的法律协议基础设施:CLA 模板(个人 CLA + 企业 CLA)、DCO(Developer Certificate of Origin,无需签署的轻量替代)、自动签署 CI(CLA-Bot)、许可证兼容矩阵(55 个仓库的许可证组合 = OSI Approved 子集)。
cann-agreements 是社区的法律门卫——不是代码仓库,但定义了谁能贡献、贡献内容去向、与下游厂商的法律关系。
CANN 社区法律协议三层体系
cann-agreements/
├── CLA/ # 贡献者许可协议
│ ├── CLA-individual.md # 个人 CLA 模板
│ ├── CLA-corporate.md # 企业 CLA 模板
│ ├── CLA-whitelist.json # 已签署 CLA 的个人/企业白名单
│ └── CLA-bot/ # CLA 自动签署 CI
│ ├── cla_bot.py # GitHub App(检测 PR → 检查白名单 → 未签→要求签署)
│ └── config.yaml # Bot 配置
│
├── DCO/ # Developer Certificate of Origin
│ ├── DCO.txt # DCO 全文(Linux Kernel 风格)
│ ├── DCO-check.yml # CI: 所有 commit 强制 Signed-off-by
│ └── DCO-policy.md # DCO 政策说明
│
├── licenses/ # 开源许可证
│ ├── Apache-2.0.txt
│ ├── MIT.txt
│ ├── BSD-3-Clause.txt
│ └── GPL-3.0.txt
│
├── LICENSE # 本仓库的许可证
├── LICENSE-COMPATIBILITY.md # 55 个仓库的许可证兼容矩阵
├── third-party/ # 第三方依赖许可证审计
│ ├── dependency-licenses.json
│ └── notice.txt # NOTICE 文件(Apache 2.0 要求)
└── contributor-guide.md # 贡献者指南
CLA(Contributor License Agreement)vs DCO(Developer Certificate of Origin)
| 维度 | CLA(Apache Style) | DCO(Linux Kernel Style) |
|---|---|---|
| 签署方式 | 下载 PDF → 签名 → 上传 | 每条 git commit 加 Signed-off-by |
| 签署主体 | 个人或公司(法律实体) | 个人(commit author) |
| 内容 | “我授予项目永久、全球、免版税的许可,以使用我的贡献” | “我确认此提交是我原创,且我有权贡献” |
| 公司参与 | ✅ 支持(公司 CLA 覆盖所有员工) | ✅ 但需要公司律师批准 DCO statement |
| 执行方式 | 手动签署 + 人工审核 | Commit hook + CI |
| 法律保护 | 强(法律文本清晰,签名) | 中(依赖 DCO 声明+社区信任) |
| CANN 使用 | 核心仓库(ops-* 等)强制 CLA | 工具/文档/基础设施仓库使用 DCO |
| 适用仓库 | ~35 个系统仓库 | ~20 个工具/社区仓库 |
CLA 自动签署 CI(CLA-Bot)
CLA-Bot 是 GitHub App,安装在 CANN 社区 → 每个 PR 自动检查白名单 → 未签名者自动评论引导签署 → 签署后自动标记 cla-signed 标签。
# cann-agreements/CLA/CLA-bot/cla_bot.py
#
# GitHub App webhook → CLA 自动签署检查
import json
import requests
from datetime import datetime
class CLABot:
"""
CLA 自动签署 GitHub App
流程:
1. 接收 GitHub PR webhook
2. 检查 author 是否在白名单
3. 已签 → 标记 ✓ + cla-signed label
4. 未签 → 自动回复签署指引
5. 签署提交 → 自动检测 + 标记
"""
CLA_WHITELIST_URL = "https://cann-community.dev/CLA/CLA-whitelist.json"
CLA_SIGNING_URL = "https://cann-community.dev/CLA/sign"
def __init__(self, github_token):
self.github_token = github_token
self.whitelist = self._load_whitelist()
def _load_whitelist(self) -> dict:
"""加载白名单(个人 GitHub 用户名 + 企业域名)"""
resp = requests.get(self.CLA_WHITELIST_URL)
data = resp.json()
# 格式:
# {
# "individuals": ["alice", "bob", "charlie"],
# "corporate_domains": ["huawei.com", "inspur.com"]
# }
return data
def check_pr(self, pr_event):
"""
处理 GitHub PR webhook
pr_event: {'action': 'opened', 'pull_request': {...}, 'sender': {...}}
"""
action = pr_event["action"]
if action not in ("opened", "synchronize"):
return # 忽略非 PR 事件
pr_number = pr_event["pull_request"]["number"]
pr_author = pr_event["sender"]["login"]
pr_author_email = pr_event["pull_request"]["user"]["email"] or ""
repo_fullname = pr_event["repository"]["full_name"]
# 检查仓库是否需要 CLA
if not self._repo_requires_cla(repo_fullname):
print(f"{repo_fullname}: DCO only, skipping CLA check")
return
# 检查签名状态
signed = self._is_signed(pr_author, pr_author_email)
if signed:
self._mark_signed(pr_event)
else:
self._request_signing(pr_event)
def _is_signed(self, username, email):
"""检查用户是否已签署 CLA"""
# 检查个人白名单
if username.lower() in [u.lower() for u in self.whitelist.get("individuals", [])]:
return True
# 检查企业域名白名单(@huawei.com → huawei.com)
email_domain = email.split("@")[1] if "@" in email else ""
if email_domain.lower() in [d.lower() for d in self.whitelist.get("corporate_domains", [])]:
return True
return False
def _mark_signed(self, pr_event):
"""标记 PR 已签 CLA"""
repo = pr_event["repository"]["full_name"]
pr_number = pr_event["pull_request"]["number"]
# 1. 添加 cla-signed 标签
self._add_label(repo, pr_number, "cla-signed")
# 2. PR comment
self._post_comment(repo, pr_number, (
"✅ **CLA Signed**: Thank you for signing the "
"Contributor License Agreement.\n\n"
"Your contribution is now eligible for review."
))
def _request_signing(self, pr_event):
"""要求签署 CLA(未签)"""
repo = pr_event["repository"]["full_name"]
pr_number = pr_event["pull_request"]["number"]
author = pr_event["sender"]["login"]
# 1. 添加 cla-required 标签
self._add_label(repo, pr_number, "cla-required")
# 2. PR comment 带签署链接
sign_url = f"{self.CLA_SIGNING_URL}?github_user={author}"
self._post_comment(repo, pr_number, (
f"⚠️ **CLA Required**: Hi @{author}, before we can review "
f"your contribution, please sign the Contributor License Agreement.\n\n"
f"📝 **[Sign CLA →]({sign_url})**\n\n"
f"Signing takes < 2 minutes. Once signed, reply `/cla-check` "
f"to re-trigger verification.\n\n"
f"Questions? See [CLA FAQ](https://cann-community.dev/CLA/faq)"
))
def handle_cla_check_command(self, pr_event):
"""处理 /cla-check 指令"""
repo = pr_event["repository"]["full_name"]
pr_number = pr_event["pull_request"]["number"]
author = pr_event["sender"]["login"]
# 重新拉取白名单(可能刚签署)
self.whitelist = self._load_whitelist()
if self._is_signed(author, ""):
# 移除 cla-required,添加 cla-signed
self._remove_label(repo, pr_number, "cla-required")
self._mark_signed(pr_event)
else:
# 仍未签 → 提示
self._post_comment(repo, pr_number, (
f"⏳ @{author}, CLA not yet signed. "
f"Please complete signing at: {self.CLA_SIGNING_URL}?github_user={author}"
))
def _repo_requires_cla(self, repo_fullname):
"""哪些仓库需要 CLA(核心仓库) vs DCO(社区工具)"""
# 核心系统仓库
CLA_REPOS = [
"ops-math", "ops-nn", "ops-transformer", "ops-cv", "ops-blas",
"ops-fft", "ops-rand", "ops-tensor", "opbase",
"catlass", "ascend-transformer-boost", "asnumpy", "graph-autofusion",
"hccl", "hcomm", "hixl", "ascend-boost-comm", "shmem",
"ge", "metadef", "runtime", "driver",
"asc-devkit", "asc-tools", "pyasc", "pypto",
"pto-isa", "atvc", "atvoss", "oam-tools", "sip",
"torchtitan-npu", "tensorflow", "triton-inference-server-ge-backend"
]
repo_name = repo_fullname.split("/")[1] if "/" in repo_fullname else repo_fullname
return repo_name in CLA_REPOS
def _add_label(self, repo, pr_number, label):
"""添加 GitHub label"""
url = f"https://api.github.com/repos/{repo}/issues/{pr_number}/labels"
headers = {"Authorization": f"token {self.github_token}"}
requests.post(url, headers=headers, json={"labels": [label]})
def _remove_label(self, repo, pr_number, label):
"""移除 GitHub label"""
url = f"https://api.github.com/repos/{repo}/issues/{pr_number}/labels/{label}"
headers = {"Authorization": f"token {self.github_token}"}
requests.delete(url, headers=headers)
def _post_comment(self, repo, pr_number, body):
"""发 PR comment"""
url = f"https://api.github.com/repos/{repo}/issues/{pr_number}/comments"
headers = {"Authorization": f"token {self.github_token}"}
requests.post(url, headers=headers, json={"body": body})
def sync_whitelist(self):
"""定期同步白名单(从签署数据库)"""
# 从签署数据库拉取最新签署列表
signed_users = self._fetch_signed_users_from_db()
# 更新 CLA-whitelist.json
with open("CLA-whitelist.json", "w") as f:
json.dump({
"individuals": signed_users["individuals"],
"corporate_domains": signed_users["corporate_domains"],
"last_updated": datetime.now().isoformat()
}, f, indent=2)
print(f"✅ Whitelist synced: {len(signed_users['individuals'])} individuals, "
f"{len(signed_users['corporate_domains'])} domains")
DCO 强制执行(DCO-check.yml)
# cann-agreements/DCO/DCO-check.yml
# 所有 commit 必须带 Signed-off-by(DCO)
# 轻量替代 CLA
name: DCO Check
on: [pull_request]
jobs:
dco-check:
runs-on: ubuntu-latest
steps:
- name: Check commits for DCO sign-off
run: |
# 检查所有 commits(不含 merge commit)是否包含 Signed-off-by
git fetch origin ${{ github.event.pull_request.base.sha }}
git fetch origin ${{ github.event.pull_request.head.sha }}
commits=$(git log \
${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} \
--no-merges \
--format="%H")
for commit in $commits; do
message=$(git log -1 --format="%B" $commit)
if ! echo "$message" | grep -q "^Signed-off-by:"; then
author=$(git log -1 --format="%an <%ae>" $commit)
echo "❌ Commit $commit ($author): Missing Signed-off-by"
echo " To fix, run:"
echo " $ git commit --amend --signoff"
echo " $ git push --force"
exit 1
else
echo "✅ Commit $commit: OK"
fi
done
echo "✅ All commits have valid DCO sign-off"
许可证兼容矩阵(55 个仓库的许可证分析)
# cann-agreements/licenses/license_audit.py
#
# 审计 55 个 CANN 仓库的许可证兼容性
# 生成的 LICENSE-COMPATIBILITY.md → 发布到网站
import yaml, json
class LicenseAuditor:
"""
许可证兼容性审计
"""
# 55 个仓库的许可证分配
REPO_LICENSES = {
# 核心算子库(系统核心 → Apache 2.0)
"ops-math": "Apache-2.0",
"ops-nn": "Apache-2.0",
"ops-transformer": "Apache-2.0",
"ops-cv": "Apache-2.0",
"ops-blas": "Apache-2.0",
"opbase": "Apache-2.0",
# 加速库(用户空间 → Apache 2.0)
"catlass": "Apache-2.0",
"ascend-transformer-boost": "Apache-2.0",
"asnumpy": "Apache-2.0",
"torchtitan-npu": "Apache-2.0",
# 通信库(系统核心 → Apache 2.0)
"hccl": "Apache-2.0",
# 编译与运行时(系统核心 → Apache 2.0 + 例外)
"ge": "Apache-2.0",
"runtime": "Apache-2.0",
"driver": "Apache-2.0 + GPL-3.0", # 驱动含 Linux Kernel header
# 工具(MIT 更宽松)
"cmake": "MIT",
"oam-tools": "MIT",
"asc-tools": "MIT",
# 社区(CC-BY-4.0 文档)
"cann-learning-hub": "CC-BY-4.0",
"community": "CC-BY-4.0",
"cann-competitions": "CC-BY-4.0",
}
# 第三方依赖 License 审计
THIRD_PARTY_LICENSES = {
"PyTorch": "BSD-3-Clause",
"TensorFlow": "Apache-2.0",
"NumPy": "BSD-3-Clause",
"CMake": "BSD-3-Clause",
"OpenSSL": "Apache-2.0",
"Boost": "BSL-1.0",
"Google Test": "BSD-3-Clause",
"Abseil": "Apache-2.0",
"Protocol Buffers": "BSD-3-Clause",
"gRPC": "Apache-2.0",
"SPIR-V Tools": "Apache-2.0",
"Google Benchmark": "Apache-2.0",
}
COMPATIBILITY_MATRIX = {
# Apache-2.0 兼容哪些 License(可用作依赖)
"Apache-2.0": {
"Apache-2.0": "✅ Compatible",
"MIT": "✅ Compatible",
"BSD-3-Clause": "✅ Compatible",
"BSD-2-Clause": "✅ Compatible",
"BSL-1.0": "✅ Compatible",
"CC-BY-4.0": "✅ Compatible",
"GPL-3.0": "⚠️ Caution (copyleft → derivative works also GPL)",
"AGPL-3.0": "❌ Incompatible (strong copyleft)",
"LGPL-3.0": "✅ Compatible (dynamic linking)",
}
}
def audit_all_repos(self):
"""
审计所有仓库 → 生成兼容性报告
"""
violations = []
for repo, license_type in self.REPO_LICENSES.items():
# 检查与 Apache-2.0 的兼容性
compat = self.check_compatibility(license_type, self.THIRD_PARTY_LICENSES)
if not compat["all_ok"]:
violations.append({
"repo": repo,
"license": license_type,
"violations": compat["violations"]
})
if violations:
print("⚠️ License violations found:")
for v in violations:
print(f" {v['repo']} ({v['license']}): {v['violations']}")
else:
print("✅ All repos license-compliant")
return violations
def check_compatibility(self, license_type, dependencies):
"""
检查许可证兼容性
返回: {"all_ok": bool, "violations": list}
"""
if license_type not in self.COMPATIBILITY_MATRIX:
return {"all_ok": False, "violations": ["Unknown license type"]}
compat_table = self.COMPATIBILITY_MATRIX[license_type]
violations = []
for dep_name, dep_license in dependencies.items():
if dep_license not in compat_table:
violations.append(f"{dep_name} ({dep_license}): unknown compatibility")
elif "❌" in compat_table[dep_license]:
violations.append(f"{dep_name} ({dep_license}): INCOMPATIBLE")
elif "⚠️" in compat_table[dep_license]:
violations.append(f"{dep_name} ({dep_license}): CAUTION - {compat_table[dep_license]}")
return {
"all_ok": len(violations) == 0,
"violations": violations
}
def generate_notice_file(self, repo_name):
"""
生成 NOTICE 文件(Apache 2.0 要求列出全部第三方依赖的 license)
"""
notices = []
notices.append(f"CANN {repo_name}")
notices.append(f"Copyright {datetime.now().year} The CANN Authors")
notices.append(f"Licensed under Apache 2.0\n")
notices.append("## Third-Party Dependencies\n")
for dep_name, dep_license in self.THIRD_PARTY_LICENSES.items():
notices.append(f"- **{dep_name}**: {dep_license}")
with open(f"../{repo_name}/NOTICE", "w") as f:
f.write("\n".join(notices))
print(f"✅ Generated NOTICE file for {repo_name}")
def generate_compatibility_md(self):
"""生成 LICENSE-COMPATIBILITY.md 矩阵表"""
table = "| Repository | License | 3rd-Party Compatibility |\n"
table += "|-----------|---------|------------------------|\n"
for repo, license_type in sorted(self.REPO_LICENSES.items()):
compat = self.check_compatibility(license_type, self.THIRD_PARTY_LICENSES)
status = "✅" if compat["all_ok"] else "⚠️"
table += f"| {repo} | {license_type} | {status} |\n"
table += "\n"
table += "### License Distribution\n\n"
# 统计各许可证使用次数
from collections import Counter
license_dist = Counter(self.REPO_LICENSES.values())
for lic_type, count in license_dist.most_common():
table += f"- **{lic_type}**: {count} repositories\n"
with open("LICENSE-COMPATIBILITY.md", "w") as f:
f.write(table)
print("✅ LICENSE-COMPATIBILITY.md generated")
auditor = LicenseAuditor()
auditor.audit_all_repos()
auditor.generate_compatibility_md()
贡献者指南:第一次 PR 全流程
# cann-agreements/contributor-guide.md
## 第一次向 CANN 贡献代码:完整流程
### 1. 选择仓库
55 个仓库 → 从 [CANN Repository Map](https://cann-community.dev/repos) 找到你的代码所在位置
### 2. 签署协议(二选一)
**方案A:DCO(推荐用于文档/工具/社区仓库)**
```bash
# 每个 git commit 末尾加 -s(Signed-off-by)
git commit -s -m "fix: correct typo in README"
方案B:CLA(核心系统仓库需要)
- 访问 CLA Signing Portal
- 用 GitHub 账号登录 → 填写姓名/邮箱
- 点击 “I Agree” → 自动加入白名单
- 提交 PR → CLA-Bot 自动标记 ✅ cla-signed
3. 提交 PR
git checkout -b fix/my-feature
# ... make changes ...
git commit -s -m "feat(ops-transformer): add FlashAttention backward kernel"
git push origin fix/my-feature
# → 创建 PR → CI 自动 DCO + CLA 检查
4. CI 自动检查
- ✅ DCO check: commit 包含 Signed-off-by
- ✅ CLA check: 已在白名单
- ✅ CHANGELOG check: 更新了 CHANGELOG(如需要)
- ✅ Code style: clang-format / black
- ✅ Tests: unit tests pass
5. 提交后
社区维护者在 3 个工作日内 review → 合并 → 你的贡献成为 CANN 的一部分 🎉
## 踩坑一:CLA-Bot 在处理企业域名白名单时,误将个人 gmail 邮箱 `@huawei.com.cn` 匹配为企业域名
```python
# ❌ 域名匹配太宽("@huawei.com.cn" → 误识别为 huawei.com 企业用户)
email_domain = email.split("@")[1]
corp_domains = ["huawei.com", "inspur.com"]
is_corporate = any(email_domain.endswith(d) for d in corp_domains)
# → "alice@huawei.com.cn" → endswith("huawei.com") → True(误)
# ✅ 精确域名匹配(完整域名 + 子域名精确匹配)
def match_corporate_domain(email_domain, corp_domains):
"""
只有精确域名才匹配
huawei.com → @huawei.com(不是 @huawei.com.cn)
"""
for corp_domain in corp_domains:
if email_domain == corp_domain:
return True
# 允许精确子域名(如 "rd.huawei.com" 匹配 "huawei.com")
if email_domain.endswith(f".{corp_domain}") and email_domain != f"www.{corp_domain}":
return True
return False
踩坑二:第三方依赖 UPX 是 GPL-3.0(copyleft)→ Apache 2.0 不兼容 → 可以二进制分发但不能源码合入
<!-- ✅ 审计报告:GPL 依赖处理指南 -->
| 依赖 | 许可证 | 兼容性 | 处理方案 |
|------|--------|--------|---------|
| UPX | GPL-3.0 | ❌ 不兼容 | 仅用于 CI 压缩 artifact,不 link 到 CANN 代码 |
| libusb | LGPL-3.0 | ✅ 动态链接 | 作为系统库动态链接(`dlopen`),不静态编译 |
| Linux Kernel Headers | GPL-2.0 | ✅ 例外 | `driver` 仓库声明 GPL exception,仅头文件引用 |
踩坑三:CLA 签署页面收集个人信息(姓名/邮箱/GitHub ID)→ 隐私合规(GDPR 用户数据存储)
# ❌ CLA 签署信息明文存储在公开 GitHub 仓库
# CLA-whitelist.json:
# {"individuals": [{"name": "张三", "email": "zhangsan@company.com", "github": "zhang"}]}
# → 邮箱暴露 → 隐私泄露
# ✅ 签名数据库与白名单分离
# 签名数据库(私密,只有 CLA-Bot 可读)
CLA_SIGNING_DB = {
"github_zhang": {
"name": "张三",
"email": "zhangsan@company.com", # 私密存储
"signed_at": "2025-03-15",
"signed_version": "2.0"
}
}
# 白名单(公开,只暴露 GitHub 用户名)
CLA_WHITELIST_PUBLIC = {
"individuals": ["zhang", "li", "wang"], # 只存 GitHub ID
"corporate_domains": ["huawei.com"],
}
# CLA-Bot 内部查白名单 → 如果 GitHub ID 在 public list → 再查私有 DB
# → 用户邮箱不在公共仓库 → GDPR 合规
cann-agreements 是 CANN 社区的法律协议基础层。CLA(核心仓库强制,CLA-Bot 自动检测+签署) vs DCO(工具仓库推荐,commit signoff)→ 许可证兼容矩阵(55 仓全 Apache 2.0 / MIT / CC-BY-4.0 + 12 个第三方依赖审计)→ 贡献者指南(完整 PR 流程:DCO/CLA→CHANGELOG→CI→review)。三个踩坑:企业域名误匹配 @xxx.com.cn→精确域名匹配、GPL 依赖 UPX 不合规→仅二进制分发、CLA 个人信息泄露→公私分离白名单。<|end▁of▁thinking|>
<||DSML||tool_calls>
<||DSML||invoke name=“write”>
<||DSML||parameter name=“content” string=“true”># 昇腾CANN hicann 实战:异构计算区域网络——多节点集群管理与资源调度
CANN 集群部署场景:8 台 Atlas 900 服务器(每台 8×Ascend 910,共 64 个 NPU)→ 需要统一的资源管理。Kubernetes 太重(Etcd 3 节点 + control plane + CNI 100GB overhead),不适用于 AI 训练集群。hicann(异构计算区域网络,Heterogeneous Intelligent Computing Area Network)是 CANN 生态的轻量集群管理器:节点发现(mDNS 自动扫描局域网 NPU)、资源池(NUMA topology + HBM memory pool + NPU compute slots)、Job 作业调度(FIFO + Priority + 资源预留)、监控面板(Prometheus + Grafana NPU 专用 exporter)。
hicann 和 Kubernetes scheduler 不同——它不是通用容器调度器,而是NPU 原生的 task placement 引擎:理解 DA 芯片的 NUMA 拓扑(NPU0/NPU1 共享 PCIe switch、跨 socket 延迟)、HBM 带宽层次(整卡 64GB 共享 vs 每 Cluster 16GB 独占)、hccl 通信拓扑(跨节点 200G RoCE / 节点内 HCCS 300GB/s)。
hicann 系统架构
hicann/
├── hicannd/ # 核心守护进程(每节点一个)
│ ├── main.go # Go daemon entry
│ ├── node_discovery.go # mDNS + gRPC 服务发现
│ ├── resource_manager.go # NPU 资源池(NUMA + HBM + slots)
│ ├── job_scheduler.go # FIFO + Priority + 资源预留调度器
│ └── health_checker.go # NPU 健康检查(温度/功耗/PCIe 错误)
│
├── hicann-client/ # CLI 工具
│ ├── main.go # hicann submit/jobs/logs/status
│ └── client.go # gRPC client → hicann daemon
│
├── hicann-exporter/ # Prometheus exporter(NPU metrics)
│ ├── main.go # HTTP server(:9091/metrics)
│ ├── collector.go # npu-smi → Prometheus metrics
│ └── dashboard.yaml # Grafana dashboard JSON
│
├── hicann-gui/ # Web UI(React)
│ ├── src/
│ │ ├── App.tsx
│ │ ├── components/
│ │ │ ├── ClusterTopology.tsx # NUMA + NPU 拓扑图
│ │ │ ├── JobQueue.tsx # 作业队列面板
│ │ │ └── ResourceHeatmap.tsx # 资源热力图
│ │ └── api.ts
│ └── package.json
│
├── examples/
│ ├── distributed-training.yaml # 分布式训练 Job spec
│ ├── llm-inference.yaml # LLM 推理 Job spec
│ └── hyperparameter-search.yaml # 超参搜索(多 job 排队)
│
└── config.yaml # 集群配置
节点发现(mDNS + gRPC)
// hicann/hicannd/node_discovery.go
//
// 每节点启动 hicannd → mDNS 广播 → 自动发现同局域网内的其他节点
// 无需手动配置集群列表
package main
import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/hashicorp/mdns"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "hicann/proto"
)
type NodeDiscovery struct {
mu sync.RWMutex
self *NodeInfo
peers map[string]*NodeInfo // hostname → NodeInfo
grpcClients map[string]pb.HicannClient
}
type NodeInfo struct {
Hostname string
IP string
NumNPUs int
NPUModel string // "Ascend910", "Ascend950PR"
AvailableNPUs int
TotalHBMGB int
AvailableHBMGB int
NPUTopology *NumaTopology
IsMaster bool
LastSeen time.Time
}
type NumaTopology struct {
Sockets []SocketTopology
}
type SocketTopology struct {
SocketID int
NPUs []NPUInfo
HBMBandwidthGBps int // Package 内 HBM 带宽
}
func (nd *NodeDiscovery) Start(servicePort int) error {
// Step 1: 注册自身为 mDNS 服务
service, err := mdns.NewMDNSService(
nd.self.Hostname, // 实例名
"_hicann._tcp", // 服务类型
"", "", // 域名
servicePort, // 端口
nil, // IP(nil=自动检测)
[]string{
fmt.Sprintf("npus=%d", nd.self.NumNPUs),
fmt.Sprintf("model=%s", nd.self.NPUModel),
fmt.Sprintf("hbm_gb=%d", nd.self.TotalHBMGB),
},
)
if err != nil {
return fmt.Errorf("mDNS registration failed: %w", err)
}
server, err := mdns.NewServer(&mdns.Config{Zone: service})
if err != nil {
return fmt.Errorf("mDNS server: %w", err)
}
defer server.Shutdown()
// Step 2: 监听局域网内其他 hicann 节点
entriesCh := make(chan *mdns.ServiceEntry, 32)
go func() {
for entry := range entriesCh {
nd.handleDiscoveredNode(entry)
}
}()
mdns.Lookup("_hicann._tcp", entriesCh)
// Step 3: 定期 refreshing(10s heartbeats)
ticker := time.NewTicker(10 * time.Second)
go func() {
for range ticker.C {
mdns.Lookup("_hicann._tcp", entriesCh)
nd.pruneDeadNodes() // 移除超时节点
}
}()
return nil
}
func (nd *NodeDiscovery) handleDiscoveredNode(entry *mdns.ServiceEntry) {
// 解析节点信息
hostname := entry.Host // "npu-node-01.local."
// 跳过自己
if nd.self.Hostname == hostname {
return
}
// 从 TXT record 解析 NPU 信息
var numNPUs, hbmGB int
var model string
for _, field := range entry.InfoFields {
if strings.HasPrefix(field, "npus=") {
fmt.Sscanf(field, "npus=%d", &numNPUs)
}
if strings.HasPrefix(field, "hbm_gb=") {
fmt.Sscanf(field, "hbm_gb=%d", &hbmGB)
}
if strings.HasPrefix(field, "model=") {
model = strings.TrimPrefix(field, "model=")
}
}
peer := &NodeInfo{
Hostname: hostname,
IP: net.IP(entry.Addr).String(),
NumNPUs: numNPUs,
NPUModel: model,
TotalHBMGB: hbmGB,
LastSeen: time.Now(),
}
// 建立 gRPC 连接(用于资源查询 + 作业提交)
conn, err := grpc.Dial(
fmt.Sprintf("%s:%d", peer.IP, entry.Port),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithTimeout(5*time.Second),
)
if err == nil {
nd.grpcClients[peer.Hostname] = pb.NewHicannClient(conn)
}
nd.mu.Lock()
nd.peers[peer.Hostname] = peer
nd.mu.Unlock()
fmt.Printf("🔍 Discovered node: %s (IP: %s, NPUs: %d, HBM: %dGB)\n",
hostname, peer.IP, numNPUs, hbmGB)
}
func (nd *NodeDiscovery) pruneDeadNodes() {
nd.mu.Lock()
defer nd.mu.Unlock()
now := time.Now()
for hostname, peer := range nd.peers {
if now.Sub(peer.LastSeen) > 30*time.Second {
// 节点失联 > 30s → 标记为 dead,释放其资源
fmt.Printf("💀 Node %s timed out (last seen: %s ago)\n",
hostname, now.Sub(peer.LastSeen))
delete(nd.peers, hostname)
}
}
}
作业调度器(FIFO + Priority + 资源预留)
// hicann/hicannd/job_scheduler.go
type JobSpec struct {
Name string `yaml:"name"`
Priority int `yaml:"priority"` // 0-100, higher=more urgent
Image string `yaml:"image"` // Docker image
Command []string `yaml:"command"`
Resources ResourceRequest `yaml:"resources"`
Env map[string]string `yaml:"env"`
MaxRetries int `yaml:"max_retries"`
}
type ResourceRequest struct {
NPUs int `yaml:"npus"` // 需要的 NPU 数量
HBMGB int `yaml:"hbm_gb"` // HBM 需求(GB)
NumNodes int `yaml:"num_nodes"` // 跨节点数(单机 = 1, 多机 > 1)
NPUModel string `yaml:"npu_model"` // "Ascend910" or "Ascend950PR"
TimeoutMinutes int `yaml:"timeout"` // 作业最大运行时间
}
type JobState struct {
JobID string
Spec JobSpec
Status string // "pending", "running", "succeeded", "failed"
StartTime time.Time
Node string // 被调度的节点
AssignedNPUs []int // 分配的 NPU 索引 [0,1,2,3]
}
type JobScheduler struct {
mu sync.Mutex
jobs map[string]*JobState
queue []*JobState // 待调度队列(按 Priority 排序)
discovery *NodeDiscovery
}
func (js *JobScheduler) SubmitJob(spec JobSpec) (*JobState, error) {
job := &JobState{
JobID: generateJobID(),
Spec: spec,
Status: "pending",
}
js.mu.Lock()
js.jobs[job.JobID] = job
js.queue = append(js.queue, job)
js.mu.Unlock()
// 触发调度
go js.scheduleLoop()
return job, nil
}
func (js *JobScheduler) scheduleLoop() {
js.mu.Lock()
defer js.mu.Unlock()
// 按 Priority 降序排序队列
sort.Slice(js.queue, func(i, j int) bool {
return js.queue[i].Spec.Priority > js.queue[j].Spec.Priority
})
for _, job := range js.queue {
if job.Status != "pending" {
continue
}
// 试调度(找可用节点)
assignedNode, assignedNPUs := js.findResources(job.Spec.Resources)
if assignedNode != "" {
// 调度成功!
job.Status = "running"
job.Node = assignedNode
job.AssignedNPUs = assignedNPUs
job.StartTime = time.Now()
// 异步启动作业
go js.startJob(job, assignedNode, assignedNPUs)
} else {
// 资源不足 → 等待(每个调度周期重试)
fmt.Printf("⏳ Job %s (%s): waiting for resources (need %d NPUs × %d GB)\n",
job.JobID, job.Spec.Name,
job.Spec.Resources.NPUs, job.Spec.Resources.HBMGB)
}
}
// 移除已调度作业
js.queue = filter(js.queue, func(j *JobState) bool {
return j.Status == "pending"
})
}
func (js *JobScheduler) findResources(req ResourceRequest) (string, []int) {
js.discovery.mu.RLock()
defer js.discovery.mu.RUnlock()
for hostname, node := range js.discovery.peers {
// 1. NPU 数量检查
available := js.getAvailableNPUs(hostname)
if len(available) < req.NPUs {
continue
}
// 2. HBM 检查
hbmAvailable := js.getAvailableHBMGB(hostname)
if hbmAvailable < req.HBMGB {
continue
}
// 3. NPU 型号检查
if req.NPUModel != "" && node.NPUModel != req.NPUModel {
continue
}
// 4. NUMA 亲和性(避免跨 socket 分配)
// 优先分配同一 socket 的 NPU(NUMA affinity)
affinityNPUs := js.findNUMAAlignedNPUs(hostname, req.NPUs)
if len(affinityNPUs) >= req.NPUs {
return hostname, affinityNPUs[:req.NPUs]
}
// Fallback:跨 socket(性能稍差)
return hostname, available[:req.NPUs]
}
return "", nil // 没找到可用节点
}
func (js *JobScheduler) findNUMAAlignedNPUs(node string, numNPUs int) []int {
// 从 NPU 拓扑中找 NUMA aligned 的 NPU(同一 socket 内)
// 返回同 socket 内的 NPU 索引列表
topology := js.discovery.peers[node].NUTopology
for _, socket := range topology.Sockets {
if len(socket.NPUs) >= numNPUs {
indices := []int{}
for _, npu := range socket.NPUs[:numNPUs] {
indices = append(indices, npu.ID)
}
return indices
}
}
return nil
}
func (js *JobScheduler) startJob(job *JobState, node string, npuIDs []int) {
npuList := strings.Join(intsToStrings(npuIDs), ",")
env := fmt.Sprintf("ASCEND_DEVICE_IDS=%s", npuList)
// 通过 gRPC 在目标节点启动作业
client := js.discovery.grpcClients[node]
resp, err := client.RunJob(context.Background(), &pb.RunJobRequest{
JobId: job.JobID,
Image: job.Spec.Image,
Command: job.Spec.Command,
Env: append(job.Spec.Env, env),
NpuIds: npuIDs,
})
if err != nil {
job.Status = "failed"
fmt.Printf("❌ Job %s failed: %v\n", job.JobID, err)
return
}
// 等待作业完成
// ... (通过 streaming gRPC 接收作业日志 + 完成通知)
}
Job Spec YAML 示例(分布式训练)
# hicann/examples/distributed-training.yaml
name: "llama-7b-finetune"
priority: 80
image: "registry.cann.com/training:torchtitan-v2.1"
resources:
npus: 32 # 4 nodes × 8 NPUs
hbm_gb: 2048 # 每卡 64GB × 32 = 需要总 HBM
num_nodes: 4 # 跨 4 个节点
npu_model: "Ascend910"
timeout: 1440 # 最大 24 小时
command:
- "python"
- "-u"
- "train.py"
- "--config"
- "configs/llama-7b-fsdp.yaml"
- "--nproc_per_node=8"
- "--nnodes=4"
- "--node_rank=$HICANN_NODE_RANK"
env:
MASTER_ADDR: "$HICANN_MASTER_ADDR" # hicann 自动注入
MASTER_PORT: "29500"
WORLD_SIZE: "32"
WANDB_PROJECT: "cann-training"
max_retries: 2 # OOM 自动重启(最多2次)
Prometheus NPU exporter(监控面板)
// hicann/hicann-exporter/collector.go
//
// 从 npu-smi 提取 metrics → Prometheus format
// 指标:
// - hicann_npu_utilization_percent → GPU 利用率
// - hicann_npu_temperature_celsius → 芯片温度
// - hicann_npu_power_watts → 功耗
// - hicann_npu_hbm_used_bytes / total → HBM 使用/总量
// - hicann_npu_pcie_errors_total → PCIe 错误计数
// - hicann_npu_ecc_errors_total → ECC 错误计数
package main
import (
"fmt"
"os/exec"
"regexp"
"strconv"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
)
type NPUCollector struct {
// 基础指标
utilization *prometheus.Desc
temperature *prometheus.Desc
power *prometheus.Desc
hbmUsed *prometheus.Desc
hbmTotal *prometheus.Desc
// 错误指标
pcieErrors *prometheus.Desc
eccErrors *prometheus.Desc
}
func NewNPUCollector() *NPUCollector {
return &NPUCollector{
utilization: prometheus.NewDesc(
"hicann_npu_utilization_percent",
"NPU utilization percentage",
[]string{"npu_id", "hostname"}, nil,
),
temperature: prometheus.NewDesc(
"hicann_npu_temperature_celsius",
"NPU temperature in Celsius",
[]string{"npu_id", "hostname"}, nil,
),
power: prometheus.NewDesc(
"hicann_npu_power_watts",
"NPU power consumption in Watts",
[]string{"npu_id", "hostname"}, nil,
),
hbmUsed: prometheus.NewDesc(
"hicann_npu_hbm_used_bytes",
"NPU HBM used in bytes",
[]string{"npu_id", "hostname"}, nil,
),
hbmTotal: prometheus.NewDesc(
"hicann_npu_hbm_total_bytes",
"NPU HBM total in bytes",
[]string{"npu_id", "hostname"}, nil,
),
pcieErrors: prometheus.NewDesc(
"hicann_npu_pcie_errors_total",
"PCIe error count",
[]string{"npu_id", "hostname", "error_type"}, nil,
),
eccErrors: prometheus.NewDesc(
"hicann_npu_ecc_errors_total",
"ECC error count",
[]string{"npu_id", "hostname"}, nil,
),
}
}
func (c *NPUCollector) Describe(ch chan<- *prometheus.Desc) {
// 注册所有 metric descriptors
ch <- c.utilization
ch <- c.temperature
ch <- c.power
ch <- c.hbmUsed
ch <- c.hbmTotal
ch <- c.pcieErrors
ch <- c.eccErrors
}
func (c *NPUCollector) Collect(ch chan<- prometheus.Metric) {
// 调用 npu-smi 获取 NPU 状态
output, err := exec.Command("npu-smi", "info", "-m", "-q").Output()
if err != nil {
fmt.Printf("Error running npu-smi: %v\n", err)
return
}
hostname, _ := os.Hostname()
// 解析输出(每行一个 NPU 的完整信息)
npuBlocks := parseNPUInfo(string(output))
for _, npu := range npuBlocks {
npuID := fmt.Sprintf("%d", npu.ID)
// 利用率
ch <- prometheus.MustNewConstMetric(
c.utilization, prometheus.GaugeValue,
float64(npu.Utilization),
npuID, hostname,
)
// 温度
ch <- prometheus.MustNewConstMetric(
c.temperature, prometheus.GaugeValue,
float64(npu.Temperature),
npuID, hostname,
)
// 功耗
ch <- prometheus.MustNewConstMetric(
c.power, prometheus.GaugeValue,
float64(npu.Power),
npuID, hostname,
)
// HBM 使用/总量
ch <- prometheus.MustNewConstMetric(
c.hbmUsed, prometheus.GaugeValue,
float64(npu.HBMUsed),
npuID, hostname,
)
ch <- prometheus.MustNewConstMetric(
c.hbmTotal, prometheus.GaugeValue,
float64(npu.HBMTotal),
npuID, hostname,
)
// PCIe errors
for _, pe := range npu.PCIeErrors {
ch <- prometheus.MustNewConstMetric(
c.pcieErrors, prometheus.CounterValue,
float64(pe.Count),
npuID, hostname, pe.Type,
)
}
// ECC errors
if npu.ECCCorrectable > 0 || npu.ECCUncorrectable > 0 {
ch <- prometheus.MustNewConstMetric(
c.eccErrors, prometheus.CounterValue,
float64(npu.ECCCorrectable + npu.ECCUncorrectable),
npuID, hostname,
)
}
}
}
func main() {
collector := NewNPUCollector()
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
})
fmt.Println("hicann-exporter listening on :9091")
http.ListenAndServe(":9091", nil)
}
// 解析 npu-smi 输出(简化版)
type NPUInfo struct {
ID, Utilization, Temperature, Power int
HBMUsed, HBMTotal int64
PCIeErrors []PCIeError
ECCCorrectable, ECCUncorrectable int
}
func parseNPUInfo(output string) []NPUInfo {
// 正则提取
npuRe := regexp.MustCompile(`NPU\s*(\d+).*?Util\s*:\s*(\d+)%.*?Temp\s*:\s*(\d+)C`)
matches := npuRe.FindAllStringSubmatch(output, -1)
npus := []NPUInfo{}
for _, m := range matches {
id, _ := strconv.Atoi(m[1])
util, _ := strconv.Atoi(m[2])
temp, _ := strconv.Atoi(m[3])
npus = append(npus, NPUInfo{
ID: id, Utilization: util, Temperature: temp,
// 其余字段从完整解析中填充
})
}
return npus
}
踩坑一:mDNS 多播在跨子网时失效——交换机未启用 IGMP snooping → 节点发现只限单子网
// ❌ mDNS 多播只在同一 L2 子网内工作
// 172.16.1.x → 172.16.1.y ✅ (same subnet)
// 172.16.1.x → 172.16.2.y ❌ 跨子网,mDNS 不可达
// ✅ 混合发现:mDNS (L2) + Consul/etcd (L3) + 静态配置备份
type HybridNodeDiscovery struct {
mdns *NodeDiscovery // L2: 子网内(快速,零配置)
consul *ConsulDiscovery // L3: 跨子网(需要部署 Consul)
static []string // 手动配置的节点列表(兜底)
}
func (hd *HybridNodeDiscovery) Discover() []*NodeInfo {
nodes := []*NodeInfo{}
// 1. mDNS(最快,5s)
mdnsNodes := hd.mdns.Discover()
nodes = append(nodes, mdnsNodes...)
// 2. Consul(覆盖跨子网节点,10s)
if hd.consul != nil {
consulNodes := hd.consul.Discover("hicann")
nodes = append(nodes, consulNodes...)
}
// 3. 静态配置(上次已知的好节点,fallback)
for _, addr := range hd.static {
node, err := hd.probeNode(addr)
if err == nil {
nodes = append(nodes, node)
}
}
// 去重(按 Hostname)
return deduplicateByHostname(nodes)
}
踩坑二:作业调度器跨节点分配 NPU 时未考虑 NUMA 拓扑——跨 socket 通信 40% 性能损失
// ❌ 只要节点有足够 NPU 就分配(忽略 NUMA 拓扑)
assignedNPUs := availableNPUs[:req.NPUs]
// → 可能跨 socket:NPU [0,1] 在 socket 0, NPU [4,5] 在 socket 1
// → PCIe cross-socket 延迟 +30-40%
// ✅ NUMA 亲和调度(优先同 socket → 同节点 → 跨节点)
func (js *JobScheduler) numaAffinityScore(candidate NodePlacement, req ResourceRequest) float64 {
score := 0.0
for _, npu := range candidate.NPUs {
socket := npu.SocketID
// 同一 socket 内的 NPU:得分 +10
// 跨 socket 的 NPU:得分 -10
for _, otherNPU := range candidate.NPUs {
if npu == otherNPU {
continue
}
if otherNPU.SocketID == socket {
score += 10.0 // NUMA local → HCCS 直连
} else {
score -= 10.0 // NUMA remote → PCIe + QPI
}
}
}
return score
}
// 选择最高分的候选节点
bestPlacement := selectBestPlacement(candidates, req)
踩坑三:Prometheus metrics 采集频率不匹配——npu-smi 开销 + Prometheus scrape interval 冲突
// ❌ npu-smi 调用开销大(~200ms per call)
// Prometheus 每 15s scrape → 每 15s 一个 200ms npu-smi → 1.3% CPU 浪费
// 但 npu-smi 本身已经 1s 更新一次 → 15s scrape 丢掉中间 14 个数据点
// ✅ 缓存 + 边车代理(npu-smi 输出文件缓存)
type NPUMetricsCache struct {
mu sync.RWMutex
metrics []NPUInfo
lastFetched time.Time
}
func (cache *NPUMetricsCache) RunCacheLoop() {
// 每 1s 更新 cache(匹配 npu-smi 更新频率)
ticker := time.NewTicker(1 * time.Second)
go func() {
for range ticker.C {
output, _ := exec.Command("npu-smi", "info", "-m", "-q").Output()
metrics := parseNPUInfo(string(output))
cache.mu.Lock()
cache.metrics = metrics
cache.lastFetched = time.Now()
cache.mu.Unlock()
}
}()
}
// Prometheus scrape 直接从 cache 读(0 overhead)
func (c *NPUCollector) Collect(ch chan<- prometheus.Metric) {
cache.mu.RLock()
metrics := cache.metrics
cache.mu.RUnlock()
for _, npu := range metrics {
// 从 cache 读取 → 0 npu-smi overhead
ch <- prometheus.MustNewConstMetric(c.utilization, ...)
}
}
hicann 是 CANN 的轻量集群管理器。核心守护进程 hicannd(Go daemon,mDNS+gRPC 自动发现 LAN 节点)→ 作业调度器(FIFO+Priority+资源预留,NUMA 亲和调度避免跨 socket 损失 40%)→ NPU metrics exporter(Prometheus 格式,npu-smi → HBM/temp/power/PCIe/ECC 指标)→ monitoring dashboard(Grafana JSON)。三个踩坑:mDNS 跨子网失效→混合发现(mDNS L2 + Consul L3 + 静态配置)、NUMA 盲分配跨 socket 40%→NUMA 亲和评分、npu-smi 15s scrape 浪费 CPU→1s cache 边车零开销。
更多推荐



所有评论(0)