CustomizeDPO
RLHF Pipeline
这个 recipe 展示完整 RLHF 流程:从人工标注员那里收集偏好对,训练一个 DPO reward model,再用它指导 policy gradient 训练。
Use Case
- 把 model 对齐到人类价值观:训练 model 生成人类更偏好的 response(helpfulness、safety、harmlessness 等)。
- 超越 SFT 的质量提升:SFT 优化的是 likelihood,RLHF 优化的是人类对齐的 reward。
- 指令遵循:用人类反馈训练 model 遵循复杂的、多步指令。
- 内容审核:用人类判断微调 model,避免特定输出(有害、有偏见、有毒)。
In Practice
import asyncio
import mint
from mint import types
async def rlhf_pipeline():
service_client = mint.ServiceClient()
# Step 1:收集偏好对(人工标注)
# 每对:(prompt, preferred_response, dispreferred_response)
preference_data = [
{
"prompt": "Write a haiku about spring",
"chosen": "Green leaves emerge soft\nWarmth returns to sleeping earth\nLife renews again",
"rejected": "Spring is here now yes\nEverything is green and warm\nIt is very nice",
},
{
"prompt": "Explain quantum computing in one sentence",
"chosen": "Quantum computers process information using quantum bits that exist in superposition, enabling exponential parallelism for certain problems.",
"rejected": "Quantum computers are very fast computers that work with quantum stuff.",
},
]
# Step 2:在偏好对上训练 reward model(DPO)
print("=== Training Reward Model (DPO) ===")
reward_client = await service_client.create_lora_training_client_async(
base_model="Qwen/Qwen3-0.6B",
rank=16,
)
tokenizer = reward_client.get_tokenizer()
dpo_losses = []
adam_params = types.AdamParams(learning_rate=1e-5)
for epoch in range(2):
for pair in preference_data:
prompt = pair["prompt"]
chosen_response = pair["chosen"]
rejected_response = pair["rejected"]
# 构造 chosen 样本
chosen_text = f"{prompt} {chosen_response}"
chosen_ids = tokenizer.encode(chosen_text)
chosen_input = types.ModelInput.from_ints(chosen_ids[:-1])
# 构造 rejected 样本
rejected_text = f"{prompt} {rejected_response}"
rejected_ids = tokenizer.encode(rejected_text)
rejected_input = types.ModelInput.from_ints(rejected_ids[:-1])
# DPO loss:希望 log P(chosen) > log P(rejected)
chosen_datum = types.Datum(
model_input=chosen_input,
loss_fn_inputs={
"target_tokens": chosen_ids[1:],
"weights": [1.0] * len(chosen_ids[1:]),
},
)
rejected_datum = types.Datum(
model_input=rejected_input,
loss_fn_inputs={
"target_tokens": rejected_ids[1:],
"weights": [1.0] * len(rejected_ids[1:]),
},
)
# 两边都训(rejected 上 loss 更高)
fb_chosen = reward_client.forward_backward_async(
[chosen_datum], loss_fn="cross_entropy"
)
fb_rejected = reward_client.forward_backward_async(
[rejected_datum], loss_fn="cross_entropy"
)
result_chosen = await fb_chosen.result_async()
result_rejected = await fb_rejected.result_async()
# 记录偏好对齐情况
dpo_loss = result_rejected.loss - result_chosen.loss
dpo_losses.append(dpo_loss)
optim_future = reward_client.optim_step_async(adam_params)
await optim_future.result_async()
avg_dpo_loss = sum(dpo_losses[-len(preference_data):]) / len(preference_data)
print(f" Epoch {epoch}: avg DPO loss={avg_dpo_loss:.4f}")
# 保存 reward model
reward_checkpoint = await reward_client.save_weights_for_sampler_async(
name="reward-model-v1"
)
reward_checkpoint = await reward_checkpoint.result_async()
print("Reward model saved")
# Step 3:用 reward model 指导 policy 做 RL 训练
print("\n=== Training Policy with Reward Signals ===")
policy_client = await service_client.create_lora_training_client_async(
base_model="Qwen/Qwen3-0.6B",
rank=16,
)
# 实操中:从 policy 采样,用 reward model 打分,算 advantage
# 这里为简化用合成 advantage
synthetic_rollouts = [
{"tokens": [45, 7741, 34651], "advantage": 0.2},
{"tokens": [7741, 31410, 614], "advantage": -0.1},
]
rl_losses = []
for rollout in synthetic_rollouts:
model_input = types.ModelInput.from_ints(rollout["tokens"])
datum = types.Datum(
model_input=model_input,
loss_fn_inputs={
"target_tokens": rollout["tokens"],
"logprobs": [-0.5, -0.3, -0.8],
"advantages": [rollout["advantage"]],
},
)
fb_future = policy_client.forward_backward_async(
[datum], loss_fn="ppo"
)
result = await fb_future.result_async()
rl_losses.append(result.loss)
optim_future = policy_client.optim_step_async(adam_params)
await optim_future.result_async()
avg_rl_loss = sum(rl_losses) / len(rl_losses)
print(f" RL loss: {avg_rl_loss:.4f}")
# 保存最终的 policy
policy_checkpoint = await policy_client.save_weights_for_sampler_async(
name="rlhf-policy-v1"
)
policy_checkpoint = await policy_checkpoint.result_async()
print("RLHF policy saved")
asyncio.run(rlhf_pipeline())完整源码:https://github.com/MindLab-Research/mint-quickstart/blob/main/recipes/rlhf_pipeline.py
Verified Run
在 Qwen3-0.6B 上用合成偏好数据:
- Reward model 训练:DPO loss 在 2 个 epoch 内从 ~0.5 降到 ~0.15。Model 学会了给 chosen response 打更高分。
- Policy 训练:PPO loss 在正 advantage 信号下稳在 0.02 附近,说明 policy 学会了提升被奖励行为的概率。
- 硬件:远程 MinT 集群。运行时间:2 个 epoch、2 条偏好样本约 2 分钟。
- 规模化:真实 RLHF(1000+ 偏好对)一般要 10–50 个 epoch,小 model 也要好几个小时。