Mind Lab Toolkit (MinT)
高级主题

队列状态

这一页对应 mint-quickstart 中的 advanced/queue_status.py

这个 demo 做什么

  • 使用底层 AsyncTinker 客户端创建采样会话。
  • 提交带 X-Tinker-Sampling-Backpressure: 1 header 的采样请求。
  • 循环调用 futures.with_raw_response.retrieve() 轮询。
  • 收到 HTTP 408(请求排队中)时,从 body 和 header 读取队列状态:
    • Body 字段: queue_statequeue_positionqueue_depthestimated_wait_sprogress
    • Header: Retry-AfterX-Queue-DepthX-Queue-PositionX-Queue-ETA-SX-Queue-Status
  • 收到 HTTP 200 时,打印结果并退出。

使用场景

  • 需要监控长时间采样请求的队列位置。
  • 构建显示 "队列位置" 或 "预计等待时间" 的 UI。
  • 调试生产部署中的背压行为。

期望输出

sampling_session_created sampling_session_id=abc123
request_submitted request_id=req_456
queue_fields request_id='req_456' type='pending' status='queued' queue_state='waiting' ...
queue_fields request_id='req_456' type='pending' status='queued' queue_state='processing' ...
result_received request_id=req_456 keys=['sequences', 'request_id', ...]

前置条件

  • Python >= 3.11
  • 设置 MINT_API_KEYTINKER_API_KEY(如果服务端关闭了认证则可选)

运行方式

export MINT_API_KEY=sk-...
python advanced/queue_status.py

参数(环境变量)

  • MINT_API_KEYTINKER_API_KEY:认证用 API key
  • MINT_BASE_URLTINKER_BASE_URL:服务端地址(��认:https://mint.macaron.xin/
  • MINT_BASE_MODEL:默认 Qwen/Qwen3-0.6B

完整脚本

#!/usr/bin/env python3
"""Queue status polling — read pending queue fields on 408 via raw response.

Demonstrates the backpressure API: submit a sample request, then poll
with `futures.with_raw_response.retrieve()` to read queue depth,
position, ETA, and status from both the response body and headers.

Env:
  MINT_API_KEY or TINKER_API_KEY     (optional if server auth disabled)
  MINT_BASE_URL or TINKER_BASE_URL   (optional, default mint)
  MINT_BASE_MODEL                    (optional, default Qwen/Qwen3-0.6B)

Run:
  python advanced/queue_status.py
"""

from __future__ import annotations

import asyncio
import inspect
import os
import sys
import time
from pathlib import Path


def load_env_file(path: Path) -> None:
    if not path.exists():
        return
    for line in path.read_text(encoding="utf-8").splitlines():
        stripped = line.strip()
        if not stripped or stripped.startswith("#"):
            continue
        if stripped.startswith("export "):
            stripped = stripped[len("export "):].lstrip()
        if "=" not in stripped:
            continue
        key, value = stripped.split("=", 1)
        key = key.strip()
        value = value.strip().strip('"').strip("'")
        if key and key not in os.environ:
            os.environ[key] = value


REPO_ROOT = Path(__file__).resolve().parents[1]
load_env_file(REPO_ROOT / ".env")

for base_dir in (REPO_ROOT.parent, REPO_ROOT):
    for src_dir in ("mindlab-toolkit-alpha/src", "mindlab-toolkit/src"):
        mint_src = base_dir / src_dir
        if mint_src.exists() and str(mint_src) not in sys.path:
            sys.path.insert(0, str(mint_src))
            break
    else:
        continue
    break

import mint as _mint  # triggers env sync (MINT_* -> TINKER_*)
import tinker
from tinker._client import AsyncTinker
from tinker import types


def _int_header(headers, name: str) -> int:
    value = headers.get(name)
    if value is None:
        raise RuntimeError(f"missing header {name}")
    try:
        return int(value)
    except Exception as exc:
        raise RuntimeError(f"header {name} is not int: {value!r}") from exc


def _maybe_int_header(headers, name: str) -> int | None:
    value = headers.get(name)
    if value is None:
        return None
    try:
        return int(value)
    except Exception as exc:
        raise RuntimeError(f"header {name} is not int: {value!r}") from exc


def _maybe_float_header(headers, name: str) -> float | None:
    value = headers.get(name)
    if value is None:
        return None
    try:
        return float(value)
    except Exception as exc:
        raise RuntimeError(f"header {name} is not float: {value!r}") from exc


async def main() -> None:
    base_model = os.environ.get("MINT_BASE_MODEL", "Qwen/Qwen3-0.6B")
    session_id = f"queue_status_{int(time.time())}"

    async with AsyncTinker() as client:
        create_req = types.CreateSamplingSessionRequest(
            session_id=session_id,
            sampling_session_seq_id=0,
            base_model=base_model,
        )
        create_resp = await client.service.create_sampling_session(request=create_req)
        sampling_session_id = create_resp.sampling_session_id
        print(
            "sampling_session_created",
            f"sampling_session_id={sampling_session_id}",
            flush=True,
        )

        sample_req = types.SampleRequest(
            sampling_session_id=sampling_session_id,
            seq_id=1,
            num_samples=1,
            prompt=types.ModelInput.from_ints([1] * 256),
            sampling_params=types.SamplingParams(
                max_tokens=128,
                temperature=0.7,
                top_k=-1,
                top_p=1.0,
            ),
        )
        future = await client.sampling.asample(
            request=sample_req,
            extra_headers={"X-Tinker-Sampling-Backpressure": "1"},
        )

        request_id = future.request_id
        print("request_submitted", f"request_id={request_id}", flush=True)

        while True:
            try:
                raw = await client.futures.with_raw_response.retrieve(
                    request=types.FutureRetrieveRequest(
                        request_id=request_id,
                    )
                )
                status_code = raw.status_code
                body = raw.json()
                if inspect.isawaitable(body):
                    body = await body
                headers = raw.headers
            except tinker.APIStatusError as e:
                status_code = e.status_code
                headers = e.response.headers if e.response is not None else {}
                if isinstance(e.body, dict):
                    body = e.body
                else:
                    try:
                        body = e.response.json()
                    except Exception:
                        body = {"error": str(e)}

            if status_code == 200:
                if isinstance(body, dict):
                    print(
                        "result_received",
                        f"request_id={request_id}",
                        f"keys={list(body)[:8]}",
                        flush=True,
                    )
                else:
                    print("result_received", f"request_id={request_id}", flush=True)
                return

            if status_code != 408:
                raise RuntimeError(
                    f"unexpected status {status_code} body={str(body)[:500]!r}"
                )

            retry_after_s = _int_header(headers, "Retry-After")
            queue_depth = _maybe_int_header(headers, "X-Queue-Depth")
            queue_position = _maybe_int_header(headers, "X-Queue-Position")
            queue_eta_s = _maybe_float_header(headers, "X-Queue-ETA-S")
            queue_status = headers.get("X-Queue-Status")

            print(
                "queue_fields",
                f"request_id={body.get('request_id')!r}",
                f"type={body.get('type')!r}",
                f"status={body.get('status')!r}",
                f"queue_state={body.get('queue_state')!r}",
                f"queue_state_reason={body.get('queue_state_reason')!r}",
                f"queue_depth={body.get('queue_depth')!r}",
                f"queue_position={body.get('queue_position')!r}",
                f"estimated_wait_s={body.get('estimated_wait_s')!r}",
                f"progress={body.get('progress')!r}",
                f"retry_after_s={body.get('retry_after_s')!r}",
                f"x_queue_depth={queue_depth!r}",
                f"x_queue_position={queue_position!r}",
                f"x_queue_status={queue_status!r}",
                f"x_queue_eta_s={queue_eta_s!r}",
                f"retry_after={retry_after_s!r}",
                flush=True,
            )

            body_request_id = body.get("request_id")
            if (
                isinstance(body_request_id, str)
                and body_request_id
                and body_request_id != request_id
            ):
                raise RuntimeError(
                    f"request_id mismatch: body={body_request_id!r} "
                    f"expected {request_id!r}"
                )

            await asyncio.sleep(max(1, int(retry_after_s)))


if __name__ == "__main__":
    asyncio.run(main())

关键概念

背压 Header

设置 X-Tinker-Sampling-Backpressure: 1 告诉服务端:请求排队时返回 408 + 队列信息,而不是阻塞连接。

队列字段(408 Body)

字段类型说明
queue_statestring当前状态:waitingprocessing
queue_positionint队列中的位置(0 起始)
queue_depthint队列总请求数
estimated_wait_sfloat预计等待秒数
retry_after_sfloat建议的轮询间隔

队列 Header

Header类型说明
Retry-Afterint下次轮询前等待秒数
X-Queue-Depthint队列总深度
X-Queue-Positionint队列位置
X-Queue-ETA-Sfloat预计等待秒数
X-Queue-Statusstring队列状态字符串

本页目录