MCP-Calculator 项目实现原理详解

目录

  1. 项目概述
  2. 整体架构
  3. 核心组件
  4. 通信协议详解
  5. calculator.py 实现原理
  6. mcp_pipe.py 实现原理
  7. MCP 协议消息格式
  8. 运行流程解析
  9. 配置驱动机制

1. 项目概述

1.1 什么是 MCP

MCP(Model Context Protocol,模型上下文协议)是一个标准化的协议,允许服务器向语言模型暴露可调用的工具(Tools)。这些工具使模型能够与外部系统交互:

  • 查询数据库
  • 调用外部 API
  • 执行计算
  • 发送邮件
  • 控制设备

1.2 项目用途

MCP-Calculator 是 xiaozhi(小智AI)的 MCP 接入点示例项目,展示了如何:

  1. 创建 MCP 服务器并暴露工具
  2. 通过 mcp_pipe.py 将本地 MCP 服务连接到 xiaozhi 云端
  3. 让语音终端能够调用这些工具

1.3 项目文件结构

mcp-calculator/
├── calculator.py        # MCP 服务器实现(计算器工具)
├── mcp_pipe.py         # WebSocket <-> stdio 桥接程序
├── mcp_config.json     # 服务器配置文件
├── requirements.txt    # 依赖列表
└── README.md          # 项目说明

2. 整体架构

2.1 系统架构图

┌─────────────────────────────────────────────────────────────────────────────┐
│                              xiaozhi 云端                                 │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                    MCP Client (WebSocket)                          │   │
│  │         负责:工具列表查询、工具调用请求、响应返回                   │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                    ▲                                     │
│                                    │ WebSocket (wss://api.xiaozhi.me/)    │
│                                    │                                     │
└────────────────────────────────────┼────────────────────────────────────┘
                                     │
┌────────────────────────────────────┼────────────────────────────────────┐
│           本地机器                  │                                     │
│                                    ▼                                     │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                         mcp_pipe.py                                  │   │
│  │                                                                      │   │
│  │  职责:                                                               │   │
│  │  1. 连接 xiaozhi 云端 WebSocket                                        │   │
│  │  2. 启动并管理 calculator.py 子进程                                     │   │
│  │  3. 在 WebSocket 和 stdio 之间转发消息                                  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                    ▲                                     │
│                                    │ stdio (标准输入输出)                  │
│                                    │                                     │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                      calculator.py                                   │   │
│  │                   (MCP Server / FastMCP)                             │   │
│  │                                                                      │   │
│  │  暴露工具:calculator(python_expression) -> dict                    │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

2.2 数据流向

语音终端                             xiaozhi 云端                          mcp_pipe.py
   │                                    │                                     │
   │  "1004乘328等于多少"                │                                     │
   │ ──────────────────────────────────► │                                     │
   │                                    │ JSON-RPC: tools/call                │
   │                                    │ ─────────────────────────────────► │
   │                                    │                                     │
   │                                    │        JSON-RPC over WebSocket      │
   │                                    │ ◄─────────────────────────────────► │
   │                                    │                                     │
   │                                    │        JSON-RPC over stdio          │
   │                                    │ ─────────────────────────────────► │
   │                                    │                                     │
   │                                    │                                     │ ▼
   │                                    │                              calculator.py
   │                                    │                                     │处理
   │                                    │                                     │▼
   │                                    │ ◄───────────────────────────────── │
   │  "算好了,329312"                  │ JSON-RPC Response                   │
   │ ◄───────────────────────────────── │                                     │

3. 核心组件

3.1 calculator.py - MCP 服务器

作用:使用 FastMCP 框架创建一个 MCP 服务器,暴露数学计算工具

关键代码

from fastmcp import FastMCP

mcp = FastMCP("Calculator")  # 创建名为 Calculator 的 MCP 服务器

@mcp.tool()  # 装饰器:暴露一个工具
def calculator(python_expression: str) -> dict:
    """For mathamatical calculation, always use this tool..."""
    result = eval(python_expression, {"math": math, "random": random})
    return {"success": True, "result": result}

mcp.run(transport="stdio")  # 使用 stdio 传输模式运行

FastMCP 框架职责

  1. 自动处理 MCP 协议的消息解析
  2. 管理工具注册表
  3. 处理 initializetools/listtools/call 等方法
  4. 通过 stdio 接收请求、返回响应

3.2 mcp_pipe.py - 桥接代理

作用:将 WebSocket 连接和本地 stdio 进程连接起来

关键职责

  1. 连接云端:作为 WebSocket 客户端连接到 xiaozhi
  2. 启动子进程:启动 calculator.py 作为子进程
  3. 双向转发
    • WebSocket → 子进程 stdin
    • 子进程 stdout → WebSocket
    • 子进程 stderr → 终端输出

3.3 mcp_config.json - 配置中心

作用:定义多个 MCP 服务器配置

{
  "mcpServers": {
    "local-stdio-calculator": {  // 服务器名称
      "type": "stdio",           // 传输类型:stdio/sse/http
      "command": "python",       // 启动命令
      "args": ["-m", "calculator"]  // 命令参数
    },
    "remote-sse-server": {      // 另一个服务器
      "type": "sse",
      "url": "https://api.example.com/sse",
      "disabled": true           // 禁用
    }
  }
}

4. 通信协议详解

4.1 MCP 协议概述

MCP 基于 JSON-RPC 2.0 协议,有三种核心方法:

方法 方向 说明
initialize 客户端 → 服务器 建立连接,交换能力
tools/list 客户端 → 服务器 获取工具列表
tools/call 客户端 → 服务器 调用指定工具

4.2 JSON-RPC 消息格式

请求格式

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "calculator",
    "arguments": {
      "python_expression": "1004 * 328"
    }
  }
}

响应格式

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"success\": true, \"result\": 329312}"
      }
    ],
    "isError": false
  }
}

错误格式

{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32600,
    "message": "Invalid Request"
  }
}

5. calculator.py 实现原理

5.1 代码逐行解析

# 1. 导入 FastMCP 框架
from fastmcp import FastMCP
import sys
import logging

logger = logging.getLogger('Calculator')  # 创建日志记录器

# 2. Windows 编码修复(跨平台兼容)
if sys.platform == 'win32':
    sys.stderr.reconfigure(encoding='utf-8')
    sys.stdout.reconfigure(encoding='utf-8')

# 3. 导入数学库供计算使用
import math
import random

# 4. 创建 MCP 服务器实例
mcp = FastMCP("Calculator")

# 5. 定义工具(使用装饰器)
@mcp.tool()
def calculator(python_expression: str) -> dict:
    """
    工具描述文档(docstring)
    重要性:AI 根据这个描述决定何时调用工具
    """
    # 6. 安全执行用户输入的数学表达式
    # 关键:只注入 math 和 random 两个模块,防止恶意代码
    result = eval(python_expression, {"math": math, "random": random})

    logger.info(f"Calculating formula: {python_expression}, result: {result}")

    # 7. 返回结构化结果
    return {"success": True, "result": result}

# 8. 启动服务器(stdio 模式)
if __name__ == "__main__":
    mcp.run(transport="stdio")

5.2 @mcp.tool() 装饰器原理

用户代码                    FastMCP 框架
   │                            │
   ▼                            │
@mcp.tool() ──────────► 注册到工具表
   │                  {
   │                    "calculator": {
   │                      "name": "calculator",
   │                      "description": "...",
   │                      "callback": func,
   │                      "schema": {...}
   │                    }
   │                  }
   │                            │
   ▼                            │
函数定义              框架自动生成 JSON Schema

5.3 FastMCP 自动生成的工具 JSON

当收到 tools/list 请求时,FastMCP 自动返回:

{
  "tools": [
    {
      "name": "calculator",
      "description": "For mathamatical calculation, always use this tool to calculate the result of a python expression. You can use 'math' or 'random' directly, without 'import'.",
      "inputSchema": {
        "type": "object",
        "properties": {
          "python_expression": {
            "type": "string",
            "description": "..."
          }
        },
        "required": ["python_expression"]
      }
    }
  ]
}

5.4 stdio 传输模式原理

┌─────────────────────────────────────────────────────────────┐
│                      calculator.py                         │
│                                                              │
│  stdin  ◄────────────────────────────────────────────────── │
│          JSON-RPC 请求 (来自 mcp_pipe.py)                     │
│                                                              │
│  框架处理                                                    │
│    │                                                        │
│    ▼                                                        │
│  调用 calculator() 函数                                      │
│    │                                                        │
│    ▼                                                        │
│  stdout ───────────────────────────────────────────────────► │
│          JSON-RPC 响应 (来自 mcp_pipe.py)                     │
└─────────────────────────────────────────────────────────────┘

为什么用 stdio

  1. 简单:不需要网络编程,子进程通过标准输入输出通信
  2. 隔离:每个 MCP 服务器是独立进程,互不影响
  3. 安全:进程级隔离,出了故障只会影响单个工具

6. mcp_pipe.py 实现原理

6.1 核心函数概览

mcp_pipe.py
├── connect_with_retry()      # 带重试的 WebSocket 连接
├── connect_to_server()        # 连接并管理子进程
├── pipe_websocket_to_process()  # WebSocket → stdin
├── pipe_process_to_websocket()   # stdout → WebSocket
├── pipe_process_stderr_to_terminal()  # stderr → 终端
├── build_server_command()    # 构建子进程启动命令
├── load_config()             # 加载配置文件
└── signal_handler()          # 信号处理(优雅退出)

6.2 主流程图

┌────────────────────────────────────────────────────────────────┐
│                         _main()                                 │
│                                                                  │
│  1. 读取 MCP_ENDPOINT 环境变量                                    │
│  2. 判断启动模式:                                                │
│     ├─ 有命令行参数 ───► 运行单个脚本 (calculator.py)              │
│     └─ 无参数 ───────► 加载 mcp_config.json,启动所有服务         │
│                                                                  │
└────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌────────────────────────────────────────────────────────────────┐
│                   connect_with_retry()                           │
│                                                                  │
│  while True:                                                    │
│    try:                                                         │
│      connect_to_server(uri, target)  ─────────────────────────┐ │
│    except:                                                     │ │
│      reconnect_attempt++                    │                   │ │
│      backoff = min(backoff * 2, MAX_BACKOFF) │ 指数退避        │ │
│      await asyncio.sleep(backoff) ─────────┴─────────────────┘ │
└────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌────────────────────────────────────────────────────────────────┐
│                      connect_to_server()                         │
│                                                                  │
│  1. await websockets.connect(uri)  ──── WebSocket 客户端         │
│                                                                  │
│  2. subprocess.Popen(cmd)  ─────────── 启动子进程               │
│     ├─ stdin = PIPE                                        │
│     ├─ stdout = PIPE                                       │
│     └─ stderr = PIPE                                       │
│                                                                  │
│  3. asyncio.gather(                                           │
│       pipe_websocket_to_process(),  ─── WebSocket → stdin       │
│       pipe_process_to_websocket(),   ─── stdout → WebSocket     │
│       pipe_process_stderr_to_terminal()  ─── stderr → 终端      │
│     )                                                          │
└────────────────────────────────────────────────────────────────┘

6.3 pipe_websocket_to_process() 详解

作用:读取 WebSocket 消息,转发到子进程 stdin

async def pipe_websocket_to_process(websocket, process, target):
    try:
        while True:
            # 1. 从 WebSocket 接收消息(阻塞)
            message = await websocket.recv()
            logger.debug(f"[{target}] << {message[:120]}...")

            # 2. 字节转字符串(如需要)
            if isinstance(message, bytes):
                message = message.decode('utf-8')

            # 3. 写入子进程 stdin
            process.stdin.write(message + '\n')  # JSON-RPC 请求
            process.stdin.flush()               # 立即发送
    except Exception as e:
        logger.error(f"[{target}] Error in WebSocket to process pipe: {e}")
        raise

数据流

xiaozhi 云端                          mcp_pipe.py                    calculator.py
   │                                    │                              │
   │ JSON-RPC Request                   │                              │
   │ {"jsonrpc":"2.0",...}              │                              │
   │ ──────────────────────────────────► │                              │
   │                                    │                              │
   │                                    │ message = await websocket.recv()
   │                                    │ process.stdin.write(message)
   │                                    │ ────────────────────────────► │
   │                                    │                              │
   │                                    │                              │ JSON-RPC Request
   │                                    │                              │ {"jsonrpc":"2.0",...}
   │                                    │                              │ ▼
   │                                    │                              │ FastMCP 框架解析

6.4 pipe_process_to_websocket() 详解

作用:读取子进程 stdout,转发到 WebSocket

async def pipe_process_to_websocket(process, websocket, target):
    try:
        while True:
            # 1. 从子进程 stdout 读取一行(阻塞)
            data = await asyncio.to_thread(process.stdout.readline)

            if not data:  # EOF 表示进程结束
                logger.info(f"[{target}] Process has ended output")
                break

            # 2. 发送到 WebSocket
            logger.debug(f"[{target}] >> {data[:120]}...")
            await websocket.send(data)
    except Exception as e:
        logger.error(f"[{target}] Error in process to WebSocket pipe: {e}")
        raise

数据流

calculator.py                     mcp_pipe.py                    xiaozhi 云端
   │                               │                              │
   │ JSON-RPC Response             │                              │
   │ {"jsonrpc":"2.0",...}          │                              │
   │ ◄────────────────────────────── │                              │
   │                               │                              │
   │                               │ data = process.stdout.readline()
   │                               │ ◄────────────────────────────── │
   │                               │                              │
   │                               │ await websocket.send(data)     │
   │                               │ ──────────────────────────────► │
   │                               │                              │
   │                               │                              │ JSON-RPC Response

6.5 重连机制详解

指数退避算法

INITIAL_BACKOFF = 1    # 初始等待 1 秒
MAX_BACKOFF = 600      # 最大等待 10 分钟

# 退避序列:1s → 2s → 4s → 8s → 16s → 32s → 64s → 128s → 256s → 512s → 600s

触发场景

  1. WebSocket 连接被断开
  2. 子进程异常退出
  3. 网络抖动

7. MCP 协议消息格式

7.1 initialize - 建立连接

客户端发送

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {},
    "clientInfo": {
      "name": "xiaozhi-agent",
      "version": "1.0.0"
    }
  }
}

服务器响应

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "2024-11-05",
    "capabilities": {"tools": {}},
    "serverInfo": {
      "name": "Calculator",
      "version": "3.2.4"
    }
  }
}

7.2 tools/list - 获取工具列表

客户端发送

{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/list",
  "params": {}
}

服务器响应

{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "tools": [
      {
        "name": "calculator",
        "description": "For mathamatical calculation...",
        "inputSchema": {
          "type": "object",
          "properties": {
            "python_expression": {
              "type": "string",
              "description": "The Python expression to evaluate"
            }
          },
          "required": ["python_expression"]
        }
      }
    ]
  }
}

7.3 tools/call - 调用工具

客户端发送

{
  "jsonrpc": "2.0",
  "id": 3,
  "method": "tools/call",
  "params": {
    "name": "calculator",
    "arguments": {
      "python_expression": "1004 * 328"
    }
  }
}

服务器响应

{
  "jsonrpc": "2.0",
  "id": 3,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"success\": true, \"result\": 329312}"
      }
    ],
    "isError": false
  }
}

8. 运行流程解析

8.1 完整启动流程

┌──────────────────────────────────────────────────────────────────────────┐
│                         用户执行命令                                      │
│                    python mcp_pipe.py calculator.py                       │
└──────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌──────────────────────────────────────────────────────────────────────────┐
│  mcp_pipe.py: main()                                                     │
│                                                                          │
│  1. signal.signal(SIGINT, signal_handler)  ─── 注册信号处理器             │
│  2. endpoint_url = os.environ.get('MCP_ENDPOINT')                       │
│     └─ 获取 wss://api.xiaozhi.me/mcp/?token=xxx                          │
│  3. target_arg = sys.argv[1]                                            │
│     └─ "calculator.py"                                                  │
│  4. asyncio.run(_main())                                                │
└──────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌──────────────────────────────────────────────────────────────────────────┐
│  _main()                                                                 │
│                                                                          │
│  asyncio.run(connect_with_retry(endpoint_url, "calculator.py"))          │
│                                                                          │
│  connect_with_retry(uri, target):                                       │
│    while True:                                                           │
│      try:                                                                │
│        connect_to_server(uri, target)  ─── 尝试连接                       │
│      except:                                                             │
│        reconnect_attempt++                                               │
│        backoff = min(backoff * 2, MAX_BACKOFF)                           │
│        await asyncio.sleep(backoff)  ─── 等待后重试                       │
└──────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌──────────────────────────────────────────────────────────────────────────┐
│  connect_to_server(uri, target)                                        │
│                                                                          │
│  1. async with websockets.connect(uri)                                 │
│     └─ WebSocket 连接成功                                               │
│                                                                          │
│  2. cmd, env = build_server_command("calculator.py")                    │
│     └─ [sys.executable, "calculator.py"], os.environ.copy()             │
│                                                                          │
│  3. subprocess.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)         │
│     └─ 启动 calculator.py 子进程                                         │
│                                                                          │
│  4. asyncio.gather(                                                    │
│       pipe_websocket_to_process(...),  ─── 启动 3 个协程                 │
│       pipe_process_to_websocket(...),   ───                              │
│       pipe_process_stderr_to_terminal(...)  ───                         │
│     )                                                                   │
└──────────────────────────────────────────────────────────────────────────┘

8.2 工具调用完整流程

┌─────────────────────────────────────────────────────────────────────────┐
│ Step 1: xiaozhi 云端发送 initialize                                      │
│                                                                         │
│ WebSocket ────────────────────────────────────────────────────────────► │
│            {"jsonrpc":"2.0","method":"initialize",...}                  │
│                                                                         │
│                      calculator.py (子进程)                             │
│                              │                                          │
│                              ▼                                          │
│                      FastMCP 框架处理                                   │
│                              │                                          │
│                              ▼                                          │
│ stdout ────────────────────────────────────────────────────────────────► │
│         {"jsonrpc":"2.0","result":{"capabilities":{...}}}              │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│ Step 2: xiaozhi 云端发送 tools/list                                    │
│                                                                         │
│ WebSocket ────────────────────────────────────────────────────────────► │
│            {"jsonrpc":"2.0","method":"tools/list",...}                 │
│                                                                         │
│                      calculator.py                                     │
│                              │                                          │
│                              ▼                                          │
│                      FastMCP 返回工具列表                              │
│                              │                                          │
│                              ▼                                          │
│ stdout ────────────────────────────────────────────────────────────────► │
│         {"jsonrpc":"2.0","result":{"tools":[...]}}                    │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│ Step 3: xiaozhi 云端发送 tools/call (计算请求)                         │
│                                                                         │
│ WebSocket ────────────────────────────────────────────────────────────► │
│            {"jsonrpc":"2.0","method":"tools/call",                     │
│             "params":{"name":"calculator",                              │
│                       "arguments":{"python_expression":"1004*328"}}}   │
│                                                                         │
│                      calculator.py                                     │
│                              │                                          │
│                              ▼                                          │
│                      @mcp.tool() decorated function                     │
│                              │                                          │
│                              ▼                                          │
│                      result = eval("1004*328", {...}) = 329312         │
│                              │                                          │
│                              ▼                                          │
│ stdout ────────────────────────────────────────────────────────────────► │
│         {"jsonrpc":"2.0","result":{"content":[{"text":"..."}]}}        │
└─────────────────────────────────────────────────────────────────────────┘

9. 配置驱动机制

9.1 mcp_config.json 详解

{
  "mcpServers": {
    "local-stdio-calculator": {   // 服务名称(可自定义)
      "type": "stdio",            // 传输类型
      "command": "python",        // 启动命令
      "args": ["-m", "calculator"] // 参数(作为模块运行)
    },
    "remote-sse-server": {
      "type": "sse",              // SSE 类型的远程服务
      "url": "https://api.example.com/sse",
      "disabled": true            // 禁用(不启动)
    },
    "remote-http-server": {
      "type": "http",             // HTTP 类型
      "url": "https://api.example.com/mcp",
      "disabled": true
    }
  }
}

9.2 配置优先级

命令行参数 ─────────────► 直接运行脚本
    │                         │
    │ (如 calculator.py)       │
    ▼                         ▼
   使用 target 作为脚本路径   使用配置文件 mcp_config.json
                              │
                              ├─► MCP_CONFIG 环境变量指向的文件
                              │
                              └─► ./mcp_config.json

9.3 多服务并发启动

mcp_pipe.py 不带参数运行时:

cfg = load_config()
servers_cfg = cfg.get("mcpServers", {})

# 启动所有未禁用的服务
tasks = [asyncio.create_task(connect_with_retry(endpoint_url, t))
         for t in enabled_servers]

await asyncio.gather(*tasks)  # 并发运行所有服务


附录:关键概念速查

概念 说明
FastMCP 基于 MCP 协议的 Python 框架,简化工具开发
stdio 标准输入输出,用于父子进程通信
WebSocket 双向实时通信协议
JSON-RPC 2.0 轻量级远程过程调用协议
mcp_pipe.py WebSocket ↔ stdio 桥接程序
@mcp.tool() 装饰器,用于暴露工具函数

参考链接

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐