小智接入B站Mcp
·
Bilibili MCP 工具实现详解
一、工具基本信息
1.1 工具名称
bilibili - B站视频播放工具
1.2 提供的 MCP Tools
| Tool 名称 | 功能描述 |
|---|---|
bilibili.search_video |
搜索B站视频,根据关键词返回视频列表 |
bilibili.play_video |
播放B站视频,支持缓存和后台下载 |
bilibili.get_status |
获取播放器当前状态 |
1.3 功能描述
- 搜索视频:通过关键词搜索B站视频,返回视频标题、作者、时长等信息
- 播放视频:一站式播放(搜索+下载+播放),支持本地缓存加速
- 状态查询:查看当前播放状态、下载进度、视频路径等
二、整体架构
2.1 MCP Server 入口
入口文件:/home/user/xiaozhi/src/mcp/mcp_server.py
工具注册入口:McpServer.add_common_tools() 方法(第258行)
2.2 使用的 MCP SDK
自定义实现,不使用官方 MCP SDK。核心类:
McpServer- MCP服务器McpTool- 工具定义PropertyList/Property- 参数定义
2.3 Server 初始化流程
McpServer.__init__()
↓
McpPlugin.setup()
↓
server.add_common_tools()
↓
get_bilibili_tools_manager().init_tools()
↓
player = get_bilibili_player_instance()
关键代码 (manager.py 第23-49行):
def init_tools(self, add_tool, PropertyList, Property, PropertyType):
"""初始化并注册所有B站视频工具."""
# 1. 获取播放器单例
self._player = get_bilibili_player_instance()
# 2. 注册各个工具
self._register_search_video_tool(add_tool, PropertyList, Property, PropertyType)
self._register_play_video_tool(add_tool, PropertyList, Property, PropertyType)
self._register_get_status_tool(add_tool, PropertyList, Property, PropertyType)
# 3. 标记初始化完成
self._initialized = True
2.4 文件结构
src/mcp/tools/bilibili/
├── __init__.py # 包导出
├── manager.py # 工具管理器(注册工具)
├── bilibili_player.py # 核心播放器实现
└── __pycache__/
三、每个 Tool 详细拆解
Tool 1: bilibili.search_video
输入参数
| 参数名 | 类型 | 必填 | 含义 |
|---|---|---|---|
| keyword | string | 是 | 搜索关键词 |
核心逻辑
1. 检查 keyword 是否为空
↓
2. 调用 player.search_videos(keyword)
↓
3. 格式化返回结果为可读文本
↓
4. 返回视频列表字符串
关键代码 (manager.py 第54-75行)
async def search_video_wrapper(args: Dict[str, Any]) -> str:
keyword = args.get("keyword", "")
# 1. 参数校验
if not keyword:
return "请提供搜索关键词,例如:搜索喜羊羊"
# 2. 调用播放器搜索
result = await self._player.search_videos(keyword)
# 3. 格式化结果
if result.get("status") == "success":
videos = result.get("videos", [])
if not videos:
return f"未找到与'{keyword}'相关的视频"
lines = [f"找到 {result['total']} 个视频:"]
for i, video in enumerate(videos[:10], 1):
duration = video.get("duration", "未知")
lines.append(f"{i}. {video['title']} (时长: {duration})")
if len(videos) > 10:
lines.append(f"...还有 {len(videos) - 10} 个视频")
return "\n".join(lines)
else:
return result.get("message", "搜索失败")
调用的外部服务
bilibili_api库的search.search()方法
返回值结构
成功: "找到 N 个视频:\n1. 标题1 (时长: 5:30)\n2. 标题2 (时长: 3:45)\n..."
失败: "搜索失败" 或 "未找到与'xxx'相关的视频"
Tool 2: bilibili.play_video
输入参数
| 参数名 | 类型 | 必填 | 含义 |
|---|---|---|---|
| keyword | string | 是 | 搜索关键词 |
核心逻辑
1. 检查 keyword 是否为空
↓
2. 调用 player.play_video(keyword)
↓ (内部流程)
2.1 search_videos() 搜索视频
2.02. 检查本地缓存
↓ (命中缓存)
直接发送音频到ROS + 调用播放API
↓ (未命中)
启动后台下载任务,立即返回"正在下载"
↓ (后台下载完成后)
发送音频 + 调用播放API
关键代码 (manager.py 第96-107行)
async def play_video_wrapper(args: Dict[str, Any]) -> str:
keyword = args.get("keyword", "")
if not keyword:
return "请提供搜索关键词,例如:播放喜羊羊"
result = await self._player.play_video(keyword)
if result.get("status") == "success":
return result.get("message", "播放成功")
else:
return result.get("message", "播放失败")
play_video 内部流程 (bilibili_player.py 第378-465行)
async def play_video(self, keyword: str) -> Dict[str, Any]:
# 1. 搜索视频
search_result = await self.search_videos(keyword)
# 2. 检查缓存
cached_video, cached_pcm = self._check_cache(title)
# 3a. 缓存命中:直接播放
if cached_video and cached_pcm:
await self._send_audio_to_ros()
await self._play_video_api()
return {"status": "success", "message": "视频已经成功播放啦!"}
# 3b. 缓存未命中:后台下载
asyncio.create_task(self._background_download_and_play(...))
return {"status": "success", "message": "正在下载视频,请稍等~", "background": True}
调用的外部服务
| 服务 | 用途 | 调用位置 |
|---|---|---|
| bilibili_api | 搜索视频 | search_videos() |
| yt-dlp | 下载视频 | download_video() |
| ffmpeg | 提取音频 | _extract_audio() |
| ROS Topic | 发送音频 | _send_audio_to_ros() |
| HTTP API (机器人) | 播放控制 | _play_video_api() |
错误处理
| 场景 | 处理方式 |
|---|---|
| 未找到视频 | 返回 “未找到与’xxx’相关的视频” |
| 下载失败 | 后台任务捕获异常,记录日志 |
| 播放失败 | 返回错误消息 |
返回值结构
成功: {"status": "success", "message": "视频已经成功播放啦!: 标题 (时长: 5:30)"}
后台下载中: {"status": "success", "message": "正在下载视频,请稍等一下哦~", "background": True}
失败: {"status": "error", "message": "错误原因"}
Tool 3: bilibili.get_status
输入参数
无参数
核心逻辑
直接调用 player.get_status() 返回播放器状态
关键代码 (manager.py 第129-131行)
async def get_status_wrapper(args: Dict[str, Any]) -> str:
result = self._player.get_status()
return result.get("message", "获取状态失败")
返回值结构
"当前视频: 标题\nBV号: BVxxxxx\n下载状态: 已完成\n视频路径: /path/to/video.mp4\n音频路径: /path/to/audio.pcm"
四、关键代码片段解读
4.1 后台任务模式 (避免AI超时)
代码位置:bilibili_player.py 第448-461行
# 缓存未命中:启动后台下载,立即返回
# 注意:is_downloading 由后台任务 download_video 内部管理
self.is_downloading = True # 标记开始下载,防止重复触发
self.current_title = title
self.current_bvid = bvid
logger.info(f"[B站] 启动后台下载: {title}")
asyncio.create_task(self._background_download_and_play(bvid, title, duration))
return {
"status": "success",
"message": f"正在下载视频: {title},请稍等一下哦~",
"background": True, # 标记为后台任务
}
解读:
asyncio.create_task()创建异步后台任务- 立即返回给AI,避免超时
- 后台任务完成后再真正播放
为什么这样写:MCP工具有超时限制(通常几秒),下载视频可能需要几分钟,所以必须异步处理。
4.2 单例模式
代码位置:bilibili_player.py 第612-622行
_bilibili_player_instance = None
def get_bilibili_player_instance() -> BilibiliPlayer:
"""获取B站视频播放器单例."""
global _bilibili_player_instance
if _bilibili_player_instance is None:
_bilibili_player_instance = BilibiliPlayer()
logger.info("[BilibiliPlayer] 创建B站视频播放器单例实例")
return _bilibili_player_instance
解读:
- 全局单例避免重复创建
- 保持播放器状态(当前视频、下载状态等)
4.3 缓存检查与命中
代码位置:bilibili_player.py 第209-238行
# 遍历缓存目录
for f in cache_files:
if f.startswith(safe_title) and (f.endswith('.mp4') or f.endswith('.webm')):
self.video_path = str(self.cache_dir / f)
base_name = os.path.splitext(os.path.basename(self.video_path))[0]
self.pcm_path = str(self.cache_dir / f"{base_name}.pcm")
# 检查PCM是否存在
if os.path.exists(self.pcm_path):
logger.info(f"[B站] 命中本地缓存: {self.video_path}")
return {"status": "success", "cached": True, ...}
解读:
- 用文件名匹配缓存(safe_title作为前缀)
- 同时检查 .mp4 和 .pcm 是否都存在
- 命中缓存直接返回,跳过下载
4.4 音频发送到ROS
代码位置:bilibili_player.py 第552-591行
async def _send_audio_to_ros(self) -> Dict[str, Any]:
try:
import rclpy
from rclpy.node import Node
from aimdk_msgs.msg import AudioPlayback
# 初始化 rclpy
if not rclpy.ok():
rclpy.init()
node = rclpy.create_node('bilibili_audio')
pub = node.create_publisher(AudioPlayback, '/aima/hal/audio/playback', 10)
# 读取PCM文件
with open(self.pcm_path, 'rb') as f:
audio_data = f.read()
# 构建消息
msg = AudioPlayback()
msg.info.channels = 1
msg.info.sample_rate = 24000
msg.info.size = len(audio_data)
msg.data.data = list(audio_data)
msg.token_id = f"video_{self.current_bvid}"
pub.publish(msg)
node.destroy_node()
return {"status": "success"}
解读:
- 使用ROS2的rclpy库发布消息
- PCM格式:24kHz采样、单声道、16位
- 通过Topic
/aima/hal/audio/playback发送
4.5 视频编码检查
代码位置:bilibili_player.py 第74-85行
def _needs_transcode(self) -> bool:
"""检查视频是否需要转码(是否已经是H.264编码)."""
try:
cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=codec_name", "-of", "csv=p=0", self.video_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
codec = result.stdout.strip().lower()
return codec not in ("h264", "avc1", "libx264")
except Exception as e:
logger.warning(f"[B站] 检查编码失败,默认转码: {e}")
return True
解读:
- 用 ffprobe 检测视频编码
- H.264编码的视频可以直接播放,无需转码
- 其他编码(如H.265、VP9)需要转码为H.264
4.6 异步任务模式(避免AI超时)
核心问题:MCP 工具调用有超时限制(通常几秒),但下载视频可能需要几分钟。
同步模式的问题:
用户: "播放奥特曼"
AI调用tool → 下载视频(3分钟) → 超时失败 ❌
异步模式(当前实现):
用户: "播放奥特曼"
AI调用tool → 启动后台下载 → 立即返回"正在下载" → AI正常响应 ✅
↓
后台下载完成 → 自动播放
关键代码对比
同步模式(会超时):
async def play_video(self, keyword: str):
# 下载视频可能需要3分钟...
download_result = await self.download_video(bvid, title) # 阻塞!
# 这里永远执行不到,因为超时了
await self._send_audio_to_ros()
await self._play_video_api()
return {"status": "success", "message": "播放成功"}
异步模式(当前实现):
async def play_video(self, keyword: str) -> Dict[str, Any]:
# 1. 搜索视频
search_result = await self.search_videos(keyword)
# 2. 检查缓存
cached_video, cached_pcm = self._check_cache(title)
# 3a. 缓存命中:直接播放(快,直接await)
if cached_video and cached_pcm:
await self._send_audio_to_ros()
await self._play_video_api()
return {"status": "success", "message": "视频已经成功播放啦!"}
# 3b. 缓存未命中:后台下载,立即返回
self.is_downloading = True # 标记,防止重复触发
asyncio.create_task(self._background_download_and_play(bvid, title, duration))
return {"status": "success", "message": "正在下载视频,请稍等~", "background": True}
后台任务实现 (bilibili_player.py 第467-517行)
async def _background_download_and_play(self, bvid: str, title: str, duration: str):
"""
后台执行下载+提取音频+播放全流程,不阻塞 MCP 工具响应.
"""
try:
# 注意:is_downloading 在 play_video 中设为 True
# download_video 内部会检查 is_downloading,所以需要先清除让它正常工作
self.is_downloading = False
# 下载视频(可能耗时几分钟,在这里悄咪咪完成)
download_result = await self.download_video(bvid, title)
if download_result.get("status") != "success":
logger.error(f"后台下载失败: {download_result.get('message')}")
self.is_downloading = False
return
# 下载完成后:发送音频 + 播放视频
await self._send_audio_to_ros()
await self._play_video_api()
except asyncio.CancelledError:
logger.info("[B站] 后台下载播放任务被取消")
except Exception as e:
logger.error(f"后台播放异常: {e}")
finally:
self.is_downloading = False
时序图
时间 →
──────────────────────────────────────────────────────────────────────────►
AI/用户 MCP Server 后台任务
│ │ │
│ play_video(keyword) │ │
│ ───────────────────────► │ │
│ │ asyncio.create_task(...) │
│ │ ─────────────────────────► │
│ │ │
│ "正在下载视频,请稍等~" │ │ (下载视频3分钟)
│ ◄────────────────────── │ │
│ │ │ 下载完成
│ │ │ 提取音频
│ │ │ 发送ROS
│ │ │ 播放视频
│ │ ▼
asyncio.create_task vs asyncio.run
| 方法 | 用途 | 场景 |
|---|---|---|
asyncio.create_task() |
在已有事件循环中创建后台任务,不阻塞当前协程 | MCP工具回调内 |
asyncio.run() |
创建新事件循环并运行(用于入口点) | main函数、初始化 |
为什么不用线程?
| 方案 | 原因 |
|---|---|
| 线程 (threading) | Python GIL限制:线程不能真正并行执行CPU密集型任务 |
| asyncio 协程 | 切换成本低,更适合IO密集型(网络请求、文件读写) |
| 多进程 | 进程间通信复杂,不适合这种场景 |
下载视频主要是IO操作(网络请求+写文件),asyncio足够且更轻量。
五、与 MCP 协议的交互细节
5.1 Tool 注册流程
McpServer.add_tool() # 注册工具到列表
↓
McpServer._handle_tools_list() # 响应 tools/list 请求
↓
返回工具JSON Schema给客户端
关键代码:manager.py 第81-90行
add_tool((
"bilibili.search_video", # 工具名
"【B站视频搜索】...", # 描述(AI会看到)
PropertyList([...]), # 参数定义
search_video_wrapper, # 回调函数
))
5.2 参数校验
位置:PropertyList.parse_arguments()
参数校验在 PropertyList 类中完成:
def parse_arguments(self, arguments: Optional[Dict[str, Any]]) -> Dict[str, Any]:
result = {}
for prop in self.properties:
if arguments and prop.name in arguments:
value = arguments[prop.name]
# 类型检查
if prop.type == PropertyType.BOOLEAN and isinstance(value, bool):
result[prop.name] = value
elif prop.type == PropertyType.INTEGER and isinstance(value, (int, float)):
result[prop.name] = prop.value(int(value))
# ...
else:
raise ValueError(f"Invalid type for property {prop.name}")
elif prop.has_default_value:
result[prop.name] = prop.default_value
else:
raise ValueError(f"Missing required argument: {prop.name}")
return result
5.3 结果返回机制
位置:mcp_server.py 第181-211行
async def call(self, arguments: Dict[str, Any]) -> str:
try:
# 解析参数
parsed_args = self.properties.parse_arguments(arguments)
# 调用回调函数
if asyncio.iscoroutinefunction(self.callback):
result = await self.callback(parsed_args)
else:
result = self.callback(parsed_args)
# 格式化返回值 - 统一转为字符串
if isinstance(result, bool):
text = "true" if result else "false"
elif isinstance(result, int):
text = str(result)
else:
text = str(result)
# 返回JSON-RPC格式
return json.dumps({
"content": [{"type": "text", "text": text}],
"isError": False
})
except Exception as e:
logger.error(f"Error calling tool {self.name}: {e}", exc_info=True)
return json.dumps({
"content": [{"type": "text", "text": str(e)}],
"isError": True
})
六、可复用的模式
6.1 后台任务模式
# 启动后台任务,立即返回
asyncio.create_task(self._background_download_and_play(...))
return {"status": "success", "message": "正在处理...", "background": True}
适用场景:耗时操作(下载、处理文件等)
6.2 单例模式
_instance = None
def get_instance() -> Type:
global _instance
if _instance is None:
_instance = Type()
return _instance
适用场景:需要保持状态的管理器
6.3 缓存模式
# 检查缓存
for f in os.listdir(cache_dir):
if f.startswith(safe_title) and f.endswith('.mp4'):
return cache_hit
# 未命中则下载
download()
适用场景:重复使用的资源
6.4 统一返回格式
# 成功
return {"status": "success", "message": "操作成功", "data": {...}}
# 失败
return {"status": "error", "message": "失败原因"}
6.5 Manager 模式
class XxxManager:
def __init__(self):
self._initialized = False
def init_tools(self, add_tool, PropertyList, Property, PropertyType):
# 注册所有工具
self._register_tool1(add_tool, PropertyList, Property, PropertyType)
self._register_tool2(...)
_manager = None
def get_xxx_manager() -> XxxManager:
global _manager
if _manager is None:
_manager = XxxManager()
return _manager
优点:
- 集中管理同类工具
- 延迟初始化
- 易于扩展
6.6 异步包装器
async def tool_wrapper(args: Dict[str, Any]) -> str:
# 同步调用播放器方法
result = self._player.some_method(keyword)
# 格式化返回
return format_result(result)
作用:将同步的播放器方法包装为异步MCP工具回调
七、架构图
┌─────────────────────────────────────────────────────────────────┐
│ AI / 用户 │
└─────────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ xiaozhi (McpServer) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ add_common_tools() → get_bilibili_tools_manager() │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ BilibiliToolsManager │ │
│ │ - search_video_wrapper() │ │
│ │ - play_video_wrapper() │ │
│ │ - get_status_wrapper() │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ BilibiliPlayer (单例) │ │
│ │ - search_videos() [调用 bilibili_api] │ │
│ │ - play_video() [搜索+缓存检查+后台下载/播放] │ │
│ │ - download_video() [调用 yt-dlp] │ │
│ │ - _extract_audio() [调用 ffmpeg] │ │
│ │ - _send_audio_to_ros() [发布 ROS Topic] │ │
│ │ - _play_video_api() [HTTP 调机器人] │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 缓存目录: ~/.cache/bilibili/ │
└─────────────────────────────────────────────────────────────────┘
文档生成时间:2026-05-14
适用版本:xiaozhi bilibili tools
更多推荐




所有评论(0)