Mind Lab Toolkit (MinT)

MinT 快速上手

MinT 是什么?

MinT(Mind Lab Toolkit) 是一套开放的语言模型训练基础设施,支持:

  • SFT(Supervised Fine-Tuning):从标注样本学习(输入 → 输出对)
  • RL(Reinforcement Learning):从 reward 信号学习(试错)

MinT 使用 LoRA(Low-Rank Adaptation) 进行高效 fine-tune,无需更新全部模型参数。

本教程内容

我们将训练一个模型来解决乘法问题,分两个阶段:

  1. 阶段 1(SFT):用标注样本教模型做乘法
  2. 阶段 2(RL):加载 SFT 模型,用 reward 信号继续优化

完整工作流:SFT → 保存 → 加载 → RL

前置条件

  • Python >= 3.11
  • MinT API key

Step 0:安装

从 git 仓库安装 MinT SDK:

pip install git+https://github.com/MindLab-Research/mindlab-toolkit.git python-dotenv matplotlib numpy

Step 1:配置 API Key

MinT 需要 API key 进行认证。两种配置方式:

方式 A:使用 .env 文件(推荐)

在项目目录创建 .env 文件:

MINT_API_KEY=sk-your-api-key-here

方式 B:直接设置环境变量

import os
os.environ['MINT_API_KEY'] = 'sk-your-api-key-here'

安全提示:不要将 API key 提交到版本控制。将 .env 加入 .gitignore

使用 Tinker SDK

如果你有使用 Tinker SDK 的现有代码,可以通过设置环境变量连接到 MinT:

pip install tinker
TINKER_BASE_URL=<your-region-endpoint>
TINKER_API_KEY=<your-mint-api-key>

按所在区域选择 MinT 域名:

  • 境内:https://mint-cn.macaron.xin/
  • 境外:https://mint.macaron.xin/

注意:使用你的 MinT API key(以 sk- 开头)。当前 mindlab-toolkit 依赖 tinker>=0.15.0,并会在 import mint 时打上 MinT 兼容补丁。对普通 Tinker 风格代码,最省事的迁移方式是 import mint as tinker;如果必须保留原样的 import tinker,请先在同一进程里 import mint,再构造 Tinker client。已知差异与计划更新请见 Tinker 兼容性

如果你想看 SFT vs RL、域名选择和 API key 获取方式,见 FAQ

HuggingFace 镜像

如果访问 HuggingFace 有网络问题,在 import mint 之前设置镜像端点:

import os
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"

import mint  # 必须在设置 HF_ENDPOINT 之后

Step 2:连接 MinT Server

ServiceClient 是 MinT 的入口,负责:

  • Server 认证
  • 创建 training 和 sampling client
  • 查询可用模型
import mint

# 创建 service client
service_client = mint.ServiceClient()

# 列出可用模型
print("Connected to MinT server!")
capabilities = service_client.get_server_capabilities()
for model in capabilities.supported_models:
    print(f"  - {model.model_name}")

核心概念

训练前,先了解关键组件:

组件用途
TrainingClient管理 LoRA adapter,处理训练操作
SamplingClient从训练后的模型生成文本
Datum单个训练样本,包含 model_inputloss_fn_inputs

训练循环模式

for each batch:
    forward_backward()  # 计算梯度
    optim_step()        # 更新 weight

Loss 函数

  • cross_entropy:用于 SFT,最大化正确 token 的概率
  • importance_sampling:用于 RL,按 advantage 加权更新

阶段 1:监督 Fine-Tuning(SFT)

目标:用标注样本教模型做两位数乘法。

SFT 原理

  1. 展示输入-输出对
  2. 模型学习根据输入预测输出
  3. Loss = 模型对正确答案的"惊讶程度"
输入:  "Question: What is 47 * 83?\nAnswer:"
输出:  " 3901"

Step 3:创建 Training Client

使用 Qwen/Qwen3-0.6B——小巧但足够用于学习。

LoRA 参数

  • rank=16:低秩矩阵大小(越大容量越大,速度越慢)
  • train_mlp=True:训练 feed-forward 层
  • train_attn=True:训练 attention 层
  • train_unembed=True:训练输出投影层
from mint import types

BASE_MODEL = "Qwen/Qwen3-0.6B"

# 创建 training client,配置 LoRA
training_client = service_client.create_lora_training_client(
    base_model=BASE_MODEL,
    rank=16,              # LoRA rank - 控制 adapter 容量
    train_mlp=True,       # 训练 MLP(feed-forward)层
    train_attn=True,      # 训练 attention 层
    train_unembed=True,   # 训练输出投影层
)

# 获取 tokenizer - 文本与 token ID 互转
tokenizer = training_client.get_tokenizer()

Step 4:准备训练数据

将样本转换为 MinT 可处理的 Datum 对象。

关键概念 - Weights

  • weight=0:不计算该 token 的 loss(prompt 部分)
  • weight=1:计算该 token 的 loss(需要学习的答案)
import random
from mint import types

random.seed(42)

def generate_sft_examples(n=100):
    """生成两位数乘法样本。"""
    examples = []
    for _ in range(n):
        a = random.randint(10, 99)
        b = random.randint(10, 99)
        examples.append({
            "question": f"What is {a} * {b}?",
            "answer": str(a * b)
        })
    return examples

def process_sft_example(example: dict, tokenizer) -> types.Datum:
    """将训练样本转换为 Datum。"""
    prompt = f"Question: {example['question']}\nAnswer:"
    completion = f" {example['answer']}"

    # 分别 tokenize prompt 和 completion
    prompt_tokens = tokenizer.encode(prompt, add_special_tokens=True)
    completion_tokens = tokenizer.encode(completion, add_special_tokens=False)

    # 添加 EOS token,让模型学会何时停止
    completion_tokens = completion_tokens + [tokenizer.eos_token_id]

    # 创建 weights:prompt 为 0,completion 为 1
    prompt_weights = [0] * len(prompt_tokens)
    completion_weights = [1] * len(completion_tokens)

    all_tokens = prompt_tokens + completion_tokens
    all_weights = prompt_weights + completion_weights

    # next-token prediction:偏移 1 位
    input_tokens = all_tokens[:-1]
    target_tokens = all_tokens[1:]
    weights = all_weights[1:]

    return types.Datum(
        model_input=types.ModelInput.from_ints(tokens=input_tokens),
        loss_fn_inputs={
            "target_tokens": target_tokens,
            "weights": weights
        }
    )

sft_examples = generate_sft_examples(100)
sft_data = [process_sft_example(ex, tokenizer) for ex in sft_examples]

Step 5:训练模型(SFT)

训练循环:

  1. forward_backward():计算 loss 和梯度
  2. optim_step():用 Adam 更新 weight
NUM_SFT_STEPS = 10
SFT_LEARNING_RATE = 5e-5

for step in range(NUM_SFT_STEPS):
    fwdbwd_result = training_client.forward_backward(
        data=sft_data,
        loss_fn="cross_entropy"
    ).result()

    training_client.optim_step(
        types.AdamParams(learning_rate=SFT_LEARNING_RATE)
    ).result()

Step 6:测试 SFT 模型

import re

def extract_answer(response: str) -> str | None:
    """从 response 中提取第一个数字答案。"""
    numbers = re.findall(r'\d+', response)
    return numbers[0] if numbers else None

# 保存 weight 并创建 sampling client
sft_sampling_client = training_client.save_weights_and_get_sampling_client(
    name='arithmetic-sft'
)

# 测试
prompt = "Question: What is 23 * 47?\nAnswer:"
prompt_tokens = types.ModelInput.from_ints(tokenizer.encode(prompt))

result = sft_sampling_client.sample(
    prompt=prompt_tokens,
    num_samples=1,
    sampling_params=types.SamplingParams(
        max_tokens=16,
        temperature=0.0,
        stop_token_ids=[tokenizer.eos_token_id]
    )
).result()

response = tokenizer.decode(result.sequences[0].tokens)
print(f"Q: What is 23 * 47?")
print(f"A: {response.strip()}")

Step 7:保存 Checkpoint

保存 SFT 模型,以便加载后用 RL 继续训练。

sft_checkpoint = training_client.save_state(name="arithmetic-sft-checkpoint").result()
print(f"Checkpoint saved to: {sft_checkpoint.path}")

阶段 2:强化学习(RL)

目标:加载 SFT 模型,用 reward 信号进一步优化。

RL 与 SFT 的区别

  • SFT:"这是正确答案,学它"
  • RL:"尝试不同答案,我告诉你对不对"

RL 工作流

1. 每个问题采样多个 response(探索)
2. 计算 reward(1.0 = 正确,0.0 = 错误)
3. 计算 advantage = reward - mean_reward
4. 用 importance_sampling loss 训练

Step 8:继续用 RL 训练

rl_training_client = training_client  # 从 SFT 继续

Step 9:定义 Reward 函数

def generate_rl_problem():
    """生成更难的问题(10-199 vs SFT 的 10-99)。"""
    a = random.randint(10, 199)
    b = random.randint(10, 199)
    return f"What is {a} * {b}?", str(a * b)

def compute_reward(response: str, correct_answer: str) -> float:
    """Reward 函数:正确 1.0,错误 0.0。"""
    extracted = extract_answer(response)
    return 1.0 if extracted == correct_answer else 0.0

Step 10:RL 训练循环

import torch
from mint import TensorData

NUM_RL_STEPS = 10
BATCH_SIZE = 8
GROUP_SIZE = 8
RL_LEARNING_RATE = 2e-5

for step in range(NUM_RL_STEPS):
    # 保存 weight 用于采样
    sampling_path = rl_training_client.save_weights_for_sampler(
        name=f"rl-step-{step}"
    ).result().path

    rl_sampling_client = service_client.create_sampling_client(
        model_path=sampling_path,
        base_model=BASE_MODEL
    )

    problems = [generate_rl_problem() for _ in range(BATCH_SIZE)]
    training_datums = []

    for question, answer in problems:
        prompt_text = f"Question: {question}\nAnswer:"
        prompt_tokens = tokenizer.encode(prompt_text)
        prompt_input = types.ModelInput.from_ints(prompt_tokens)

        # 采样多个 response
        sample_result = rl_sampling_client.sample(
            prompt=prompt_input,
            num_samples=GROUP_SIZE,
            sampling_params=types.SamplingParams(
                max_tokens=16,
                temperature=0.7,
                stop_token_ids=[tokenizer.eos_token_id]
            )
        ).result()

        # 计算 reward 和 advantage
        group_rewards = []
        for seq in sample_result.sequences:
            response_text = tokenizer.decode(seq.tokens)
            reward = compute_reward(response_text, answer)
            group_rewards.append(reward)

        mean_reward = sum(group_rewards) / len(group_rewards)
        advantages = [r - mean_reward for r in group_rewards]

        # 创建带 advantage 的训练 datum
        for seq, adv in zip(sample_result.sequences, advantages):
            if len(seq.tokens) == 0 or adv == 0:
                continue

            full_tokens = prompt_tokens + list(seq.tokens)
            input_tokens = full_tokens[:-1]
            target_tokens = full_tokens[1:]

            weights = [0.0] * (len(prompt_tokens) - 1) + [1.0] * len(seq.tokens)
            logprobs = [0.0] * (len(prompt_tokens) - 1) + list(seq.logprobs or [0.0] * len(seq.tokens))
            full_advantages = [0.0] * (len(prompt_tokens) - 1) + [adv] * len(seq.tokens)

            datum = types.Datum(
                model_input=types.ModelInput.from_ints(tokens=input_tokens),
                loss_fn_inputs={
                    "target_tokens": TensorData.from_torch(torch.tensor(target_tokens, dtype=torch.int64)),
                    "weights": TensorData.from_torch(torch.tensor(weights, dtype=torch.float32)),
                    "logprobs": TensorData.from_torch(torch.tensor(logprobs, dtype=torch.float32)),
                    "advantages": TensorData.from_torch(torch.tensor(full_advantages, dtype=torch.float32)),
                },
            )
            training_datums.append(datum)

    # 训练
    if training_datums:
        rl_training_client.forward_backward(
            training_datums,
            loss_fn="importance_sampling"
        ).result()

        rl_training_client.optim_step(
            types.AdamParams(learning_rate=RL_LEARNING_RATE)
        ).result()

Step 11:测试最终模型

final_path = rl_training_client.save_weights_for_sampler(name="arithmetic-rl-final").result().path
final_client = service_client.create_sampling_client(
    model_path=final_path,
    base_model=BASE_MODEL
)

# 在更难的问题上测试
test_problems = [
    ("What is 123 * 45?", "5535"),
    ("What is 67 * 189?", "12663"),
]

for question, correct in test_problems:
    prompt = f"Question: {question}\nAnswer:"
    prompt_input = types.ModelInput.from_ints(tokenizer.encode(prompt))

    result = final_client.sample(
        prompt=prompt_input,
        num_samples=1,
        sampling_params=types.SamplingParams(
            max_tokens=16,
            temperature=0.0,
            stop_token_ids=[tokenizer.eos_token_id]
        )
    ).result()

    response = tokenizer.decode(result.sequences[0].tokens)
    extracted = extract_answer(response)
    print(f"Q: {question} → A: {response.strip()} (correct: {correct})")

总结

训练 Pipeline

阶段方法Loss 函数用途
1SFTcross_entropy用标注样本教乘法
2RLimportance_sampling用 reward 信号优化

核心 API

# 初始化
service_client = mint.ServiceClient()
training_client = service_client.create_lora_training_client(base_model=...)

# 训练
training_client.forward_backward(data, loss_fn)  # 计算梯度
training_client.optim_step(adam_params)          # 更新 weight

# Checkpoint
checkpoint = training_client.save_state(name)
resumed = service_client.create_lora_training_client(
    base_model=BASE_MODEL,
    rank=16,
    train_mlp=True,
    train_attn=True,
    train_unembed=True,
)
resumed.load_state_with_optimizer(checkpoint.path).result()

# 推理
sampling_client = training_client.save_weights_and_get_sampling_client(name)
sampling_client.sample(prompt, num_samples, sampling_params)

下一步

  • 如果你想看“客户端自定义 reward + partial credit”的独立示例,见 Custom Rewardquickstart/custom_reward.py
  • 如果你想看基于 forward_backward_custom 的成对偏好训练,见 Loss Functionsquickstart/custom_loss.py

本页目录