Customize
Async Patterns
MinT 所有耗时操作都有异步 API。核心原则:先把多个调用一口气提交出去,再去 await 结果。这样服务端可以并行处理多个 batch,客户端同时准备下一个 batch,训练时间能大幅缩短。
Concept
同步调用(阻塞)会形成瓶颈:
Call 1: forward_backward() --[wait]-- result
Call 2: forward_backward() --[wait]-- result异步调用允许 pipelining:
Call 1: forward_backward_async() --> [submitted, not waiting]
Call 2: forward_backward_async() --> [submitted, not waiting]
Call 3: forward_backward_async() --> [submitted, not waiting]
通过 asyncio.gather() 并行 await 所有结果这是 MinT 训练里最关键的性能优化。在 await 之前先排队 3–5 个 batch,能让 GPU 一直跑满,CPU 同步准备下一批数据。
Pattern
import asyncio
import mint
from mint import types
async def train_with_pipelining():
service_client = mint.ServiceClient()
training_client = await service_client.create_lora_training_client_async(
base_model="Qwen/Qwen3-0.6B",
rank=16,
)
tokenizer = training_client.get_tokenizer()
adam_params = types.AdamParams(learning_rate=5e-5)
# 准备 batch
batches = [
"Example batch 1 for training.",
"Example batch 2 for training.",
"Example batch 3 for training.",
"Example batch 4 for training.",
]
# 反例(慢):顺序调用
# for batch_text in batches:
# tokens = tokenizer.encode(batch_text)
# datum = types.Datum(...)
# result = await training_client.forward_backward_async(...).result_async()
# await training_client.optim_step_async(adam_params).result_async()
# 推荐写法(快):把多个 forward_backward 调用 pipeline 起来
fb_futures = []
for batch_text in batches:
tokens = tokenizer.encode(batch_text)
model_input = types.ModelInput.from_ints(tokens[:-1])
target_tokens = tokens[1:]
weights = [1.0] * len(target_tokens)
datum = types.Datum(
model_input=model_input,
loss_fn_inputs={"target_tokens": target_tokens, "weights": weights},
)
# 提交但不等
fb_future = training_client.forward_backward_async([datum], loss_fn="cross_entropy")
fb_futures.append(fb_future)
# 一次性 gather + await 所有结果
fb_results = await asyncio.gather(*[f.result_async() for f in fb_futures])
# 然后再做 optimizer step
for result in fb_results:
print(f"Loss: {result.loss:.4f}")
optim_future = training_client.optim_step_async(adam_params)
await optim_future.result_async()
# 跑异步训练
asyncio.run(train_with_pipelining())完整源码:https://github.com/MindLab-Research/mint-quickstart/blob/main/concepts/async_patterns.py
API Surface
TrainingClient 和 SamplingClient 的所有方法都有 _async 变体:
| 同步 | 异步 | 用途 |
|---|---|---|
forward_backward(...) | forward_backward_async(...) | 在一个 batch 上算梯度 |
optim_step(...) | optim_step_async(...) | 更新 model weights |
sample(...) | sample_async(...) | 生成 token |
save_weights_for_sampler(...) | save_weights_for_sampler_async(...) | 保存 checkpoint |
异步执行:
- 所有
_async()方法返回一个Future对象。 - 调
.result_async()来 await 结果。 - 用
asyncio.gather(*futures)并行等多个 future。
Caveats & Pitfalls
- 退化成顺序:千万别在循环里
await异步调用 —— 那等于把异步又串行化了,完全失去意义。正确做法是先把 future 收集到一个 list,再一起 gather。 - Pipeline 深度:在 await 前排 3–5 个 batch。排太多(> 10)可能撑爆服务端内存,太少(1–2)藏不住延迟。
- 必须 await result:每个 future 用之前都要先调
.result_async()。直接访问没 await 的 future 会引起 race condition。 - Optimizer step 时机:
optim_step_async()应该在所有 forward_backward 结果都收完之后再调,不要交错。每个 optimizer step 都对一整个 batch 累积的梯度生效。 - 错误处理:把
asyncio.gather()包在 try-except 里捕获并行调用里的错误。任意一个 future 失败都会抛异常。