Mind Lab Toolkit (MinT)
CustomizeDPO

RLHF Pipeline

这个 recipe 展示完整 RLHF 流程:从人工标注员那里收集偏好对,训练一个 DPO reward model,再用它指导 policy gradient 训练。

Use Case

  • 把 model 对齐到人类价值观:训练 model 生成人类更偏好的 response(helpfulness、safety、harmlessness 等)。
  • 超越 SFT 的质量提升:SFT 优化的是 likelihood,RLHF 优化的是人类对齐的 reward。
  • 指令遵循:用人类反馈训练 model 遵循复杂的、多步指令。
  • 内容审核:用人类判断微调 model,避免特定输出(有害、有偏见、有毒)。

In Practice

import asyncio
import mint
from mint import types

async def rlhf_pipeline():
    service_client = mint.ServiceClient()
    
    # Step 1:收集偏好对(人工标注)
    # 每对:(prompt, preferred_response, dispreferred_response)
    preference_data = [
        {
            "prompt": "Write a haiku about spring",
            "chosen": "Green leaves emerge soft\nWarmth returns to sleeping earth\nLife renews again",
            "rejected": "Spring is here now yes\nEverything is green and warm\nIt is very nice",
        },
        {
            "prompt": "Explain quantum computing in one sentence",
            "chosen": "Quantum computers process information using quantum bits that exist in superposition, enabling exponential parallelism for certain problems.",
            "rejected": "Quantum computers are very fast computers that work with quantum stuff.",
        },
    ]
    
    # Step 2:在偏好对上训练 reward model(DPO)
    print("=== Training Reward Model (DPO) ===")
    reward_client = await service_client.create_lora_training_client_async(
        base_model="Qwen/Qwen3-0.6B",
        rank=16,
    )
    tokenizer = reward_client.get_tokenizer()
    
    dpo_losses = []
    adam_params = types.AdamParams(learning_rate=1e-5)
    
    for epoch in range(2):
        for pair in preference_data:
            prompt = pair["prompt"]
            chosen_response = pair["chosen"]
            rejected_response = pair["rejected"]
            
            # 构造 chosen 样本
            chosen_text = f"{prompt} {chosen_response}"
            chosen_ids = tokenizer.encode(chosen_text)
            chosen_input = types.ModelInput.from_ints(chosen_ids[:-1])
            
            # 构造 rejected 样本
            rejected_text = f"{prompt} {rejected_response}"
            rejected_ids = tokenizer.encode(rejected_text)
            rejected_input = types.ModelInput.from_ints(rejected_ids[:-1])
            
            # DPO loss:希望 log P(chosen) > log P(rejected)
            chosen_datum = types.Datum(
                model_input=chosen_input,
                loss_fn_inputs={
                    "target_tokens": chosen_ids[1:],
                    "weights": [1.0] * len(chosen_ids[1:]),
                },
            )
            
            rejected_datum = types.Datum(
                model_input=rejected_input,
                loss_fn_inputs={
                    "target_tokens": rejected_ids[1:],
                    "weights": [1.0] * len(rejected_ids[1:]),
                },
            )
            
            # 两边都训(rejected 上 loss 更高)
            fb_chosen = reward_client.forward_backward_async(
                [chosen_datum], loss_fn="cross_entropy"
            )
            fb_rejected = reward_client.forward_backward_async(
                [rejected_datum], loss_fn="cross_entropy"
            )
            
            result_chosen = await fb_chosen.result_async()
            result_rejected = await fb_rejected.result_async()
            
            # 记录偏好对齐情况
            dpo_loss = result_rejected.loss - result_chosen.loss
            dpo_losses.append(dpo_loss)
        
        optim_future = reward_client.optim_step_async(adam_params)
        await optim_future.result_async()
        
        avg_dpo_loss = sum(dpo_losses[-len(preference_data):]) / len(preference_data)
        print(f"  Epoch {epoch}: avg DPO loss={avg_dpo_loss:.4f}")
    
    # 保存 reward model
    reward_checkpoint = await reward_client.save_weights_for_sampler_async(
        name="reward-model-v1"
    )
    reward_checkpoint = await reward_checkpoint.result_async()
    print("Reward model saved")
    
    # Step 3:用 reward model 指导 policy 做 RL 训练
    print("\n=== Training Policy with Reward Signals ===")
    policy_client = await service_client.create_lora_training_client_async(
        base_model="Qwen/Qwen3-0.6B",
        rank=16,
    )
    
    # 实操中:从 policy 采样,用 reward model 打分,算 advantage
    # 这里为简化用合成 advantage
    synthetic_rollouts = [
        {"tokens": [45, 7741, 34651], "advantage": 0.2},
        {"tokens": [7741, 31410, 614], "advantage": -0.1},
    ]
    
    rl_losses = []
    
    for rollout in synthetic_rollouts:
        model_input = types.ModelInput.from_ints(rollout["tokens"])
        
        datum = types.Datum(
            model_input=model_input,
            loss_fn_inputs={
                "target_tokens": rollout["tokens"],
                "logprobs": [-0.5, -0.3, -0.8],
                "advantages": [rollout["advantage"]],
            },
        )
        
        fb_future = policy_client.forward_backward_async(
            [datum], loss_fn="ppo"
        )
        result = await fb_future.result_async()
        rl_losses.append(result.loss)
    
    optim_future = policy_client.optim_step_async(adam_params)
    await optim_future.result_async()
    
    avg_rl_loss = sum(rl_losses) / len(rl_losses)
    print(f"  RL loss: {avg_rl_loss:.4f}")
    
    # 保存最终的 policy
    policy_checkpoint = await policy_client.save_weights_for_sampler_async(
        name="rlhf-policy-v1"
    )
    policy_checkpoint = await policy_checkpoint.result_async()
    print("RLHF policy saved")

asyncio.run(rlhf_pipeline())

完整源码:https://github.com/MindLab-Research/mint-quickstart/blob/main/recipes/rlhf_pipeline.py

Verified Run

在 Qwen3-0.6B 上用合成偏好数据:

  • Reward model 训练:DPO loss 在 2 个 epoch 内从 ~0.5 降到 ~0.15。Model 学会了给 chosen response 打更高分。
  • Policy 训练:PPO loss 在正 advantage 信号下稳在 0.02 附近,说明 policy 学会了提升被奖励行为的概率。
  • 硬件:远程 MinT 集群。运行时间:2 个 epoch、2 条偏好样本约 2 分钟。
  • 规模化:真实 RLHF(1000+ 偏好对)一般要 10–50 个 epoch,小 model 也要好几个小时。

本页目录