MinT 快速上手
MinT 是什么?
MinT(Mind Lab Toolkit) 是一套开放的语言模型训练基础设施,支持:
- SFT(Supervised Fine-Tuning):从标注样本学习(输入 → 输出对)
- RL(Reinforcement Learning):从 reward 信号学习(试错)
MinT 使用 LoRA(Low-Rank Adaptation) 进行高效 fine-tune,无需更新全部模型参数。
本教程内容
我们将训练一个模型来解决乘法问题,分两个阶段:
- 阶段 1(SFT):用标注样本教模型做乘法
- 阶段 2(RL):加载 SFT 模型,用 reward 信号继续优化
完整工作流:SFT → 保存 → 加载 → RL
前置条件
- Python >= 3.11
- MinT API key
Step 0:安装
从 git 仓库安装 MinT SDK:
pip install git+https://github.com/MindLab-Research/mindlab-toolkit.git python-dotenv matplotlib numpyStep 1:配置 API Key
MinT 需要 API key 进行认证。两种配置方式:
方式 A:使用 .env 文件(推荐)
在项目目录创建 .env 文件:
MINT_API_KEY=sk-your-api-key-here方式 B:直接设置环境变量
import os
os.environ['MINT_API_KEY'] = 'sk-your-api-key-here'安全提示:不要将 API key 提交到版本控制。将 .env 加入 .gitignore。
使用 Tinker SDK
如果你有使用 Tinker SDK 的现有代码,可以通过设置环境变量连接到 MinT:
pip install tinkerTINKER_BASE_URL=<your-region-endpoint>
TINKER_API_KEY=<your-mint-api-key>按所在区域选择 MinT 域名:
- 境内:
https://mint-cn.macaron.xin/ - 境外:
https://mint.macaron.xin/
注意:使用你的 MinT API key(以 sk- 开头)。当前 mindlab-toolkit 依赖 tinker>=0.15.0,并会在 import mint 时打上 MinT 兼容补丁。对普通 Tinker 风格代码,最省事的迁移方式是 import mint as tinker;如果必须保留原样的 import tinker,请先在同一进程里 import mint,再构造 Tinker client。已知差异与计划更新请见 Tinker 兼容性。
如果你想看 SFT vs RL、域名选择和 API key 获取方式,见 FAQ。
HuggingFace 镜像
如果访问 HuggingFace 有网络问题,在 import mint 之前设置镜像端点:
import os
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"
import mint # 必须在设置 HF_ENDPOINT 之后Step 2:连接 MinT Server
ServiceClient 是 MinT 的入口,负责:
- Server 认证
- 创建 training 和 sampling client
- 查询可用模型
import mint
# 创建 service client
service_client = mint.ServiceClient()
# 列出可用模型
print("Connected to MinT server!")
capabilities = service_client.get_server_capabilities()
for model in capabilities.supported_models:
print(f" - {model.model_name}")核心概念
训练前,先了解关键组件:
| 组件 | 用途 |
|---|---|
TrainingClient | 管理 LoRA adapter,处理训练操作 |
SamplingClient | 从训练后的模型生成文本 |
Datum | 单个训练样本,包含 model_input 和 loss_fn_inputs |
训练循环模式
for each batch:
forward_backward() # 计算梯度
optim_step() # 更新 weightLoss 函数
- cross_entropy:用于 SFT,最大化正确 token 的概率
- importance_sampling:用于 RL,按 advantage 加权更新
阶段 1:监督 Fine-Tuning(SFT)
目标:用标注样本教模型做两位数乘法。
SFT 原理:
- 展示输入-输出对
- 模型学习根据输入预测输出
- Loss = 模型对正确答案的"惊讶程度"
输入: "Question: What is 47 * 83?\nAnswer:"
输出: " 3901"Step 3:创建 Training Client
使用 Qwen/Qwen3-0.6B——小巧但足够用于学习。
LoRA 参数:
rank=16:低秩矩阵大小(越大容量越大,速度越慢)train_mlp=True:训练 feed-forward 层train_attn=True:训练 attention 层train_unembed=True:训练输出投影层
from mint import types
BASE_MODEL = "Qwen/Qwen3-0.6B"
# 创建 training client,配置 LoRA
training_client = service_client.create_lora_training_client(
base_model=BASE_MODEL,
rank=16, # LoRA rank - 控制 adapter 容量
train_mlp=True, # 训练 MLP(feed-forward)层
train_attn=True, # 训练 attention 层
train_unembed=True, # 训练输出投影层
)
# 获取 tokenizer - 文本与 token ID 互转
tokenizer = training_client.get_tokenizer()Step 4:准备训练数据
将样本转换为 MinT 可处理的 Datum 对象。
关键概念 - Weights:
weight=0:不计算该 token 的 loss(prompt 部分)weight=1:计算该 token 的 loss(需要学习的答案)
import random
from mint import types
random.seed(42)
def generate_sft_examples(n=100):
"""生成两位数乘法样本。"""
examples = []
for _ in range(n):
a = random.randint(10, 99)
b = random.randint(10, 99)
examples.append({
"question": f"What is {a} * {b}?",
"answer": str(a * b)
})
return examples
def process_sft_example(example: dict, tokenizer) -> types.Datum:
"""将训练样本转换为 Datum。"""
prompt = f"Question: {example['question']}\nAnswer:"
completion = f" {example['answer']}"
# 分别 tokenize prompt 和 completion
prompt_tokens = tokenizer.encode(prompt, add_special_tokens=True)
completion_tokens = tokenizer.encode(completion, add_special_tokens=False)
# 添加 EOS token,让模型学会何时停止
completion_tokens = completion_tokens + [tokenizer.eos_token_id]
# 创建 weights:prompt 为 0,completion 为 1
prompt_weights = [0] * len(prompt_tokens)
completion_weights = [1] * len(completion_tokens)
all_tokens = prompt_tokens + completion_tokens
all_weights = prompt_weights + completion_weights
# next-token prediction:偏移 1 位
input_tokens = all_tokens[:-1]
target_tokens = all_tokens[1:]
weights = all_weights[1:]
return types.Datum(
model_input=types.ModelInput.from_ints(tokens=input_tokens),
loss_fn_inputs={
"target_tokens": target_tokens,
"weights": weights
}
)
sft_examples = generate_sft_examples(100)
sft_data = [process_sft_example(ex, tokenizer) for ex in sft_examples]Step 5:训练模型(SFT)
训练循环:
- forward_backward():计算 loss 和梯度
- optim_step():用 Adam 更新 weight
NUM_SFT_STEPS = 10
SFT_LEARNING_RATE = 5e-5
for step in range(NUM_SFT_STEPS):
fwdbwd_result = training_client.forward_backward(
data=sft_data,
loss_fn="cross_entropy"
).result()
training_client.optim_step(
types.AdamParams(learning_rate=SFT_LEARNING_RATE)
).result()Step 6:测试 SFT 模型
import re
def extract_answer(response: str) -> str | None:
"""从 response 中提取第一个数字答案。"""
numbers = re.findall(r'\d+', response)
return numbers[0] if numbers else None
# 保存 weight 并创建 sampling client
sft_sampling_client = training_client.save_weights_and_get_sampling_client(
name='arithmetic-sft'
)
# 测试
prompt = "Question: What is 23 * 47?\nAnswer:"
prompt_tokens = types.ModelInput.from_ints(tokenizer.encode(prompt))
result = sft_sampling_client.sample(
prompt=prompt_tokens,
num_samples=1,
sampling_params=types.SamplingParams(
max_tokens=16,
temperature=0.0,
stop_token_ids=[tokenizer.eos_token_id]
)
).result()
response = tokenizer.decode(result.sequences[0].tokens)
print(f"Q: What is 23 * 47?")
print(f"A: {response.strip()}")Step 7:保存 Checkpoint
保存 SFT 模型,以便加载后用 RL 继续训练。
sft_checkpoint = training_client.save_state(name="arithmetic-sft-checkpoint").result()
print(f"Checkpoint saved to: {sft_checkpoint.path}")阶段 2:强化学习(RL)
目标:加载 SFT 模型,用 reward 信号进一步优化。
RL 与 SFT 的区别:
- SFT:"这是正确答案,学它"
- RL:"尝试不同答案,我告诉你对不对"
RL 工作流:
1. 每个问题采样多个 response(探索)
2. 计算 reward(1.0 = 正确,0.0 = 错误)
3. 计算 advantage = reward - mean_reward
4. 用 importance_sampling loss 训练Step 8:继续用 RL 训练
rl_training_client = training_client # 从 SFT 继续Step 9:定义 Reward 函数
def generate_rl_problem():
"""生成更难的问题(10-199 vs SFT 的 10-99)。"""
a = random.randint(10, 199)
b = random.randint(10, 199)
return f"What is {a} * {b}?", str(a * b)
def compute_reward(response: str, correct_answer: str) -> float:
"""Reward 函数:正确 1.0,错误 0.0。"""
extracted = extract_answer(response)
return 1.0 if extracted == correct_answer else 0.0Step 10:RL 训练循环
import torch
from mint import TensorData
NUM_RL_STEPS = 10
BATCH_SIZE = 8
GROUP_SIZE = 8
RL_LEARNING_RATE = 2e-5
for step in range(NUM_RL_STEPS):
# 保存 weight 用于采样
sampling_path = rl_training_client.save_weights_for_sampler(
name=f"rl-step-{step}"
).result().path
rl_sampling_client = service_client.create_sampling_client(
model_path=sampling_path,
base_model=BASE_MODEL
)
problems = [generate_rl_problem() for _ in range(BATCH_SIZE)]
training_datums = []
for question, answer in problems:
prompt_text = f"Question: {question}\nAnswer:"
prompt_tokens = tokenizer.encode(prompt_text)
prompt_input = types.ModelInput.from_ints(prompt_tokens)
# 采样多个 response
sample_result = rl_sampling_client.sample(
prompt=prompt_input,
num_samples=GROUP_SIZE,
sampling_params=types.SamplingParams(
max_tokens=16,
temperature=0.7,
stop_token_ids=[tokenizer.eos_token_id]
)
).result()
# 计算 reward 和 advantage
group_rewards = []
for seq in sample_result.sequences:
response_text = tokenizer.decode(seq.tokens)
reward = compute_reward(response_text, answer)
group_rewards.append(reward)
mean_reward = sum(group_rewards) / len(group_rewards)
advantages = [r - mean_reward for r in group_rewards]
# 创建带 advantage 的训练 datum
for seq, adv in zip(sample_result.sequences, advantages):
if len(seq.tokens) == 0 or adv == 0:
continue
full_tokens = prompt_tokens + list(seq.tokens)
input_tokens = full_tokens[:-1]
target_tokens = full_tokens[1:]
weights = [0.0] * (len(prompt_tokens) - 1) + [1.0] * len(seq.tokens)
logprobs = [0.0] * (len(prompt_tokens) - 1) + list(seq.logprobs or [0.0] * len(seq.tokens))
full_advantages = [0.0] * (len(prompt_tokens) - 1) + [adv] * len(seq.tokens)
datum = types.Datum(
model_input=types.ModelInput.from_ints(tokens=input_tokens),
loss_fn_inputs={
"target_tokens": TensorData.from_torch(torch.tensor(target_tokens, dtype=torch.int64)),
"weights": TensorData.from_torch(torch.tensor(weights, dtype=torch.float32)),
"logprobs": TensorData.from_torch(torch.tensor(logprobs, dtype=torch.float32)),
"advantages": TensorData.from_torch(torch.tensor(full_advantages, dtype=torch.float32)),
},
)
training_datums.append(datum)
# 训练
if training_datums:
rl_training_client.forward_backward(
training_datums,
loss_fn="importance_sampling"
).result()
rl_training_client.optim_step(
types.AdamParams(learning_rate=RL_LEARNING_RATE)
).result()Step 11:测试最终模型
final_path = rl_training_client.save_weights_for_sampler(name="arithmetic-rl-final").result().path
final_client = service_client.create_sampling_client(
model_path=final_path,
base_model=BASE_MODEL
)
# 在更难的问题上测试
test_problems = [
("What is 123 * 45?", "5535"),
("What is 67 * 189?", "12663"),
]
for question, correct in test_problems:
prompt = f"Question: {question}\nAnswer:"
prompt_input = types.ModelInput.from_ints(tokenizer.encode(prompt))
result = final_client.sample(
prompt=prompt_input,
num_samples=1,
sampling_params=types.SamplingParams(
max_tokens=16,
temperature=0.0,
stop_token_ids=[tokenizer.eos_token_id]
)
).result()
response = tokenizer.decode(result.sequences[0].tokens)
extracted = extract_answer(response)
print(f"Q: {question} → A: {response.strip()} (correct: {correct})")总结
训练 Pipeline
| 阶段 | 方法 | Loss 函数 | 用途 |
|---|---|---|---|
| 1 | SFT | cross_entropy | 用标注样本教乘法 |
| 2 | RL | importance_sampling | 用 reward 信号优化 |
核心 API
# 初始化
service_client = mint.ServiceClient()
training_client = service_client.create_lora_training_client(base_model=...)
# 训练
training_client.forward_backward(data, loss_fn) # 计算梯度
training_client.optim_step(adam_params) # 更新 weight
# Checkpoint
checkpoint = training_client.save_state(name)
resumed = service_client.create_lora_training_client(
base_model=BASE_MODEL,
rank=16,
train_mlp=True,
train_attn=True,
train_unembed=True,
)
resumed.load_state_with_optimizer(checkpoint.path).result()
# 推理
sampling_client = training_client.save_weights_and_get_sampling_client(name)
sampling_client.sample(prompt, num_samples, sampling_params)下一步
- 如果你想看“客户端自定义 reward + partial credit”的独立示例,见 Custom Reward 和
quickstart/custom_reward.py。 - 如果你想看基于
forward_backward_custom的成对偏好训练,见 Loss Functions 和quickstart/custom_loss.py。