什么是 A2A 协议

A2A(Agent-to-Agent)是一种智能体通信协议,基于 JSON-RPC 2.0 格式,通过统一的端点路由所有方法,内置鉴权和会话管理机制。

在前面的章节中,我们配置了智能体的 URL、会话维持方式和认证信息。接下来,我们将通过代码实现可以与之通信的 A2A API 服务,完整演示鉴权、会话分配和消息处理三个核心流程。

alt text

搭建 A2A API 服务

下面我们一步步搭建一个 A2A API 服务,完整演示鉴权、会话分配和消息处理三个核心流程。

第一步:创建项目并安装依赖

mkdir a2a-demo && cd a2a-demo
pip install fastapi uvicorn a2a-sdk

a2a-sdk 是 Google 官方提供的 A2A Python SDK,包含 TaskStateTaskStatusMessage 等核心类型,保证响应数据结构与 A2A 规范一致。

第二步:保存 API 密钥和会话数据

创建 main.py,先定义 API 密钥和会话存储。这一步相当于搭建服务的基础设施。

import uuid
from fastapi import FastAPI, Request, Header
from fastapi.responses import JSONResponse

app = FastAPI(title="A2A Demo")

# 演示用 API 密钥(生产环境应写入配置文件或数据库)
API_KEY = "sk-test-api-key"

# 会话存储(生产环境应使用 Redis)
sessions: dict[str, dict] = {}

代码说明

  • API_KEY:服务端预设的 API 密钥,客户端通过 Authorization: Bearer 头部传递
  • sessions:存储已初始化的会话,key 为 agentSessionId,value 为会话信息

第三步:实现鉴权验证

鉴权是 A2A 协议的第一道关卡。客户端在请求头的 Authorization 字段中携带 Bearer Token,服务端需要验证这个 Token 是否合法。

def verify_auth(request: Request) -> bool:
    auth = request.headers.get("Authorization", "")
    return auth == f"Bearer {API_KEY}"

代码说明

  • 从请求 Header 中获取 Authorization 字段
  • 判断值是否为 Bearer {预设密钥},匹配则通过,否则拒绝

第四步:创建统一消息入口

A2A 协议的所有方法都通过 POST /agent/message 端点处理。这里我们编写入口函数,串联整个请求处理流程。

@app.post("/agent/message")
async def handle_message(
    request: Request,
    agent_session_id: str | None = Header(None, alias="agent-session-id"),
):
    # 1. 鉴权
    if not verify_auth(request):
        return JSONResponse(
            status_code=401,
            content={"jsonrpc": "2.0", "id": None, "error": {"code": 401, "message": "Invalid auth"}},
        )

    body = await request.json()
    method = body.get("method")
    req_id = body.get("id")

    # 2. 分配 Session
    if method == "initialize":
        return await handle_initialize(req_id)

    # 3. 验证 Session(initialize 之外的请求都需要)
    if not agent_session_id or agent_session_id not in sessions:
        return JSONResponse(
            status_code=401,
            content={"jsonrpc": "2.0", "id": req_id, "error": {"code": 401, "message": "Invalid session"}},
        )

    # 4. 处理消息
    if method == "message/stream":
        return await handle_message_stream(body, req_id)

    return JSONResponse(
        status_code=400,
        content={"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Unknown method: {method}"}},
    )

代码说明

  • agent_session_id:从 Header 中提取,initialize 方法不需要此参数
  • 通过 body.get("method") 判断请求类型,路由到不同的处理方法
  • 所有响应遵循 JSON-RPC 2.0 格式

第五步:实现会话分配

当鸿蒙智能体首次连接时,会调用 initialize 方法获取会话 ID。服务端生成一个唯一 ID 并返回,后续请求都需要携带这个 ID。

async def handle_initialize(req_id: int | str | None):
    session_id = uuid.uuid4().hex
    sessions[session_id] = {"status": "active"}
    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "result": {
            "agentSessionId": session_id,
            "agentSessionTtl": 604800,  # 会话有效期:7 天
        },
    }

代码说明

  • uuid.uuid4().hex:生成 32 位随机字符串作为会话 ID
  • agentSessionTtl:告知客户端该会话的有效期(秒),客户端需要在过期前重新初始化
  • 会话信息存入 sessions 字典,供后续请求验证

第六步:实现消息处理

会话建立后,鸿蒙智能体会发送 message/stream 请求,将用户输入的消息体传给我们。我们从请求中提取用户文本,构造 A2A 规范的响应返回。

from a2a.types import TaskState, TaskStatus, Message, TextPart


async def handle_message_stream(body: dict, req_id: int | str | None):
    # 从请求中提取用户输入
    params = body.get("params", {})
    task_id = params.get("id")
    user_text = (
        params.get("message", {})
        .get("parts", [{}])[0]
        .get("text", "")
    )

    # 使用 a2a-sdk 构建消息
    message = Message(
        role="agent",
        parts=[TextPart(text=f"你说了:{user_text}")],
    )

    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "result": {
            "taskId": task_id,
            "kind": "status-update",
            "final": True,
            "status": TaskStatus(state=TaskState.completed, message=message).model_dump(),
        },
    }

代码说明

  • params.message.parts[0].text 路径提取用户的文本输入
  • TaskState.completed:SDK 提供的任务状态枚举,表示处理完成
  • TaskStatus:SDK 提供的状态模型,包含 statemessagetimestamp 字段
  • Message + TextPart:SDK 提供的消息模型,role="agent" 表示由智能体回复

完整代码

将以上所有步骤组合在一起,就是完整的 main.py

import uuid
from fastapi import FastAPI, Request, Header
from fastapi.responses import JSONResponse
from a2a.types import TaskState, TaskStatus, Message, TextPart

app = FastAPI(title="A2A Demo")

API_KEY = "sk-test-api-key"
sessions: dict[str, dict] = {}


def verify_auth(request: Request) -> bool:
    auth = request.headers.get("Authorization", "")
    return auth == f"Bearer {API_KEY}"


async def handle_initialize(req_id: int | str | None):
    session_id = uuid.uuid4().hex
    sessions[session_id] = {"status": "active"}
    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "result": {
            "agentSessionId": session_id,
            "agentSessionTtl": 604800,
        },
    }


async def handle_message_stream(body: dict, req_id: int | str | None):
    params = body.get("params", {})
    user_text = (
        params.get("message", {})
        .get("parts", [{}])[0]
        .get("text", "")
    )
    message = Message(
        role="agent",
        parts=[TextPart(text=f"你说了:{user_text}")],
    )
    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "result": {
            "taskId": params.get("id"),
            "kind": "status-update",
            "final": True,
            "status": TaskStatus(state=TaskState.completed, message=message).model_dump(),
        },
    }


@app.post("/agent/message")
async def handle_message(
    request: Request,
    agent_session_id: str | None = Header(None, alias="agent-session-id"),
):
    if not verify_auth(request):
        return JSONResponse(
            status_code=401,
            content={"jsonrpc": "2.0", "id": None, "error": {"code": 401, "message": "Invalid auth"}},
        )

    body = await request.json()
    method = body.get("method")
    req_id = body.get("id")

    if method == "initialize":
        return await handle_initialize(req_id)

    if not agent_session_id or agent_session_id not in sessions:
        return JSONResponse(
            status_code=401,
            content={"jsonrpc": "2.0", "id": req_id, "error": {"code": 401, "message": "Invalid session"}},
        )

    if method == "message/stream":
        return await handle_message_stream(body, req_id)

    return JSONResponse(
        status_code=400,
        content={"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Unknown method: {method}"}},
    )

运行测试

启动服务:

python main.py

打开新终端,依次执行以下命令,验证三个核心流程:

1. 不携带密钥 → 鉴权失败

curl -X POST http://localhost:8080/agent/message \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":1,"method":"initialize"}'

预期返回 401,提示 Invalid auth

2. 携带密钥调用 initialize → 获取会话 ID

curl -X POST http://localhost:8080/agent/message \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer sk-test-api-key" \
  -d '{"jsonrpc":"2.0","id":1,"method":"initialize"}'

预期返回 agentSessionIdagentSessionTtl

3. 携带 Session ID 发送消息 → 得到回复

SESSION_ID="替换为上面返回的 agentSessionId"
curl -X POST http://localhost:8080/agent/message \
  -H "Content-Type: application/json" \
  -H "agent-session-id: $SESSION_ID" \
  -H "Authorization: Bearer sk-test-api-key" \
  -d '{
    "jsonrpc": "2.0",
    "id": 2,
    "method": "message/stream",
    "params": {
      "id": "task-001",
      "sessionId": "session-001",
      "message": {
        "role": "user",
        "parts": [{"kind": "text", "text": "你好"}]
      }
    }
  }'

预期返回 state: completed,消息内容为 你说了:你好

测试运行截图:

alt text

总结

本文从零开始,分六步搭建了一个完整的 A2A API 服务。每个步骤对应 A2A 协议的一个核心环节:

步骤 对应代码 说明
鉴权 verify_auth() 验证 Authorization: Bearer Token
消息入口 handle_message() 统一端点路由分发
会话分配 handle_initialize() 生成并返回 agentSessionId
消息处理 handle_message_stream() 解析用户输入,返回处理结果

使用 a2a-sdk 提供的 TaskStateTaskStatusMessageTextPart 等类型构建响应,保证了数据结构与 A2A 规范的一致性。下一篇文章将详细介绍 message/stream 的 SSE 流式输出和 tasks/cancel 等进阶方法。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐