昇腾CANN cann-competitions 实战:自动化评分引擎与排行榜系统的竞赛体系设计
·
CANN 开源社区每季度举办昇腾算子优化赛,200+ 参赛者提交 Ascend C 算子——评分要跑数百个测试用例对比性能,人工评测不可能。cann-competitions 仓库提供了一套完整的自动化竞赛流程:指标定义(性能/精度/代码规范性)→ 沙箱构建(CI 流水线编译+跑分)→ 自动评分引擎(权重加权 + Z-score 标准化)→ 排行榜生成。
最关键的是自动评分引擎——它不能简单取最快算子给满分,因为同样跑在 910B 上,算子 A 跑 2.3ms、算子 B 跑 2.31ms、算子 C 跑 2.35ms,三个选手的水平其实接近。直接用 raw timing 排名会忽略微小差异中的噪声(同一算子两次运行差 ±0.05ms)。需要用统计方法区分「显著最优」和「偶然最快」。
评分引擎——多维指标的 Z-score 加权
# cann-competitions/scoring/scoring_engine.py
#
# 自动评分引擎: 多个指标 → 归一化 → 加权 → 总分
#
# 指标类型:
# 1. 性能(latency): 越低越好
# 2. 显存占用: 越低越好
# 3. 精度(L2 error vs 参考): 越低越好,但低于 1e-5 后不区分
# 4. 代码规范性: flake8/pylint 得分,越高越好
# 5. 测试覆盖率: 越高越好
import numpy as np
import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
class MetricDirection(Enum):
"""指标方向"""
LOWER_IS_BETTER = "lower" # 延迟、显存、精度误差
HIGHER_IS_BETTER = "higher" # 覆盖率、规范分
@dataclass
class MetricDefinition:
"""评分指标定义"""
name: str
direction: MetricDirection
weight: float # 权重(所有指标权重之和 = 1.0)
threshold: Optional[float] = None # 最低门槛(不达标直接淘汰)
sig_figs: int = 3 # 有效数字
@dataclass
class SubmissionResult:
"""单个提交的评测结果"""
team_id: str
team_name: str
metrics: Dict[str, float] # metric_name → value
build_status: str = "success" # success / failed / timeout
test_passed: int = 0
test_total: int = 0
class ScoringEngine:
"""
多维指标自动评分引擎
流程: 原始值 → Z-score 标准化 → 方向修正 → 阈值筛选 → 加权求和
"""
def __init__(self, metrics_config: List[MetricDefinition]):
self.metrics = {m.name: m for m in metrics_config}
# 验证权重
total_weight = sum(m.weight for m in metrics_config)
if abs(total_weight - 1.0) > 0.001:
raise ValueError(f"Weights must sum to 1.0, got {total_weight}")
def score(self, submissions: List[SubmissionResult]) -> List[Dict]:
"""
对所有提交打分
returns: sorted list of {team_id, team_name, scores, total, rank}
"""
# Step 1: 过滤构建失败的提交
valid = [s for s in submissions if s.build_status == "success"]
# Step 2: 过滤测试通过率不达标的提交
qualified = [
s for s in valid
if s.test_total > 0 and s.test_passed / s.test_total >= 0.90
]
# Step 3: 每个指标做 Z-score 标准化
z_scores = self._compute_z_scores(qualified)
# Step 4: 方向修正 + 截断
normalized = self._normalize_by_direction(z_scores, qualified)
# Step 5: 阈值检查
passed_threshold = self._check_thresholds(normalized, qualified)
# Step 6: 加权求和
final_scores = self._weighted_sum(normalized, passed_threshold)
# Step 7: 排序 + 排名
final_scores.sort(key=lambda x: x["total_score"], reverse=True)
for rank, s in enumerate(final_scores, start=1):
s["rank"] = rank
return final_scores
def _compute_z_scores(self, submissions: List[SubmissionResult]):
"""Z-score 标准化: z = (x - μ) / σ"""
z = {}
for metric_name, metric_def in self.metrics.items():
# 收集所有提交的该指标值
values = []
for sub in submissions:
if metric_name in sub.metrics:
values.append(sub.metrics[metric_name])
if not values:
continue
arr = np.array(values, dtype=np.float64)
# 统计量
mu = np.mean(arr)
sigma = np.std(arr, ddof=1) # 样本标准差
# 避免除零
if sigma < 1e-10:
z[metric_name] = np.zeros_like(arr)
else:
z[metric_name] = (arr - mu) / sigma
return z
def _normalize_by_direction(self, z_scores, submissions):
"""
方向修正: lower_is_better → 反号 → 高分 = 好
LOWER 指标: score = -z(z 越小越好 → -z 越大得分越高)
HIGHER 指标: score = +z(z 越大越好 → +z 越大得分越高)
"""
normalized = {}
for metric_name, metric_def in self.metrics.items():
if metric_name not in z_scores:
continue
z = z_scores[metric_name]
if metric_def.direction == MetricDirection.LOWER_IS_BETTER:
score = -z # 负 Z → 越低越好,得分越高
else:
score = z # 正 Z → 越高越好,得分越高
# 截断到 [-3, 3](Z-score 超过 3σ 的异常值统一处理)
score = np.clip(score, -3.0, 3.0)
# 映射到 [0, 100] 区间
# linear: score ∈ [-3, 3] → [0, 100]
score_scaled = (score + 3.0) / 6.0 * 100.0
normalized[metric_name] = score_scaled
return normalized
def _check_thresholds(self, normalized, submissions):
"""阈值检查: 不达标的指标清零"""
passed = []
for i, sub in enumerate(submissions):
sub_passed = True
for metric_name, metric_def in self.metrics.items():
if metric_def.threshold is not None:
raw_value = sub.metrics.get(metric_name)
if raw_value is None:
sub_passed = False
break
# 阈值检查
if metric_def.direction == MetricDirection.LOWER_IS_BETTER:
if raw_value > metric_def.threshold:
sub_passed = False
else:
if raw_value < metric_def.threshold:
sub_passed = False
passed.append(sub_passed)
# 打印阈值淘汰信息
eliminated = [
submissions[i].team_name
for i, p in enumerate(passed) if not p
]
if eliminated:
print(f"Threshold eliminated: {eliminated}")
return passed
def _weighted_sum(self, normalized, passed_threshold):
"""加权求和"""
results = []
for i, (sub, passed) in enumerate(zip(submissions := self._get_qualified_subs(),
passed_threshold)):
if not passed:
results.append({
"team_id": sub.team_id,
"team_name": sub.team_name,
"total_score": 0.0,
"metric_scores": {},
"status": "threshold_failed"
})
continue
total = 0.0
metric_scores = {}
for metric_name, metric_def in self.metrics.items():
if metric_name in normalized and i < len(normalized[metric_name]):
score = normalized[metric_name][i]
weighted = score * metric_def.weight
total += weighted
metric_scores[metric_name] = {
"raw": round(sub.metrics.get(metric_name, 0), metric_def.sig_figs),
"z_normalized": round(float(score), 2),
"weighted": round(float(weighted), 2),
"weight": metric_def.weight,
}
results.append({
"team_id": sub.team_id,
"team_name": sub.team_name,
"total_score": round(float(total), 2),
"metric_scores": metric_scores,
"status": "passed",
})
return results
def _get_qualified_subs(self):
"""内部方法(在 _weighted_sum 外预先传入,这里简化)"""
return []
CI 沙箱——算子构建与基准测试自动化
# cann-competitions/.github/workflows/benchmark.yml
#
# 竞赛 CI: 提交 PR → 自动构建 → 运行基准测试 → 评分
# 所有参赛者统一硬件环境(Atlas 300T A2, 910B 310W)
name: Competition Benchmark
on:
pull_request:
branches: [competition/*]
paths:
- 'submissions/*/kernel.cpp'
- 'submissions/*/test_cases.txt'
jobs:
validate:
runs-on: [self-hosted, npu-910b] # 社区提供的 NPU 服务器
timeout-minutes: 30
steps:
- uses: actions/checkout@v4
- name: Setup CANN Environment
run: |
source /usr/local/Ascend/ascend-toolkit/set_env.sh
echo "ASCEND_HOME=$ASCEND_HOME" >> $GITHUB_ENV
- name: Find Changed Submissions
id: changed
run: |
# 只构建有变更的算子(节省 CI 时间)
CHANGED=$(git diff --name-only ${{ github.event.pull_request.base.sha }} \
${{ github.event.pull_request.head.sha }} \
-- submissions/*/kernel.cpp | \
xargs -I {} dirname {} | sort -u | tr '\n' ' ')
echo "changed=$CHANGED" >> $GITHUB_OUTPUT
- name: Build & Run Benchmark
run: |
for submission_dir in ${{ steps.changed.outputs.changed }}; do
team_name=$(basename $submission_dir)
echo "=== Testing: $team_name ==="
# 1. 编译 Ascend C 算子
cd $submission_dir
python3 build.py --soc Ascend910B
# 2. 运行基准测试(统一测试用例)
python3 benchmark.py \
--test-cases test_cases.txt \
--warmup 10 \
--iterations 100 \
--output benchmarks/$team_name.json
# 3. 精度校验(对比参考实现)
python3 accuracy_check.py \
--output benchmarks/$team_name.json \
--reference reference/solution.json \
--tolerance 1e-5
done
- name: Compute Scores
run: |
python3 scoring_engine.py \
--benchmark-dir benchmarks/ \
--config competition_v1_config.json \
--output leaderboard.json
- name: Post Results as PR Comment
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const results = JSON.parse(fs.readFileSync('leaderboard.json', 'utf8'));
let comment = '## 🏆 Competition Results\n\n';
comment += '| Rank | Team | Total | Latency | Memory | Accuracy |\n';
comment += '|------|------|-------|---------|--------|----------|\n';
for (const r of results.slice(0, 10)) {
comment += `| ${r.rank} | ${r.team_name} | ${r.total_score} |`;
comment += `${r.metric_scores.latency.raw}ms |`;
comment += `${r.metric_scores.memory.raw}MB |`;
comment += `${r.metric_scores.accuracy.raw} |\n`;
}
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
});
排行榜——实时排名与趋势分析
# cann-competitions/leaderboard/leaderboard.py
#
# 排行榜: 多轮竞赛的累积排名 + ELO 评分系统
class Leaderboard:
"""
竞赛排行榜
特性:
1. 按轮次独立排名
2. 累积 ELO 评分(跨轮比较)
3. 趋势分析(上升/下降/稳定)
4. 反作弊检测(异常提交标记)
"""
def __init__(self, competition_rounds: List[str]):
self.rounds = competition_rounds
self.submissions = {round_id: [] for round_id in competition_rounds}
self.elo_ratings = {} # team_id → current ELO
def add_submission(self, round_id: str, result: Dict):
"""添加一轮提交结果"""
self.submissions[round_id].append(result)
def compute_round_ranking(self, round_id: str):
"""计算单轮排名"""
subs = self.submissions[round_id]
ranked = sorted(subs, key=lambda s: s["total_score"], reverse=True)
for rank, sub in enumerate(ranked, start=1):
sub["round_rank"] = rank
return ranked
def update_elo(self, round_id: str, K=32):
"""
ELO 评分更新
每轮比赛后:
- 高分者从低分者获取 ELO 分
- 新进选手初始 1500 分
- K 因子: 新手(前 5 轮)用 K=48, 老手用 K=32
"""
ranked = self.compute_round_ranking(round_id)
n = len(ranked)
# 初始化新选手
for sub in ranked:
if sub["team_id"] not in self.elo_ratings:
self.elo_ratings[sub["team_id"]] = 1500
sub["elo_games_played"] = 0
# 成对更新: 排名高者对排名低者
for i in range(n):
for j in range(i + 1, n):
winner = ranked[i]
loser = ranked[j]
w_elo = self.elo_ratings[winner["team_id"]]
l_elo = self.elo_ratings[loser["team_id"]]
# 期望胜率
E_w = 1.0 / (1.0 + 10.0 ** ((l_elo - w_elo) / 400.0))
# K 因子
k_w = 48 if winner.get("elo_games_played", 0) < 5 else 32
k_l = 48 if loser.get("elo_games_played", 0) < 5 else 32
# 更新(胜者得 ELO,败者失 ELO)
self.elo_ratings[winner["team_id"]] += k_w * (1.0 - E_w)
self.elo_ratings[loser["team_id"]] += k_l * (0.0 - (1.0 - E_w))
winner["elo_games_played"] = winner.get("elo_games_played", 0) + 1
loser["elo_games_played"] = loser.get("elo_games_played", 0) + 1
return self.elo_ratings
def detect_anomaly(self, round_id: str):
"""
反作弊检测: 检查异常模式
1. 分数跳变: 同一选手两轮间分数跃升 > 50%(可能是换人或作弊)
2. 完美提交: 所有指标均为 top 1%
3. 代码相似度: 与其他提交的余弦相似度 > 0.95
"""
current_round = self.submissions[round_id]
flags = []
# 检查分数跳变(需要前一轮数据)
prev_round_idx = self.rounds.index(round_id) - 1
if prev_round_idx >= 0:
prev_round = self.rounds[prev_round_idx]
prev_scores = {
s["team_id"]: s["total_score"]
for s in self.submissions[prev_round]
}
for sub in current_round:
tid = sub["team_id"]
if tid in prev_scores:
jump = (sub["total_score"] - prev_scores[tid]) / prev_scores[tid] * 100
if jump > 50:
flags.append({
"team_id": tid,
"team_name": sub["team_name"],
"reason": f"Score jump: +{jump:.0f}%",
"severity": "high"
})
# 检查完美指标
for sub in current_round:
perfect_count = 0
for metric_name, score_info in sub.get("metric_scores", {}).items():
if score_info.get("z_normalized", 0) > 2.9: # top 0.1%
perfect_count += 1
if perfect_count >= 3: # 3 个以上指标接近完美
flags.append({
"team_id": sub["team_id"],
"team_name": sub["team_name"],
"reason": f"Unusually high scores: {perfect_count} metrics near perfect",
"severity": "medium"
})
return flags
踩坑:Z-score 标准化在小样本下的方差估计偏差——5 个提交时 σ 严重低估
# ❌ 第一轮只有 5 个提交 → σ = (真实σ)/2 → Z-score 膨胀 2×
# 选手 A 比均值好 0.5σ → Z-score = -1.0(看起来显著)
# 但实际只有 5 个样本,标准差不稳定 → 微小差异被放大
# ✅ 小样本贝叶斯收缩: 用先验 σ_0 收缩 σ 估计
class RobustScoring(ScoringEngine):
"""
鲁棒评分:小样本下的贝叶斯收缩 Z-score
z = (x - μ) / σ_shrunk
其中 σ_shrunk = σ_sample * (1 - shrinkage) + σ_prior * shrinkage
shrinkage = 1 / (1 + (n-1) * σ_sample² / σ_prior²)
"""
def __init__(self, metrics_config, prior_std=None):
super().__init__(metrics_config)
# 先验标准差(基于历史竞赛经验)
self.prior_std = prior_std or {
"latency": 0.15, # 延迟通常变异 15%
"memory": 0.08, # 显存通常变异 8%
"accuracy": 0.5, # 精度误差变异大(数量级差异)
}
def _compute_z_scores(self, submissions):
"""贝叶斯收缩 Z-score"""
z = {}
for metric_name, metric_def in self.metrics.items():
values = np.array([
s.metrics[metric_name]
for s in submissions
if metric_name in s.metrics
], dtype=np.float64)
if len(values) < 3:
continue
n = len(values)
mu = np.mean(values)
sigma_sample = np.std(values, ddof=1)
# 贝叶斯收缩
sigma_prior = self.prior_std.get(metric_name, 0.2) * abs(mu)
shrinkage = 1.0 / (1.0 + (n - 1) * sigma_sample**2 / sigma_prior**2)
sigma_shrunk = sigma_sample * (1 - shrinkage) + sigma_prior * shrinkage
z[metric_name] = (values - mu) / sigma_shrunk
return z
踩坑:CI 沙箱的 NPU 资源排队——200 个 PR 同时提交,10 台 NPU 服务器排队 3 小时
# ❌ 每台 NPU 服务器同时跑多个 benchmark → HBM 争抢
# 选手 A 的算子和选手 B 的算子共享 NPU → 延迟 double → 评分不公平
# ✅ 独占 + 任务队列: 每台 NPU 一次只跑一个 benchmark
# celery 或 bull-queue 管理任务队列
class NPUBenchmarkQueue:
"""
NPU 基准测试任务队列
关键: 每个 NPU 一次只跑一个 benchmark(独占)
"""
def __init__(self, npu_devices: List[int]):
self.npu_devices = npu_devices # [0, 1, 2, 3, 4, 5, 6, 7]
self.device_locks = {
dev: threading.Lock() for dev in npu_devices
}
def run_benchmark(self, team_name: str, submission_dir: str):
"""在空闲 NPU 上运行基准测试"""
# 找到空闲 NPU
device = None
while device is None:
for dev in self.npu_devices:
if self.device_locks[dev].acquire(blocking=False):
device = dev
break
if device is None:
time.sleep(5) # 等待 5 秒后重试
try:
# 独占运行
os.environ["ASCEND_VISIBLE_DEVICES"] = str(device)
result = subprocess.run([
"python3", "benchmark.py",
"--submission-dir", submission_dir,
"--output", f"results/{team_name}.json"
], capture_output=True, text=True, timeout=900)
return result.returncode == 0
finally:
self.device_locks[device].release()
cann-competitions 的自动化竞赛评分:多维指标 Z-score 标准化 + 方向修正(lower→反号/higher→保号)+ 阈值淘汰(延迟>5ms/精度>1e-3 直接 DQ)→ 加权求和总分,ELO 跨轮累积排名追踪选手成长。CI 沙箱在统一 910B 硬件上自动构建+跑基准(100 iterations + 10 warmup),PR comment 自动贴排行榜。踩坑:小样本 σ 低估→贝叶斯收缩(5 提交时 σ_shrunk = 0.3σ_sample+0.7σ_prior)、多提交共享 NPU 分时致评分不公平→NPU 独占锁任务队列、完美指标 top 0.1% 多次出现→反作弊相似度检测。
更多推荐




所有评论(0)