昇腾CANN triton-inference-server-ge-backend 实战:Triton 推理服务的动态批处理与模型级联
生产环境的模型服务不是灌数据拿结果那么简单。20 个客户端同时发来请求,每个请求的 batch size 不同(1/4/8),把它们各自启动一个 kernel 跑 → GPU 利用率 8%。如果等到攒够 32 个再一起跑 → 第一个用户等 200ms,超时了。
Triton Inference Server 的 dynamic batching 解决了这个问题:维护一个队列,新请求到达时不立刻执行,等待 100μs 看有没有更多请求到来 → 攒够 batch 或超时 → 合并为一个 kernel 调用。背后的 ge backend(Graph Engine Backend for Triton)负责把多个请求的输入 tensor 拼接为一个大 batch → 映射到 NPU 的 AscendCL 图执行 → 拆分结果返回各客户端。
Dynamic Batching 的队列实现
# triton-inference-server-ge-backend/src/dynamic_batcher.py
#
# Dynamic Batcher: 请求队列 + 微批合并 + 超时控制
# GE Backend 采用 prefer_batch 策略:
# 新请求到达 → 加入队列 → 启动 100μs 超时 →
# 超时到期或队列满 → 合并所有 pending 请求为一个 batch → model.run(batched_input)
import threading
import time
import torch
import torch_npu
from collections import deque
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum
class BatchStrategy(Enum):
"""批处理策略"""
PREFER_BATCH = "prefer_batch" # 尽可能等更多请求(延迟优先)
PREFER_LATENCY = "prefer_latency" # 尽可能早返回(吞吐优先)
@dataclass
class InferenceRequest:
"""单次推理请求"""
request_id: str
inputs: Dict[str, torch.Tensor] # input_name → tensor
output_names: List[str]
response_event: threading.Event # 返回事件
response: Optional[Dict[str, torch.Tensor]] = None
arrival_time: float = 0.0
priority: int = 0 # 优先级(0 最高)
class DynamicBatcher:
"""
Dynamic Batcher for GE Backend
核心参数:
- max_batch_size: 模型最大 batch size(如 32)
- max_queue_delay_us: 最大等待时间(如 100μs)
- preferred_batch_sizes: 优先 batch size 列表(如 [8, 16, 32])
工作流程:
1. 请求到达 → 加入队列(按 priority 排序)
2. 检查: 队列中请求数 >= preferred_batch_size[0] → 立即执行
3. 否则: 等待 max_queue_delay_us → 超时后执行所有 pending 请求
4. 执行: Tensor 拼接 → model.infer(batched_inputs) → 拆分结果
"""
def __init__(
self,
model,
max_batch_size: int = 32,
max_queue_delay_us: int = 100,
preferred_batch_sizes: List[int] = None,
strategy: BatchStrategy = BatchStrategy.PREFER_BATCH
):
self.model = model
self.max_batch_size = max_batch_size
self.max_queue_delay_us = max_queue_delay_us
self.preferred_batch_sizes = preferred_batch_sizes or [8, 16, 32]
self.strategy = strategy
self.queue = deque()
self.queue_lock = threading.Lock()
self.batch_ready = threading.Event()
# 后台线程:持续处理队列中的请求
self.worker_thread = threading.Thread(target=self._batch_worker, daemon=True)
self.running = True
self.worker_thread.start()
def enqueue(self, request: InferenceRequest) -> Dict[str, torch.Tensor]:
"""
同步接口: 提交请求 → 阻塞等待结果
调用方: request.enqueue(InferenceRequest(...)) → Dict[str, tensor]
"""
request.arrival_time = time.time()
request.response_event = threading.Event()
with self.queue_lock:
self.queue.append(request)
# 排序: 高优先级 + 早到达排前面
self.queue = deque(sorted(
self.queue,
key=lambda r: (r.priority, r.arrival_time)
))
# 通知 worker: 有新请求
self.batch_ready.set()
# 阻塞等待结果
request.response_event.wait(timeout=30.0) # 30s 超时
if request.response is None:
raise TimeoutError(f"Request {request.request_id} timed out")
return request.response
def enqueue_async(self, request: InferenceRequest, callback=None):
"""
异步接口: 提交请求 → 立即返回 → callback(result) 回调
"""
request.arrival_time = time.time()
if callback:
request._callback = callback
with self.queue_lock:
self.queue.append(request)
self.queue = deque(sorted(
self.queue,
key=lambda r: (r.priority, r.arrival_time)
))
self.batch_ready.set()
def _batch_worker(self):
"""持续运行的批处理工作线程"""
while self.running:
# 等待请求到达
if not self.queue:
self.batch_ready.wait(timeout=0.5)
self.batch_ready.clear()
continue
# 收集一个 batch
batch_requests = self._collect_batch()
if not batch_requests:
continue
# 执行推理
try:
self._execute_batch(batch_requests)
except Exception as e:
# 失败时逐请求返回错误(不丢弃整个 batch)
for req in batch_requests:
req.response = {"error": str(e)}
req.response_event.set()
def _collect_batch(self) -> List[InferenceRequest]:
"""
从队列中收集一个 batch
规则:
1. 如果队列 >= max_batch_size → 立即取走 max_batch_size 个
2. 如果队列 >= preferred_batch_sizes[0] → 立即执行
3. 否则: 等待 max_queue_delay_us → 取走所有 pending 请求
"""
with self.queue_lock:
current_size = len(self.queue)
# 规则 1: 队列已满
if current_size >= self.max_batch_size:
batch = [self.queue.popleft() for _ in range(self.max_batch_size)]
return batch
# 规则 2+3: 检查 preferred sizes + 等待
wait_start = time.time()
while True:
elapsed_us = (time.time() - wait_start) * 1_000_000
with self.queue_lock:
current_size = len(self.queue)
# 检查 preferred sizes
for pref_size in self.preferred_batch_sizes:
if current_size >= pref_size and pref_size <= self.max_batch_size:
batch = [self.queue.popleft() for _ in range(pref_size)]
return batch
# 超时 → 取走所有
if elapsed_us >= self.max_queue_delay_us:
batch_size = min(current_size, self.max_batch_size)
if batch_size > 0:
batch = [self.queue.popleft() for _ in range(batch_size)]
return batch
# 等待下一个请求到达(最多等到超时)
remaining_us = self.max_queue_delay_us - elapsed_us
if remaining_us > 0:
self.batch_ready.wait(timeout=remaining_us / 1_000_000)
self.batch_ready.clear()
return [] # 不应到这里
def _execute_batch(self, requests: List[InferenceRequest]):
"""
执行批量推理: 合并输入 → 推理 → 拆分输出
"""
batch_size = len(requests)
# Step 1: 合并输入 tensors
# 所有请求的 input_name → list of tensors → cat([B1, B2, ...]) → [total_batch, ...]
batched_inputs = {}
input_names = requests[0].inputs.keys()
for name in input_names:
tensors = [req.inputs[name] for req in requests]
# 检查 dim 0 是否一致(shape 除 batch 维外相同才能合并)
shapes = set(t.shape[1:] for t in tensors)
if len(shapes) > 1:
raise ValueError(
f"Input '{name}' shape mismatch: {[t.shape for t in tensors]}"
)
# 拼接
batched_inputs[name] = torch.cat(tensors, dim=0) # [sum(B_i), ...]
# Step 2: 推理
start_time = time.time()
batched_outputs = self.model(**batched_inputs)
infer_time = (time.time() - start_time) * 1000 # ms
# Step 3: 拆分输出
offsets = [0]
for req in requests:
offsets.append(offsets[-1] + req.inputs[input_names[0]].shape[0])
for req in requests:
req_id = requests.index(req)
start = offsets[req_id]
end = offsets[req_id + 1]
req.response = {
name: output[start:end]
for name, output in batched_outputs.items()
}
# 记录性能统计
req.wait_time = (start_time - req.arrival_time) * 1000 # ms
req.infer_time = infer_time / batch_size # 均摊推理时间
# 同步请求: 设置返回事件
if req.response_event:
req.response_event.set()
# 异步请求: 调用 callback
if hasattr(req, '_callback') and req._callback:
req._callback(req.response)
def get_stats(self):
"""获取动态批处理统计"""
with self.queue_lock:
queue_depth = len(self.queue)
return {
"queue_depth": queue_depth,
"strategy": self.strategy.value,
"max_batch_size": self.max_batch_size,
"max_queue_delay_us": self.max_queue_delay_us,
}
def shutdown(self):
self.running = False
self.batch_ready.set()
self.worker_thread.join(timeout=5.0)
Model Ensemble——多模型级联
# triton-inference-server-ge-backend/src/model_ensemble.py
#
# Model Ensemble: 多模型流水线
# 例: 文本分类 pipeline
# Tokenizer(tokenize) → BERT(encode) → Classifier(predict)
class ModelEnsemble:
"""
多模型级联:链式调用多个模型,中间结果在 NPU 上直接传递
Pipeline: Tokenizer → BERT → Classifier
共用 NPU 显存: tokenizer_output 和 bert_output 在 HBM 中传递
无需 CPU↔NPU 往返
"""
def __init__(self, models: List[Any], pipeline_graph):
"""
models: [tokenizer_model, bert_model, classifier_model]
pipeline_graph: DAG 描述
{
"tokenizer": {"inputs": ["text"], "outputs": ["token_ids", "attention_mask"]},
"bert": {"inputs": ["token_ids", "attention_mask"], "outputs": ["hidden_states"]},
"classifier": {"inputs": ["hidden_states"], "outputs": ["logits"]}
}
"""
self.models = models
self.pipeline_graph = pipeline_graph
# 预分配中间结果的显存(避免每次动态分配)
self.intermediate_buffers = {}
self._preallocate_buffers()
def _preallocate_buffers(self):
"""
预分配中间结果缓冲区
避免 pipeline 中多次 cudaMalloc → 减少 400-600μs per step
"""
for model in self.models:
if hasattr(model, "output_shape"):
shape = model.output_shape
# NPU 上预分配
self.intermediate_buffers[model.name] = torch.empty(
shape, dtype=torch.float16, device="npu"
)
def infer(self, raw_texts: List[str]) -> torch.Tensor:
"""
文本 → logits 的完整 pipeline
全程在 NPU 上:
tokenizer(text) → BERT(token_ids) → Classifier(hidden) → logits
"""
# Step 1: Tokenizer(轻量级,可放 CPU 或 NPU)
# NPU 上的 fast tokenizer: 用预构建的 vocab hash table
token_ids, attention_mask = self.models[0].encode(
raw_texts,
max_length=512,
output_buffer_token=self.intermediate_buffers["tokenizer"],
)
# 输出在 NPU HBM 中,不移回 CPU
# Step 2: BERT encoder(主计算)
hidden_states = self.models[1](
input_ids=token_ids,
attention_mask=attention_mask,
output_buffer=self.intermediate_buffers["bert"],
)
# 输出 pooler_output [B, 768],在 NPU HBM 中
# Step 3: Classifier(线性层)
logits = self.models[2](
hidden_states,
output_buffer=self.intermediate_buffers["classifier"],
)
# 输出 [B, num_classes]
return logits # 仍在 NPU HBM,调用方 .cpu() 才搬回
def benchmark(self, num_texts=32):
"""Pipeline 性能 """
dummy_texts = ["电力负荷预测模型部署实战"] * num_texts
# 预热
for _ in range(5):
_ = self.infer(dummy_texts)
torch.npu.synchronize()
start = torch.npu.Event(enable_timing=True)
end = torch.npu.Event(enable_timing=True)
start.record()
result = self.infer(dummy_texts)
end.record()
torch.npu.synchronize()
elapsed = start.elapsed_time(end)
print(f"=== Model Ensemble ===")
print(f" Texts: {num_texts}")
print(f" Latency: {elapsed:.1f} ms")
print(f" Per text: {elapsed/num_texts:.1f} ms")
# ====== 使用示例 ======
# # 启动 Triton 服务
# python -m triton_server \
# --model-repository=/models \
# --backend-config=ge,device_id=0 \
# --dynamic-batching \
# --max-batch-size=32 \
# --max-queue-delay-us=100
#
# # 客户端调用
# import tritonclient.http as httpclient
#
# client = httpclient.InferenceServerClient("localhost:8000")
#
# inputs = [
# httpclient.InferInput("text", [1], "BYTES"),
# ]
# inputs[0].set_data_from_numpy(np.array(["hello world"], dtype=object))
#
# result = client.infer("text_classifier", inputs)
# logits = result.as_numpy("logits")
GE Backend 的模型加载与内存规划
// triton-inference-server-ge-backend/src/ge_backend.cc
//
// GE Backend: 将 Triton 模型转换为 GE Graph → 加载到 NPU
// 核心:静态内存规划,一次分配整个模型的 HBM + L1
#include "triton/backend/backend_common.h"
#include "ge/ge_ir_build.h"
#include "ge/ge_api.h"
namespace triton { namespace backend { namespace ge_backend {
TRITONSERVER_Error* GEBackendModel::LoadModel(
const std::string& model_path,
const int device_id)
{
// Step 1: 加载 OM 离线模型(已由 ATC 编译的 GE Graph)
std::string om_path = model_path + "/model.om";
// Step 2: GE Session 初始化
// Session = 在 NPU 上的一次推理上下文
ge::SessionOptions session_options;
session_options.device_id = device_id;
auto session = std::make_shared<ge::Session>(session_options);
// Step 3: 加载 Graph(OM 文件 → GE Graph)
ge::Graph graph("triton_model");
auto status = graph.LoadFromFile(om_path);
if (status != ge::SUCCESS) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
("Failed to load OM model: " + om_path).c_str());
}
// Step 4: 添加 Graph 到 Session
status = session->AddGraph(0, graph); // graph_id=0
if (status != ge::SUCCESS) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
"Failed to add graph to GE session");
}
// Step 5: 静态内存规划
// GE Graph 编译时已经确定所有 tensor 的 shape + dtype
// 在 load 阶段一次性分配所有 HBM buffer
// 运行时无需 malloc → 零动态分配开销
size_t static_mem_size = 0;
size_t weight_mem_size = 0;
size_t workspace_size = 0;
status = session->GetMemoryRequirement(
static_mem_size, // 输入输出 buffer
weight_mem_size, // 权重 buffer(常量)
workspace_size // 中间结果 buffer
);
// 预分配(一次 malloc,终身复用)
void* static_buf = nullptr;
void* weight_buf = nullptr;
void* workspace_buf = nullptr;
aclrtMalloc(&static_buf, static_mem_size, ACL_MEM_MALLOC_HUGE_FIRST);
aclrtMalloc(&weight_buf, weight_mem_size, ACL_MEM_MALLOC_HUGE_FIRST);
aclrtMalloc(&workspace_buf, workspace_size, ACL_MEM_MALLOC_HUGE_FIRST);
// 加载权重到 NPU(从 OM 文件中反序列化)
status = session->LoadWeights(weight_buf, weight_mem_size);
// Step 6: 存储 session 上下文
model_state_ = std::make_unique<GEModelState>();
model_state_->session = session;
model_state_->static_buf = static_buf;
model_state_->weight_buf = weight_buf;
model_state_->workspace_buf = workspace_buf;
LOG_MESSAGE(TRITONSERVER_LOG_INFO,
("GE Backend loaded: " + model_path +
", static=" + std::to_string(static_mem_size / 1024.0 / 1024.0) + "MB"
", weights=" + std::to_string(weight_mem_size / 1024.0 / 1024.0) + "MB"
", workspace=" + std::to_string(workspace_size / 1024.0 / 1024.0) + "MB"
).c_str());
return nullptr; // Success
}
TRITONSERVER_Error* GEBackendInstance::Execute(
TRITONBACKEND_Request** requests,
const uint32_t request_count)
{
// 批量执行: request_count 个请求合并为一个 batch
// Step 1: 收集所有请求的输入 + 合并为 batch tensor
std::vector<const char*> input_names;
std::vector<TRITONSERVER_DataType> input_types;
// 从 model config 获取输入描述
RETURN_IF_ERROR(GetModelInputs(input_names, input_types));
// Step 2: 为每个 input 构造 batch tensor
for (size_t i = 0; i < input_names.size(); ++i) {
size_t batch_byte_size = GetBatchByteSize(
requests, request_count, input_names[i]);
// 直接用预分配的 static_buf(零拷贝)
void* input_buf = model_state_->static_buf + input_offset;
// 将所有请求的该 input 复制到 batch buffer
size_t offset = 0;
for (uint32_t r = 0; r < request_count; ++r) {
size_t req_size;
const void* req_input;
TRITONBACKEND_RequestInput(
requests[r], input_names[i].c_str(),
&req_input, &req_size);
aclrtMemcpy(
static_cast<char*>(input_buf) + offset,
req_size, req_input, req_size,
ACL_MEMCPY_HOST_TO_DEVICE);
offset += req_size;
}
}
// Step 3: GE Session Run(推理)
auto status = model_state_->session->Run(
model_state_->static_buf,
model_state_->workspace_buf
);
// Step 4: 将结果拆分回各个请求
for (uint32_t r = 0; r < request_count; ++r) {
TRITONBACKEND_Response* response;
TRITONBACKEND_ResponseNew(&response, requests[r]);
// 从 batch output buffer 中提取该请求的输出
TRITONBACKEND_Output* output;
TRITONBACKEND_ResponseOutput(
response, &output, "output",
TRITONSERVER_TYPE_FP16,
output_shape, output_dims_count);
void* output_buf;
TRITONBACKEND_OutputBuffer(
output, &output_buf, output_size,
&output_memory_type, &output_memory_type_id);
// 从 batch buffer 复制到 output
aclrtMemcpy(output_buf, output_size,
batch_output_ptr + r * output_size, output_size,
ACL_MEMCPY_DEVICE_TO_HOST);
TRITONBACKEND_ResponseSend(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL, nullptr);
}
return nullptr; // Success
}
}}} // namespace
踩坑:Dynamic Batching 的 Tensor shape 不匹配——用户 A 发来 [1, 3, 224, 224],用户 B 发来 [1, 3, 336, 336]
# ❌ torch.cat([tensor_A, tensor_B], dim=0) → RuntimeError
# shape[1:] 不一致: (3,224,224) vs (3,336,336)
# ✅ NBH(N-dim Batch Handler): 对变长输入做 padding
class RaggedBatchHandler:
"""
Ragged Batch: 变长输入的自动 padding
策略: 找到 max shape,其他 pad 到 max
例: (1,3,224,224) + (1,3,336,336) → (2,3,336,336)
"""
def merge_variable_shapes(self, tensors: List[torch.Tensor]):
"""合并变长 tensor(自动 padding)"""
# 找最大 shape
max_shape = [
max(t.shape[d] for t in tensors)
for d in range(len(tensors[0].shape))
]
padded = []
seq_lens = []
for t in tensors:
if list(t.shape) == max_shape:
padded.append(t)
seq_lens.append(t.shape[0] if t.dim() > 1 else 1)
else:
# Pad 到 max shape
pad_dims = []
for d in range(t.dim()):
pad_dims.extend([0, max_shape[d] - t.shape[d]])
t_padded = torch.nn.functional.pad(t, pad_dims, value=0)
padded.append(t_padded)
# 记录真实长度(第一个维度)
seq_lens.append(t.shape[0] if t.dim() > 1 else 1)
merged = torch.cat(padded, dim=0)
# 同时构造 mask(标记 padding 位置,模型应忽略)
mask = torch.ones(len(tensors), max_shape[1:], device=merged.device)
for i, t in enumerate(tensors):
if t.dim() > 1:
mask[i, t.shape[1]:] = 0
return merged, seq_lens, mask
踩坑:GE Graph 静态内存规划导致多模型并发时 HBM 争抢——两个模型同时 load,都预分配了 8GB workspace
# ❌ 模型 A (15B params, 8GB workspace) + 模型 B (8B params, 6GB workspace)
# 同时加载 → 需要 14GB workspace → 但 HBM 只有 32GB(权重已占 18GB)
# → OOM
# ✅ 共享 workspace pool: 所有模型共享一个统一的 workspace buffer
class SharedWorkspacePool:
"""
多模型共享 workspace 池
只在推理时才占用,推理完释放
"""
def __init__(self, total_workspace_hbm_gb=12):
self.total_size = int(total_workspace_hbm_gb * 1024**3)
self.workspace = torch.empty(
self.total_size, dtype=torch.uint8, device="npu"
)
self.allocations = {} # model_name → (offset, size)
self.offset = 0
self.lock = threading.Lock()
def allocate(self, model_name: str, size: int) -> int:
"""为模型分配 workspace 空间(临时)"""
with self.lock:
if self.offset + size > self.total_size:
raise RuntimeError(
f"Workspace OOM: {model_name} needs {size}, "
f"only {self.total_size - self.offset} remaining"
)
offset = self.offset
self.offset += size
self.allocations[model_name] = (offset, size)
return offset
def release(self, model_name: str):
"""释放模型占用的 workspace"""
with self.lock:
if model_name in self.allocations:
del self.allocations[model_name]
# 压缩: 回收被释放的空间
if len(self.allocations) == 0:
self.offset = 0
triton-inference-server-ge-backend 的部署方案:Dynamic Batcher 维护请求队列,preferred_batch_sizes [8,16,32] 策略在延迟和吞吐间权衡(100μs 超时攒 batch),GE Backend 在 load 阶段一次性静态分配 HBM(weight + workspace + io buffer,零运行时 malloc),Model Ensemble 多模型级联中间结果 HBM 直传省 CPU 往返。踩坑:变长输入 shape 不匹配→Ragged Batch 自动 padding + mask、多模型并发 workspace 争抢 14GB→共享池 12GB 按需分配、静态 shape 限制无法处理动态 batch→GE 动态 shape 输入支持。
更多推荐




所有评论(0)