作者:昇腾实战派

背景

在深度学习和强化学习领域,高效的大规模训练框架对于提升模型性能至关重要。GLM-5 在推理、编码和智能体任务上取得了全球所有开源模型中的最佳性能,显著缩小了与前沿模型的差距。这背后,Slime 作为其强化学习基础设施,通过优化训练吞吐量和效率,受到了广泛关注。

昇腾平台已对slime 训练框架进行了兼容性适配

在这里插入图片描述

整体设计

Slime 采用 Ray 作为单控制器(Single Controller)进行调度,基于 **SGLang **和 Megatron LM 作为唯一后端,构建了一个简洁且支持大规模 RL 的框架。其设计目标是减少训练与推理之间数据传递的开销,并使所有环节尽可能对齐真实生产环境中的组件,从而实现灵活的大规模 RL 训练。

slime 采用分离式架构,将 RLHF 训练流程分解为三个独立协作的模块:

  • Training (Megatron): 负责主训练流程,支持完整的 5D 并行(TP/PP/DP/CP/EP)
  • Rollout (SGLang): 使用 SGLang router 进行请求分发,生成新数据,基于 SGLang 的采样逻辑;
  • Data Buffer: 管理数据流和自定义生成逻辑

在这里插入图片描述
基于前卫的设计,slime 的自由度很高:

  1. 资源调度自由:支持 co-locatedis-aggregate 两种部署策略;在 rollout 和 training 上各自支持 DP/TP/PP/EP;具体实现见 slime/ray/placement_group.py
  2. 训练方式自由:支持同步训练异步训练两种模式;具体实现见 slime/train.py 和 slime/train_async.py;注意,后者需要在 dis-aggregate 架构下进行训推分离的异步训练,rollout 始终领先 train 一个 step,也即 one-step off-policy
  3. 采样方式自由:支持用户自定义复杂的采样流程,包括多轮工具调用、奖励模型集成、自定义验证器等;具体实现见 slime_plugins/rollout_buffer/
  4. 模型支持自由:支持 Dense 和 MoE 模型;具体脚本可参考 slime/scripts/run-qwen3-4B.sh 和 slime/scripts/run-deepseek-r1.sh

在这里插入图片描述

整体控制流程

在这里插入图片描述

训练主流程如下:

# train.py
def train(args):
    # 1. 创建 Placement Groups
    pgs = create_placement_groups(args)

    # 2. 创建 RolloutManager(包含 SGLang 引擎)
    rollout_manager, num_rollout_per_epoch = create_rollout_manager(args, pgs["rollout"])

    # 3. 创建 Actor 和 Critic 模型
    actor_model, critic_model = create_training_models(args, pgs, rollout_manager)

    # 4. 训练循环
    for rollout_id in range(args.start_rollout_id, args.num_rollout):
        # 生成样本
        rollout_data_ref = rollout_manager.generate.remote(rollout_id)

        # 模型训练
        actor_model.async_train(rollout_id, rollout_data_ref)

        # 权重同步
        actor_model.update_weights()

同步训练 train.py

for rollout_id in range(args.start_rollout_id, args.num_rollout):
    # 阻塞等待当前轮 rollout 数据生成完毕
    rollout_data_ref = ray.get(rollout_manager.generate.remote(rollout_id))

    # 将 rollout 模型从 GPU 卸载以释放显存
    if args.offload_rollout:
        ray.get(rollout_manager.offload.remote())

    # 使用当前 rollout 数据执行一步训练
    ray.get(actor_model.async_train(rollout_id, rollout_data_ref))

    # 将 rollout 模型权重重新加载回 GPU
    if args.offload_rollout:
        ray.get(rollout_manager.onload_weights.remote())

    # 将最新训练权重同步到 rollout 模型
    actor_model.update_weights()

    # 权重更新完成后重新加载 KV cache
    if args.offload_rollout:
        ray.get(rollout_manager.onload_kv.remote())

异步训练 train_async.py

# 预先启动第一轮 rollout,使其在训练循环开始前就在后台运行
rollout_data_next_future = rollout_manager.generate.remote(args.start_rollout_id)

for rollout_id in range(args.start_rollout_id, args.num_rollout):
    # 等待上一轮预生成的 rollout 数据就绪
    if rollout_data_next_future is not None:
        rollout_data_curr_ref = ray.get(rollout_data_next_future)

    # 提前异步启动下一轮 rollout,与当前训练并行执行
    if rollout_id + 1 < args.num_rollout:
        rollout_data_next_future = rollout_manager.generate.remote(rollout_id + 1)

    # 执行当前轮训练,此时下一轮 rollout 正在后台并行生成
    ray.get(actor_model.async_train(rollout_id, rollout_data_curr_ref))

    # 权重更新前等待并缓存正在进行的 rollout,同步屏障保证策略一致性
    if (rollout_id + 1) % args.update_weights_interval == 0:
        rollout_data_curr_ref = ray.get(x) if (x := rollout_data_next_future) is not None else None
        rollout_data_next_future = None

训练初始化

Placement Group 组创建

Slime 使用 Ray 作为其资源细粒度调度的框架,以支持分离(dis-aggregate)或统一(co-located)的资源放置策略。

首先,系统根据当前模式(traindebug_train_onlydebug_rollout_only)创建 Placement Group (PG):

  • Co-located 模式: rolloutactor (训练) 指向同一个 PG,实现资源共享。
  • Debug 模式: actorrollout 可以独占资源,允许隔离调试 RL 系统的某个组件。

Ray 的 Placement Group Scheduling 机制是实现这一点的关键。它通过分配逻辑上的虚拟资源来管理 GPU。例如,可以分配给 train group 中的每个 actor 0.4 个 GPU,然后 rollout manager 中的每个 engine 再要求 0.2 个 GPU。Ray 本身不限制 GPU 的实际使用,而是按需进行逻辑分配。

# slime/ray/placement_group.py
def allocate_train_group(args, num_nodes, num_gpus_per_node, pg):
    return RayTrainGroup(
        args=args,
        num_nodes=num_nodes,
        num_gpus_per_node=num_gpus_per_node,
        pg=pg,
        num_gpus_per_actor=0.4,  # 训练 Actor 请求 0.4 GPU
    )

# slime/ray/rollout.py
def init_rollout_engines(args, pg, all_rollout_engines):
    # ...
    for i in range(num_engines):
        num_gpus = 0.2  # 推理引擎请求 0.2 GPU
        num_cpus = num_gpus

        scheduling_strategy = PlacementGroupSchedulingStrategy(
            placement_group=pg,
            placement_group_capture_child_tasks=True,
            placement_group_bundle_index=reordered_bundle_indices[i * num_gpu_per_engine],
        )

        rollout_engine = RolloutRayActor.options(
            num_cpus=num_cpus,
            num_gpus=num_gpus,
            scheduling_strategy=scheduling_strategy,
        ).remote(args, rank=i, worker_type=worker_type, base_gpu_id=base_gpu_id)

对于非 co-located 的情况,由于初始创建的 PG 就是分离的,actor 和 rollout 会自然地被调度到各自指定的资源上。

# slime/ray/placement_group.py
def create_placement_groups(args):
    num_gpus = 0
    if args.debug_train_only:
        num_gpus = args.actor_num_nodes * args.actor_num_gpus_per_node
        rollout_offset = 0
        # ...
    elif args.debug_rollout_only:
        num_gpus = args.rollout_num_gpus
        rollout_offset = 0
    elif args.colocate:
        num_gpus = args.actor_num_nodes * args.actor_num_gpus_per_node
        rollout_offset = 0  # actor 和 rollout 共享同一组 GPU
        # ...
    else:
        # 分离模式:rollout 从 actor 之后开始
        num_gpus = args.actor_num_nodes * args.actor_num_gpus_per_node + args.rollout_num_gpus
        rollout_offset = args.actor_num_nodes * args.actor_num_gpus_per_node
        # ...

    # 返回包含 actor、critic、rollout 的 PG 字典
    return {
        "actor": (pg, actor_pg_reordered_bundle_indices, actor_pg_reordered_gpu_ids),
        "critic": (pg, critic_pg_reordered_bundle_indices, critic_pg_reordered_gpu_ids) if args.use_critic else None,
        "rollout": (pg, rollout_pg_reordered_bundle_indices, rollout_pg_reordered_gpu_ids),
    }

RayTrainGroup / Actor 初始化

理解这个逻辑可以分为三层:

  1. 顶层 (RayTrainGroup): 负责 TrainRayActor 实例的创建、初始化和资源分配。它接收顶层指令(如 train.py),并将其广播到其管理的所有 TrainRayActor 实例。
  2. 中层 (TrainRayActor): 作为分布式训练的基本执行单元,每个 TrainRayActor 负责一部分计算任务,并与 SGLang 引擎建立连接以进行权重同步。
  3. 底层 (Megatron-LM): 通过后端 megatron_utils 封装的核心训练逻辑。

RayTrainGroup 通过 self._actor_handlers 列表持有多个 TrainRayActor 实例

RayTrainGroup 中的异步方法都使用 def 定义。这是因为在 Ray 中,调用 .remote() 方法会立即同步返回一个句柄对象(类似 Future),而真正的异步任务执行发生在后台。只有当显式调用 ray.get() 时,才会阻塞并获取结果。

# slime/ray/actor_group.py
class RayTrainGroup:
    def async_init(self, args, role, with_ref=False):
        return [actor.init.remote(args, role, with_ref=with_ref) for actor in self._actor_handlers]

    def async_train(self, rollout_id, rollout_data_ref):
        return [actor.train.remote(rollout_id, rollout_data_ref) for actor in self._actor_handlers]

    def update_weights(self):
        return ray.get([actor.update_weights.remote() for actor in self._actor_handlers])

MegatronTrainRayActor 中有几个重要的设计点:

  • 权重管理: 使用 torch_memory_saver 进行显存管理,支持通过标签(tag)定向地卸载(offload)或加载(onload)参数。
# slime/backends/megatron_utils/actor.py
class MegatronTrainRayActor(TrainRayActor):
    def init(self, args, role, with_ref=False):
        # ...
        if args.offload_train:
            if (x := args.train_memory_margin_bytes) > 0:
                torch_memory_saver.memory_margin_bytes = x

        (self.model, self.optimizer, self.opt_param_scheduler, loaded_rollout_id) = initialize_model_and_optimizer(
            args, role
        )
  • 模型 On/Offload: 通过 torch_memory_saver 实现模型的休眠和唤醒:
# slime/backends/megatron_utils/actor.py
def sleep(self) -> None:
    assert self.args.offload_train
    clear_memory(clear_host_memory=True)
    print_memory("before offload model")
    destroy_process_groups()
    torch_memory_saver.pause()
    print_memory("after offload model")

def wake_up(self) -> None:
    assert self.args.offload_train
    print_memory("before wake_up model")
    torch_memory_saver.resume()
    clear_memory()
    reload_process_groups()
    print_memory("after wake_up model")

RolloutManager / SGLang Engine 初始化

同样可以分为三层来理解:

  1. 顶层 (RolloutManager): 负责宏观的生命周期管理,包括启动 SGLang 负载均衡器、管理数据源以及实例化一组 SGLang 引擎。
  2. 中层 (SGLangEngine): 作为面向 Ray 分布式环境的接口,封装了核心的 SGLang 推理引擎。
  3. 底层 (SGLang HTTP Server): 包含了 SGLang 的具体实现,负责实际的文本生成任务。

RolloutManager 是一个 @ray.remote 装饰的 Ray Actor,通过 self.all_rollout_engines 列表持有多个 SGLangEngine 实例

RolloutManager 的初始化核心职责包括:

在这里插入图片描述

  1. 启动 Router (_start_router): 通过 multiprocessing.Process 启动一个独立的 sglang_router 进程或自定义的 slime_router 进程。这个路由器作为负载均衡器,接收推理请求并分发给后端的 SGLang 服务。
# slime/ray/rollout.py
def _start_router(args):
    if args.sglang_router_ip is not None:
        return

    args.sglang_router_ip = _wrap_ipv6(get_host_info()[1])
    if args.sglang_router_port is None:
        args.sglang_router_port = find_available_port(random.randint(3000, 4000))

    if args.use_slime_router:
        from slime.router.router import run_router
        router_args = args
    else:
        from sglang_router.launch_router import RouterArgs
        from slime.utils.http_utils import run_router
        router_args = RouterArgs.from_cli_args(args, use_router_prefix=True)
        router_args.host = args.sglang_router_ip
        router_args.port = args.sglang_router_port
        # ...

    # 启动路由器进程
    process = multiprocessing.Process(target=run_router, args=(router_args,))
    process.daemon = True
    process.start()
  1. 加载数据源和 Rollout 函数: 动态加载用户指定的数据源和 rollout 函数,实现高度可插拔的设计:
# slime/ray/rollout.py
class RolloutManager:
    def __init__(self, args, pg):
        # ...
        data_source_cls = load_function(self.args.data_source_path)
        self.data_source = data_source_cls(args)

        self.generate_rollout = load_function(self.args.rollout_function_path)
        self.eval_generate_rollout = load_function(self.args.eval_function_path)
  1. 创建 Rollout Engines (init_rollout_engines): 创建 SGLang 引擎集群,并为每个引擎分配网络端口(HTTP、NCCL 等):
# slime/ray/rollout.py
def init_rollout_engines(args, pg, all_rollout_engines):
    RolloutRayActor = ray.remote(SGLangEngine)

    for i in range(num_engines):
        rollout_engine = RolloutRayActor.options(
            num_cpus=num_cpus,
            num_gpus=num_gpus,
            scheduling_strategy=scheduling_strategy,
        ).remote(args, rank=i, worker_type=worker_type, base_gpu_id=base_gpu_id)

        all_rollout_engines[i] = rollout_engine
  1. 创建 Lock Actor 和 Health Monitor: 创建一个分布式锁 Actor,用于在权重更新时防止数据竞争。同时,如果启用了容错机制,会创建 RolloutHealthMonitor 来监控引擎健康状态。
# slime/ray/rollout.py
class RolloutManager:
    def __init__(self, args, pg):
        # ...
        self.rollout_engine_lock = Lock.options(num_cpus=1, num_gpus=0).remote()

        self._health_monitor = None
        if self.args.use_fault_tolerance:
            self._health_monitor = RolloutHealthMonitor(self, args)
            self._health_monitor.start()

模型与参数同步

模型同步初始化

MegatronTrainRayActorinit 方法完成分布式环境和模型的初始化。

  1. 分布式环境初始化: init(args) 会初始化 torch.distributed,并设置 Megatron-LM 的模型并行/流水线并行组(MPU)。
# slime/backends/megatron_utils/initialize.py
def init(args):
    set_args(args)
    _initialize_distributed(args)
    _set_random_seed(args.seed, args.data_parallel_random_init, ...)
    _build_tokenizer(args)
  1. 分词器串行加载: 为了避免多个 rank 同时读写 Hugging Face 缓存目录导致文件损坏,配置和分词器采用串行加载(rank by rank)的方式。
# slime/backends/megatron_utils/actor.py
for i in range(dist.get_world_size()):
    if i == dist.get_rank():
        self.hf_config = AutoConfig.from_pretrained(args.hf_checkpoint, trust_remote_code=True)
        self.tokenizer = AutoTokenizer.from_pretrained(self.args.hf_checkpoint, trust_remote_code=True)
    dist.barrier(group=get_gloo_group())
  1. 模型和优化器初始化: 模型初始化遵循 Megatron 的实践,直接提供 GPTModelmodel_provider
# slime/backends/megatron_utils/model.py
def initialize_model_and_optimizer(args, role="actor"):
    model, optimizer, opt_param_scheduler = setup_model_and_optimizer(args, role)
    model[0].role = role
    iteration, _ = load_checkpoint(model, optimizer, opt_param_scheduler, ...)
    return model, optimizer, opt_param_scheduler, iteration
  1. Reference Model 初始化: 加载与主模型相同的初始权重,但在训练中保持不变,作为计算 KL 散度的基准。
参数同步连接

权重同步相关的代码(转换、通信协议、连接建立)在大规模框架中通常占很大比重。

通过 set_rollout_manager 方法建立训练 Actor 与 Rollout Manager 的连接:

# slime/ray/actor_group.py
class RayTrainGroup:
    def set_rollout_manager(self, rollout_manager):
        return ray.get([actor.set_rollout_manager.remote(rollout_manager) for actor in self._actor_handlers])

在执行层,根据 colocate 配置,采用两种不同策略建立通信组:

  • 策略1: Co-located (Gloo IPC): 当训练和推理 Actor 在同一批 GPU 上时,使用 UpdateWeightFromTensor 类。它创建 Gloo 通信组,通过序列化和 Ray IPC 传输权重数据。
# slime/backends/megatron_utils/update_weight/update_weight_from_tensor.py
class UpdateWeightFromTensor:
    def __init__(self, args, model, weights_getter, ...):
        # 创建 Gloo 通信组
        for start_rank in range(0, dist.get_world_size(), self.args.rollout_num_gpus_per_engine):
            group_ranks = list(range(start_rank, end_rank))
            new_group = dist.new_group(ranks=group_ranks, backend="gloo")
  • 策略2: Distributed (NCCL): 当 Actor 分布在不同机器时,使用 UpdateWeightFromDistributed 类。它创建 NCCL 通信组,通过 dist.broadcast 广播权重。
# slime/backends/megatron_utils/update_weight/update_weight_from_distributed.py
class UpdateWeightFromDistributed:
    def connect_rollout_engines(self, rollout_engines, rollout_engine_lock):
        # 创建 NCCL 通信组
        model_update_groups = init_process_group(
            backend="nccl",
            init_method=f"tcp://{master_address}:{master_port}",
            world_size=world_size,
            rank=0,
            group_name=group_name,
        )

由于所有 Megatron 权重最终都转换为 HF 格式供 rollout 侧消费,框架通过 megatron_to_hf 模块支持多种模型的权重转换(LLaMA、Qwen、GLM、DeepSeek 等)。

训练主流程

所有初始化和配置工作都是为了训练能够高效稳定地运行。主流程核心包括:样本生成 -> 模型训练 -> 权重更新

训练流程 — 生成样本

RolloutManager 直接管理生成任务:

# slime/ray/rollout.py
class RolloutManager:
    def generate(self, rollout_id):
        start_time = time.time()
        self.rollout_id = rollout_id
        data, metrics = self._get_rollout_data(rollout_id=rollout_id)
        data = self._convert_samples_to_train_data(data)
        return self._split_train_data_by_dp(data, self.train_parallel_config["dp_size"])

数据生成逻辑通过 call_rollout_fn 调用用户自定义的 rollout 函数,这使得数据生成逻辑完全可插拔:

def _get_rollout_data(self, rollout_id):
    data = call_rollout_fn(self.generate_rollout, self.args, rollout_id, self.data_source, evaluation=False)
    metrics = data.metrics
    data = data.samples
    # flatten the data if it is a list of lists
    while isinstance(data[0], list):
        data = list(itertools.chain.from_iterable(data))

    if not self.args.disable_rollout_trim_samples:
        global_batch_size = self.args.global_batch_size
        if self.args.use_dynamic_global_batch_size:
            logger.info(f"Collected {len(data)} samples from rollout to train with dynamic global batch size")
            # TODO: this is a temporary solution, we should directly save dynamic_global_batch_size to rollout data
            self._dynamic_global_batch_size = self._compute_dynamic_global_batch_size(len(data))
            global_batch_size = self._dynamic_global_batch_size

        if len(data) % global_batch_size != 0:
            trim_len = (len(data) // global_batch_size) * global_batch_size
            if trim_len == 0:
                raise ValueError(f"Not enough samples {len(data)} for global_batch_size {global_batch_size}")
            origin_data_length = len(data)
            data = data[:trim_len]
            logger.info(f"trim number of samples from {origin_data_length} to {trim_len}")
        logger.info(f"Final collected {len(data)} samples from rollout to train")

return data, metrics

# slime/rollout/base_types.py
def call_rollout_fn(fn, *args, evaluation: bool, **kwargs):
    output = fn(*args, **kwargs, evaluation=evaluation)
    if not isinstance(output, (RolloutFnTrainOutput, RolloutFnEvalOutput)):
        output = RolloutFnEvalOutput(data=output) if evaluation else RolloutFnTrainOutput(samples=output)
    return output

数据流: RolloutManager -> Ray Object Store -> Train Actor

Rollout 过程产生的经验(Sample dataclass)被转换为标准的 Python 字典格式(train_data),通过 Ray 对象存储高效地传输给训练 Actor。

训练流程 — 模型训练

# slime/backends/megatron_utils/actor.py
class MegatronTrainRayActor:
    def train_actor(self, rollout_id, rollout_data):
        # 1. 创建数据迭代器
        data_iterator, num_microbatches = get_data_iterator(self.args, self.model, rollout_data)

        # 2. Routing Replay 填充(如果启用)
        if self.args.use_rollout_routing_replay:
            self.fill_routing_replay(data_iterator, num_microbatches, rollout_data)

        # 3. 计算 Reference Log Probs(如果需要)
        if "ref" in self.weights_backuper.backup_tags:
            self._switch_model("ref")
            rollout_data.update(self.compute_log_prob(data_iterator, num_microbatches, store_prefix="ref_"))

        # 4. 计算 Actor Log Probs
        self._switch_model("old_actor" if self.args.keep_old_actor else "actor")
        if not self.args.use_rollout_logprobs or self.args.get_mismatch_metrics:
            rollout_data.update(self.compute_log_prob(data_iterator, num_microbatches, store_prefix=""))

        # 5. 计算 Advantages 和 Returns
        compute_advantages_and_returns(self.args, rollout_data)

        # 6. 执行训练步骤
        train(rollout_id, self.model, self.optimizer, self.opt_param_scheduler, data_iterator, num_microbatches)

训练流程的关键步骤:

  1. 获取数据: TrainRayActor 根据 DP rank 从 Ray Object Store 获取对应的数据分片(分片已在 RolloutManager 侧预完成)。
  2. 计算 Log Probs: 分别使用参考模型和策略模型进行前向传播,计算 log probabilities。
  3. 计算优势函数: 从 rollout 数据中取出 log probs 和 rewards,计算 KL 散度,并根据 GRPO/GSPO/PPO/REINFORCE++ 等算法公式计算 advantagesreturns
  4. 执行训练步骤: 调用 Megatron 的 forward_backward_func 执行前向传播计算 loss -> 反向传播 -> 梯度裁剪 -> 参数更新。

在这里插入图片描述

训练流程 — 权重同步

训练完成后,需要将更新后的权重同步到 SGLang 推理引擎。

路径一:Co-located (Gloo IPC)

# slime/backends/megatron_utils/update_weight/update_weight_from_tensor.py
class UpdateWeightFromTensor:
    def update_weights(self):
        self.weight_version += 1

        # 1. 暂停推理
        ray.get([engine.pause_generation.remote() for engine in self.rollout_engines])
        ray.get([engine.flush_cache.remote() for engine in self.rollout_engines])

        # 2. 获取权重并转换为 HF 格式
        megatron_local_weights = self.weights_getter()

        # 3. 通过 Gloo gather + Ray IPC 发送
        for hf_named_tensors in self._hf_weight_iterator.get_hf_weight_chunks(megatron_local_weights):
            refs, long_lived_tensors = self._send_hf_params(hf_named_tensors)
            ray.get(refs)

        # 4. 恢复推理
        ray.get([engine.continue_generation.remote() for engine in self.rollout_engines])

路径二:Distributed (NCCL Broadcast)

# slime/backends/megatron_utils/update_weight/update_weight_from_distributed.py
class UpdateWeightFromDistributed:
    def update_weights(self):
        self.weight_version += 1

        # 1. 暂停推理
        ray.get([engine.pause_generation.remote() for engine in self.rollout_engines])
        ray.get([engine.flush_cache.remote() for engine in self.rollout_engines])

        # 2. 非专家参数:gather TP -> 转换 HF -> broadcast
        for name, param in named_params_and_buffers(self.args, self.model):
            if ".experts." not in name:
                param = all_gather_param(name, param)
                # 转换并广播

        # 3. 专家参数:gather EP -> 转换 HF -> broadcast
        for name, param in named_params_and_buffers(self.args, self.model):
            if ".experts." in name:
                # EP gather + broadcast

        # 4. 恢复推理
        ray.get([engine.continue_generation.remote() for engine in self.rollout_engines])

数据源管理机制

Slime 的数据管理采用分层设计,从数据源获取、推理生成到训练消费,形成完整的数据流闭环。

数据源架构

数据源通过 DataSource 抽象类定义,主要有两种实现:

在这里插入图片描述

# slime/rollout/data_source.py
class DataSource(abc.ABC):
    @abc.abstractmethod
    def get_samples(self, num_samples: int) -> list[list[Sample]]:
        """获取指定数量的样本"""

    @abc.abstractmethod
    def add_samples(self, samples: list[list[Sample]]):
        """添加样本到数据源"""

    @abc.abstractmethod
    def save(self, rollout_id):
        """保存数据源状态"""

    @abc.abstractmethod
    def load(self, rollout_id=None):
        """加载数据源状态"""
1. RolloutDataSource(只读数据源)

从全局数据集读取 prompt,支持 shuffle 和断点续训:

# slime/rollout/data_source.py
class RolloutDataSource(DataSource):
    def __init__(self, args):
        self.epoch_id = 0
        self.sample_offset = 0

        if args.rollout_global_dataset:
            self.dataset = Dataset(
                args.prompt_data,
                tokenizer=tokenizer,
                processor=processor,
                max_length=args.rollout_max_prompt_len,
            )

    def get_samples(self, num_samples):
        # 从数据集获取 prompt 样本
        prompt_samples = self.dataset.samples[self.sample_offset : self.sample_offset + num_samples]
        self.sample_offset += num_samples

        # 为每个 prompt 创建 n_samples_per_prompt 个副本
        samples = []
        for prompt_sample in prompt_samples:
            group = []
            for _ in range(self.args.n_samples_per_prompt):
                sample = copy.deepcopy(prompt_sample)
                sample.group_index = self.sample_group_index
                sample.index = self.sample_index
                self.sample_index += 1
                group.append(sample)
            self.sample_group_index += 1
            samples.append(group)
        return samples
2. RolloutDataSourceWithBuffer(带缓冲区数据源)

在只读数据源基础上增加缓冲区,支持将中断的样本重新加入队列:

# slime/rollout/data_source.py
class RolloutDataSourceWithBuffer(RolloutDataSource):
    def __init__(self, args):
        super().__init__(args)
        self.buffer = []  # 缓冲区存储未完成的样本组

    def get_samples(self, num_samples):
        # 优先从缓冲区获取
        samples = self._get_samples_from_buffer(num_samples)
        num_samples -= len(samples)

        # 不足部分从数据集获取
        if num_samples > 0:
            samples += super().get_samples(num_samples=num_samples)
        return samples

    def add_samples(self, samples):
        # 将样本组添加到缓冲区
        for group in samples:
            self.buffer.append(group)

Sample 数据结构

Sample 是贯穿整个数据流的核心数据结构:

# slime/utils/types.py
@dataclass
class Sample:
    # 标识
    group_index: int | None = None      # 同一 prompt 的样本组索引
    index: int | None = None            # 全局唯一索引

    # Prompt 相关
    prompt: str | list[dict] = ""       # 原始 prompt
    tokens: list[int] = field(default_factory=list)  # 完整的 token 序列
    multimodal_inputs: dict = None      # 多模态输入

    # Response 相关
    response: str = ""                  # 生成的响应
    response_length: int = 0            # 响应长度
    reward: float | dict = None         # 奖励值
    loss_mask: list[int] = None         # loss 掩码

    # 状态
    status: Status = Status.PENDING    # PENDING/COMPLETED/TRUNCATED/ABORTED

    # 元数据
    metadata: dict = field(default_factory=dict)
    rollout_log_probs: list[float] = None  # 推理时的 log probs

推理侧:数据生成与存储

推理侧的数据流分为三个阶段:

  1. Prompt 获取
#train.py
# train loop.
# note that for async training, one can change the position of the sync operation(ray.get).
for rollout_id in range(args.start_rollout_id, args.num_rollout):
   if args.eval_interval is not None and rollout_id == 0 and not args.skip_eval_before_train:
       ray.get(rollout_manager.eval.remote(rollout_id))
   # 远程调用generate
   rollout_data_ref = ray.get(rollout_manager.generate.remote(rollout_id))

# slime/ray/rollout.py
class RolloutManager:
    def generate(self, rollout_id):
        # 调用用户自定义的 rollout 函数
        data = call_rollout_fn(
            self.generate_rollout, 
            self.args, 
            rollout_id, 
            self.data_source,  # 传入数据源
            evaluation=False
        )

# slime/rollout/sglang_rollout.py
def generate_rollout(
    args: Namespace, rollout_id: int, data_source: Any, evaluation: bool = False
) -> RolloutFnTrainOutput | RolloutFnEvalOutput:

    assert args.rollout_global_dataset
    if evaluation:
        output, _ = run(eval_rollout(args, rollout_id))
        return output

    output, aborted_samples = run(generate_rollout_async(args, rollout_id, data_source.get_samples))
    data_source.add_samples(aborted_samples)
    return output

用户自定义的 rollout 函数从数据源获取 prompt:

# slime/rollout/sglang_rollout.py
async def generate_rollout_async(args, rollout_id, data_source):
    target_data_size = args.rollout_batch_size
    data = []

    while len(data) < target_data_size:
        # 从数据源获取样本
        samples = data_source(args.over_sampling_batch_size)

        # 提交生成任务
        state.submit_generate_tasks(samples)

        # 等待生成完成
        done, state.pendings = await asyncio.wait(state.pendings, ...)
        for task in done:
            group = task.result()
            data.append(group)

    return RolloutFnTrainOutput(samples=data)
  1. 推理生成
# slime/rollout/sglang_rollout.py
def submit_generate_tasks(self, samples: list[list[Sample]]) -> None:
    for group in samples:
        self.pendings.add(
            asyncio.create_task(
                # submit a group of samples as a single task.
                generate_and_rm_group(
                    self.args,
                    group,
                    sampling_params=self.sampling_params.copy(),
                    evaluation=False,
                )
            )
        )
    self.remaining_batch_size += len(samples)

async def generate_and_rm(args, sample, sampling_params):
    # 1. 调用 SGLang 进行生成
    sample = await generate(args, sample, sampling_params)

    # 2. 计算 reward
    if sample.reward is None:
        sample.reward = await async_rm(args, sample)

    return sample

async def generate(args, sample, sampling_params):
    url = f"http://{args.sglang_router_ip}:{args.sglang_router_port}/generate"

    # 准备请求
    payload = {
        "input_ids": prompt_ids,
        "sampling_params": sampling_params,
        "return_logprob": True,  # 返回 log probs
    }

    # 发送请求
    output = await post(url, payload)

    # 更新 sample
    sample.tokens = sample.tokens + new_response_tokens
    sample.response_length += len(new_response_tokens)
    sample.response += output["text"]
    sample.rollout_log_probs += new_response_log_probs
    sample.update_from_meta_info(args, output["meta_info"])

    return sample

generate_and_rm_group :对样本组进行生成和奖励模型评估,并发生成所有样本
generate_and_rm:单个样本的生成和奖励模型评估

  1. 数据转换与分片

推理完成后,RolloutManagerSample 列表转换为训练数据格式:

# slime/ray/rollout.py
class RolloutManager:
    def _convert_samples_to_train_data(self, samples):
        # 后处理 reward(如 GRPO 的 group normalization)
        raw_rewards, rewards = self._post_process_rewards(samples)

        # 构建训练数据字典
        train_data = {
            "tokens": [sample.tokens for sample in samples],
            "response_lengths": [sample.response_length for sample in samples],
            "rewards": rewards,
            "raw_reward": raw_rewards,
            "loss_masks": [sample.loss_mask for sample in samples],
            "truncated": [1 if sample.status == Sample.Status.TRUNCATED else 0 for sample in samples],
            "sample_indices": [sample.index for sample in samples],
        }

        # 可选字段
        if samples[0].rollout_log_probs is not None:
            train_data["rollout_log_probs"] = [sample.rollout_log_probs for sample in samples]

        return train_data

    def _split_train_data_by_dp(self, data, dp_size):
        """按 DP size 分片数据"""
        total_lengths = [len(t) for t in data["tokens"]]
        data["total_lengths"] = total_lengths

        # 计算分片策略(支持序列长度均衡)
        if self.args.balance_data:
            partitions = get_seqlen_balanced_partitions(total_lengths, dp_size, equal_size=True)
        else:
            partitions = [range(i, len(total_lengths), dp_size) for i in range(dp_size)]

        # 为每个 DP rank 创建数据分片
        rollout_data_refs = []
        for i in range(dp_size):
            rollout_data = {}
            partition = partitions[i]
            for key in ["tokens", "rewards", "loss_masks", ...]:
                rollout_data[key] = [data[key][j] for j in partition]
            # 放入 Ray Object Store
            rollout_data_refs.append(Box(ray.put(rollout_data)))

        return rollout_data_refs

训练侧:数据获取与消费

训练侧从 Ray Object Store 获取数据:

# slime/utils/data.py
def process_rollout_data(args, rollout_data_ref, dp_rank, dp_size):
    # 从 Ray Object Store 获取数据
    rollout_data = ray.get(rollout_data_ref[dp_rank].inner)

    # 获取分片信息
    partition = rollout_data.pop("partition")
    total_lengths = rollout_data["total_lengths"]

    # 保存序列长度(用于计时)
    Timer().seq_lens = total_lengths
    rollout_data["total_lengths"] = [total_lengths[i] for i in partition]

    return rollout_data

# slime/backends/megatron_utils/actor.py
class MegatronTrainRayActor:
    def _get_rollout_data(self, rollout_data_ref):
        rollout_data = process_rollout_data(
            self.args,
            rollout_data_ref,
            mpu.get_data_parallel_rank(with_context_parallel=False),
            mpu.get_data_parallel_world_size(with_context_parallel=False),
        )

        # 将 tokens 移动到 GPU
        rollout_data["tokens"] = [
            torch.tensor(t, dtype=torch.long, device=torch.cuda.current_device()) 
            for t in rollout_data["tokens"]
        ]

        return rollout_data

数据流完整路径

┌─────────────────────────────────────────────────────────────────────────────┐
│                           数据流完整路径                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌──────────────────────────────────────────────────────────────────┐      │
│  │                    RolloutManager.generate()                      │      │
│  │                              │                                    │      │
│  │                              │ call_rollout_fn()                  │      │
│  │                              ▼                                    │      │
│  │  ┌──────────────────────────────────────────────────────────┐    │      │
│  │  │              generate_rollout (用户自定义函数)            │    │      │
│  │  │                                                          │    │      │
│  │  │  ┌──────────────┐                     ┌──────────────┐  │    │      │
│  │  │  │ DataSource   │  get_samples()      │ Sample       │  │    │      │
│  │  │  │ (prompt数据) │ ─────────────────>  │ (prompt)     │  │    │      │
│  │  │  └──────────────┘                     └──────────────┘  │    │      │
│  │  │                                              │           │    │      │
│  │  │                                              │ submit_generate_tasks()
│  │  │                                              ▼           │    │      │
│  │  │         ┌─────────────────────┐              ┌─────────────────────┐
│  │  │         │ SGLang Engine       │  生成完成     │ Sample (completed)  │
│  │  │         │ (推理生成)          │ ──────────>  │ (tokens, response,  │
│  │  │         └─────────────────────┘              │  reward, log_probs) │
│  │  │                                              └─────────────────────┘
│  │  └──────────────────────────────────────────────────────────────────┘    │
│  │                              │                                    │      │
│  │                              │ 返回 RolloutFnTrainOutput           │      │
│  │                              ▼                                    │      │
│  │  ┌─────────────────────────────────────────────────────────────┐ │      │
│  │  │ _convert_samples_to_train_data()                            │ │      │
│  │  │  - reward 后处理(group normalization)                      │ │      │
│  │  │  - 构建 train_data 字典                                     │ │      │
│  │  └─────────────────────────────────────────────────────────────┘ │      │
│  │  ┌─────────────────────────────────────────────────────────────┐ │      │
│  │  │ _split_train_data_by_dp()                                   │ │      │
│  │  │  - 按序列长度均衡分片                                        │ │      │
│  │  │  - ray.put() 放入 Object Store                              │ │      │
│  │  └─────────────────────────────────────────────────────────────┘ │      │
│  └──────────────────────────────────────────────────────────────────┘      │
│         │                                                                   │
│         │ ray.put() -> List[ObjectRef]                                     │
│         ▼                                                                   │
│  ┌──────────────────────────────────────────────────────────────────┐      │
│  │                    Ray Object Store                               │      │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │      │
│  │  │  DP rank 0 │ │  DP rank 1 │ │  DP rank 2 │ │  DP rank N │    │      │
│  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │      │
│  └──────────────────────────────────────────────────────────────────┘      │
│         │                                                                   │
│         │ ray.get() per DP rank                                            │
│         ▼                                                                   │
│  ┌──────────────────────────────────────────────────────────────────┐      │
│  │                    TrainRayActor (per DP rank)                    │      │
│  │  ┌─────────────────────────────────────────────────────────────┐ │      │
│  │  │ _get_rollout_data()                                         │ │      │
│  │  │  - 从 Object Store 获取数据                                  │ │      │
│  │  │  - tokens 移动到 GPU                                         │ │      │
│  │  └─────────────────────────────────────────────────────────────┘ │      │
│  │  ┌─────────────────────────────────────────────────────────────┐ │      │
│  │  │ train_actor()                                               │ │      │
│  │  │  - 创建 DataIterator                                        │ │      │
│  │  │  - 计算 log_probs (ref + actor)                             │ │      │
│  │  │  - 计算 advantages/returns                                  │ │      │
│  │  │  - 执行训练步骤                                              │ │      │
│  │  └─────────────────────────────────────────────────────────────┘ │      │
│  └──────────────────────────────────────────────────────────────────┘      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

高级特性

slime支持以下高级特性(仅列举一部分):

  • 在策略蒸馏 (On-Policy Distillation):在策略蒸馏 (OPD) 让学生模型在自己的 rollout 数据上训练,同时匹配教师模型的 token 级 log-probability,从而实现从大模型到小模型的知识传递。
  • 投机采样:加速 rollout 的重要优化手段。推理过程中不再让昂贵的 Target Model 逐个 token 进行 decode,而是先由一个轻量级的 draft model 先进行 decode,生成多个 token 后,再由大模型进行批量验证。
  • 低精度训练:FP8 推理与 BF16 训练、FP8 推理与 FP8 训练、INT4 QAT 训练
  • 可重现性 (Reproducibility):通过结合 SGLang 的确定性推理和 Megatron-LM 的确定性模式,slime 支持按位实验重现。
  • rollout 容灾:slime 会在 rollout 过程中,定期向所有 SGLang server 发送心跳请求(/health_generate),如果心跳超时,则会停止这个 SGLang server。并在这轮 rollout 完成之后进行重启和正确的参数更新。
  • PD 分离:可以通过设置 --prefill-num-servers 参数来指定用于 Prefill 的服务器数量。
  • Full Async:创建一个全局工作线程,在后台保持运行,不断拉取 Prompt 并启动生成任务。
  • Retool: from SFT to RL,使用 retool 功能进行支持工具的语言模型生成。
  • Multi-Agent RL

核心优势

  • 工业级的"训推一体"架构: 通过 Ray Actor 模型,将专为大规模设计的离线训练引擎(Megatron-LM)和专为低延迟、高吞吐量设计的在线推理引擎(SGLang)无缝地"粘合"在一起。

  • 高效的"知识"流动闭环: 框架实现了 RL 的核心飞轮:生成 -> 训练 -> 同步

    • 数据流: 通过 RolloutManager 和 Ray Object Store,实现了从"探索"到"学习"的高效数据回传。
    • 权重流: 通过自适应的权重同步机制(Co-located Gloo IPC 和 Distributed NCCL Broadcast),实现了从"学习"到"行动"的低延迟知识更新。
  • 高度的灵活性与可扩展性:

    • 可插拔的探索与奖励: 通过配置文件指定 rollout_function_pathcustom_rm_path,用户可以轻松自定义 Agent 的探索行为和价值判断标准。
    • 模型无关性: 通过 slime_pluginsmegatron_to_hf 模块,支持 LLaMA、Qwen、GLM、DeepSeek 等主流模型。
  • 完善的容错机制: 通过 RolloutHealthMonitor 实现推理引擎的健康监控和自动恢复,保证大规模训练的稳定性。

  • 丰富的优化技术:

    • Routing Replay: 复用推理阶段的 MoE 专家路由决策
    • True On-Policy Mode: 消除训练和推理之间的数值差异
    • Off-Policy 修正: 支持 TIS、OPSM、ICEPOP 等技术
    • Speculative Decoding: 推测解码支持和统计

NPU 适配

Slime

  1. common.py 新增 is_npu 函数,用于判断是否是 NPU,走入特定 NPU 处理。
  2. Ray 的资源分配:由于不支持 ray.remote(num_gpus=1 ...) 的形式,需要改造资源分配的方式,通过入参 resources 来分配资源。
  3. Ray 不支持 get_gpu_ids() 函数:修改为等价实现,由于修改后返回值是 str 类型,因此还要进行 int 类型转换。
  4. 手动转换部分
    • hccl 手动修改 nccl
    • cuda 手动修改 npu
    • CUDA_VISIBLE_DEVICES 修改为 ASCEND_RT_VISIBLE_DEVICES
  5. Megatron 要用上 mindspeed 的插件仓,添加相关代码。
  6. megatron_bridge 的 model_provider 需要注入 mindspeed 的额外参数:使用 megatron_bridge 加载权重的方式,model_provider 如果没有额外注入,无法识别到 mindspeed 的参数,因此需要额外逻辑注入参数。

Megatron

  1. @jit_fuser 修改:将@jit_fuser 修饰符全部去除。
  2. 手动转换,修改 cuda 为 npu。

Megatron-Bridge

  1. Mindspeed 对 te 模块打 patch 有额外前缀,导致分支未走入,后续会去掉前缀,目前 PR 已合入。
  2. 修 megatron-bridge 的重计算 bug

MindSpeed

  1. mindspeed 落后 megatron 版本,入参有对不齐的问题。
  2. slime 传入 args 格式与 mindspeed 的格式不符,需要额外转换一遍。

Sglang 精度修复

  1. 修改了 qwen3_vl.py 中 vision model 中 fast_pos_embed_interpolate
  2. 回退了 input_deepstack_embeds 的修改
  3. attention_backend 里面走 fia 算子

参考

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐