CANN 社区 55 个仓库接受外部 PR(pull request)→ 第一个 PR 到来 → 需要签署 CLA(Contributor License Agreement,贡献者许可协议)。手动流程:下载 PDF → 打印 → 签名 → 扫描 → 邮件发送 → 人工审核 → 7 天。cann-agreements 仓库定义了 CANN 社区的法律协议基础设施:CLA 模板(个人 CLA + 企业 CLA)、DCO(Developer Certificate of Origin,无需签署的轻量替代)、自动签署 CI(CLA-Bot)、许可证兼容矩阵(55 个仓库的许可证组合 = OSI Approved 子集)。

cann-agreements 是社区的法律门卫——不是代码仓库,但定义了谁能贡献、贡献内容去向、与下游厂商的法律关系。

CANN 社区法律协议三层体系

cann-agreements/
├── CLA/                          # 贡献者许可协议
│   ├── CLA-individual.md         # 个人 CLA 模板
│   ├── CLA-corporate.md          # 企业 CLA 模板
│   ├── CLA-whitelist.json        # 已签署 CLA 的个人/企业白名单
│   └── CLA-bot/                  # CLA 自动签署 CI
│       ├── cla_bot.py            # GitHub App(检测 PR → 检查白名单 → 未签→要求签署)
│       └── config.yaml           # Bot 配置
│
├── DCO/                          # Developer Certificate of Origin
│   ├── DCO.txt                   # DCO 全文(Linux Kernel 风格)
│   ├── DCO-check.yml             # CI: 所有 commit 强制 Signed-off-by
│   └── DCO-policy.md             # DCO 政策说明
│
├── licenses/                     # 开源许可证
│   ├── Apache-2.0.txt
│   ├── MIT.txt
│   ├── BSD-3-Clause.txt
│   └── GPL-3.0.txt
│
├── LICENSE                       # 本仓库的许可证
├── LICENSE-COMPATIBILITY.md      # 55 个仓库的许可证兼容矩阵
├── third-party/                  # 第三方依赖许可证审计
│   ├── dependency-licenses.json
│   └── notice.txt                # NOTICE 文件(Apache 2.0 要求)
└── contributor-guide.md          # 贡献者指南

CLA(Contributor License Agreement)vs DCO(Developer Certificate of Origin)

维度 CLA(Apache Style) DCO(Linux Kernel Style)
签署方式 下载 PDF → 签名 → 上传 每条 git commit 加 Signed-off-by
签署主体 个人或公司(法律实体) 个人(commit author)
内容 “我授予项目永久、全球、免版税的许可,以使用我的贡献” “我确认此提交是我原创,且我有权贡献”
公司参与 ✅ 支持(公司 CLA 覆盖所有员工) ✅ 但需要公司律师批准 DCO statement
执行方式 手动签署 + 人工审核 Commit hook + CI
法律保护 强(法律文本清晰,签名) 中(依赖 DCO 声明+社区信任)
CANN 使用 核心仓库(ops-* 等)强制 CLA 工具/文档/基础设施仓库使用 DCO
适用仓库 ~35 个系统仓库 ~20 个工具/社区仓库

CLA 自动签署 CI(CLA-Bot)

CLA-Bot 是 GitHub App,安装在 CANN 社区 → 每个 PR 自动检查白名单 → 未签名者自动评论引导签署 → 签署后自动标记 cla-signed 标签。

# cann-agreements/CLA/CLA-bot/cla_bot.py
#
# GitHub App webhook → CLA 自动签署检查

import json
import requests
from datetime import datetime

class CLABot:
    """
    CLA 自动签署 GitHub App
    流程:
    1. 接收 GitHub PR webhook
    2. 检查 author 是否在白名单
    3. 已签 → 标记 ✓ + cla-signed label
    4. 未签 → 自动回复签署指引
    5. 签署提交 → 自动检测 + 标记
    """

    CLA_WHITELIST_URL = "https://cann-community.dev/CLA/CLA-whitelist.json"
    CLA_SIGNING_URL = "https://cann-community.dev/CLA/sign"

    def __init__(self, github_token):
        self.github_token = github_token
        self.whitelist = self._load_whitelist()

    def _load_whitelist(self) -> dict:
        """加载白名单(个人 GitHub 用户名 + 企业域名)"""
        resp = requests.get(self.CLA_WHITELIST_URL)
        data = resp.json()

        # 格式:
        # {
        #   "individuals": ["alice", "bob", "charlie"],
        #   "corporate_domains": ["huawei.com", "inspur.com"]
        # }
        return data

    def check_pr(self, pr_event):
        """
        处理 GitHub PR webhook
        pr_event: {'action': 'opened', 'pull_request': {...}, 'sender': {...}}
        """
        action = pr_event["action"]
        if action not in ("opened", "synchronize"):
            return  # 忽略非 PR 事件

        pr_number = pr_event["pull_request"]["number"]
        pr_author = pr_event["sender"]["login"]
        pr_author_email = pr_event["pull_request"]["user"]["email"] or ""
        repo_fullname = pr_event["repository"]["full_name"]

        # 检查仓库是否需要 CLA
        if not self._repo_requires_cla(repo_fullname):
            print(f"{repo_fullname}: DCO only, skipping CLA check")
            return

        # 检查签名状态
        signed = self._is_signed(pr_author, pr_author_email)

        if signed:
            self._mark_signed(pr_event)
        else:
            self._request_signing(pr_event)

    def _is_signed(self, username, email):
        """检查用户是否已签署 CLA"""
        # 检查个人白名单
        if username.lower() in [u.lower() for u in self.whitelist.get("individuals", [])]:
            return True

        # 检查企业域名白名单(@huawei.com → huawei.com)
        email_domain = email.split("@")[1] if "@" in email else ""
        if email_domain.lower() in [d.lower() for d in self.whitelist.get("corporate_domains", [])]:
            return True

        return False

    def _mark_signed(self, pr_event):
        """标记 PR 已签 CLA"""
        repo = pr_event["repository"]["full_name"]
        pr_number = pr_event["pull_request"]["number"]

        # 1. 添加 cla-signed 标签
        self._add_label(repo, pr_number, "cla-signed")

        # 2. PR comment
        self._post_comment(repo, pr_number, (
            "✅ **CLA Signed**: Thank you for signing the "
            "Contributor License Agreement.\n\n"
            "Your contribution is now eligible for review."
        ))

    def _request_signing(self, pr_event):
        """要求签署 CLA(未签)"""
        repo = pr_event["repository"]["full_name"]
        pr_number = pr_event["pull_request"]["number"]
        author = pr_event["sender"]["login"]

        # 1. 添加 cla-required 标签
        self._add_label(repo, pr_number, "cla-required")

        # 2. PR comment 带签署链接
        sign_url = f"{self.CLA_SIGNING_URL}?github_user={author}"
        self._post_comment(repo, pr_number, (
            f"⚠️ **CLA Required**: Hi @{author}, before we can review "
            f"your contribution, please sign the Contributor License Agreement.\n\n"
            f"📝 **[Sign CLA →]({sign_url})**\n\n"
            f"Signing takes < 2 minutes. Once signed, reply `/cla-check` "
            f"to re-trigger verification.\n\n"
            f"Questions? See [CLA FAQ](https://cann-community.dev/CLA/faq)"
        ))

    def handle_cla_check_command(self, pr_event):
        """处理 /cla-check 指令"""
        repo = pr_event["repository"]["full_name"]
        pr_number = pr_event["pull_request"]["number"]
        author = pr_event["sender"]["login"]

        # 重新拉取白名单(可能刚签署)
        self.whitelist = self._load_whitelist()

        if self._is_signed(author, ""):
            # 移除 cla-required,添加 cla-signed
            self._remove_label(repo, pr_number, "cla-required")
            self._mark_signed(pr_event)
        else:
            # 仍未签 → 提示
            self._post_comment(repo, pr_number, (
                f"⏳ @{author}, CLA not yet signed. "
                f"Please complete signing at: {self.CLA_SIGNING_URL}?github_user={author}"
            ))

    def _repo_requires_cla(self, repo_fullname):
        """哪些仓库需要 CLA(核心仓库) vs DCO(社区工具)"""
        # 核心系统仓库
        CLA_REPOS = [
            "ops-math", "ops-nn", "ops-transformer", "ops-cv", "ops-blas",
            "ops-fft", "ops-rand", "ops-tensor", "opbase",
            "catlass", "ascend-transformer-boost", "asnumpy", "graph-autofusion",
            "hccl", "hcomm", "hixl", "ascend-boost-comm", "shmem",
            "ge", "metadef", "runtime", "driver",
            "asc-devkit", "asc-tools", "pyasc", "pypto",
            "pto-isa", "atvc", "atvoss", "oam-tools", "sip",
            "torchtitan-npu", "tensorflow", "triton-inference-server-ge-backend"
        ]

        repo_name = repo_fullname.split("/")[1] if "/" in repo_fullname else repo_fullname
        return repo_name in CLA_REPOS

    def _add_label(self, repo, pr_number, label):
        """添加 GitHub label"""
        url = f"https://api.github.com/repos/{repo}/issues/{pr_number}/labels"
        headers = {"Authorization": f"token {self.github_token}"}
        requests.post(url, headers=headers, json={"labels": [label]})

    def _remove_label(self, repo, pr_number, label):
        """移除 GitHub label"""
        url = f"https://api.github.com/repos/{repo}/issues/{pr_number}/labels/{label}"
        headers = {"Authorization": f"token {self.github_token}"}
        requests.delete(url, headers=headers)

    def _post_comment(self, repo, pr_number, body):
        """发 PR comment"""
        url = f"https://api.github.com/repos/{repo}/issues/{pr_number}/comments"
        headers = {"Authorization": f"token {self.github_token}"}
        requests.post(url, headers=headers, json={"body": body})

    def sync_whitelist(self):
        """定期同步白名单(从签署数据库)"""
        # 从签署数据库拉取最新签署列表
        signed_users = self._fetch_signed_users_from_db()

        # 更新 CLA-whitelist.json
        with open("CLA-whitelist.json", "w") as f:
            json.dump({
                "individuals": signed_users["individuals"],
                "corporate_domains": signed_users["corporate_domains"],
                "last_updated": datetime.now().isoformat()
            }, f, indent=2)

        print(f"✅ Whitelist synced: {len(signed_users['individuals'])} individuals, "
              f"{len(signed_users['corporate_domains'])} domains")

DCO 强制执行(DCO-check.yml)

# cann-agreements/DCO/DCO-check.yml
# 所有 commit 必须带 Signed-off-by(DCO)
# 轻量替代 CLA

name: DCO Check

on: [pull_request]

jobs:
  dco-check:
    runs-on: ubuntu-latest
    steps:
      - name: Check commits for DCO sign-off
        run: |
          # 检查所有 commits(不含 merge commit)是否包含 Signed-off-by
          git fetch origin ${{ github.event.pull_request.base.sha }}
          git fetch origin ${{ github.event.pull_request.head.sha }}

          commits=$(git log \
            ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} \
            --no-merges \
            --format="%H")

          for commit in $commits; do
            message=$(git log -1 --format="%B" $commit)

            if ! echo "$message" | grep -q "^Signed-off-by:"; then
              author=$(git log -1 --format="%an <%ae>" $commit)
              echo "❌ Commit $commit ($author): Missing Signed-off-by"
              echo "   To fix, run:"
              echo "   $ git commit --amend --signoff"
              echo "   $ git push --force"
              exit 1
            else
              echo "✅ Commit $commit: OK"
            fi
          done

          echo "✅ All commits have valid DCO sign-off"

许可证兼容矩阵(55 个仓库的许可证分析)

# cann-agreements/licenses/license_audit.py
#
# 审计 55 个 CANN 仓库的许可证兼容性
# 生成的 LICENSE-COMPATIBILITY.md → 发布到网站

import yaml, json

class LicenseAuditor:
    """
    许可证兼容性审计
    """

    # 55 个仓库的许可证分配
    REPO_LICENSES = {
        # 核心算子库(系统核心 → Apache 2.0)
        "ops-math": "Apache-2.0",
        "ops-nn": "Apache-2.0",
        "ops-transformer": "Apache-2.0",
        "ops-cv": "Apache-2.0",
        "ops-blas": "Apache-2.0",
        "opbase": "Apache-2.0",

        # 加速库(用户空间 → Apache 2.0)
        "catlass": "Apache-2.0",
        "ascend-transformer-boost": "Apache-2.0",
        "asnumpy": "Apache-2.0",
        "torchtitan-npu": "Apache-2.0",

        # 通信库(系统核心 → Apache 2.0)
        "hccl": "Apache-2.0",

        # 编译与运行时(系统核心 → Apache 2.0 + 例外)
        "ge": "Apache-2.0",
        "runtime": "Apache-2.0",
        "driver": "Apache-2.0 + GPL-3.0",  # 驱动含 Linux Kernel header

        # 工具(MIT 更宽松)
        "cmake": "MIT",
        "oam-tools": "MIT",
        "asc-tools": "MIT",

        # 社区(CC-BY-4.0 文档)
        "cann-learning-hub": "CC-BY-4.0",
        "community": "CC-BY-4.0",
        "cann-competitions": "CC-BY-4.0",
    }

    # 第三方依赖 License 审计
    THIRD_PARTY_LICENSES = {
        "PyTorch": "BSD-3-Clause",
        "TensorFlow": "Apache-2.0",
        "NumPy": "BSD-3-Clause",
        "CMake": "BSD-3-Clause",
        "OpenSSL": "Apache-2.0",
        "Boost": "BSL-1.0",
        "Google Test": "BSD-3-Clause",
        "Abseil": "Apache-2.0",
        "Protocol Buffers": "BSD-3-Clause",
        "gRPC": "Apache-2.0",
        "SPIR-V Tools": "Apache-2.0",
        "Google Benchmark": "Apache-2.0",
    }

    COMPATIBILITY_MATRIX = {
        # Apache-2.0 兼容哪些 License(可用作依赖)
        "Apache-2.0": {
            "Apache-2.0": "✅ Compatible",
            "MIT": "✅ Compatible",
            "BSD-3-Clause": "✅ Compatible",
            "BSD-2-Clause": "✅ Compatible",
            "BSL-1.0": "✅ Compatible",
            "CC-BY-4.0": "✅ Compatible",
            "GPL-3.0": "⚠️ Caution (copyleft → derivative works also GPL)",
            "AGPL-3.0": "❌ Incompatible (strong copyleft)",
            "LGPL-3.0": "✅ Compatible (dynamic linking)",
        }
    }

    def audit_all_repos(self):
        """
        审计所有仓库 → 生成兼容性报告
        """
        violations = []

        for repo, license_type in self.REPO_LICENSES.items():
            # 检查与 Apache-2.0 的兼容性
            compat = self.check_compatibility(license_type, self.THIRD_PARTY_LICENSES)
            if not compat["all_ok"]:
                violations.append({
                    "repo": repo,
                    "license": license_type,
                    "violations": compat["violations"]
                })

        if violations:
            print("⚠️  License violations found:")
            for v in violations:
                print(f"  {v['repo']} ({v['license']}): {v['violations']}")
        else:
            print("✅ All repos license-compliant")

        return violations

    def check_compatibility(self, license_type, dependencies):
        """
        检查许可证兼容性
        返回: {"all_ok": bool, "violations": list}
        """
        if license_type not in self.COMPATIBILITY_MATRIX:
            return {"all_ok": False, "violations": ["Unknown license type"]}

        compat_table = self.COMPATIBILITY_MATRIX[license_type]
        violations = []

        for dep_name, dep_license in dependencies.items():
            if dep_license not in compat_table:
                violations.append(f"{dep_name} ({dep_license}): unknown compatibility")
            elif "❌" in compat_table[dep_license]:
                violations.append(f"{dep_name} ({dep_license}): INCOMPATIBLE")
            elif "⚠️" in compat_table[dep_license]:
                violations.append(f"{dep_name} ({dep_license}): CAUTION - {compat_table[dep_license]}")

        return {
            "all_ok": len(violations) == 0,
            "violations": violations
        }

    def generate_notice_file(self, repo_name):
        """
        生成 NOTICE 文件(Apache 2.0 要求列出全部第三方依赖的 license)
        """
        notices = []

        notices.append(f"CANN {repo_name}")
        notices.append(f"Copyright {datetime.now().year} The CANN Authors")
        notices.append(f"Licensed under Apache 2.0\n")

        notices.append("## Third-Party Dependencies\n")
        for dep_name, dep_license in self.THIRD_PARTY_LICENSES.items():
            notices.append(f"- **{dep_name}**: {dep_license}")

        with open(f"../{repo_name}/NOTICE", "w") as f:
            f.write("\n".join(notices))

        print(f"✅ Generated NOTICE file for {repo_name}")

    def generate_compatibility_md(self):
        """生成 LICENSE-COMPATIBILITY.md 矩阵表"""
        table = "| Repository | License | 3rd-Party Compatibility |\n"
        table += "|-----------|---------|------------------------|\n"

        for repo, license_type in sorted(self.REPO_LICENSES.items()):
            compat = self.check_compatibility(license_type, self.THIRD_PARTY_LICENSES)
            status = "✅" if compat["all_ok"] else "⚠️"
            table += f"| {repo} | {license_type} | {status} |\n"

        table += "\n"
        table += "### License Distribution\n\n"

        # 统计各许可证使用次数
        from collections import Counter
        license_dist = Counter(self.REPO_LICENSES.values())
        for lic_type, count in license_dist.most_common():
            table += f"- **{lic_type}**: {count} repositories\n"

        with open("LICENSE-COMPATIBILITY.md", "w") as f:
            f.write(table)

        print("✅ LICENSE-COMPATIBILITY.md generated")


auditor = LicenseAuditor()
auditor.audit_all_repos()
auditor.generate_compatibility_md()

贡献者指南:第一次 PR 全流程

# cann-agreements/contributor-guide.md

## 第一次向 CANN 贡献代码:完整流程

### 1. 选择仓库
55 个仓库 → 从 [CANN Repository Map](https://cann-community.dev/repos) 找到你的代码所在位置

### 2. 签署协议(二选一)

**方案A:DCO(推荐用于文档/工具/社区仓库)**
```bash
# 每个 git commit 末尾加 -s(Signed-off-by)
git commit -s -m "fix: correct typo in README"

方案B:CLA(核心系统仓库需要)

  1. 访问 CLA Signing Portal
  2. 用 GitHub 账号登录 → 填写姓名/邮箱
  3. 点击 “I Agree” → 自动加入白名单
  4. 提交 PR → CLA-Bot 自动标记 ✅ cla-signed

3. 提交 PR

git checkout -b fix/my-feature
# ... make changes ...
git commit -s -m "feat(ops-transformer): add FlashAttention backward kernel"
git push origin fix/my-feature
# → 创建 PR → CI 自动 DCO + CLA 检查

4. CI 自动检查

  • ✅ DCO check: commit 包含 Signed-off-by
  • ✅ CLA check: 已在白名单
  • ✅ CHANGELOG check: 更新了 CHANGELOG(如需要)
  • ✅ Code style: clang-format / black
  • ✅ Tests: unit tests pass

5. 提交后

社区维护者在 3 个工作日内 review → 合并 → 你的贡献成为 CANN 的一部分 🎉


## 踩坑一:CLA-Bot 在处理企业域名白名单时,误将个人 gmail 邮箱 `@huawei.com.cn` 匹配为企业域名

```python
# ❌ 域名匹配太宽("@huawei.com.cn" → 误识别为 huawei.com 企业用户)
email_domain = email.split("@")[1]
corp_domains = ["huawei.com", "inspur.com"]
is_corporate = any(email_domain.endswith(d) for d in corp_domains)
# → "alice@huawei.com.cn" → endswith("huawei.com") → True(误)

# ✅ 精确域名匹配(完整域名 + 子域名精确匹配)
def match_corporate_domain(email_domain, corp_domains):
    """
    只有精确域名才匹配
    huawei.com → @huawei.com(不是 @huawei.com.cn)
    """
    for corp_domain in corp_domains:
        if email_domain == corp_domain:
            return True
        # 允许精确子域名(如 "rd.huawei.com" 匹配 "huawei.com")
        if email_domain.endswith(f".{corp_domain}") and email_domain != f"www.{corp_domain}":
            return True
    return False

踩坑二:第三方依赖 UPX 是 GPL-3.0(copyleft)→ Apache 2.0 不兼容 → 可以二进制分发但不能源码合入

<!-- ✅ 审计报告:GPL 依赖处理指南 -->
| 依赖 | 许可证 | 兼容性 | 处理方案 |
|------|--------|--------|---------|
| UPX | GPL-3.0 | ❌ 不兼容 | 仅用于 CI 压缩 artifact,不 link 到 CANN 代码 |
| libusb | LGPL-3.0 | ✅ 动态链接 | 作为系统库动态链接(`dlopen`),不静态编译 |
| Linux Kernel Headers | GPL-2.0 | ✅ 例外 | `driver` 仓库声明 GPL exception,仅头文件引用 |

踩坑三:CLA 签署页面收集个人信息(姓名/邮箱/GitHub ID)→ 隐私合规(GDPR 用户数据存储)

# ❌ CLA 签署信息明文存储在公开 GitHub 仓库
# CLA-whitelist.json:
# {"individuals": [{"name": "张三", "email": "zhangsan@company.com", "github": "zhang"}]}
# → 邮箱暴露 → 隐私泄露

# ✅ 签名数据库与白名单分离
# 签名数据库(私密,只有 CLA-Bot 可读)
CLA_SIGNING_DB = {
    "github_zhang": {
        "name": "张三",
        "email": "zhangsan@company.com",  # 私密存储
        "signed_at": "2025-03-15",
        "signed_version": "2.0"
    }
}

# 白名单(公开,只暴露 GitHub 用户名)
CLA_WHITELIST_PUBLIC = {
    "individuals": ["zhang", "li", "wang"],  # 只存 GitHub ID
    "corporate_domains": ["huawei.com"],
}

# CLA-Bot 内部查白名单 → 如果 GitHub ID 在 public list → 再查私有 DB
# → 用户邮箱不在公共仓库 → GDPR 合规

cann-agreements 是 CANN 社区的法律协议基础层。CLA(核心仓库强制,CLA-Bot 自动检测+签署) vs DCO(工具仓库推荐,commit signoff)→ 许可证兼容矩阵(55 仓全 Apache 2.0 / MIT / CC-BY-4.0 + 12 个第三方依赖审计)→ 贡献者指南(完整 PR 流程:DCO/CLA→CHANGELOG→CI→review)。三个踩坑:企业域名误匹配 @xxx.com.cn→精确域名匹配、GPL 依赖 UPX 不合规→仅二进制分发、CLA 个人信息泄露→公私分离白名单。<|end▁of▁thinking|>

<||DSML||tool_calls>
<||DSML||invoke name=“write”>
<||DSML||parameter name=“content” string=“true”># 昇腾CANN hicann 实战:异构计算区域网络——多节点集群管理与资源调度

CANN 集群部署场景:8 台 Atlas 900 服务器(每台 8×Ascend 910,共 64 个 NPU)→ 需要统一的资源管理。Kubernetes 太重(Etcd 3 节点 + control plane + CNI 100GB overhead),不适用于 AI 训练集群。hicann(异构计算区域网络,Heterogeneous Intelligent Computing Area Network)是 CANN 生态的轻量集群管理器:节点发现(mDNS 自动扫描局域网 NPU)、资源池(NUMA topology + HBM memory pool + NPU compute slots)、Job 作业调度(FIFO + Priority + 资源预留)、监控面板(Prometheus + Grafana NPU 专用 exporter)。

hicann 和 Kubernetes scheduler 不同——它不是通用容器调度器,而是NPU 原生的 task placement 引擎:理解 DA 芯片的 NUMA 拓扑(NPU0/NPU1 共享 PCIe switch、跨 socket 延迟)、HBM 带宽层次(整卡 64GB 共享 vs 每 Cluster 16GB 独占)、hccl 通信拓扑(跨节点 200G RoCE / 节点内 HCCS 300GB/s)。

hicann 系统架构

hicann/
├── hicannd/                       # 核心守护进程(每节点一个)
│   ├── main.go                    # Go daemon entry
│   ├── node_discovery.go          # mDNS + gRPC 服务发现
│   ├── resource_manager.go        # NPU 资源池(NUMA + HBM + slots)
│   ├── job_scheduler.go           # FIFO + Priority + 资源预留调度器
│   └── health_checker.go          # NPU 健康检查(温度/功耗/PCIe 错误)
│
├── hicann-client/                 # CLI 工具
│   ├── main.go                    # hicann submit/jobs/logs/status
│   └── client.go                  # gRPC client → hicann daemon
│
├── hicann-exporter/               # Prometheus exporter(NPU metrics)
│   ├── main.go                    # HTTP server(:9091/metrics)
│   ├── collector.go               # npu-smi → Prometheus metrics
│   └── dashboard.yaml             # Grafana dashboard JSON
│
├── hicann-gui/                    # Web UI(React)
│   ├── src/
│   │   ├── App.tsx
│   │   ├── components/
│   │   │   ├── ClusterTopology.tsx  # NUMA + NPU 拓扑图
│   │   │   ├── JobQueue.tsx         # 作业队列面板
│   │   │   └── ResourceHeatmap.tsx  # 资源热力图
│   │   └── api.ts
│   └── package.json
│
├── examples/
│   ├── distributed-training.yaml   # 分布式训练 Job spec
│   ├── llm-inference.yaml          # LLM 推理 Job spec
│   └── hyperparameter-search.yaml  # 超参搜索(多 job 排队)
│
└── config.yaml                    # 集群配置

节点发现(mDNS + gRPC)

// hicann/hicannd/node_discovery.go
//
// 每节点启动 hicannd → mDNS 广播 → 自动发现同局域网内的其他节点
// 无需手动配置集群列表

package main

import (
    "context"
    "fmt"
    "net"
    "strings"
    "sync"
    "time"

    "github.com/hashicorp/mdns"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    pb "hicann/proto"
)

type NodeDiscovery struct {
    mu          sync.RWMutex
    self        *NodeInfo
    peers       map[string]*NodeInfo  // hostname → NodeInfo
    grpcClients map[string]pb.HicannClient
}

type NodeInfo struct {
    Hostname      string
    IP            string
    NumNPUs       int
    NPUModel      string   // "Ascend910", "Ascend950PR"
    AvailableNPUs int
    TotalHBMGB    int
    AvailableHBMGB int
    NPUTopology   *NumaTopology
    IsMaster      bool
    LastSeen      time.Time
}

type NumaTopology struct {
    Sockets []SocketTopology
}

type SocketTopology struct {
    SocketID int
    NPUs     []NPUInfo
    HBMBandwidthGBps int  // Package 内 HBM 带宽
}

func (nd *NodeDiscovery) Start(servicePort int) error {
    // Step 1: 注册自身为 mDNS 服务
    service, err := mdns.NewMDNSService(
        nd.self.Hostname,           // 实例名
        "_hicann._tcp",             // 服务类型
        "", "",                     // 域名
        servicePort,                // 端口
        nil,                        // IP(nil=自动检测)
        []string{
            fmt.Sprintf("npus=%d", nd.self.NumNPUs),
            fmt.Sprintf("model=%s", nd.self.NPUModel),
            fmt.Sprintf("hbm_gb=%d", nd.self.TotalHBMGB),
        },
    )
    if err != nil {
        return fmt.Errorf("mDNS registration failed: %w", err)
    }

    server, err := mdns.NewServer(&mdns.Config{Zone: service})
    if err != nil {
        return fmt.Errorf("mDNS server: %w", err)
    }
    defer server.Shutdown()

    // Step 2: 监听局域网内其他 hicann 节点
    entriesCh := make(chan *mdns.ServiceEntry, 32)
    go func() {
        for entry := range entriesCh {
            nd.handleDiscoveredNode(entry)
        }
    }()

    mdns.Lookup("_hicann._tcp", entriesCh)

    // Step 3: 定期 refreshing(10s heartbeats)
    ticker := time.NewTicker(10 * time.Second)
    go func() {
        for range ticker.C {
            mdns.Lookup("_hicann._tcp", entriesCh)
            nd.pruneDeadNodes()  // 移除超时节点
        }
    }()

    return nil
}

func (nd *NodeDiscovery) handleDiscoveredNode(entry *mdns.ServiceEntry) {
    // 解析节点信息
    hostname := entry.Host // "npu-node-01.local."

    // 跳过自己
    if nd.self.Hostname == hostname {
        return
    }

    // 从 TXT record 解析 NPU 信息
    var numNPUs, hbmGB int
    var model string
    for _, field := range entry.InfoFields {
        if strings.HasPrefix(field, "npus=") {
            fmt.Sscanf(field, "npus=%d", &numNPUs)
        }
        if strings.HasPrefix(field, "hbm_gb=") {
            fmt.Sscanf(field, "hbm_gb=%d", &hbmGB)
        }
        if strings.HasPrefix(field, "model=") {
            model = strings.TrimPrefix(field, "model=")
        }
    }

    peer := &NodeInfo{
        Hostname:   hostname,
        IP:         net.IP(entry.Addr).String(),
        NumNPUs:    numNPUs,
        NPUModel:   model,
        TotalHBMGB: hbmGB,
        LastSeen:   time.Now(),
    }

    // 建立 gRPC 连接(用于资源查询 + 作业提交)
    conn, err := grpc.Dial(
        fmt.Sprintf("%s:%d", peer.IP, entry.Port),
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithBlock(),
        grpc.WithTimeout(5*time.Second),
    )
    if err == nil {
        nd.grpcClients[peer.Hostname] = pb.NewHicannClient(conn)
    }

    nd.mu.Lock()
    nd.peers[peer.Hostname] = peer
    nd.mu.Unlock()

    fmt.Printf("🔍 Discovered node: %s (IP: %s, NPUs: %d, HBM: %dGB)\n",
        hostname, peer.IP, numNPUs, hbmGB)
}

func (nd *NodeDiscovery) pruneDeadNodes() {
    nd.mu.Lock()
    defer nd.mu.Unlock()

    now := time.Now()
    for hostname, peer := range nd.peers {
        if now.Sub(peer.LastSeen) > 30*time.Second {
            // 节点失联 > 30s → 标记为 dead,释放其资源
            fmt.Printf("💀 Node %s timed out (last seen: %s ago)\n",
                hostname, now.Sub(peer.LastSeen))
            delete(nd.peers, hostname)
        }
    }
}

作业调度器(FIFO + Priority + 资源预留)

// hicann/hicannd/job_scheduler.go

type JobSpec struct {
    Name       string            `yaml:"name"`
    Priority   int               `yaml:"priority"`   // 0-100, higher=more urgent
    Image      string            `yaml:"image"`      // Docker image
    Command    []string          `yaml:"command"`
    Resources  ResourceRequest   `yaml:"resources"`
    Env        map[string]string `yaml:"env"`
    MaxRetries int               `yaml:"max_retries"`
}

type ResourceRequest struct {
    NPUs      int    `yaml:"npus"`       // 需要的 NPU 数量
    HBMGB     int    `yaml:"hbm_gb"`     // HBM 需求(GB)
    NumNodes  int    `yaml:"num_nodes"`  // 跨节点数(单机 = 1, 多机 > 1)
    NPUModel  string `yaml:"npu_model"`  // "Ascend910" or "Ascend950PR"
    TimeoutMinutes int `yaml:"timeout"`  // 作业最大运行时间
}

type JobState struct {
    JobID     string
    Spec      JobSpec
    Status    string    // "pending", "running", "succeeded", "failed"
    StartTime time.Time
    Node      string    // 被调度的节点
    AssignedNPUs []int  // 分配的 NPU 索引 [0,1,2,3]
}

type JobScheduler struct {
    mu         sync.Mutex
    jobs       map[string]*JobState
    queue      []*JobState     // 待调度队列(按 Priority 排序)
    discovery  *NodeDiscovery
}

func (js *JobScheduler) SubmitJob(spec JobSpec) (*JobState, error) {
    job := &JobState{
        JobID:  generateJobID(),
        Spec:   spec,
        Status: "pending",
    }

    js.mu.Lock()
    js.jobs[job.JobID] = job
    js.queue = append(js.queue, job)
    js.mu.Unlock()

    // 触发调度
    go js.scheduleLoop()

    return job, nil
}

func (js *JobScheduler) scheduleLoop() {
    js.mu.Lock()
    defer js.mu.Unlock()

    // 按 Priority 降序排序队列
    sort.Slice(js.queue, func(i, j int) bool {
        return js.queue[i].Spec.Priority > js.queue[j].Spec.Priority
    })

    for _, job := range js.queue {
        if job.Status != "pending" {
            continue
        }

        // 试调度(找可用节点)
        assignedNode, assignedNPUs := js.findResources(job.Spec.Resources)
        if assignedNode != "" {
            // 调度成功!
            job.Status = "running"
            job.Node = assignedNode
            job.AssignedNPUs = assignedNPUs
            job.StartTime = time.Now()

            // 异步启动作业
            go js.startJob(job, assignedNode, assignedNPUs)
        } else {
            // 资源不足 → 等待(每个调度周期重试)
            fmt.Printf("⏳ Job %s (%s): waiting for resources (need %d NPUs × %d GB)\n",
                job.JobID, job.Spec.Name,
                job.Spec.Resources.NPUs, job.Spec.Resources.HBMGB)
        }
    }

    // 移除已调度作业
    js.queue = filter(js.queue, func(j *JobState) bool {
        return j.Status == "pending"
    })
}

func (js *JobScheduler) findResources(req ResourceRequest) (string, []int) {
    js.discovery.mu.RLock()
    defer js.discovery.mu.RUnlock()

    for hostname, node := range js.discovery.peers {
        // 1. NPU 数量检查
        available := js.getAvailableNPUs(hostname)
        if len(available) < req.NPUs {
            continue
        }

        // 2. HBM 检查
        hbmAvailable := js.getAvailableHBMGB(hostname)
        if hbmAvailable < req.HBMGB {
            continue
        }

        // 3. NPU 型号检查
        if req.NPUModel != "" && node.NPUModel != req.NPUModel {
            continue
        }

        // 4. NUMA 亲和性(避免跨 socket 分配)
        // 优先分配同一 socket 的 NPU(NUMA affinity)
        affinityNPUs := js.findNUMAAlignedNPUs(hostname, req.NPUs)
        if len(affinityNPUs) >= req.NPUs {
            return hostname, affinityNPUs[:req.NPUs]
        }

        // Fallback:跨 socket(性能稍差)
        return hostname, available[:req.NPUs]
    }

    return "", nil  // 没找到可用节点
}

func (js *JobScheduler) findNUMAAlignedNPUs(node string, numNPUs int) []int {
    // 从 NPU 拓扑中找 NUMA aligned 的 NPU(同一 socket 内)
    // 返回同 socket 内的 NPU 索引列表
    topology := js.discovery.peers[node].NUTopology

    for _, socket := range topology.Sockets {
        if len(socket.NPUs) >= numNPUs {
            indices := []int{}
            for _, npu := range socket.NPUs[:numNPUs] {
                indices = append(indices, npu.ID)
            }
            return indices
        }
    }

    return nil
}

func (js *JobScheduler) startJob(job *JobState, node string, npuIDs []int) {
    npuList := strings.Join(intsToStrings(npuIDs), ",")
    env := fmt.Sprintf("ASCEND_DEVICE_IDS=%s", npuList)

    // 通过 gRPC 在目标节点启动作业
    client := js.discovery.grpcClients[node]
    resp, err := client.RunJob(context.Background(), &pb.RunJobRequest{
        JobId:   job.JobID,
        Image:   job.Spec.Image,
        Command: job.Spec.Command,
        Env:     append(job.Spec.Env, env),
        NpuIds:  npuIDs,
    })

    if err != nil {
        job.Status = "failed"
        fmt.Printf("❌ Job %s failed: %v\n", job.JobID, err)
        return
    }

    // 等待作业完成
    // ... (通过 streaming gRPC 接收作业日志 + 完成通知)
}

Job Spec YAML 示例(分布式训练)

# hicann/examples/distributed-training.yaml

name: "llama-7b-finetune"
priority: 80
image: "registry.cann.com/training:torchtitan-v2.1"
resources:
  npus: 32          # 4 nodes × 8 NPUs
  hbm_gb: 2048      # 每卡 64GB × 32 = 需要总 HBM
  num_nodes: 4       # 跨 4 个节点
  npu_model: "Ascend910"
  timeout: 1440       # 最大 24 小时

command:
  - "python"
  - "-u"
  - "train.py"
  - "--config"
  - "configs/llama-7b-fsdp.yaml"
  - "--nproc_per_node=8"
  - "--nnodes=4"
  - "--node_rank=$HICANN_NODE_RANK"

env:
  MASTER_ADDR: "$HICANN_MASTER_ADDR"   # hicann 自动注入
  MASTER_PORT: "29500"
  WORLD_SIZE: "32"
  WANDB_PROJECT: "cann-training"

max_retries: 2     # OOM 自动重启(最多2次)

Prometheus NPU exporter(监控面板)

// hicann/hicann-exporter/collector.go
//
// 从 npu-smi 提取 metrics → Prometheus format
// 指标:
// - hicann_npu_utilization_percent      → GPU 利用率
// - hicann_npu_temperature_celsius      → 芯片温度
// - hicann_npu_power_watts              → 功耗
// - hicann_npu_hbm_used_bytes / total   → HBM 使用/总量
// - hicann_npu_pcie_errors_total        → PCIe 错误计数
// - hicann_npu_ecc_errors_total         → ECC 错误计数

package main

import (
    "fmt"
    "os/exec"
    "regexp"
    "strconv"
    "strings"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "net/http"
)

type NPUCollector struct {
    // 基础指标
    utilization *prometheus.Desc
    temperature *prometheus.Desc
    power       *prometheus.Desc
    hbmUsed     *prometheus.Desc
    hbmTotal    *prometheus.Desc

    // 错误指标
    pcieErrors  *prometheus.Desc
    eccErrors   *prometheus.Desc
}

func NewNPUCollector() *NPUCollector {
    return &NPUCollector{
        utilization: prometheus.NewDesc(
            "hicann_npu_utilization_percent",
            "NPU utilization percentage",
            []string{"npu_id", "hostname"}, nil,
        ),
        temperature: prometheus.NewDesc(
            "hicann_npu_temperature_celsius",
            "NPU temperature in Celsius",
            []string{"npu_id", "hostname"}, nil,
        ),
        power: prometheus.NewDesc(
            "hicann_npu_power_watts",
            "NPU power consumption in Watts",
            []string{"npu_id", "hostname"}, nil,
        ),
        hbmUsed: prometheus.NewDesc(
            "hicann_npu_hbm_used_bytes",
            "NPU HBM used in bytes",
            []string{"npu_id", "hostname"}, nil,
        ),
        hbmTotal: prometheus.NewDesc(
            "hicann_npu_hbm_total_bytes",
            "NPU HBM total in bytes",
            []string{"npu_id", "hostname"}, nil,
        ),
        pcieErrors: prometheus.NewDesc(
            "hicann_npu_pcie_errors_total",
            "PCIe error count",
            []string{"npu_id", "hostname", "error_type"}, nil,
        ),
        eccErrors: prometheus.NewDesc(
            "hicann_npu_ecc_errors_total",
            "ECC error count",
            []string{"npu_id", "hostname"}, nil,
        ),
    }
}

func (c *NPUCollector) Describe(ch chan<- *prometheus.Desc) {
    // 注册所有 metric descriptors
    ch <- c.utilization
    ch <- c.temperature
    ch <- c.power
    ch <- c.hbmUsed
    ch <- c.hbmTotal
    ch <- c.pcieErrors
    ch <- c.eccErrors
}

func (c *NPUCollector) Collect(ch chan<- prometheus.Metric) {
    // 调用 npu-smi 获取 NPU 状态
    output, err := exec.Command("npu-smi", "info", "-m", "-q").Output()
    if err != nil {
        fmt.Printf("Error running npu-smi: %v\n", err)
        return
    }

    hostname, _ := os.Hostname()

    // 解析输出(每行一个 NPU 的完整信息)
    npuBlocks := parseNPUInfo(string(output))

    for _, npu := range npuBlocks {
        npuID := fmt.Sprintf("%d", npu.ID)

        // 利用率
        ch <- prometheus.MustNewConstMetric(
            c.utilization, prometheus.GaugeValue,
            float64(npu.Utilization),
            npuID, hostname,
        )

        // 温度
        ch <- prometheus.MustNewConstMetric(
            c.temperature, prometheus.GaugeValue,
            float64(npu.Temperature),
            npuID, hostname,
        )

        // 功耗
        ch <- prometheus.MustNewConstMetric(
            c.power, prometheus.GaugeValue,
            float64(npu.Power),
            npuID, hostname,
        )

        // HBM 使用/总量
        ch <- prometheus.MustNewConstMetric(
            c.hbmUsed, prometheus.GaugeValue,
            float64(npu.HBMUsed),
            npuID, hostname,
        )
        ch <- prometheus.MustNewConstMetric(
            c.hbmTotal, prometheus.GaugeValue,
            float64(npu.HBMTotal),
            npuID, hostname,
        )

        // PCIe errors
        for _, pe := range npu.PCIeErrors {
            ch <- prometheus.MustNewConstMetric(
                c.pcieErrors, prometheus.CounterValue,
                float64(pe.Count),
                npuID, hostname, pe.Type,
            )
        }

        // ECC errors
        if npu.ECCCorrectable > 0 || npu.ECCUncorrectable > 0 {
            ch <- prometheus.MustNewConstMetric(
                c.eccErrors, prometheus.CounterValue,
                float64(npu.ECCCorrectable + npu.ECCUncorrectable),
                npuID, hostname,
            )
        }
    }
}

func main() {
    collector := NewNPUCollector()
    prometheus.MustRegister(collector)

    http.Handle("/metrics", promhttp.Handler())
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("OK"))
    })

    fmt.Println("hicann-exporter listening on :9091")
    http.ListenAndServe(":9091", nil)
}

// 解析 npu-smi 输出(简化版)
type NPUInfo struct {
    ID, Utilization, Temperature, Power int
    HBMUsed, HBMTotal                   int64
    PCIeErrors                          []PCIeError
    ECCCorrectable, ECCUncorrectable    int
}

func parseNPUInfo(output string) []NPUInfo {
    // 正则提取
    npuRe := regexp.MustCompile(`NPU\s*(\d+).*?Util\s*:\s*(\d+)%.*?Temp\s*:\s*(\d+)C`)
    matches := npuRe.FindAllStringSubmatch(output, -1)

    npus := []NPUInfo{}
    for _, m := range matches {
        id, _ := strconv.Atoi(m[1])
        util, _ := strconv.Atoi(m[2])
        temp, _ := strconv.Atoi(m[3])

        npus = append(npus, NPUInfo{
            ID: id, Utilization: util, Temperature: temp,
            // 其余字段从完整解析中填充
        })
    }
    return npus
}

踩坑一:mDNS 多播在跨子网时失效——交换机未启用 IGMP snooping → 节点发现只限单子网

// ❌ mDNS 多播只在同一 L2 子网内工作
// 172.16.1.x → 172.16.1.y  ✅ (same subnet)
// 172.16.1.x → 172.16.2.y  ❌ 跨子网,mDNS 不可达

// ✅ 混合发现:mDNS (L2) + Consul/etcd (L3) + 静态配置备份
type HybridNodeDiscovery struct {
    mdns    *NodeDiscovery     // L2: 子网内(快速,零配置)
    consul  *ConsulDiscovery   // L3: 跨子网(需要部署 Consul)
    static  []string           // 手动配置的节点列表(兜底)
}

func (hd *HybridNodeDiscovery) Discover() []*NodeInfo {
    nodes := []*NodeInfo{}

    // 1. mDNS(最快,5s)
    mdnsNodes := hd.mdns.Discover()
    nodes = append(nodes, mdnsNodes...)

    // 2. Consul(覆盖跨子网节点,10s)
    if hd.consul != nil {
        consulNodes := hd.consul.Discover("hicann")
        nodes = append(nodes, consulNodes...)
    }

    // 3. 静态配置(上次已知的好节点,fallback)
    for _, addr := range hd.static {
        node, err := hd.probeNode(addr)
        if err == nil {
            nodes = append(nodes, node)
        }
    }

    // 去重(按 Hostname)
    return deduplicateByHostname(nodes)
}

踩坑二:作业调度器跨节点分配 NPU 时未考虑 NUMA 拓扑——跨 socket 通信 40% 性能损失

// ❌ 只要节点有足够 NPU 就分配(忽略 NUMA 拓扑)
assignedNPUs := availableNPUs[:req.NPUs]
// → 可能跨 socket:NPU [0,1] 在 socket 0, NPU [4,5] 在 socket 1
// → PCIe cross-socket 延迟 +30-40%

// ✅ NUMA 亲和调度(优先同 socket → 同节点 → 跨节点)
func (js *JobScheduler) numaAffinityScore(candidate NodePlacement, req ResourceRequest) float64 {
    score := 0.0

    for _, npu := range candidate.NPUs {
        socket := npu.SocketID
        // 同一 socket 内的 NPU:得分 +10
        // 跨 socket 的 NPU:得分 -10
        for _, otherNPU := range candidate.NPUs {
            if npu == otherNPU {
                continue
            }
            if otherNPU.SocketID == socket {
                score += 10.0  // NUMA local → HCCS 直连
            } else {
                score -= 10.0  // NUMA remote → PCIe + QPI
            }
        }
    }

    return score
}

// 选择最高分的候选节点
bestPlacement := selectBestPlacement(candidates, req)

踩坑三:Prometheus metrics 采集频率不匹配——npu-smi 开销 + Prometheus scrape interval 冲突

// ❌ npu-smi 调用开销大(~200ms per call)
// Prometheus 每 15s scrape → 每 15s 一个 200ms npu-smi → 1.3% CPU 浪费
// 但 npu-smi 本身已经 1s 更新一次 → 15s scrape 丢掉中间 14 个数据点

// ✅ 缓存 + 边车代理(npu-smi 输出文件缓存)
type NPUMetricsCache struct {
    mu       sync.RWMutex
    metrics  []NPUInfo
    lastFetched time.Time
}

func (cache *NPUMetricsCache) RunCacheLoop() {
    // 每 1s 更新 cache(匹配 npu-smi 更新频率)
    ticker := time.NewTicker(1 * time.Second)
    go func() {
        for range ticker.C {
            output, _ := exec.Command("npu-smi", "info", "-m", "-q").Output()
            metrics := parseNPUInfo(string(output))

            cache.mu.Lock()
            cache.metrics = metrics
            cache.lastFetched = time.Now()
            cache.mu.Unlock()
        }
    }()
}

// Prometheus scrape 直接从 cache 读(0 overhead)
func (c *NPUCollector) Collect(ch chan<- prometheus.Metric) {
    cache.mu.RLock()
    metrics := cache.metrics
    cache.mu.RUnlock()

    for _, npu := range metrics {
        // 从 cache 读取 → 0 npu-smi overhead
        ch <- prometheus.MustNewConstMetric(c.utilization, ...)
    }
}

hicann 是 CANN 的轻量集群管理器。核心守护进程 hicannd(Go daemon,mDNS+gRPC 自动发现 LAN 节点)→ 作业调度器(FIFO+Priority+资源预留,NUMA 亲和调度避免跨 socket 损失 40%)→ NPU metrics exporter(Prometheus 格式,npu-smi → HBM/temp/power/PCIe/ECC 指标)→ monitoring dashboard(Grafana JSON)。三个踩坑:mDNS 跨子网失效→混合发现(mDNS L2 + Consul L3 + 静态配置)、NUMA 盲分配跨 socket 40%→NUMA 亲和评分、npu-smi 15s scrape 浪费 CPU→1s cache 边车零开销。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐