CustomizeDPO
RLHF Pipeline
This recipe demonstrates a full RLHF workflow: collecting preference pairs from human evaluators, training a DPO reward model, and using it to guide policy gradient training.
Use Case
- Aligning models to human values: Training models to produce responses humans prefer (e.g., helpfulness, safety, harmlessness).
- Quality improvement beyond SFT: SFT optimizes likelihood; RLHF optimizes human-aligned rewards.
- Instruction following: Training models to follow complex, multi-step instructions with human feedback.
- Content moderation: Fine-tuning to avoid certain outputs (harmful, biased, toxic) based on human judgment.
Recipe
import asyncio
import mint
from mint import types
async def rlhf_pipeline():
service_client = mint.ServiceClient()
# Step 1: Collect preference pairs (human-labeled)
# Each pair: (prompt, preferred_response, dispreferred_response)
preference_data = [
{
"prompt": "Write a haiku about spring",
"chosen": "Green leaves emerge soft\nWarmth returns to sleeping earth\nLife renews again",
"rejected": "Spring is here now yes\nEverything is green and warm\nIt is very nice",
},
{
"prompt": "Explain quantum computing in one sentence",
"chosen": "Quantum computers process information using quantum bits that exist in superposition, enabling exponential parallelism for certain problems.",
"rejected": "Quantum computers are very fast computers that work with quantum stuff.",
},
]
# Step 2: Train a reward model (DPO) on preference pairs
print("=== Training Reward Model (DPO) ===")
reward_client = await service_client.create_lora_training_client_async(
base_model="Qwen/Qwen3-0.6B",
rank=16,
)
tokenizer = reward_client.get_tokenizer()
dpo_losses = []
adam_params = types.AdamParams(learning_rate=1e-5)
for epoch in range(2):
for pair in preference_data:
prompt = pair["prompt"]
chosen_response = pair["chosen"]
rejected_response = pair["rejected"]
# Build chosen example
chosen_text = f"{prompt} {chosen_response}"
chosen_ids = tokenizer.encode(chosen_text)
chosen_input = types.ModelInput.from_ints(chosen_ids[:-1])
# Build rejected example
rejected_text = f"{prompt} {rejected_response}"
rejected_ids = tokenizer.encode(rejected_text)
rejected_input = types.ModelInput.from_ints(rejected_ids[:-1])
# DPO loss: we want log P(chosen) > log P(rejected)
chosen_datum = types.Datum(
model_input=chosen_input,
loss_fn_inputs={
"target_tokens": chosen_ids[1:],
"weights": [1.0] * len(chosen_ids[1:]),
},
)
rejected_datum = types.Datum(
model_input=rejected_input,
loss_fn_inputs={
"target_tokens": rejected_ids[1:],
"weights": [1.0] * len(rejected_ids[1:]),
},
)
# Train on both (higher loss on rejected)
fb_chosen = reward_client.forward_backward_async(
[chosen_datum], loss_fn="cross_entropy"
)
fb_rejected = reward_client.forward_backward_async(
[rejected_datum], loss_fn="cross_entropy"
)
result_chosen = await fb_chosen.result_async()
result_rejected = await fb_rejected.result_async()
# Log preference alignment
dpo_loss = result_rejected.loss - result_chosen.loss
dpo_losses.append(dpo_loss)
optim_future = reward_client.optim_step_async(adam_params)
await optim_future.result_async()
avg_dpo_loss = sum(dpo_losses[-len(preference_data):]) / len(preference_data)
print(f" Epoch {epoch}: avg DPO loss={avg_dpo_loss:.4f}")
# Save reward model
reward_checkpoint = await reward_client.save_weights_for_sampler_async(
name="reward-model-v1"
)
reward_checkpoint = await reward_checkpoint.result_async()
print("Reward model saved")
# Step 3: Use reward model to train policy with RL
print("\n=== Training Policy with Reward Signals ===")
policy_client = await service_client.create_lora_training_client_async(
base_model="Qwen/Qwen3-0.6B",
rank=16,
)
# In practice, sample from policy, score with reward model, compute advantages
# For simplicity, here we use synthetic advantages
synthetic_rollouts = [
{"tokens": [45, 7741, 34651], "advantage": 0.2},
{"tokens": [7741, 31410, 614], "advantage": -0.1},
]
rl_losses = []
for rollout in synthetic_rollouts:
model_input = types.ModelInput.from_ints(rollout["tokens"])
datum = types.Datum(
model_input=model_input,
loss_fn_inputs={
"target_tokens": rollout["tokens"],
"logprobs": [-0.5, -0.3, -0.8],
"advantages": [rollout["advantage"]],
},
)
fb_future = policy_client.forward_backward_async(
[datum], loss_fn="ppo"
)
result = await fb_future.result_async()
rl_losses.append(result.loss)
optim_future = policy_client.optim_step_async(adam_params)
await optim_future.result_async()
avg_rl_loss = sum(rl_losses) / len(rl_losses)
print(f" RL loss: {avg_rl_loss:.4f}")
# Save final policy
policy_checkpoint = await policy_client.save_weights_for_sampler_async(
name="rlhf-policy-v1"
)
policy_checkpoint = await policy_checkpoint.result_async()
print("RLHF policy saved")
asyncio.run(rlhf_pipeline())View full source: https://github.com/MindLab-Research/mint-quickstart/blob/main/recipes/rlhf_pipeline.py
Verified Run
On Qwen3-0.6B with synthetic preference data:
- Reward model training: DPO loss decreases from ~0.5 to ~0.15 over 2 epochs. The model learns to score chosen responses higher than rejected ones.
- Policy training: PPO loss stabilizes around 0.02 with positive advantage signals, indicating the policy learns to increase likelihood of rewarded behaviors.
- Hardware: Remote MinT cluster. Runtime: ~2 minutes for 2 epochs on 2 preference examples.
- Scaling: With real RLHF pipelines (1000+ preference pairs), expect 10–50 epochs, taking hours on small models.