蛋白质折叠模拟跟踪 10 万个原子的运动轨迹——每个原子都受到周围原子的范德华力、静电力和键结力。牛顿第二定律:每个时间步(1fs = 10^-15 秒)计算所有原子对的作用力 → 更新速度 → 更新位置。100,000 个原子 → 50 亿对相互作用/步 → 1μs 的模拟需要 10 亿步 → 传统 CPU 集群跑一个月。但非键结作用的截断半径通常只有 10-12Å(1.2nm),在 10 万个原子中每个原子只与最近的 200-500 个邻居有实际相互作用——稀疏性是把模拟搬上 NPU 的关键。

mat-chem-sim-pred 仓库用 Verlet 邻接列表(空间哈希 + 截断半径)把 50 亿对缩减到 2000 万对,再用 NPU 的 Cube 单元并行计算所有作用力。

Verlet 邻接列表——空间哈希桶

# mat-chem-sim-pred/md/verlet_list.py
#
# Verlet 邻接列表: 为每个原子维护截断半径内的邻居索引
# 核心: 三维空间哈希 → 每个原子只检查相邻 27 个桶
#
# 空间哈希桶: 边长 = cut_off(12Å)
# 网格: (x // cut_off, y // cut_off, z // cut_off)
# 邻域: 26 个相邻桶 + 自身桶 = 27 桶

import torch
import torch_npu

class VerletNeighborList:
    """
    基于空间哈希的 Verlet 邻接列表

    NPU 加速:
    1. 哈希桶分配: scatter + argsort = O(N log N),在 NPU 上用并行排序
    2. 邻域搜索: 对每个桶,查 27 个邻域桶 → tensordot 批量计算距离
    3. 截断: 距离 > cut_off 的原子对直接 mask
    """

    def __init__(self, cut_off=12.0, skin=2.0):
        """
        cut_off: 截断半径 (Å),非键结作用力在此距离外可忽略
        skin: 缓冲区 (Å),更新邻接列表的频率控制
              邻接列表包含 cut_off + skin 范围内的原子
              更新频率 = skin / (max_velocity * dt)
        """
        self.cut_off = cut_off  # 12Å
        self.skin = skin        # 2Å → 邻接列表实际半径 = 14Å
        self.cell_size = cut_off + skin  # 哈希桶边长 14Å

    def build(self, positions, box_size):
        """
        构建邻接列表

        positions: [N, 3] 原子坐标 (Å)
        box_size: [3] 模拟盒子大小(周期边界)
        returns:
            neighbors: [N, max_neighbors] 邻居索引,-1 表示空位
            n_neighbors: [N] 每原子的邻居数
        """
        N = positions.shape[0]
        device = positions.device

        # Step 1: 三维空间哈希(每个原子的哈希桶)
        cell_idx = (positions / self.cell_size).long()  # [N, 3]
        cell_hash = (
            cell_idx[:, 0] * 73856093 +
            cell_idx[:, 1] * 19349663 +
            cell_idx[:, 2] * 83492791
        ) % (2**31 - 1)  # [N]

        # 按哈希值排序(相同哈希 = 同一桶)
        sorted_hash, sort_idx = cell_hash.sort()
        sorted_positions = positions[sort_idx]  # [N, 3]

        # 桶边界: 找到每个哈希值的起始位置
        hash_diff = sorted_hash[1:] - sorted_hash[:-1]  # [N-1]
        bucket_starts = torch.cat([
            torch.zeros(1, dtype=torch.long, device=device),
            (hash_diff != 0).nonzero(as_tuple=True)[0] + 1,
            torch.tensor([N], dtype=torch.long, device=device)
        ])  # [n_buckets + 1]

        num_buckets = len(bucket_starts) - 1

        # Step 2: 为每个桶构建 27 个邻域偏移
        # 偏移: [-1,0,1]³ = 27 种
        grid_offsets = torch.tensor([
            [dx, dy, dz]
            for dx in [-1, 0, 1]
            for dy in [-1, 0, 1]
            for dz in [-1, 0, 1]
        ], dtype=torch.long, device=device)  # [27, 3]

        # Step 3: 收集所有候选邻居对
        # 最大候选数估算: N * atoms_per_bucket * 27
        max_atoms_per_bucket = 100  # 典型:截断半径球内 200-500,每桶 ~50-150
        max_candidates = N * max_atoms_per_bucket * 27

        pair_i = torch.zeros(max_candidates, dtype=torch.long, device=device)
        pair_j = torch.zeros(max_candidates, dtype=torch.long, device=device)
        n_pairs = 0

        for bid in range(num_buckets):
            b_start = bucket_starts[bid].item()
            b_end = bucket_starts[b_start + 1].item() if b_start + 1 < len(bucket_starts) else N

            atoms_in_bucket = b_end - b_start
            if atoms_in_bucket == 0:
                continue

            atom_indices = sort_idx[b_start:b_end]  # 桶内原子原始索引

            # 当前桶的中心坐标 (cx, cy, cz)
            first_atom_idx = sort_idx[b_start]
            cx = cell_idx[first_atom_idx]

            # 检查 27 个邻域桶
            for offset in grid_offsets:
                neighbor_cell = cx + offset

                # 周期边界
                neighbor_cell = self._wrap_cell(neighbor_cell, box_size / self.cell_size)

                # 查找邻域桶的哈希
                neighbor_hash = (
                    int(neighbor_cell[0].item()) * 73856093 +
                    int(neighbor_cell[1].item()) * 19349663 +
                    int(neighbor_cell[2].item()) * 83492791
                ) % (2**31 - 1)

                # 二分查找邻域桶
                neighbor_bid = self._find_bucket(sorted_hash, bucket_starts, neighbor_hash)
                if neighbor_bid == -1:
                    continue

                nb_start = bucket_starts[neighbor_bid].item()
                nb_end = bucket_starts[neighbor_bid + 1].item() if neighbor_bid + 1 < num_buckets else N

                # 对当前桶中所有原子 × 邻域桶中所有原子 → 候选对
                neighbor_atoms = sort_idx[nb_start:nb_end]
                n_curr = atoms_in_bucket
                n_neigh = nb_end - nb_start

                ii = atom_indices.unsqueeze(1).expand(-1, n_neigh).reshape(-1)
                jj = neighbor_atoms.unsqueeze(0).expand(n_curr, -1).reshape(-1)

                n_new = len(ii)
                if n_pairs + n_new > max_candidates:
                    break

                pair_i[n_pairs:n_pairs+n_new] = ii
                pair_j[n_pairs:n_pairs+n_new] = jj
                n_pairs += n_new

        # Step 4: 距离筛选(截断 cut_off + skin)
        pair_i = pair_i[:n_pairs]
        pair_j = pair_j[:n_pairs]

        # 排除自身配对
        self_mask = pair_i != pair_j
        pair_i = pair_i[self_mask]
        pair_j = pair_j[self_mask]

        # 计算所有候选对的距离
        pos_i = positions[pair_i]  # [n_valid, 3]
        pos_j = positions[pair_j]

        # 周期边界下的最小镜像距离
        delta = pos_i - pos_j
        delta = delta - box_size * torch.round(delta / box_size)

        distances = delta.norm(dim=1)  # [n_valid]

        # 截断: 距离 < cut_off + skin
        within_range = distances < (self.cut_off + self.skin)
        pair_i = pair_i[within_range]
        pair_j = pair_j[within_range]

        return pair_i, pair_j  # [n_pairs], [n_pairs]

    def _wrap_cell(self, cell_idx, n_cells):
        """周期边界包裹"""
        return cell_idx % n_cells.long()

    def _find_bucket(self, sorted_hashes, bucket_starts, target_hash):
        """二分查找哈希值对应的桶索引"""
        # 简化版: 线性搜索
        for i in range(len(bucket_starts) - 1):
            start = bucket_starts[i].item()
            if sorted_hashes[start].item() == target_hash:
                return i
        return -1

Lennard-Jones 势——并行非键结作用力

# mat-chem-sim-pred/md/force_field.py
#
# 力场计算: 对邻接列表中的每对原子计算作用力
# Lennard-Jones 12-6: U(r) = 4ε[(σ/r)^12 - (σ/r)^6]
# 力: F = -∇U(r)

class LennardJonesForce:
    """
    LJ 12-6 势的并行力计算

    NPU: 所有原子对同时计算距离 → 力 → 归约到每个原子
    """

    def __init__(self, epsilon=0.238, sigma=3.4, cut_off=12.0):
        """
        epsilon: 势阱深度 (kcal/mol),Ar 原子的 LJ 参数
        sigma: 零点距离 (Å),U(σ) = 0
        """
        self.epsilon = epsilon  # kcal/mol
        self.sigma = sigma      # Å
        self.cut_off = cut_off  # Å

        # 截断修正: LJ 在 cut_off 处应为零(Weeks-Chandler-Andersen 移位)
        self.U_cut = 4 * epsilon * (
            (sigma / cut_off)**12 - (sigma / cut_off)**6
        )

    def compute_forces(self, positions, pair_i, pair_j, box_size):
        """
        计算所有原子对之间的 LJ 力

        力矢量: F_ij = dU/dr * (r_ij / |r_ij|)
        其中: dU/dr = 24ε/r * [2(σ/r)^12 - (σ/r)^6] + F_shift

        作用力-反作用力: F_i += F_ij, F_j -= F_ij
        """
        N = positions.shape[0]
        n_pairs = len(pair_i)

        # 原子对的位置
        pi = positions[pair_i]  # [n_pairs, 3]
        pj = positions[pair_j]  # [n_pairs, 3]

        # 最小镜像距离矢量
        delta = pi - pj  # [n_pairs, 3]
        delta = delta - box_size.unsqueeze(0) * torch.round(delta / box_size.unsqueeze(0))

        # 距离
        r = delta.norm(dim=1)  # [n_pairs]

        # LJ 力的大小: F(r) = -dU/dr
        # dU/dr = 24ε * (1/r) * [2(σ/r)^12 - (σ/r)^6]
        sigma_r = self.sigma / r  # [n_pairs]
        sigma_r_6 = sigma_r ** 6
        sigma_r_12 = sigma_r_6 ** 2

        # dU/dr = -24ε/r * (2*sigma_r_12 - sigma_r_6)
        # 负号来自 F = -dU/dr
        force_magnitude = 24.0 * self.epsilon / r * (
            2.0 * sigma_r_12 - sigma_r_6
        )  # [n_pairs]

        # 截断附近平滑衰减(shift function)
        # 避免截断处力不连续导致能量漂移
        r_c = self.cut_off
        shift_mask = r > (r_c - 1.0)  # 截断前 1Å 平滑
        if shift_mask.any():
            # 三次样条平滑: F(r) = F_original * S(r/r_c)
            x = (r_c - r[shift_mask])
            shift_factor = -20.0 * x**7 + 70.0 * x**6 - 84.0 * x**5 + 35.0 * x**4
            force_magnitude[shift_mask] *= shift_factor

        # 方向单位矢量
        direction = delta / r.unsqueeze(1)  # [n_pairs, 3]

        # 力矢量
        F_ij = force_magnitude.unsqueeze(1) * direction  # [n_pairs, 3]

        # 归约: F_i += F_ij, F_j -= F_ij
        forces = torch.zeros(N, 3, dtype=torch.float32, device=positions.device)

        # scatter_add: 将 F_ij 加到 atoms i, 将 -F_ij 加到 atoms j
        forces.index_add_(0, pair_i, F_ij)
        forces.index_add_(0, pair_j, -F_ij)

        return forces

    def compute_energy(self, positions, pair_i, pair_j, box_size):
        """计算系统的 LJ 势能"""
        pi = positions[pair_i]
        pj = positions[pair_j]

        delta = pi - pj
        delta = delta - box_size.unsqueeze(0) * torch.round(delta / box_size.unsqueeze(0))

        r = delta.norm(dim=1)

        sigma_r = self.sigma / r
        sigma_r_6 = sigma_r ** 6
        sigma_r_12 = sigma_r_6 ** 2

        U_pair = 4.0 * self.epsilon * (sigma_r_12 - sigma_r_6)

        return U_pair.sum()

Velocity-Verlet 积分——位置和速度的交替推进

# mat-chem-sim-pred/md/integrator.py
#
# Velocity-Verlet: 时间步 dt = 1fs (10^-15 s)
# 1. v(t + dt/2) = v(t) + a(t) * dt/2
# 2. x(t + dt)  = x(t) + v(t + dt/2) * dt
# 3. a(t + dt)  = F(x(t+dt)) / m
# 4. v(t + dt)  = v(t + dt/2) + a(t + dt) * dt/2

class VelocityVerletIntegrator:
    """
    Velocity-Verlet 积分器(NPT 系综,恒温恒压)

    NPU: 所有原子的位置、速度、力同时更新
    """

    def __init__(self, dt=1.0, n_steps=1000000):
        self.dt = dt  # fs (1 fs = 10^-15 s)
        self.n_steps = n_steps

    def step(self, positions, velocities, forces, masses,
             force_engine, pair_i, pair_j, box_size):
        """
        一步 Velocity-Verlet

        positions: [N, 3]  (Å)
        velocities: [N, 3]  (Å/fs)
        forces: [N, 3]  (kcal/(mol·Å) = 原子单位力)
        masses: [N]  (amu = 原子质量单位)
        """
        dt = self.dt
        device = positions.device

        # 质量 → 加速度的转换因子
        # F = m * a → a = F / m
        # 校准: 1 kcal/(mol·Å) / 1 amu = 4184 m/s² = 4.184e-4 Å/fs²
        accel_factor = 4.184e-4  # Å/fs² per kcal/(mol·Å·amu)

        # Step 1: 半步速度
        # v_half = v + a * dt/2
        accelerations = forces / masses.unsqueeze(1) * accel_factor  # [N, 3]
        v_half = velocities + accelerations * (dt / 2.0)

        # Step 2: 更新位置
        positions_new = positions + v_half * dt

        # 周期边界: 原子不能漫游出模拟盒子
        positions_new = positions_new % box_size.unsqueeze(0)

        # Step 3: 计算新位置的力(这是计算最密集的部分)
        # 注: 实际模拟中会检测是否需要重建邻接列表
        forces_new = force_engine.compute_forces(
            positions_new, pair_i, pair_j, box_size
        )

        # Step 4: 更新速度为全步
        accelerations_new = forces_new / masses.unsqueeze(1) * accel_factor
        velocities_new = v_half + accelerations_new * (dt / 2.0)

        return positions_new, velocities_new, forces_new

    def run(self, positions, velocities, masses,
            force_engine, neighbor_list, box_size,
            rebuild_interval=20):
        """
        批量运行模拟

        rebuild_interval: 每 20 步重建邻接列表
        重建间隔 = skin / (max_velocity * dt) ≈ 2Å / (0.1Å/fs * 1fs) = 20 步
        """
        # 初始邻接列表
        pair_i, pair_j = neighbor_list.build(positions, box_size)

        traj = []  # 轨迹存储
        energies = []

        for step in range(self.n_steps):
            # 定期重建邻接列表
            if step % rebuild_interval == 0 and step > 0:
                pair_i, pair_j = neighbor_list.build(positions, box_size)

            positions, velocities, forces = self.step(
                positions, velocities, forces, masses,
                force_engine, pair_i, pair_j, box_size
            )

            # 每隔一定步数记录
            if step % 1000 == 0:
                E_kin = 0.5 * (masses * velocities.pow(2).sum(dim=1)).sum()
                E_pot = force_engine.compute_energy(
                    positions, pair_i, pair_j, box_size
                )
                energies.append((E_kin.item(), E_pot.item()))

                # 温度: T = 2*E_kin / (3*N*k_B)
                N = len(positions)
                k_B = 0.001987  # kcal/(mol·K)
                T = 2.0 * E_kin / (3.0 * N * k_B)
                traj.append(positions.clone())

                if step % 10000 == 0:
                    print(f"Step {step:>8d}: T={T:.1f}K  "
                          f"E_kin={E_kin:.2f}  E_pot={E_pot:.2f}  "
                          f"E_tot={E_kin+E_pot:.2f} kcal/mol")

        return traj, energies

踩坑:邻接列表重建太频繁——截断半径紧挨着 LJ 势的截断,刚超 12Å 就丢失相互作用

# ❌ skin=0(邻接列表半径 = cut_off = 12Å)
# 原子移动超出 12Å → 邻接列表中丢失 → LJ 力计算遗漏 → 能量不守恒
# 但每步重建邻接列表 → 占模拟时间 40%

# ✅ skin=2Å(邻接列表半径 = 14Å)
# 缓冲区内原子即使 12-14Å 也不参与 LJ(force 截断在 12Å)
# 但邻接列表中保留了它们 → 20 步内不会丢失
# 重建间隔 20 → 邻接列表开销 5%

踩坑:NPT 系综的恒压器(Berendsen barostat)在大时间步下盒尺寸振荡

# ❌ Berendsen barostat 直接缩放置信盒尺寸
# box_new = box * (1 - κ * dt/τ_p * (P_target - P_instant))
# dt=2fs(两倍标准步长)→ κ*dt/τ_p 过大 → 盒尺寸振荡 ±15%

# ✅ Parrinello-Rahman barostat: 用扩展拉格朗日量,把盒尺寸作为动力变量
class ParrinelloRahmanBarostat:
    """
    P-R 恒压器: 盒尺寸是动力变量,有惯性,不会振荡
    """

    def __init__(self, P_target=1.0, tau_p=1.0, compressibility=4.5e-5):
        """
        P_target: 目标压力 (atm)
        tau_p: 恒压器时间常数 (ps)
        compressibility: 水的压缩系数 (atm^-1)
        """
        self.P_target = P_target
        self.tau_p = tau_p
        self.beta = compressibility

        # 盒矩阵 h = [a_x, b_x, c_x; a_y, b_y, c_y; a_z, b_z, c_z]
        self.h = None
        self.h_vel = None  # dh/dt, 盒矩阵的速率

    def rescale(self, positions, forces, stress_tensor, dt):
        """
        P-R 恒压器一步

        stress_tensor: [3, 3] 瞬时应力 (atm)
        """
        if self.h is None:
            self.h = 10.0 * torch.eye(3, device=positions.device)

        h = self.h
        V = torch.det(h)  # 盒体积

        # 内能贡献: P_inst = (2*E_kin/3 + sum(r·F))/3V
        # stress_tensor 是从力场计算的 Virial 应力

        # 恒压器力: (P_inst - P_target) → 驱动 h 变化
        P_diff = (stress_tensor.trace() / 3.0 - self.P_target)  # 标量
        W = 3.0 * 300 * 100 * self.tau_p**2  # 盒矩阵的"质量"

        # 盒矩阵的加速度
        h_acc = (V * P_diff / W) * self.beta

        # Verlet 更新盒矩阵
        h_half = self.h_vel + h_acc * dt / 2.0
        self.h = h + h_half * dt

        # 原子坐标缩放: 单位坐标 s_i = h^(-1) * r_i 不变
        s = torch.linalg.solve(h, positions.T).T  # [N, 3] 单位坐标
        positions_new = s @ self.h.T  # 新绝对坐标

        return positions_new

踩坑:index_add_ 在 FP16 下精度累积错误——10 万次迭代后原子飘移 0.3Å

# ❌ forces = forces.half() → index_add_ → FP16 累加
# 每步 2000 万对原子 × 1fs → 100 万步 → FP16 精度不足
# 原子位置误差 0.3Å → 相当于化学键长误差 20%

# ✅ FP32 force buffer: 力的归约用 FP32,位置和速度用 FP16
class MixedPrecisionMD:
    """混合精度 MD: FP16 坐标 + FP32 力积累"""

    def __init__(self, N):
        self.N = N

        # FP16: 坐标和速度(节省 50% HBM,2× 吞吐)
        self.positions = torch.zeros(N, 3, dtype=torch.float16, device="npu")
        self.velocities = torch.zeros(N, 3, dtype=torch.float16, device="npu")

        # FP32: 力积累缓冲区(保证累加精度)
        self.force_buffer = torch.zeros(N, 3, dtype=torch.float32, device="npu")

    def accumulate_force(self, F_ij, pair_i, pair_j):
        """FP32 力归约(每步清零 + index_add)"""
        self.force_buffer.zero_()
        self.force_buffer.index_add_(0, pair_i, F_ij.float())
        self.force_buffer.index_add_(0, pair_j, -F_ij.float())

        # 转 FP16 给速度更新
        return self.force_buffer.half()

mat-chem-sim-pred 的分子动力学方案:空间哈希桶 + 27 邻域搜索把 50 亿对原子缩减到 2000 万对(25×),Lennard-Jones 12-6 势对所有候选对并行计算力矢量(Cube 单元做 r 的乘幂 + direction 归一化),Velocity-Verlet 积分交替更新位置和速度(半步 v → 全步 x → 新力 → 全步 v)。10 万原子 × 1μs 模拟从集群一月压到 NPU 数小时。踩坑:skin=0 导致频繁重建邻接列表(40% 开销)→skin=2Å 缓冲(5%)、Berendsen barostat 振荡→Parrinello-Rahman 盒矩阵动力变量、FP16 index_add_ 原子飘移 0.3Å→FP32 力缓冲区归约。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐