小智mcp实现原理
MCP(Model Context Protocol,模型上下文协议)是一个标准化的协议,允许服务器向语言模型暴露可调用的工具(Tools)。查询数据库调用外部 API执行计算发送邮件控制设备方法方向说明initialize客户端 → 服务器建立连接,交换能力tools/list客户端 → 服务器获取工具列表tools/call客户端 → 服务器调用指定工具。
·
MCP-Calculator 项目实现原理详解
目录
1. 项目概述
1.1 什么是 MCP
MCP(Model Context Protocol,模型上下文协议)是一个标准化的协议,允许服务器向语言模型暴露可调用的工具(Tools)。这些工具使模型能够与外部系统交互:
- 查询数据库
- 调用外部 API
- 执行计算
- 发送邮件
- 控制设备
1.2 项目用途
MCP-Calculator 是 xiaozhi(小智AI)的 MCP 接入点示例项目,展示了如何:
- 创建 MCP 服务器并暴露工具
- 通过
mcp_pipe.py将本地 MCP 服务连接到 xiaozhi 云端 - 让语音终端能够调用这些工具
1.3 项目文件结构
mcp-calculator/
├── calculator.py # MCP 服务器实现(计算器工具)
├── mcp_pipe.py # WebSocket <-> stdio 桥接程序
├── mcp_config.json # 服务器配置文件
├── requirements.txt # 依赖列表
└── README.md # 项目说明
2. 整体架构
2.1 系统架构图
┌─────────────────────────────────────────────────────────────────────────────┐
│ xiaozhi 云端 │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ MCP Client (WebSocket) │ │
│ │ 负责:工具列表查询、工具调用请求、响应返回 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ WebSocket (wss://api.xiaozhi.me/) │
│ │ │
└────────────────────────────────────┼────────────────────────────────────┘
│
┌────────────────────────────────────┼────────────────────────────────────┐
│ 本地机器 │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ mcp_pipe.py │ │
│ │ │ │
│ │ 职责: │ │
│ │ 1. 连接 xiaozhi 云端 WebSocket │ │
│ │ 2. 启动并管理 calculator.py 子进程 │ │
│ │ 3. 在 WebSocket 和 stdio 之间转发消息 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ stdio (标准输入输出) │
│ │ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ calculator.py │ │
│ │ (MCP Server / FastMCP) │ │
│ │ │ │
│ │ 暴露工具:calculator(python_expression) -> dict │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
2.2 数据流向
语音终端 xiaozhi 云端 mcp_pipe.py
│ │ │
│ "1004乘328等于多少" │ │
│ ──────────────────────────────────► │ │
│ │ JSON-RPC: tools/call │
│ │ ─────────────────────────────────► │
│ │ │
│ │ JSON-RPC over WebSocket │
│ │ ◄─────────────────────────────────► │
│ │ │
│ │ JSON-RPC over stdio │
│ │ ─────────────────────────────────► │
│ │ │
│ │ │ ▼
│ │ calculator.py
│ │ │处理
│ │ │▼
│ │ ◄───────────────────────────────── │
│ "算好了,329312" │ JSON-RPC Response │
│ ◄───────────────────────────────── │ │
3. 核心组件
3.1 calculator.py - MCP 服务器
作用:使用 FastMCP 框架创建一个 MCP 服务器,暴露数学计算工具
关键代码:
from fastmcp import FastMCP
mcp = FastMCP("Calculator") # 创建名为 Calculator 的 MCP 服务器
@mcp.tool() # 装饰器:暴露一个工具
def calculator(python_expression: str) -> dict:
"""For mathamatical calculation, always use this tool..."""
result = eval(python_expression, {"math": math, "random": random})
return {"success": True, "result": result}
mcp.run(transport="stdio") # 使用 stdio 传输模式运行
FastMCP 框架职责:
- 自动处理 MCP 协议的消息解析
- 管理工具注册表
- 处理
initialize、tools/list、tools/call等方法 - 通过 stdio 接收请求、返回响应
3.2 mcp_pipe.py - 桥接代理
作用:将 WebSocket 连接和本地 stdio 进程连接起来
关键职责:
- 连接云端:作为 WebSocket 客户端连接到 xiaozhi
- 启动子进程:启动
calculator.py作为子进程 - 双向转发:
- WebSocket → 子进程 stdin
- 子进程 stdout → WebSocket
- 子进程 stderr → 终端输出
3.3 mcp_config.json - 配置中心
作用:定义多个 MCP 服务器配置
{
"mcpServers": {
"local-stdio-calculator": { // 服务器名称
"type": "stdio", // 传输类型:stdio/sse/http
"command": "python", // 启动命令
"args": ["-m", "calculator"] // 命令参数
},
"remote-sse-server": { // 另一个服务器
"type": "sse",
"url": "https://api.example.com/sse",
"disabled": true // 禁用
}
}
}
4. 通信协议详解
4.1 MCP 协议概述
MCP 基于 JSON-RPC 2.0 协议,有三种核心方法:
| 方法 | 方向 | 说明 |
|---|---|---|
initialize |
客户端 → 服务器 | 建立连接,交换能力 |
tools/list |
客户端 → 服务器 | 获取工具列表 |
tools/call |
客户端 → 服务器 | 调用指定工具 |
4.2 JSON-RPC 消息格式
请求格式:
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "calculator",
"arguments": {
"python_expression": "1004 * 328"
}
}
}
响应格式:
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "{\"success\": true, \"result\": 329312}"
}
],
"isError": false
}
}
错误格式:
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32600,
"message": "Invalid Request"
}
}
5. calculator.py 实现原理
5.1 代码逐行解析
# 1. 导入 FastMCP 框架
from fastmcp import FastMCP
import sys
import logging
logger = logging.getLogger('Calculator') # 创建日志记录器
# 2. Windows 编码修复(跨平台兼容)
if sys.platform == 'win32':
sys.stderr.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
# 3. 导入数学库供计算使用
import math
import random
# 4. 创建 MCP 服务器实例
mcp = FastMCP("Calculator")
# 5. 定义工具(使用装饰器)
@mcp.tool()
def calculator(python_expression: str) -> dict:
"""
工具描述文档(docstring)
重要性:AI 根据这个描述决定何时调用工具
"""
# 6. 安全执行用户输入的数学表达式
# 关键:只注入 math 和 random 两个模块,防止恶意代码
result = eval(python_expression, {"math": math, "random": random})
logger.info(f"Calculating formula: {python_expression}, result: {result}")
# 7. 返回结构化结果
return {"success": True, "result": result}
# 8. 启动服务器(stdio 模式)
if __name__ == "__main__":
mcp.run(transport="stdio")
5.2 @mcp.tool() 装饰器原理
用户代码 FastMCP 框架
│ │
▼ │
@mcp.tool() ──────────► 注册到工具表
│ {
│ "calculator": {
│ "name": "calculator",
│ "description": "...",
│ "callback": func,
│ "schema": {...}
│ }
│ }
│ │
▼ │
函数定义 框架自动生成 JSON Schema
5.3 FastMCP 自动生成的工具 JSON
当收到 tools/list 请求时,FastMCP 自动返回:
{
"tools": [
{
"name": "calculator",
"description": "For mathamatical calculation, always use this tool to calculate the result of a python expression. You can use 'math' or 'random' directly, without 'import'.",
"inputSchema": {
"type": "object",
"properties": {
"python_expression": {
"type": "string",
"description": "..."
}
},
"required": ["python_expression"]
}
}
]
}
5.4 stdio 传输模式原理
┌─────────────────────────────────────────────────────────────┐
│ calculator.py │
│ │
│ stdin ◄────────────────────────────────────────────────── │
│ JSON-RPC 请求 (来自 mcp_pipe.py) │
│ │
│ 框架处理 │
│ │ │
│ ▼ │
│ 调用 calculator() 函数 │
│ │ │
│ ▼ │
│ stdout ───────────────────────────────────────────────────► │
│ JSON-RPC 响应 (来自 mcp_pipe.py) │
└─────────────────────────────────────────────────────────────┘
为什么用 stdio:
- 简单:不需要网络编程,子进程通过标准输入输出通信
- 隔离:每个 MCP 服务器是独立进程,互不影响
- 安全:进程级隔离,出了故障只会影响单个工具
6. mcp_pipe.py 实现原理
6.1 核心函数概览
mcp_pipe.py
├── connect_with_retry() # 带重试的 WebSocket 连接
├── connect_to_server() # 连接并管理子进程
├── pipe_websocket_to_process() # WebSocket → stdin
├── pipe_process_to_websocket() # stdout → WebSocket
├── pipe_process_stderr_to_terminal() # stderr → 终端
├── build_server_command() # 构建子进程启动命令
├── load_config() # 加载配置文件
└── signal_handler() # 信号处理(优雅退出)
6.2 主流程图
┌────────────────────────────────────────────────────────────────┐
│ _main() │
│ │
│ 1. 读取 MCP_ENDPOINT 环境变量 │
│ 2. 判断启动模式: │
│ ├─ 有命令行参数 ───► 运行单个脚本 (calculator.py) │
│ └─ 无参数 ───────► 加载 mcp_config.json,启动所有服务 │
│ │
└────────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ connect_with_retry() │
│ │
│ while True: │
│ try: │
│ connect_to_server(uri, target) ─────────────────────────┐ │
│ except: │ │
│ reconnect_attempt++ │ │ │
│ backoff = min(backoff * 2, MAX_BACKOFF) │ 指数退避 │ │
│ await asyncio.sleep(backoff) ─────────┴─────────────────┘ │
└────────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ connect_to_server() │
│ │
│ 1. await websockets.connect(uri) ──── WebSocket 客户端 │
│ │
│ 2. subprocess.Popen(cmd) ─────────── 启动子进程 │
│ ├─ stdin = PIPE │
│ ├─ stdout = PIPE │
│ └─ stderr = PIPE │
│ │
│ 3. asyncio.gather( │
│ pipe_websocket_to_process(), ─── WebSocket → stdin │
│ pipe_process_to_websocket(), ─── stdout → WebSocket │
│ pipe_process_stderr_to_terminal() ─── stderr → 终端 │
│ ) │
└────────────────────────────────────────────────────────────────┘
6.3 pipe_websocket_to_process() 详解
作用:读取 WebSocket 消息,转发到子进程 stdin
async def pipe_websocket_to_process(websocket, process, target):
try:
while True:
# 1. 从 WebSocket 接收消息(阻塞)
message = await websocket.recv()
logger.debug(f"[{target}] << {message[:120]}...")
# 2. 字节转字符串(如需要)
if isinstance(message, bytes):
message = message.decode('utf-8')
# 3. 写入子进程 stdin
process.stdin.write(message + '\n') # JSON-RPC 请求
process.stdin.flush() # 立即发送
except Exception as e:
logger.error(f"[{target}] Error in WebSocket to process pipe: {e}")
raise
数据流:
xiaozhi 云端 mcp_pipe.py calculator.py
│ │ │
│ JSON-RPC Request │ │
│ {"jsonrpc":"2.0",...} │ │
│ ──────────────────────────────────► │ │
│ │ │
│ │ message = await websocket.recv()
│ │ process.stdin.write(message)
│ │ ────────────────────────────► │
│ │ │
│ │ │ JSON-RPC Request
│ │ │ {"jsonrpc":"2.0",...}
│ │ │ ▼
│ │ │ FastMCP 框架解析
6.4 pipe_process_to_websocket() 详解
作用:读取子进程 stdout,转发到 WebSocket
async def pipe_process_to_websocket(process, websocket, target):
try:
while True:
# 1. 从子进程 stdout 读取一行(阻塞)
data = await asyncio.to_thread(process.stdout.readline)
if not data: # EOF 表示进程结束
logger.info(f"[{target}] Process has ended output")
break
# 2. 发送到 WebSocket
logger.debug(f"[{target}] >> {data[:120]}...")
await websocket.send(data)
except Exception as e:
logger.error(f"[{target}] Error in process to WebSocket pipe: {e}")
raise
数据流:
calculator.py mcp_pipe.py xiaozhi 云端
│ │ │
│ JSON-RPC Response │ │
│ {"jsonrpc":"2.0",...} │ │
│ ◄────────────────────────────── │ │
│ │ │
│ │ data = process.stdout.readline()
│ │ ◄────────────────────────────── │
│ │ │
│ │ await websocket.send(data) │
│ │ ──────────────────────────────► │
│ │ │
│ │ │ JSON-RPC Response
6.5 重连机制详解
指数退避算法:
INITIAL_BACKOFF = 1 # 初始等待 1 秒
MAX_BACKOFF = 600 # 最大等待 10 分钟
# 退避序列:1s → 2s → 4s → 8s → 16s → 32s → 64s → 128s → 256s → 512s → 600s
触发场景:
- WebSocket 连接被断开
- 子进程异常退出
- 网络抖动
7. MCP 协议消息格式
7.1 initialize - 建立连接
客户端发送:
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "xiaozhi-agent",
"version": "1.0.0"
}
}
}
服务器响应:
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": "Calculator",
"version": "3.2.4"
}
}
}
7.2 tools/list - 获取工具列表
客户端发送:
{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
服务器响应:
{
"jsonrpc": "2.0",
"id": 2,
"result": {
"tools": [
{
"name": "calculator",
"description": "For mathamatical calculation...",
"inputSchema": {
"type": "object",
"properties": {
"python_expression": {
"type": "string",
"description": "The Python expression to evaluate"
}
},
"required": ["python_expression"]
}
}
]
}
}
7.3 tools/call - 调用工具
客户端发送:
{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "calculator",
"arguments": {
"python_expression": "1004 * 328"
}
}
}
服务器响应:
{
"jsonrpc": "2.0",
"id": 3,
"result": {
"content": [
{
"type": "text",
"text": "{\"success\": true, \"result\": 329312}"
}
],
"isError": false
}
}
8. 运行流程解析
8.1 完整启动流程
┌──────────────────────────────────────────────────────────────────────────┐
│ 用户执行命令 │
│ python mcp_pipe.py calculator.py │
└──────────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ mcp_pipe.py: main() │
│ │
│ 1. signal.signal(SIGINT, signal_handler) ─── 注册信号处理器 │
│ 2. endpoint_url = os.environ.get('MCP_ENDPOINT') │
│ └─ 获取 wss://api.xiaozhi.me/mcp/?token=xxx │
│ 3. target_arg = sys.argv[1] │
│ └─ "calculator.py" │
│ 4. asyncio.run(_main()) │
└──────────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ _main() │
│ │
│ asyncio.run(connect_with_retry(endpoint_url, "calculator.py")) │
│ │
│ connect_with_retry(uri, target): │
│ while True: │
│ try: │
│ connect_to_server(uri, target) ─── 尝试连接 │
│ except: │
│ reconnect_attempt++ │
│ backoff = min(backoff * 2, MAX_BACKOFF) │
│ await asyncio.sleep(backoff) ─── 等待后重试 │
└──────────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ connect_to_server(uri, target) │
│ │
│ 1. async with websockets.connect(uri) │
│ └─ WebSocket 连接成功 │
│ │
│ 2. cmd, env = build_server_command("calculator.py") │
│ └─ [sys.executable, "calculator.py"], os.environ.copy() │
│ │
│ 3. subprocess.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) │
│ └─ 启动 calculator.py 子进程 │
│ │
│ 4. asyncio.gather( │
│ pipe_websocket_to_process(...), ─── 启动 3 个协程 │
│ pipe_process_to_websocket(...), ─── │
│ pipe_process_stderr_to_terminal(...) ─── │
│ ) │
└──────────────────────────────────────────────────────────────────────────┘
8.2 工具调用完整流程
┌─────────────────────────────────────────────────────────────────────────┐
│ Step 1: xiaozhi 云端发送 initialize │
│ │
│ WebSocket ────────────────────────────────────────────────────────────► │
│ {"jsonrpc":"2.0","method":"initialize",...} │
│ │
│ calculator.py (子进程) │
│ │ │
│ ▼ │
│ FastMCP 框架处理 │
│ │ │
│ ▼ │
│ stdout ────────────────────────────────────────────────────────────────► │
│ {"jsonrpc":"2.0","result":{"capabilities":{...}}} │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ Step 2: xiaozhi 云端发送 tools/list │
│ │
│ WebSocket ────────────────────────────────────────────────────────────► │
│ {"jsonrpc":"2.0","method":"tools/list",...} │
│ │
│ calculator.py │
│ │ │
│ ▼ │
│ FastMCP 返回工具列表 │
│ │ │
│ ▼ │
│ stdout ────────────────────────────────────────────────────────────────► │
│ {"jsonrpc":"2.0","result":{"tools":[...]}} │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ Step 3: xiaozhi 云端发送 tools/call (计算请求) │
│ │
│ WebSocket ────────────────────────────────────────────────────────────► │
│ {"jsonrpc":"2.0","method":"tools/call", │
│ "params":{"name":"calculator", │
│ "arguments":{"python_expression":"1004*328"}}} │
│ │
│ calculator.py │
│ │ │
│ ▼ │
│ @mcp.tool() decorated function │
│ │ │
│ ▼ │
│ result = eval("1004*328", {...}) = 329312 │
│ │ │
│ ▼ │
│ stdout ────────────────────────────────────────────────────────────────► │
│ {"jsonrpc":"2.0","result":{"content":[{"text":"..."}]}} │
└─────────────────────────────────────────────────────────────────────────┘
9. 配置驱动机制
9.1 mcp_config.json 详解
{
"mcpServers": {
"local-stdio-calculator": { // 服务名称(可自定义)
"type": "stdio", // 传输类型
"command": "python", // 启动命令
"args": ["-m", "calculator"] // 参数(作为模块运行)
},
"remote-sse-server": {
"type": "sse", // SSE 类型的远程服务
"url": "https://api.example.com/sse",
"disabled": true // 禁用(不启动)
},
"remote-http-server": {
"type": "http", // HTTP 类型
"url": "https://api.example.com/mcp",
"disabled": true
}
}
}
9.2 配置优先级
命令行参数 ─────────────► 直接运行脚本
│ │
│ (如 calculator.py) │
▼ ▼
使用 target 作为脚本路径 使用配置文件 mcp_config.json
│
├─► MCP_CONFIG 环境变量指向的文件
│
└─► ./mcp_config.json
9.3 多服务并发启动
当 mcp_pipe.py 不带参数运行时:
cfg = load_config()
servers_cfg = cfg.get("mcpServers", {})
# 启动所有未禁用的服务
tasks = [asyncio.create_task(connect_with_retry(endpoint_url, t))
for t in enabled_servers]
await asyncio.gather(*tasks) # 并发运行所有服务
附录:关键概念速查
| 概念 | 说明 |
|---|---|
| FastMCP | 基于 MCP 协议的 Python 框架,简化工具开发 |
| stdio | 标准输入输出,用于父子进程通信 |
| WebSocket | 双向实时通信协议 |
| JSON-RPC 2.0 | 轻量级远程过程调用协议 |
| mcp_pipe.py | WebSocket ↔ stdio 桥接程序 |
| @mcp.tool() | 装饰器,用于暴露工具函数 |
参考链接
更多推荐




所有评论(0)