鸿蒙智能体开发实战:5.搭建A2A API服务
什么是 A2A 协议
A2A(Agent-to-Agent)是一种智能体通信协议,基于 JSON-RPC 2.0 格式,通过统一的端点路由所有方法,内置鉴权和会话管理机制。
在前面的章节中,我们配置了智能体的 URL、会话维持方式和认证信息。接下来,我们将通过代码实现可以与之通信的 A2A API 服务,完整演示鉴权、会话分配和消息处理三个核心流程。

搭建 A2A API 服务
下面我们一步步搭建一个 A2A API 服务,完整演示鉴权、会话分配和消息处理三个核心流程。
第一步:创建项目并安装依赖
mkdir a2a-demo && cd a2a-demo
pip install fastapi uvicorn a2a-sdk
a2a-sdk 是 Google 官方提供的 A2A Python SDK,包含 TaskState、TaskStatus、Message 等核心类型,保证响应数据结构与 A2A 规范一致。
第二步:保存 API 密钥和会话数据
创建 main.py,先定义 API 密钥和会话存储。这一步相当于搭建服务的基础设施。
import uuid
from fastapi import FastAPI, Request, Header
from fastapi.responses import JSONResponse
app = FastAPI(title="A2A Demo")
# 演示用 API 密钥(生产环境应写入配置文件或数据库)
API_KEY = "sk-test-api-key"
# 会话存储(生产环境应使用 Redis)
sessions: dict[str, dict] = {}
代码说明:
API_KEY:服务端预设的 API 密钥,客户端通过Authorization: Bearer头部传递sessions:存储已初始化的会话,key 为agentSessionId,value 为会话信息
第三步:实现鉴权验证
鉴权是 A2A 协议的第一道关卡。客户端在请求头的 Authorization 字段中携带 Bearer Token,服务端需要验证这个 Token 是否合法。
def verify_auth(request: Request) -> bool:
auth = request.headers.get("Authorization", "")
return auth == f"Bearer {API_KEY}"
代码说明:
- 从请求 Header 中获取
Authorization字段 - 判断值是否为
Bearer {预设密钥},匹配则通过,否则拒绝
第四步:创建统一消息入口
A2A 协议的所有方法都通过 POST /agent/message 端点处理。这里我们编写入口函数,串联整个请求处理流程。
@app.post("/agent/message")
async def handle_message(
request: Request,
agent_session_id: str | None = Header(None, alias="agent-session-id"),
):
# 1. 鉴权
if not verify_auth(request):
return JSONResponse(
status_code=401,
content={"jsonrpc": "2.0", "id": None, "error": {"code": 401, "message": "Invalid auth"}},
)
body = await request.json()
method = body.get("method")
req_id = body.get("id")
# 2. 分配 Session
if method == "initialize":
return await handle_initialize(req_id)
# 3. 验证 Session(initialize 之外的请求都需要)
if not agent_session_id or agent_session_id not in sessions:
return JSONResponse(
status_code=401,
content={"jsonrpc": "2.0", "id": req_id, "error": {"code": 401, "message": "Invalid session"}},
)
# 4. 处理消息
if method == "message/stream":
return await handle_message_stream(body, req_id)
return JSONResponse(
status_code=400,
content={"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Unknown method: {method}"}},
)
代码说明:
agent_session_id:从 Header 中提取,initialize方法不需要此参数- 通过
body.get("method")判断请求类型,路由到不同的处理方法 - 所有响应遵循 JSON-RPC 2.0 格式
第五步:实现会话分配
当鸿蒙智能体首次连接时,会调用 initialize 方法获取会话 ID。服务端生成一个唯一 ID 并返回,后续请求都需要携带这个 ID。
async def handle_initialize(req_id: int | str | None):
session_id = uuid.uuid4().hex
sessions[session_id] = {"status": "active"}
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"agentSessionId": session_id,
"agentSessionTtl": 604800, # 会话有效期:7 天
},
}
代码说明:
uuid.uuid4().hex:生成 32 位随机字符串作为会话 IDagentSessionTtl:告知客户端该会话的有效期(秒),客户端需要在过期前重新初始化- 会话信息存入
sessions字典,供后续请求验证
第六步:实现消息处理
会话建立后,鸿蒙智能体会发送 message/stream 请求,将用户输入的消息体传给我们。我们从请求中提取用户文本,构造 A2A 规范的响应返回。
from a2a.types import TaskState, TaskStatus, Message, TextPart
async def handle_message_stream(body: dict, req_id: int | str | None):
# 从请求中提取用户输入
params = body.get("params", {})
task_id = params.get("id")
user_text = (
params.get("message", {})
.get("parts", [{}])[0]
.get("text", "")
)
# 使用 a2a-sdk 构建消息
message = Message(
role="agent",
parts=[TextPart(text=f"你说了:{user_text}")],
)
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"taskId": task_id,
"kind": "status-update",
"final": True,
"status": TaskStatus(state=TaskState.completed, message=message).model_dump(),
},
}
代码说明:
- 从
params.message.parts[0].text路径提取用户的文本输入 TaskState.completed:SDK 提供的任务状态枚举,表示处理完成TaskStatus:SDK 提供的状态模型,包含state、message、timestamp字段Message+TextPart:SDK 提供的消息模型,role="agent"表示由智能体回复
完整代码
将以上所有步骤组合在一起,就是完整的 main.py:
import uuid
from fastapi import FastAPI, Request, Header
from fastapi.responses import JSONResponse
from a2a.types import TaskState, TaskStatus, Message, TextPart
app = FastAPI(title="A2A Demo")
API_KEY = "sk-test-api-key"
sessions: dict[str, dict] = {}
def verify_auth(request: Request) -> bool:
auth = request.headers.get("Authorization", "")
return auth == f"Bearer {API_KEY}"
async def handle_initialize(req_id: int | str | None):
session_id = uuid.uuid4().hex
sessions[session_id] = {"status": "active"}
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"agentSessionId": session_id,
"agentSessionTtl": 604800,
},
}
async def handle_message_stream(body: dict, req_id: int | str | None):
params = body.get("params", {})
user_text = (
params.get("message", {})
.get("parts", [{}])[0]
.get("text", "")
)
message = Message(
role="agent",
parts=[TextPart(text=f"你说了:{user_text}")],
)
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"taskId": params.get("id"),
"kind": "status-update",
"final": True,
"status": TaskStatus(state=TaskState.completed, message=message).model_dump(),
},
}
@app.post("/agent/message")
async def handle_message(
request: Request,
agent_session_id: str | None = Header(None, alias="agent-session-id"),
):
if not verify_auth(request):
return JSONResponse(
status_code=401,
content={"jsonrpc": "2.0", "id": None, "error": {"code": 401, "message": "Invalid auth"}},
)
body = await request.json()
method = body.get("method")
req_id = body.get("id")
if method == "initialize":
return await handle_initialize(req_id)
if not agent_session_id or agent_session_id not in sessions:
return JSONResponse(
status_code=401,
content={"jsonrpc": "2.0", "id": req_id, "error": {"code": 401, "message": "Invalid session"}},
)
if method == "message/stream":
return await handle_message_stream(body, req_id)
return JSONResponse(
status_code=400,
content={"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Unknown method: {method}"}},
)
运行测试
启动服务:
python main.py
打开新终端,依次执行以下命令,验证三个核心流程:
1. 不携带密钥 → 鉴权失败
curl -X POST http://localhost:8080/agent/message \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"initialize"}'
预期返回 401,提示 Invalid auth。
2. 携带密钥调用 initialize → 获取会话 ID
curl -X POST http://localhost:8080/agent/message \
-H "Content-Type: application/json" \
-H "Authorization: Bearer sk-test-api-key" \
-d '{"jsonrpc":"2.0","id":1,"method":"initialize"}'
预期返回 agentSessionId 和 agentSessionTtl。
3. 携带 Session ID 发送消息 → 得到回复
SESSION_ID="替换为上面返回的 agentSessionId"
curl -X POST http://localhost:8080/agent/message \
-H "Content-Type: application/json" \
-H "agent-session-id: $SESSION_ID" \
-H "Authorization: Bearer sk-test-api-key" \
-d '{
"jsonrpc": "2.0",
"id": 2,
"method": "message/stream",
"params": {
"id": "task-001",
"sessionId": "session-001",
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "你好"}]
}
}
}'
预期返回 state: completed,消息内容为 你说了:你好。
测试运行截图:

总结
本文从零开始,分六步搭建了一个完整的 A2A API 服务。每个步骤对应 A2A 协议的一个核心环节:
| 步骤 | 对应代码 | 说明 |
|---|---|---|
| 鉴权 | verify_auth() |
验证 Authorization: Bearer Token |
| 消息入口 | handle_message() |
统一端点路由分发 |
| 会话分配 | handle_initialize() |
生成并返回 agentSessionId |
| 消息处理 | handle_message_stream() |
解析用户输入,返回处理结果 |
使用 a2a-sdk 提供的 TaskState、TaskStatus、Message、TextPart 等类型构建响应,保证了数据结构与 A2A 规范的一致性。下一篇文章将详细介绍 message/stream 的 SSE 流式输出和 tasks/cancel 等进阶方法。
更多推荐




所有评论(0)