Mind Lab Toolkit (MinT)
Customize

Async Patterns

MinT 所有耗时操作都有异步 API。核心原则:先把多个调用一口气提交出去,再去 await 结果。这样服务端可以并行处理多个 batch,客户端同时准备下一个 batch,训练时间能大幅缩短。

Concept

同步调用(阻塞)会形成瓶颈:

Call 1: forward_backward() --[wait]-- result
Call 2: forward_backward() --[wait]-- result

异步调用允许 pipelining:

Call 1: forward_backward_async() --> [submitted, not waiting]
Call 2: forward_backward_async() --> [submitted, not waiting]
Call 3: forward_backward_async() --> [submitted, not waiting]
通过 asyncio.gather() 并行 await 所有结果

这是 MinT 训练里最关键的性能优化。在 await 之前先排队 3–5 个 batch,能让 GPU 一直跑满,CPU 同步准备下一批数据。

Pattern

import asyncio
import mint
from mint import types

async def train_with_pipelining():
    service_client = mint.ServiceClient()
    training_client = await service_client.create_lora_training_client_async(
        base_model="Qwen/Qwen3-0.6B",
        rank=16,
    )
    tokenizer = training_client.get_tokenizer()
    adam_params = types.AdamParams(learning_rate=5e-5)
    
    # 准备 batch
    batches = [
        "Example batch 1 for training.",
        "Example batch 2 for training.",
        "Example batch 3 for training.",
        "Example batch 4 for training.",
    ]
    
    # 反例(慢):顺序调用
    # for batch_text in batches:
    #     tokens = tokenizer.encode(batch_text)
    #     datum = types.Datum(...)
    #     result = await training_client.forward_backward_async(...).result_async()
    #     await training_client.optim_step_async(adam_params).result_async()
    
    # 推荐写法(快):把多个 forward_backward 调用 pipeline 起来
    fb_futures = []
    for batch_text in batches:
        tokens = tokenizer.encode(batch_text)
        model_input = types.ModelInput.from_ints(tokens[:-1])
        target_tokens = tokens[1:]
        weights = [1.0] * len(target_tokens)
        
        datum = types.Datum(
            model_input=model_input,
            loss_fn_inputs={"target_tokens": target_tokens, "weights": weights},
        )
        
        # 提交但不等
        fb_future = training_client.forward_backward_async([datum], loss_fn="cross_entropy")
        fb_futures.append(fb_future)
    
    # 一次性 gather + await 所有结果
    fb_results = await asyncio.gather(*[f.result_async() for f in fb_futures])
    
    # 然后再做 optimizer step
    for result in fb_results:
        print(f"Loss: {result.loss:.4f}")
    
    optim_future = training_client.optim_step_async(adam_params)
    await optim_future.result_async()

# 跑异步训练
asyncio.run(train_with_pipelining())

完整源码:https://github.com/MindLab-Research/mint-quickstart/blob/main/concepts/async_patterns.py

API Surface

TrainingClient 和 SamplingClient 的所有方法都有 _async 变体:

同步异步用途
forward_backward(...)forward_backward_async(...)在一个 batch 上算梯度
optim_step(...)optim_step_async(...)更新 model weights
sample(...)sample_async(...)生成 token
save_weights_for_sampler(...)save_weights_for_sampler_async(...)保存 checkpoint

异步执行:

  • 所有 _async() 方法返回一个 Future 对象。
  • .result_async() 来 await 结果。
  • asyncio.gather(*futures) 并行等多个 future。

Caveats & Pitfalls

  • 退化成顺序:千万别在循环里 await 异步调用 —— 那等于把异步又串行化了,完全失去意义。正确做法是先把 future 收集到一个 list,再一起 gather。
  • Pipeline 深度:在 await 前排 3–5 个 batch。排太多(> 10)可能撑爆服务端内存,太少(1–2)藏不住延迟。
  • 必须 await result:每个 future 用之前都要先调 .result_async()。直接访问没 await 的 future 会引起 race condition。
  • Optimizer step 时机optim_step_async() 应该在所有 forward_backward 结果都收完之后再调,不要交错。每个 optimizer step 都对一整个 batch 累积的梯度生效。
  • 错误处理:把 asyncio.gather() 包在 try-except 里捕获并行调用里的错误。任意一个 future 失败都会抛异常。

本页目录