Mind Lab Toolkit (MinT)
Customize

Async Patterns

MinT supports async APIs for all long-running operations. The key principle: submit calls back-to-back before awaiting results. This allows the server to process multiple batches in parallel while your client prepares the next batch, dramatically reducing training time.

Concept

Synchronous calls (blocking) create a bottleneck:

Call 1: forward_backward() --[wait]-- result
Call 2: forward_backward() --[wait]-- result

Async calls allow pipelining:

Call 1: forward_backward_async() --> [submitted, not waiting]
Call 2: forward_backward_async() --> [submitted, not waiting]
Call 3: forward_backward_async() --> [submitted, not waiting]
Await all results in parallel via asyncio.gather()

This is the #1 performance optimization for MinT training. By queueing 3–5 batches before awaiting, you keep the GPU fully utilized while your CPU prepares the next batch.

Pattern

import asyncio
import mint
from mint import types

async def train_with_pipelining():
    service_client = mint.ServiceClient()
    training_client = await service_client.create_lora_training_client_async(
        base_model="Qwen/Qwen3-0.6B",
        rank=16,
    )
    tokenizer = training_client.get_tokenizer()
    adam_params = types.AdamParams(learning_rate=5e-5)
    
    # Prepare batches
    batches = [
        "Example batch 1 for training.",
        "Example batch 2 for training.",
        "Example batch 3 for training.",
        "Example batch 4 for training.",
    ]
    
    # Anti-pattern (slow): sequential calls
    # for batch_text in batches:
    #     tokens = tokenizer.encode(batch_text)
    #     datum = types.Datum(...)
    #     result = await training_client.forward_backward_async(...).result_async()
    #     await training_client.optim_step_async(adam_params).result_async()
    
    # Pattern (fast): pipeline multiple forward_backward calls
    fb_futures = []
    for batch_text in batches:
        tokens = tokenizer.encode(batch_text)
        model_input = types.ModelInput.from_ints(tokens[:-1])
        target_tokens = tokens[1:]
        weights = [1.0] * len(target_tokens)
        
        datum = types.Datum(
            model_input=model_input,
            loss_fn_inputs={"target_tokens": target_tokens, "weights": weights},
        )
        
        # Submit without waiting
        fb_future = training_client.forward_backward_async([datum], loss_fn="cross_entropy")
        fb_futures.append(fb_future)
    
    # Gather and await all results in parallel
    fb_results = await asyncio.gather(*[f.result_async() for f in fb_futures])
    
    # Then do optimizer step
    for result in fb_results:
        print(f"Loss: {result.loss:.4f}")
    
    optim_future = training_client.optim_step_async(adam_params)
    await optim_future.result_async()

# Run the async training
asyncio.run(train_with_pipelining())

View full source: https://github.com/MindLab-Research/mint-quickstart/blob/main/concepts/async_patterns.py

API Surface

All TrainingClient and SamplingClient methods have _async variants:

SyncAsyncPurpose
forward_backward(...)forward_backward_async(...)Compute gradients on a batch
optim_step(...)optim_step_async(...)Update model weights
sample(...)sample_async(...)Generate tokens
save_weights_for_sampler(...)save_weights_for_sampler_async(...)Save checkpoint

Async execution:

  • All _async() methods return a Future object.
  • Call .result_async() to await the result.
  • Use asyncio.gather(*futures) to wait for multiple futures in parallel.

Caveats & Pitfalls

  • Sequential fallback: Never wrap async calls in a loop with await inside. This defeats the purpose and serializes the calls. Instead, collect futures in a list, then gather them.
  • Pipeline depth: Queue 3–5 batches before awaiting. Queueing too many (> 10) may exceed server memory; too few (1–2) doesn't hide latency.
  • Result awaiting: Always call .result_async() on each future before using the result. Accessing a future without awaiting causes race conditions.
  • Optimizer steps: optim_step_async() should come after all forward_backward results are collected, not interleaved with them. Each optimizer step must apply to the accumulated gradients from the batch.
  • Error handling: Wrap asyncio.gather() in a try-except to catch errors from any of the parallel calls. One failed future will raise an exception.

On this page