高级主题
队列状态
这一页对应 mint-quickstart 中的 advanced/queue_status.py。
这个 demo 做什么
- 使用底层
AsyncTinker客户端创建采样会话。 - 提交带
X-Tinker-Sampling-Backpressure: 1header 的采样请求。 - 循环调用
futures.with_raw_response.retrieve()轮询。 - 收到 HTTP 408(请求排队中)时,从 body 和 header 读取队列状态:
- Body 字段:
queue_state、queue_position、queue_depth、estimated_wait_s、progress - Header:
Retry-After、X-Queue-Depth、X-Queue-Position、X-Queue-ETA-S、X-Queue-Status
- Body 字段:
- 收到 HTTP 200 时,打印结果并退出。
使用场景
- 需要监控长时间采样请求的队列位置。
- 构建显示 "队列位置" 或 "预计等待时间" 的 UI。
- 调试生产部署中的背压行为。
期望输出
sampling_session_created sampling_session_id=abc123
request_submitted request_id=req_456
queue_fields request_id='req_456' type='pending' status='queued' queue_state='waiting' ...
queue_fields request_id='req_456' type='pending' status='queued' queue_state='processing' ...
result_received request_id=req_456 keys=['sequences', 'request_id', ...]前置条件
- Python >= 3.11
- 设置
MINT_API_KEY或TINKER_API_KEY(如果服务端关闭了认证则可选)
运行方式
export MINT_API_KEY=sk-...
python advanced/queue_status.py参数(环境变量)
MINT_API_KEY或TINKER_API_KEY:认证用 API keyMINT_BASE_URL或TINKER_BASE_URL:服务端地址(��认:https://mint.macaron.xin/)MINT_BASE_MODEL:默认Qwen/Qwen3-0.6B
完整脚本
#!/usr/bin/env python3
"""Queue status polling — read pending queue fields on 408 via raw response.
Demonstrates the backpressure API: submit a sample request, then poll
with `futures.with_raw_response.retrieve()` to read queue depth,
position, ETA, and status from both the response body and headers.
Env:
MINT_API_KEY or TINKER_API_KEY (optional if server auth disabled)
MINT_BASE_URL or TINKER_BASE_URL (optional, default mint)
MINT_BASE_MODEL (optional, default Qwen/Qwen3-0.6B)
Run:
python advanced/queue_status.py
"""
from __future__ import annotations
import asyncio
import inspect
import os
import sys
import time
from pathlib import Path
def load_env_file(path: Path) -> None:
if not path.exists():
return
for line in path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if stripped.startswith("export "):
stripped = stripped[len("export "):].lstrip()
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
REPO_ROOT = Path(__file__).resolve().parents[1]
load_env_file(REPO_ROOT / ".env")
for base_dir in (REPO_ROOT.parent, REPO_ROOT):
for src_dir in ("mindlab-toolkit-alpha/src", "mindlab-toolkit/src"):
mint_src = base_dir / src_dir
if mint_src.exists() and str(mint_src) not in sys.path:
sys.path.insert(0, str(mint_src))
break
else:
continue
break
import mint as _mint # triggers env sync (MINT_* -> TINKER_*)
import tinker
from tinker._client import AsyncTinker
from tinker import types
def _int_header(headers, name: str) -> int:
value = headers.get(name)
if value is None:
raise RuntimeError(f"missing header {name}")
try:
return int(value)
except Exception as exc:
raise RuntimeError(f"header {name} is not int: {value!r}") from exc
def _maybe_int_header(headers, name: str) -> int | None:
value = headers.get(name)
if value is None:
return None
try:
return int(value)
except Exception as exc:
raise RuntimeError(f"header {name} is not int: {value!r}") from exc
def _maybe_float_header(headers, name: str) -> float | None:
value = headers.get(name)
if value is None:
return None
try:
return float(value)
except Exception as exc:
raise RuntimeError(f"header {name} is not float: {value!r}") from exc
async def main() -> None:
base_model = os.environ.get("MINT_BASE_MODEL", "Qwen/Qwen3-0.6B")
session_id = f"queue_status_{int(time.time())}"
async with AsyncTinker() as client:
create_req = types.CreateSamplingSessionRequest(
session_id=session_id,
sampling_session_seq_id=0,
base_model=base_model,
)
create_resp = await client.service.create_sampling_session(request=create_req)
sampling_session_id = create_resp.sampling_session_id
print(
"sampling_session_created",
f"sampling_session_id={sampling_session_id}",
flush=True,
)
sample_req = types.SampleRequest(
sampling_session_id=sampling_session_id,
seq_id=1,
num_samples=1,
prompt=types.ModelInput.from_ints([1] * 256),
sampling_params=types.SamplingParams(
max_tokens=128,
temperature=0.7,
top_k=-1,
top_p=1.0,
),
)
future = await client.sampling.asample(
request=sample_req,
extra_headers={"X-Tinker-Sampling-Backpressure": "1"},
)
request_id = future.request_id
print("request_submitted", f"request_id={request_id}", flush=True)
while True:
try:
raw = await client.futures.with_raw_response.retrieve(
request=types.FutureRetrieveRequest(
request_id=request_id,
)
)
status_code = raw.status_code
body = raw.json()
if inspect.isawaitable(body):
body = await body
headers = raw.headers
except tinker.APIStatusError as e:
status_code = e.status_code
headers = e.response.headers if e.response is not None else {}
if isinstance(e.body, dict):
body = e.body
else:
try:
body = e.response.json()
except Exception:
body = {"error": str(e)}
if status_code == 200:
if isinstance(body, dict):
print(
"result_received",
f"request_id={request_id}",
f"keys={list(body)[:8]}",
flush=True,
)
else:
print("result_received", f"request_id={request_id}", flush=True)
return
if status_code != 408:
raise RuntimeError(
f"unexpected status {status_code} body={str(body)[:500]!r}"
)
retry_after_s = _int_header(headers, "Retry-After")
queue_depth = _maybe_int_header(headers, "X-Queue-Depth")
queue_position = _maybe_int_header(headers, "X-Queue-Position")
queue_eta_s = _maybe_float_header(headers, "X-Queue-ETA-S")
queue_status = headers.get("X-Queue-Status")
print(
"queue_fields",
f"request_id={body.get('request_id')!r}",
f"type={body.get('type')!r}",
f"status={body.get('status')!r}",
f"queue_state={body.get('queue_state')!r}",
f"queue_state_reason={body.get('queue_state_reason')!r}",
f"queue_depth={body.get('queue_depth')!r}",
f"queue_position={body.get('queue_position')!r}",
f"estimated_wait_s={body.get('estimated_wait_s')!r}",
f"progress={body.get('progress')!r}",
f"retry_after_s={body.get('retry_after_s')!r}",
f"x_queue_depth={queue_depth!r}",
f"x_queue_position={queue_position!r}",
f"x_queue_status={queue_status!r}",
f"x_queue_eta_s={queue_eta_s!r}",
f"retry_after={retry_after_s!r}",
flush=True,
)
body_request_id = body.get("request_id")
if (
isinstance(body_request_id, str)
and body_request_id
and body_request_id != request_id
):
raise RuntimeError(
f"request_id mismatch: body={body_request_id!r} "
f"expected {request_id!r}"
)
await asyncio.sleep(max(1, int(retry_after_s)))
if __name__ == "__main__":
asyncio.run(main())关键概念
背压 Header
设置 X-Tinker-Sampling-Backpressure: 1 告诉服务端:请求排队时返回 408 + 队列信息,而不是阻塞连接。
队列字段(408 Body)
| 字段 | 类型 | 说明 |
|---|---|---|
queue_state | string | 当前状态:waiting、processing 等 |
queue_position | int | 队列中的位置(0 起始) |
queue_depth | int | 队列总请求数 |
estimated_wait_s | float | 预计等待秒数 |
retry_after_s | float | 建议的轮询间隔 |
队列 Header
| Header | 类型 | 说明 |
|---|---|---|
Retry-After | int | 下次轮询前等待秒数 |
X-Queue-Depth | int | 队列总深度 |
X-Queue-Position | int | 队列位置 |
X-Queue-ETA-S | float | 预计等待秒数 |
X-Queue-Status | string | 队列状态字符串 |