生产环境的模型服务不是灌数据拿结果那么简单。20 个客户端同时发来请求,每个请求的 batch size 不同(1/4/8),把它们各自启动一个 kernel 跑 → GPU 利用率 8%。如果等到攒够 32 个再一起跑 → 第一个用户等 200ms,超时了。

Triton Inference Server 的 dynamic batching 解决了这个问题:维护一个队列,新请求到达时不立刻执行,等待 100μs 看有没有更多请求到来 → 攒够 batch 或超时 → 合并为一个 kernel 调用。背后的 ge backend(Graph Engine Backend for Triton)负责把多个请求的输入 tensor 拼接为一个大 batch → 映射到 NPU 的 AscendCL 图执行 → 拆分结果返回各客户端。

Dynamic Batching 的队列实现

# triton-inference-server-ge-backend/src/dynamic_batcher.py
#
# Dynamic Batcher: 请求队列 + 微批合并 + 超时控制
# GE Backend 采用 prefer_batch 策略:
#   新请求到达 → 加入队列 → 启动 100μs 超时 → 
#   超时到期或队列满 → 合并所有 pending 请求为一个 batch → model.run(batched_input)

import threading
import time
import torch
import torch_npu

from collections import deque
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum

class BatchStrategy(Enum):
    """批处理策略"""
    PREFER_BATCH = "prefer_batch"   # 尽可能等更多请求(延迟优先)
    PREFER_LATENCY = "prefer_latency" # 尽可能早返回(吞吐优先)

@dataclass
class InferenceRequest:
    """单次推理请求"""
    request_id: str
    inputs: Dict[str, torch.Tensor]  # input_name → tensor
    output_names: List[str]
    response_event: threading.Event  # 返回事件
    response: Optional[Dict[str, torch.Tensor]] = None
    arrival_time: float = 0.0
    priority: int = 0  # 优先级(0 最高)


class DynamicBatcher:
    """
    Dynamic Batcher for GE Backend

    核心参数:
    - max_batch_size: 模型最大 batch size(如 32)
    - max_queue_delay_us: 最大等待时间(如 100μs)
    - preferred_batch_sizes: 优先 batch size 列表(如 [8, 16, 32])

    工作流程:
    1. 请求到达 → 加入队列(按 priority 排序)
    2. 检查: 队列中请求数 >= preferred_batch_size[0] → 立即执行
    3. 否则: 等待 max_queue_delay_us → 超时后执行所有 pending 请求
    4. 执行: Tensor 拼接 → model.infer(batched_inputs) → 拆分结果
    """

    def __init__(
        self,
        model,
        max_batch_size: int = 32,
        max_queue_delay_us: int = 100,
        preferred_batch_sizes: List[int] = None,
        strategy: BatchStrategy = BatchStrategy.PREFER_BATCH
    ):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_queue_delay_us = max_queue_delay_us
        self.preferred_batch_sizes = preferred_batch_sizes or [8, 16, 32]
        self.strategy = strategy

        self.queue = deque()
        self.queue_lock = threading.Lock()
        self.batch_ready = threading.Event()

        # 后台线程:持续处理队列中的请求
        self.worker_thread = threading.Thread(target=self._batch_worker, daemon=True)
        self.running = True
        self.worker_thread.start()

    def enqueue(self, request: InferenceRequest) -> Dict[str, torch.Tensor]:
        """
        同步接口: 提交请求 → 阻塞等待结果

        调用方: request.enqueue(InferenceRequest(...)) → Dict[str, tensor]
        """
        request.arrival_time = time.time()
        request.response_event = threading.Event()

        with self.queue_lock:
            self.queue.append(request)

        # 排序: 高优先级 + 早到达排前面
        self.queue = deque(sorted(
            self.queue,
            key=lambda r: (r.priority, r.arrival_time)
        ))

        # 通知 worker: 有新请求
        self.batch_ready.set()

        # 阻塞等待结果
        request.response_event.wait(timeout=30.0)  # 30s 超时

        if request.response is None:
            raise TimeoutError(f"Request {request.request_id} timed out")

        return request.response

    def enqueue_async(self, request: InferenceRequest, callback=None):
        """
        异步接口: 提交请求 → 立即返回 → callback(result) 回调
        """
        request.arrival_time = time.time()
        if callback:
            request._callback = callback

        with self.queue_lock:
            self.queue.append(request)

        self.queue = deque(sorted(
            self.queue,
            key=lambda r: (r.priority, r.arrival_time)
        ))
        self.batch_ready.set()

    def _batch_worker(self):
        """持续运行的批处理工作线程"""
        while self.running:
            # 等待请求到达
            if not self.queue:
                self.batch_ready.wait(timeout=0.5)
                self.batch_ready.clear()
                continue

            # 收集一个 batch
            batch_requests = self._collect_batch()

            if not batch_requests:
                continue

            # 执行推理
            try:
                self._execute_batch(batch_requests)
            except Exception as e:
                # 失败时逐请求返回错误(不丢弃整个 batch)
                for req in batch_requests:
                    req.response = {"error": str(e)}
                    req.response_event.set()

    def _collect_batch(self) -> List[InferenceRequest]:
        """
        从队列中收集一个 batch

        规则:
        1. 如果队列 >= max_batch_size → 立即取走 max_batch_size 个
        2. 如果队列 >= preferred_batch_sizes[0] → 立即执行
        3. 否则: 等待 max_queue_delay_us → 取走所有 pending 请求
        """
        with self.queue_lock:
            current_size = len(self.queue)

            # 规则 1: 队列已满
            if current_size >= self.max_batch_size:
                batch = [self.queue.popleft() for _ in range(self.max_batch_size)]
                return batch

        # 规则 2+3: 检查 preferred sizes + 等待
        wait_start = time.time()
        while True:
            elapsed_us = (time.time() - wait_start) * 1_000_000

            with self.queue_lock:
                current_size = len(self.queue)

                # 检查 preferred sizes
                for pref_size in self.preferred_batch_sizes:
                    if current_size >= pref_size and pref_size <= self.max_batch_size:
                        batch = [self.queue.popleft() for _ in range(pref_size)]
                        return batch

                # 超时 → 取走所有
                if elapsed_us >= self.max_queue_delay_us:
                    batch_size = min(current_size, self.max_batch_size)
                    if batch_size > 0:
                        batch = [self.queue.popleft() for _ in range(batch_size)]
                        return batch

            # 等待下一个请求到达(最多等到超时)
            remaining_us = self.max_queue_delay_us - elapsed_us
            if remaining_us > 0:
                self.batch_ready.wait(timeout=remaining_us / 1_000_000)
                self.batch_ready.clear()

        return []  # 不应到这里

    def _execute_batch(self, requests: List[InferenceRequest]):
        """
        执行批量推理: 合并输入 → 推理 → 拆分输出
        """
        batch_size = len(requests)

        # Step 1: 合并输入 tensors
        # 所有请求的 input_name → list of tensors → cat([B1, B2, ...]) → [total_batch, ...]
        batched_inputs = {}
        input_names = requests[0].inputs.keys()

        for name in input_names:
            tensors = [req.inputs[name] for req in requests]

            # 检查 dim 0 是否一致(shape 除 batch 维外相同才能合并)
            shapes = set(t.shape[1:] for t in tensors)
            if len(shapes) > 1:
                raise ValueError(
                    f"Input '{name}' shape mismatch: {[t.shape for t in tensors]}"
                )

            # 拼接
            batched_inputs[name] = torch.cat(tensors, dim=0)  # [sum(B_i), ...]

        # Step 2: 推理
        start_time = time.time()
        batched_outputs = self.model(**batched_inputs)
        infer_time = (time.time() - start_time) * 1000  # ms

        # Step 3: 拆分输出
        offsets = [0]
        for req in requests:
            offsets.append(offsets[-1] + req.inputs[input_names[0]].shape[0])

        for req in requests:
            req_id = requests.index(req)
            start = offsets[req_id]
            end = offsets[req_id + 1]

            req.response = {
                name: output[start:end]
                for name, output in batched_outputs.items()
            }

            # 记录性能统计
            req.wait_time = (start_time - req.arrival_time) * 1000  # ms
            req.infer_time = infer_time / batch_size  # 均摊推理时间

            # 同步请求: 设置返回事件
            if req.response_event:
                req.response_event.set()

            # 异步请求: 调用 callback
            if hasattr(req, '_callback') and req._callback:
                req._callback(req.response)

    def get_stats(self):
        """获取动态批处理统计"""
        with self.queue_lock:
            queue_depth = len(self.queue)
        return {
            "queue_depth": queue_depth,
            "strategy": self.strategy.value,
            "max_batch_size": self.max_batch_size,
            "max_queue_delay_us": self.max_queue_delay_us,
        }

    def shutdown(self):
        self.running = False
        self.batch_ready.set()
        self.worker_thread.join(timeout=5.0)

Model Ensemble——多模型级联

# triton-inference-server-ge-backend/src/model_ensemble.py
#
# Model Ensemble: 多模型流水线
# 例: 文本分类 pipeline 
#   Tokenizer(tokenize) → BERT(encode) → Classifier(predict)

class ModelEnsemble:
    """
    多模型级联:链式调用多个模型,中间结果在 NPU 上直接传递

    Pipeline: Tokenizer → BERT → Classifier
    共用 NPU 显存: tokenizer_output 和 bert_output 在 HBM 中传递
    无需 CPU↔NPU 往返
    """

    def __init__(self, models: List[Any], pipeline_graph):
        """
        models: [tokenizer_model, bert_model, classifier_model]
        pipeline_graph: DAG 描述
          {
            "tokenizer": {"inputs": ["text"], "outputs": ["token_ids", "attention_mask"]},
            "bert": {"inputs": ["token_ids", "attention_mask"], "outputs": ["hidden_states"]},
            "classifier": {"inputs": ["hidden_states"], "outputs": ["logits"]}
          }
        """
        self.models = models
        self.pipeline_graph = pipeline_graph

        # 预分配中间结果的显存(避免每次动态分配)
        self.intermediate_buffers = {}
        self._preallocate_buffers()

    def _preallocate_buffers(self):
        """
        预分配中间结果缓冲区
        避免 pipeline 中多次 cudaMalloc → 减少 400-600μs per step
        """
        for model in self.models:
            if hasattr(model, "output_shape"):
                shape = model.output_shape
                # NPU 上预分配
                self.intermediate_buffers[model.name] = torch.empty(
                    shape, dtype=torch.float16, device="npu"
                )

    def infer(self, raw_texts: List[str]) -> torch.Tensor:
        """
        文本 → logits 的完整 pipeline

        全程在 NPU 上:
        tokenizer(text) → BERT(token_ids) → Classifier(hidden) → logits
        """
        # Step 1: Tokenizer(轻量级,可放 CPU 或 NPU)
        # NPU 上的 fast tokenizer: 用预构建的 vocab hash table
        token_ids, attention_mask = self.models[0].encode(
            raw_texts,
            max_length=512,
            output_buffer_token=self.intermediate_buffers["tokenizer"],
        )
        # 输出在 NPU HBM 中,不移回 CPU

        # Step 2: BERT encoder(主计算)
        hidden_states = self.models[1](
            input_ids=token_ids,
            attention_mask=attention_mask,
            output_buffer=self.intermediate_buffers["bert"],
        )
        # 输出 pooler_output [B, 768],在 NPU HBM 中

        # Step 3: Classifier(线性层)
        logits = self.models[2](
            hidden_states,
            output_buffer=self.intermediate_buffers["classifier"],
        )
        # 输出 [B, num_classes]

        return logits  # 仍在 NPU HBM,调用方 .cpu() 才搬回

    def benchmark(self, num_texts=32):
        """Pipeline 性能 """
        dummy_texts = ["电力负荷预测模型部署实战"] * num_texts

        # 预热
        for _ in range(5):
            _ = self.infer(dummy_texts)
        torch.npu.synchronize()

        start = torch.npu.Event(enable_timing=True)
        end = torch.npu.Event(enable_timing=True)

        start.record()
        result = self.infer(dummy_texts)
        end.record()
        torch.npu.synchronize()

        elapsed = start.elapsed_time(end)
        print(f"=== Model Ensemble ===")
        print(f"  Texts: {num_texts}")
        print(f"  Latency: {elapsed:.1f} ms")
        print(f"  Per text: {elapsed/num_texts:.1f} ms")


# ====== 使用示例 ======
# # 启动 Triton 服务
# python -m triton_server \
#     --model-repository=/models \
#     --backend-config=ge,device_id=0 \
#     --dynamic-batching \
#     --max-batch-size=32 \
#     --max-queue-delay-us=100
#
# # 客户端调用
# import tritonclient.http as httpclient
#
# client = httpclient.InferenceServerClient("localhost:8000")
#
# inputs = [
#     httpclient.InferInput("text", [1], "BYTES"),
# ]
# inputs[0].set_data_from_numpy(np.array(["hello world"], dtype=object))
#
# result = client.infer("text_classifier", inputs)
# logits = result.as_numpy("logits")

GE Backend 的模型加载与内存规划

// triton-inference-server-ge-backend/src/ge_backend.cc
//
// GE Backend: 将 Triton 模型转换为 GE Graph → 加载到 NPU
// 核心:静态内存规划,一次分配整个模型的 HBM + L1

#include "triton/backend/backend_common.h"
#include "ge/ge_ir_build.h"
#include "ge/ge_api.h"

namespace triton { namespace backend { namespace ge_backend {

TRITONSERVER_Error* GEBackendModel::LoadModel(
    const std::string& model_path,
    const int device_id)
{
    // Step 1: 加载 OM 离线模型(已由 ATC 编译的 GE Graph)
    std::string om_path = model_path + "/model.om";

    // Step 2: GE Session 初始化
    // Session = 在 NPU 上的一次推理上下文
    ge::SessionOptions session_options;
    session_options.device_id = device_id;

    auto session = std::make_shared<ge::Session>(session_options);

    // Step 3: 加载 Graph(OM 文件 → GE Graph)
    ge::Graph graph("triton_model");
    auto status = graph.LoadFromFile(om_path);
    if (status != ge::SUCCESS) {
        return TRITONSERVER_ErrorNew(
            TRITONSERVER_ERROR_INTERNAL,
            ("Failed to load OM model: " + om_path).c_str());
    }

    // Step 4: 添加 Graph 到 Session
    status = session->AddGraph(0, graph);  // graph_id=0
    if (status != ge::SUCCESS) {
        return TRITONSERVER_ErrorNew(
            TRITONSERVER_ERROR_INTERNAL,
            "Failed to add graph to GE session");
    }

    // Step 5: 静态内存规划
    // GE Graph 编译时已经确定所有 tensor 的 shape + dtype
    // 在 load 阶段一次性分配所有 HBM buffer
    // 运行时无需 malloc → 零动态分配开销

    size_t static_mem_size = 0;
    size_t weight_mem_size = 0;
    size_t workspace_size = 0;

    status = session->GetMemoryRequirement(
        static_mem_size,   // 输入输出 buffer
        weight_mem_size,   // 权重 buffer(常量)
        workspace_size     // 中间结果 buffer
    );

    // 预分配(一次 malloc,终身复用)
    void* static_buf = nullptr;
    void* weight_buf = nullptr;
    void* workspace_buf = nullptr;

    aclrtMalloc(&static_buf, static_mem_size, ACL_MEM_MALLOC_HUGE_FIRST);
    aclrtMalloc(&weight_buf, weight_mem_size, ACL_MEM_MALLOC_HUGE_FIRST);
    aclrtMalloc(&workspace_buf, workspace_size, ACL_MEM_MALLOC_HUGE_FIRST);

    // 加载权重到 NPU(从 OM 文件中反序列化)
    status = session->LoadWeights(weight_buf, weight_mem_size);

    // Step 6: 存储 session 上下文
    model_state_ = std::make_unique<GEModelState>();
    model_state_->session = session;
    model_state_->static_buf = static_buf;
    model_state_->weight_buf = weight_buf;
    model_state_->workspace_buf = workspace_buf;

    LOG_MESSAGE(TRITONSERVER_LOG_INFO,
        ("GE Backend loaded: " + model_path +
         ", static=" + std::to_string(static_mem_size / 1024.0 / 1024.0) + "MB"
         ", weights=" + std::to_string(weight_mem_size / 1024.0 / 1024.0) + "MB"
         ", workspace=" + std::to_string(workspace_size / 1024.0 / 1024.0) + "MB"
        ).c_str());

    return nullptr;  // Success
}


TRITONSERVER_Error* GEBackendInstance::Execute(
    TRITONBACKEND_Request** requests,
    const uint32_t request_count)
{
    // 批量执行: request_count 个请求合并为一个 batch

    // Step 1: 收集所有请求的输入 + 合并为 batch tensor
    std::vector<const char*> input_names;
    std::vector<TRITONSERVER_DataType> input_types;

    // 从 model config 获取输入描述
    RETURN_IF_ERROR(GetModelInputs(input_names, input_types));

    // Step 2: 为每个 input 构造 batch tensor
    for (size_t i = 0; i < input_names.size(); ++i) {
        size_t batch_byte_size = GetBatchByteSize(
            requests, request_count, input_names[i]);

        // 直接用预分配的 static_buf(零拷贝)
        void* input_buf = model_state_->static_buf + input_offset;

        // 将所有请求的该 input 复制到 batch buffer
        size_t offset = 0;
        for (uint32_t r = 0; r < request_count; ++r) {
            size_t req_size;
            const void* req_input;

            TRITONBACKEND_RequestInput(
                requests[r], input_names[i].c_str(),
                &req_input, &req_size);

            aclrtMemcpy(
                static_cast<char*>(input_buf) + offset,
                req_size, req_input, req_size,
                ACL_MEMCPY_HOST_TO_DEVICE);

            offset += req_size;
        }
    }

    // Step 3: GE Session Run(推理)
    auto status = model_state_->session->Run(
        model_state_->static_buf,
        model_state_->workspace_buf
    );

    // Step 4: 将结果拆分回各个请求
    for (uint32_t r = 0; r < request_count; ++r) {
        TRITONBACKEND_Response* response;
        TRITONBACKEND_ResponseNew(&response, requests[r]);

        // 从 batch output buffer 中提取该请求的输出
        TRITONBACKEND_Output* output;
        TRITONBACKEND_ResponseOutput(
            response, &output, "output",
            TRITONSERVER_TYPE_FP16,
            output_shape, output_dims_count);

        void* output_buf;
        TRITONBACKEND_OutputBuffer(
            output, &output_buf, output_size,
            &output_memory_type, &output_memory_type_id);

        // 从 batch buffer 复制到 output
        aclrtMemcpy(output_buf, output_size,
                    batch_output_ptr + r * output_size, output_size,
                    ACL_MEMCPY_DEVICE_TO_HOST);

        TRITONBACKEND_ResponseSend(
            response, TRITONSERVER_RESPONSE_COMPLETE_FINAL, nullptr);
    }

    return nullptr;  // Success
}

}}}  // namespace

踩坑:Dynamic Batching 的 Tensor shape 不匹配——用户 A 发来 [1, 3, 224, 224],用户 B 发来 [1, 3, 336, 336]

# ❌ torch.cat([tensor_A, tensor_B], dim=0) → RuntimeError
# shape[1:] 不一致: (3,224,224) vs (3,336,336)

# ✅ NBH(N-dim Batch Handler): 对变长输入做 padding
class RaggedBatchHandler:
    """
    Ragged Batch: 变长输入的自动 padding

    策略: 找到 max shape,其他 pad 到 max
    例: (1,3,224,224) + (1,3,336,336) → (2,3,336,336)
    """

    def merge_variable_shapes(self, tensors: List[torch.Tensor]):
        """合并变长 tensor(自动 padding)"""
        # 找最大 shape
        max_shape = [
            max(t.shape[d] for t in tensors)
            for d in range(len(tensors[0].shape))
        ]

        padded = []
        seq_lens = []

        for t in tensors:
            if list(t.shape) == max_shape:
                padded.append(t)
                seq_lens.append(t.shape[0] if t.dim() > 1 else 1)
            else:
                # Pad 到 max shape
                pad_dims = []
                for d in range(t.dim()):
                    pad_dims.extend([0, max_shape[d] - t.shape[d]])

                t_padded = torch.nn.functional.pad(t, pad_dims, value=0)
                padded.append(t_padded)

                # 记录真实长度(第一个维度)
                seq_lens.append(t.shape[0] if t.dim() > 1 else 1)

        merged = torch.cat(padded, dim=0)

        # 同时构造 mask(标记 padding 位置,模型应忽略)
        mask = torch.ones(len(tensors), max_shape[1:], device=merged.device)
        for i, t in enumerate(tensors):
            if t.dim() > 1:
                mask[i, t.shape[1]:] = 0

        return merged, seq_lens, mask

踩坑:GE Graph 静态内存规划导致多模型并发时 HBM 争抢——两个模型同时 load,都预分配了 8GB workspace

# ❌ 模型 A (15B params, 8GB workspace) + 模型 B (8B params, 6GB workspace)
# 同时加载 → 需要 14GB workspace → 但 HBM 只有 32GB(权重已占 18GB)
# → OOM

# ✅ 共享 workspace pool: 所有模型共享一个统一的 workspace buffer
class SharedWorkspacePool:
    """
    多模型共享 workspace 池
    只在推理时才占用,推理完释放
    """

    def __init__(self, total_workspace_hbm_gb=12):
        self.total_size = int(total_workspace_hbm_gb * 1024**3)
        self.workspace = torch.empty(
            self.total_size, dtype=torch.uint8, device="npu"
        )

        self.allocations = {}  # model_name → (offset, size)
        self.offset = 0
        self.lock = threading.Lock()

    def allocate(self, model_name: str, size: int) -> int:
        """为模型分配 workspace 空间(临时)"""
        with self.lock:
            if self.offset + size > self.total_size:
                raise RuntimeError(
                    f"Workspace OOM: {model_name} needs {size}, "
                    f"only {self.total_size - self.offset} remaining"
                )
            offset = self.offset
            self.offset += size
            self.allocations[model_name] = (offset, size)
            return offset

    def release(self, model_name: str):
        """释放模型占用的 workspace"""
        with self.lock:
            if model_name in self.allocations:
                del self.allocations[model_name]

            # 压缩: 回收被释放的空间
            if len(self.allocations) == 0:
                self.offset = 0

triton-inference-server-ge-backend 的部署方案:Dynamic Batcher 维护请求队列,preferred_batch_sizes [8,16,32] 策略在延迟和吞吐间权衡(100μs 超时攒 batch),GE Backend 在 load 阶段一次性静态分配 HBM(weight + workspace + io buffer,零运行时 malloc),Model Ensemble 多模型级联中间结果 HBM 直传省 CPU 往返。踩坑:变长输入 shape 不匹配→Ragged Batch 自动 padding + mask、多模型并发 workspace 争抢 14GB→共享池 12GB 按需分配、静态 shape 限制无法处理动态 batch→GE 动态 shape 输入支持。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐