小智AI模型接入MCP
ESP32接入小智AI的MCP架构。
·
ESP32接入小智AI的MCP架构
- 整体架构图
ESP32设备 ←→ MCP服务器 ←→ 小智AI云端 ←→ 用户语音终端
↑ ↑ ↑ ↑
本地GPIO 工具转换 AI模型理解 语音识别
音量控制 协议适配 工具调用 语音合成 - MCP服务器实现
基于小智AI的要求,需要创建一个MCP服务器来桥接ESP32和小智AI:
esp32_mcp_server.py
# esp32_mcp_server.py
from mcp.server.fastmcp import FastMCP
import logging
import asyncio
import websockets
import json
from typing import Dict, Any
logger = logging.getLogger('esp32_mcp')
# 创建MCP服务器
mcp = FastMCP("ESP32_Device_Controller")
# ESP32设备连接管理
class ESP32Manager:
def __init__(self):
self.devices = {} # device_id -> websocket连接
async def connect_device(self, device_id: str, websocket_url: str):
"""连接到ESP32设备"""
try:
websocket = await websockets.connect(websocket_url)
self.devices[device_id] = websocket
logger.info(f"Connected to ESP32 device: {device_id}")
return True
except Exception as e:
logger.error(f"Failed to connect to device {device_id}: {e}")
return False
async def send_mcp_request(self, device_id: str, method: str, params: Dict = None) -> Dict:
"""向ESP32发送MCP请求"""
if device_id not in self.devices:
raise Exception(f"Device {device_id} not connected")
websocket = self.devices[device_id]
request = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or {}
}
await websocket.send(json.dumps(request))
response = await websocket.recv()
return json.loads(response)
# 全局设备管理器
esp32_manager = ESP32Manager()
# 在启动时连接到ESP32设备
async def initialize_devices():
# 这里配置您的ESP32设备WebSocket地址
await esp32_manager.connect_device("esp32_001", "ws://192.168.1.100:80/mcp")
@mcp.tool()
def get_device_status(device_id: str = "esp32_001") -> dict:
"""Get the current status of ESP32 device including volume, GPIO states, battery, network info. Use this tool to check device condition before controlling it."""
try:
loop = asyncio.get_event_loop()
response = loop.run_until_complete(
esp32_manager.send_mcp_request(device_id, "tools/call", {
"name": "self.get_device_status",
"arguments": {}
})
)
logger.info(f"Device status: {response}")
return {"success": True, "status": response.get("result", {})}
except Exception as e:
logger.error(f"Failed to get device status: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
def set_audio_volume(volume: int, device_id: str = "esp32_001") -> dict:
"""Set the audio speaker volume of ESP32 device. Volume should be between 0 and 100."""
try:
loop = asyncio.get_event_loop()
response = loop.run_until_complete(
esp32_manager.send_mcp_request(device_id, "tools/call", {
"name": "self.audio_speaker.set_volume",
"arguments": {"volume": volume}
})
)
logger.info(f"Set volume to {volume}: {response}")
return {"success": True, "volume": volume}
except Exception as e:
logger.error(f"Failed to set volume: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
def control_gpio_output(pin_number: int, state: bool, device_id: str = "esp32_001") -> dict:
"""Control GPIO pin output state on ESP32 device. Set pin to high (true) or low (false) level."""
try:
loop = asyncio.get_event_loop()
response = loop.run_until_complete(
esp32_manager.send_mcp_request(device_id, "tools/call", {
"name": "self.gpio.set_output",
"arguments": {"pin": pin_number, "state": state}
})
)
logger.info(f"Set GPIO{pin_number} to {'HIGH' if state else 'LOW'}: {response}")
return {"success": True, "pin": pin_number, "state": state}
except Exception as e:
logger.error(f"Failed to control GPIO: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
def read_gpio_input(pin_number: int, pull_mode: str = "none", device_id: str = "esp32_001") -> dict:
"""Read GPIO pin input state from ESP32 device. Pull mode can be 'none', 'up', or 'down'."""
try:
loop = asyncio.get_event_loop()
response = loop.run_until_complete(
esp32_manager.send_mcp_request(device_id, "tools/call", {
"name": "self.gpio.get_input",
"arguments": {"pin": pin_number, "pull_mode": pull_mode}
})
)
logger.info(f"Read GPIO{pin_number}: {response}")
return {"success": True, "pin": pin_number, "state": response.get("result", {})}
except Exception as e:
logger.error(f"Failed to read GPIO: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
def control_led(action: str, device_id: str = "esp32_001") -> dict:
"""Control the built-in LED on ESP32 device. Action can be 'on', 'off', 'toggle', or 'status'."""
try:
loop = asyncio.get_event_loop()
response = loop.run_until_complete(
esp32_manager.send_mcp_request(device_id, "tools/call", {
"name": "self.led.control",
"arguments": {"action": action}
})
)
logger.info(f"LED {action}: {response}")
return {"success": True, "action": action, "result": response.get("result", {})}
except Exception as e:
logger.error(f"Failed to control LED: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
def set_pwm_brightness(brightness: int, device_id: str = "esp32_001") -> dict:
"""Set PWM brightness for dimmable devices like LEDs. Brightness should be between 0 and 100 percent."""
try:
loop = asyncio.get_event_loop()
response = loop.run_until_complete(
esp32_manager.send_mcp_request(device_id, "tools/call", {
"name": "self.pwm.set_brightness",
"arguments": {"brightness": brightness}
})
)
logger.info(f"Set PWM brightness to {brightness}%: {response}")
return {"success": True, "brightness": brightness}
except Exception as e:
logger.error(f"Failed to set PWM brightness: {e}")
return {"success": False, "error": str(e)}
# 启动时初始化设备连接
if __name__ == "__main__":
# 初始化设备连接
asyncio.run(initialize_devices())
# 启动MCP服务器
mcp.run(transport="stdio")
- MCP管道脚本
创建连接小智AI的管道脚本:
# mcp_pipe.py
import asyncio
import websockets
import json
import subprocess
import sys
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('MCP_PIPE')
class MCPPipe:
def __init__(self, mcp_script_path: str, endpoint: str):
self.mcp_script_path = mcp_script_path
self.endpoint = endpoint
self.process = None
self.websocket = None
async def start_mcp_process(self):
"""启动MCP服务器进程"""
self.process = await asyncio.create_subprocess_exec(
sys.executable, self.mcp_script_path,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
logger.info(f"Started {self.mcp_script_path} process")
async def connect_websocket(self):
"""连接到小智AI的WebSocket端点"""
logger.info("Connecting to WebSocket server...")
self.websocket = await websockets.connect(self.endpoint)
logger.info("Successfully connected to WebSocket server")
async def handle_websocket_message(self, message):
"""处理来自小智AI的消息"""
try:
# 转发消息到MCP进程
self.process.stdin.write(message.encode() + b'\n')
await self.process.stdin.drain()
# 读取MCP进程的响应
response = await self.process.stdout.readline()
# 转发响应到小智AI
await self.websocket.send(response.decode().strip())
except Exception as e:
logger.error(f"Error handling message: {e}")
async def run(self):
"""运行MCP管道"""
await self.start_mcp_process()
await self.connect_websocket()
try:
async for message in self.websocket:
await self.handle_websocket_message(message)
except websockets.exceptions.ConnectionClosed:
logger.info("WebSocket connection closed")
finally:
if self.process:
self.process.terminate()
await self.process.wait()
async def main():
endpoint = os.getenv('MCP_ENDPOINT')
if not endpoint:
print("Please set MCP_ENDPOINT environment variable")
sys.exit(1)
if len(sys.argv) != 2:
print("Usage: python mcp_pipe.py <mcp_script.py>")
sys.exit(1)
mcp_script = sys.argv[1]
pipe = MCPPipe(mcp_script, endpoint)
await pipe.run()
if __name__ == "__main__":
asyncio.run(main())
- 部署和使用
A. 安装依赖
pip install mcp websockets asyncio
B. 配置环境变量
设置小智AI提供的MCP接入点
export MCP_ENDPOINT=“wss://your-xiaozhi-mcp-endpoint”
C. 启动服务
启动ESP32 MCP服务器
python mcp_pipe.py esp32_mcp_server.py
5. ESP32端配置
确保ESP32设备:
启用MCP协议: CONFIG_IOT_PROTOCOL_MCP=y
配置WebSocket服务器: 提供MCP接口
网络连接: 能够被MCP服务器访问
6. 语音控制示例
用户通过小智AI语音终端说话:
用户语音 小智AI理解 MCP工具调用 ESP32执行
“调大音量” 音量控制 set_audio_volume(volume=80) 设置音量到80
“打开LED” LED控制 control_led(action=“on”) GPIO设置LED为高电平
“GPIO2设为高电平” GPIO控制 control_gpio_output(pin_number=2, state=true) 设置GPIO2输出高电平
“检查设备状态” 状态查询 get_device_status() 返回设备完整状态
7. 注意事项
网络配置: 确保MCP服务器能访问ESP32设备
错误处理: 添加设备离线、网络中断的处理逻辑
安全性: 考虑添加设备认证和访问控制
性能优化: 对于频繁操作,考虑连接池和缓存
日志监控: 完善日志记录,便于调试和监控
这样就实现了ESP32设备通过MCP协议接入小智AI的完整方案!
更多推荐




所有评论(0)