编程 A2A 协议深度解析:当多 Agent 系统告别「战国时代」——从协议原理到生产级实战完全指南(2026)

2026-06-15 18:20:12 +0800 CST views 6

A2A 协议深度解析:当多 Agent 系统告别「战国时代」——从协议原理到生产级实战完全指南(2026)

引言

2025年4月,Google 在 Google Cloud Next 大会上发布了一个看似低调、实则意义深远的协议——A2A(Agent-to-Agent Protocol)。同年6月,整个项目捐赠给 Linux Foundation,由 AWS、Cisco、Google、Microsoft、Salesforce、SAP 等八大厂商联合维护。截至2026年,已有超过150家组织支持该协议。

但大多数开发者对 A2A 的认知仍然停留在"A2A是给多 Agent 用的协议"这个层面,对它的设计哲学、核心数据模型、与 MCP 的关系、以及如何在生产环境中落地,仍然一头雾水。

本文从协议诞生的深层动机讲起,深入解析 A2A 的五大设计原则、逐字段拆解核心数据模型(Agent Card、Task、Message、Artifact),并用一套可以实际运行的 Python 代码演示完整的 A2A 交互流程。最后,我们会对比 MCP 与 A2A,帮助你理解这两个协议如何互补,共同构成 AI Agent 生态的通信基础设施。


一、为什么需要 A2A?多 Agent 系统的「战国时代」之痛

1.1 碎片化困境

在 A2A 出现之前,企业构建多 Agent 系统时面临一个根本性的碎片化问题。想象这样一个场景:

你有一个部署在 Google Vertex AI 上的"招聘协调 Agent",需要调用三件事:

  • 从 Salesforce 的 Agent 拿候选人列表
  • 从 ServiceNow 的 Agent 安排面试
  • 从第三方背调服务的 Agent 触发背景调查

在 A2A 之前,每一对 Agent 之间都需要写一套定制的"胶水代码":Salesforce Agent 有自己的消息格式,ServiceNow Agent 有自己的 API 风格,背调服务又是另一套。每增加一个新 Agent,就要再写一套新的连接层。团队时间大量消耗在"翻译"上,而不是业务逻辑上。

这像极了 REST API 出现之前的 RPC 时代——每个厂商都有自己的序列化格式和调用约定,互操作性几乎为零。

1.2 MCP 解决了 Agent→工具,那 Agent→Agent 呢?

你可能已经熟悉 MCP(Model Context Protocol)——Anthropic 在 2024年底发布的协议,专门解决 Agent 与外部工具(数据库、搜索、API) 的连接问题。MCP 就像是"AI 世界的 USB 接口",让任何 Agent 都能用标准方式调用工具。

但 MCP 有一个天然局限:它只定义 Agent 如何调用工具,不定义 Agent 如何与其他 Agent 协作。当你在一个复杂系统中需要多个专业 Agent(代码 Agent、测试 Agent、分析 Agent)协同工作时,MCP 就力不从心了。

A2A 解决的就是这个空白:Agent 如何发现彼此、如何交换任务、如何同步状态、如何在长时间运行的工作流中保持通信。

1.3 跨框架互操作:A2A 的核心价值

A2A 用一句话解决碎片化问题:定义一个所有 Agent 都说的公共语言。

只要两个 Agent 都支持 A2A,不管它们:

  • 用什么框架实现(LangChain、AutoGen、CrewAI、自己写的)
  • 部署在哪个平台(AWS、Azure、GCP、私有 IDC)
  • 用什么语言编写(Python、Go、TypeScript、Rust)

都能直接通信。

这意味着企业可以把来自不同供应商的 Agent 组合在一起工作,就像 REST API 让不同公司的服务可以互相调用一样。


二、A2A 的五大设计原则

深入理解 A2A 之前,需要先理解它的设计哲学。协议设计者没有发明新轮子,而是做了大量克制而深思熟虑的选择。

原则一:Agent 是不透明的黑盒(Opaque Agents)

A2A 的设计前提是:你不应该、也不需要知道对方 Agent 的内部实现。

它用什么模型?怎么推理?有没有记忆?代码是什么语言写的?这些都不重要。A2A 只定义两个 Agent 之间交换什么,而不是内部怎么运行。

这与 SOAP/WSDL 时代的设计思路截然不同——WSDL 要求服务提供方暴露完整的接口签名和服务描述,而 A2A 只要求暴露"我能做什么"(能力声明),不需要暴露"我是怎么做的"。

这一点是跨厂商互操作的根本保障。你不需要关心 Salesforce 的 Agent 是用 GPT-4 还是 Claude 实现的,也不需要关心 ServiceNow 用的是什么推理框架。

原则二:建在已有标准上

A2A 刻意没有发明新的传输协议。它直接复用:

技术用途
HTTP/HTTPS传输层
JSON-RPC 2.0消息格式
Server-Sent Events(SSE)流式推送
OAuth 2.0 / OpenID Connect / API Key / mTLS认证

这意味着任何熟悉 Web 开发的工程师都不需要学新东西。HTTP 客户端、SSE 处理、JSON 解析——这些技能可以直接平移到 A2A 开发中。

原则三:任务(Task)第一

A2A 里所有工作都围绕 Task 展开。Agent 之间不是在"聊天",而是在"委托任务"和"完成任务"。

Task 有明确的生命周期状态,可以被查询、取消、订阅更新。这让长时间运行的复杂工作流变得可管理——你不会陷入无限等待,也可以在任何时刻检查进度或主动取消。

原则四:能力通过 Agent Card 公开声明

每个 Agent 在固定路径公开一份 JSON 文档(/.well-known/agent.json),描述自己能做什么、支持哪些输入输出格式、需要什么认证方式。其他 Agent 读取这份文档来决定是否把任务交给它。

这是发现机制,类似 OpenAPI 规范之于 REST API。

原则五:支持多种交互模式

A2A 同时支持四种交互模式,Agent 根据自身能力声明支持哪些,客户端按需选择:

  1. 同步请求-响应:短任务立即返回结果
  2. 异步轮询:发出去后定期来查状态
  3. 流式 SSE:长任务实时推送进度
  4. Push Notification:完成后主动回调

三、核心数据模型:逐字段深度拆解

3.1 Agent Card:Agent 的「名片」

每个遵循 A2A 协议的 Agent 必须在 /.well-known/agent.json 路径上托管一份 Agent Card。这是发现机制的入口。

{
  "name": "财务分析 Agent",
  "description": "专业处理财务数据分析、报表生成和趋势预测任务",
  "version": "1.2.0",
  "url": "https://finance-agent.example.com/a2a",
  "provider": {
    "organization": "FinTech Corp",
    "url": "https://fintechcorp.example.com"
  },
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": ["text", "data"],
  "defaultOutputModes": ["text", "data", "file"],
  "securitySchemes": {
    "bearerAuth": {
      "type": "http",
      "scheme": "bearer",
      "bearerFormat": "JWT"
    }
  },
  "security": [{ "bearerAuth": [] }],
  "skills": [
    {
      "id": "financial-analysis",
      "name": "财务数据分析",
      "description": "对财务数据进行多维度分析,输出结构化报告",
      "tags": ["finance", "analysis", "reporting"],
      "examples": [
        "分析 Q3 的营收趋势",
        "对比各部门的预算执行情况"
      ],
      "inputModes": ["text", "data"],
      "outputModes": ["text", "data", "file"]
    },
    {
      "id": "forecast",
      "name": "财务预测",
      "description": "基于历史数据进行未来财务预测",
      "tags": ["finance", "forecast", "ml"],
      "inputModes": ["data"],
      "outputModes": ["data", "file"]
    }
  ]
}

逐字段解析:

字段含义开发者关注点
urlAgent 的 A2A 服务端点所有 JSON-RPC 请求都发到这里
capabilities.streaming是否支持 SSE 流式响应true 才可以用 message/stream
capabilities.pushNotifications是否支持完成后主动回调true 才可以用 Push Notification
defaultInputModes支持哪些输入类型text / data / file / audio / video
securitySchemes认证方案格式与 OpenAPI 3.0 Security Scheme 一致
skillsAgent 能力清单每个 skill 有 id、tags、examples,客户端据此匹配任务

为什么 skills 字段如此重要?

你可能有一个"数据分析 Agent",但它的 skills 列表里可能有"SQL查询"、"可视化"、"报告生成"三个不同的能力。客户端不应该把"帮我写一封商务邮件"这种任务交给数据分析 Agent——即使它的名字叫"数据分析 Agent"。通过 skills 的 tags 和 examples,客户端可以精确匹配。

# 伪代码:客户端根据 skills 做任务路由
def should_delegate(task: str, agent_card: AgentCard) -> bool:
    for skill in agent_card.skills:
        # 用简单关键词匹配,实际可用 embedding
        if any(keyword in task for keyword in skill['tags']):
            return True
        if any(example in task for example in skill['examples']):
            return True
    return False

3.2 Task:工作的基本单元

Task 是 A2A 里最核心的对象,贯穿整个交互生命周期:

{
  "id": "363422be-b0f9-4692-a24d-278670e7c7f1",
  "contextId": "c295ea44-7543-4f78-b524-7a38915ad6e4",
  "status": {
    "state": "completed",
    "timestamp": "2025-04-17T17:47:09Z",
    "message": {
      "role": "agent",
      "parts": [
        { "kind": "text", "text": "分析完成,详见 artifact。" }
      ]
    }
  },
  "artifacts": [],
  "history": []
}

核心字段:

  • id:Task 的唯一标识(UUID)。用于后续查询(tasks/get)和取消(tasks/cancel
  • contextId:会话上下文 ID。同一个多轮对话里的所有 Task 共享同一个 contextId。可以把 taskId 理解为"这次具体做的事",contextId 理解为"整个对话脉络"
  • status.state:当前状态,取值如下

Task 状态机(完整生命周期):

submitted ──→ working ──→ completed
                   │
                   ├──→ failed
                   │
                   ├──→ input-required ──→ working ──→ completed
                   │                              │
                   │                              └──→ failed
                   │
                   ├──→ canceled(任意时刻客户端主动取消)
                   └──→ rejected(服务端拒绝执行)

input-required 状态是多轮交互的关键。 Agent 处理到一半发现需要更多信息,就把 Task 切到这个状态,告诉客户端"我还需要你告诉我 X"。客户端补充信息后,用同一个 taskId 继续发送,Task 重新进入 working。

这在很多实际场景中非常有用:

  • 订票 Agent:需要出发地、目的地、日期三个信息,但用户只给了"帮我订票"
  • 代码审查 Agent:需要知道代码是用什么语言、有什么安全约束
  • 报告生成 Agent:需要用户确认报告的受众是谁、语气应该正式还是轻松

3.3 Message:通信的载体

Message 是客户端和 Agent 之间实际传递的内容:

{
  "role": "user",
  "parts": [
    {
      "kind": "text",
      "text": "请分析附件中的 Q3 财务数据,重点看毛利率变化趋势"
    },
    {
      "kind": "file",
      "file": {
        "name": "q3_finance.xlsx",
        "mimeType": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        "data": "<base64编码>"
      }
    }
  ],
  "messageId": "9229e770-767c-417b-a0b0-f0741243c589",
  "taskId": "363422be-b0f9-4692-a24d-278670e7c7f1",
  "contextId": "c295ea44-7543-4f78-b524-7a38915ad6e4"
}

Part 的 kind 类型:

kind说明典型场景
text纯文本用户指令、Agent 回复
file文件(含 base64 或 URL)上传代码文件、数据文件、图片
data任意 JSON 结构数据结构化参数、API 响应、查询结果

设计洞察: A2A 的 Part 模型与 Anthropic 的消息格式一脉相承,这意味着 AI Agent 内部的消息可以直接序列化为 A2A Message,无需额外的格式转换。这是一个非常聪明的设计决策。

3.4 Artifact:任务输出物

Artifact 是 Agent 完成任务后交付的成果:

{
  "artifactId": "9b6934dd-37e3-4eb1-8766-962efaab63a1",
  "name": "财务分析报告",
  "description": "Q3毛利率趋势分析结果",
  "parts": [
    {
      "kind": "text",
      "text": "## Q3 毛利率分析\n\n根据数据,Q3 毛利率为 42.3%,环比下降 1.2 个百分点..."
    },
    {
      "kind": "data",
      "data": {
        "grossMargin": 0.423,
        "qoqChange": -0.012,
        "trend": "declining",
        "monthlyBreakdown": [0.435, 0.428, 0.406]
      }
    }
  ]
}

Artifact vs Message 的语义区别:

  • Message:通信过程中的"说话"——发送方在表达、询问、回应
  • Artifact:任务完成后交付的"产品"——有名字、有描述、可被引用、可被下游消费

一个 Task 可以生成多个 Artifact(例如:一份 SQL 报告 + 一个可视化图表 + 一个数据导出文件)。这些 Artifact 全部关联到同一个 taskId,便于客户端一次性获取完整成果。


四、JSON-RPC 方法全景解析

A2A 通过 JSON-RPC 2.0 定义了一套方法,全部 POST 到 Agent 的 url 端点:

4.1 方法总览

方法名用途SSE支持
message/send发送消息,同步等待完整结果
message/stream发送消息,SSE 流式接收进度和结果
tasks/get查询某个 Task 的当前状态和结果-
tasks/list列出所有(或过滤的)Task-
tasks/cancel取消一个正在进行的 Task-
tasks/resubscribe重新订阅某个 Task 的 SSE 流
tasks/pushNotificationConfig/set配置 Task 完成后的推送回调地址-
agent/authenticatedExtendedCard获取需认证才能看到的完整 Agent Card-

JSON-RPC 请求基础结构:

{
  "jsonrpc": "2.0",
  "id": "请求ID(客户端自生成,用于对应响应)",
  "method": "方法名",
  "params": { ... }
}

4.2 同步请求:message/send

最短路径:发一条消息,等 Agent 处理完返回完整结果。

请求示例:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "message/send",
  "params": {
    "message": {
      "role": "user",
      "parts": [
        {
          "kind": "text",
          "text": "把这段 Python 代码翻译成 Go:\ndef add(a, b):\n    return a + b"
        }
      ],
      "messageId": "msg-001"
    },
    "configuration": {
      "blocking": true,
      "acceptedOutputModes": ["text"]
    }
  }
}

关键配置字段:

  • blocking: true:客户端愿意一直等到 Task 完成。如果服务端支持就直接返回最终结果;如果不支持,则先返回 Task 初始状态,客户端再轮询
  • acceptedOutputModes:客户端能接受哪些格式的输出

同步响应(Task 已完成):

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "kind": "task",
    "id": "task-abc-123",
    "contextId": "ctx-xyz-789",
    "status": {
      "state": "completed",
      "timestamp": "2025-04-17T10:30:00Z"
    },
    "artifacts": [
      {
        "artifactId": "artifact-001",
        "name": "Go 代码",
        "parts": [
          {
            "kind": "text",
            "text": "func add(a, b int) int {\n    return a + b\n}"
          }
        ]
      }
    ]
  }
}

4.3 流式响应:message/stream

适合长时间运行的任务。HTTP 响应使用 Content-Type: text/event-stream,内容是一连串 SSE 事件:

// 事件1:Task 被接受
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"task","id":"task-001","status":{"state":"submitted","timestamp":"2025-04-17T10:30:00Z"}}}

// 事件2:进入处理中
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"status-update","taskId":"task-001","status":{"state":"working"}}}

// 事件3:第一个内容块到达(append: false = 新 Artifact 的第一块)
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"artifact-update","taskId":"task-001","artifact":{"artifactId":"art-001","parts":[{"kind":"text","text":"# Q3 财务分析报告\n\n## 摘要\n"}]},"append":false,"lastChunk":false}}

// 事件4:追加内容(append: true = 拼接到同一个 Artifact)
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"artifact-update","taskId":"task-001","artifact":{"artifactId":"art-001","parts":[{"kind":"text","text":"Q3 毛利率为 42.3%,环比下降 1.2 个百分点,主要受原材料成本上升影响..."}]},"append":true,"lastChunk":false}}

// 事件5:最后一块(lastChunk: true)
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"artifact-update","taskId":"task-001","artifact":{"artifactId":"art-001","parts":[{"kind":"text","text":"\n\n## 结论\n建议关注供应链成本优化机会。"}]},"append":true,"lastChunk":true}}

// 事件6:Task 完成,final: true,SSE 连接关闭
data: {"jsonrpc":"2.0","id":1,"result":{"kind":"status-update","taskId":"task-001","status":{"state":"completed","timestamp":"2025-04-17T10:30:45Z"},"final":true}}

客户端处理 SSE 的逻辑:

import sseclient
import requests

def stream_task(a2a_url: str, message: dict) -> list[dict]:
    artifacts = []
    current_artifact = None
    
    with requests.post(f"{a2a_url}/a2a", json=message, stream=True) as r:
        client = sseclient.SSEClient(r)
        for event in client.events():
            data = json.loads(event.data)
            result = data.get("result", {})
            
            kind = result.get("kind")
            
            if kind == "task":
                # 初始化,获取 taskId
                task_id = result["id"]
                
            elif kind == "artifact-update":
                artifact = result["artifact"]
                is_append = result.get("append", False)
                is_last = result.get("lastChunk", False)
                
                if not is_append:
                    # 新 Artifact 开始
                    current_artifact = {
                        "artifactId": artifact["artifactId"],
                        "parts": artifact["parts"]
                    }
                    artifacts.append(current_artifact)
                else:
                    # 追加到当前 Artifact
                    current_artifact["parts"].extend(artifact["parts"])
                    
                if is_last:
                    current_artifact = None
                    
            elif kind == "status-update" and result.get("final"):
                break  # 流结束
    
    return artifacts

4.4 多轮交互:input-required 模式

这是 A2A 最体现"协作"特性的场景:

第一轮:客户端发起,Agent 表示需要补充信息

// 响应
{
  "jsonrpc": "2.0",
  "id": "req-001",
  "result": {
    "kind": "task",
    "id": "task-flight-001",
    "contextId": "ctx-flight-session",
    "status": {
      "state": "input-required",
      "message": {
        "role": "agent",
        "parts": [
          { "kind": "text", "text": "我可以帮你订机票!请告诉我出发城市、目的地和出行日期?" }
        ]
      }
    }
  }
}

第二轮:客户端补充信息,使用同一个 taskId 和 contextId

{
  "jsonrpc": "2.0",
  "id": "req-002",
  "method": "message/send",
  "params": {
    "message": {
      "role": "user",
      "parts": [
        { "kind": "text", "text": "从上海出发飞东京,10月15日去,10月22日回" }
      ],
      "messageId": "user-msg-002",
      "taskId": "task-flight-001",
      "contextId": "ctx-flight-session"
    },
    "configuration": { "blocking": true }
  }
}

关键点: 续接任务时,message 里必须带上 taskIdcontextId,Agent 才能把这条消息关联到正在等待 input-required 的 Task 上继续处理。这与 HTTP 会话的 Cookie 机制有异曲同工之妙。

4.5 错误码体系

A2A 在 JSON-RPC 标准错误码之外定义了协议专属错误:

错误码常量名含义
-32001TaskNotFound任务 ID 不存在
-32002TaskNotCancelable任务已终止,无法取消
-32003PushNotificationNotSupportedAgent 不支持推送通知
-32004UnsupportedOperation不支持该方法
-32005ContentTypeNotSupported不支持该输入/输出格式
-32006InvalidAgentResponseAgent 内部返回了非法数据
-32700ParseErrorJSON 解析失败
-32600InvalidRequest请求格式不合法
-32601MethodNotFound方法不存在
-32602InvalidParams参数不合法

五、安全机制:生产环境必备

A2A 对安全的要求非常明确,不是可选项,而是协议的正式组成部分。

5.1 传输层安全

必须使用 HTTPS,不得在生产环境中使用明文 HTTP。这与 Web 安全的最佳实践一致。

5.2 客户端认证

A2A 完整复用 OpenAPI 3.0 的 Security Scheme 体系,Agent Card 里声明什么认证方式,客户端就按什么方式提供凭证:

"securitySchemes": {
  "bearerAuth": {
    "type": "http",
    "scheme": "bearer",
    "bearerFormat": "JWT"
  }
}

支持的方案包括:

  • API Key:放在 Header 里,最简单
  • HTTP Basic / Bearer:标准 HTTP 认证
  • OAuth 2.0:支持 Authorization Code、Client Credentials、Device Code 三种流程
  • OpenID Connect:带身份验证的 OAuth 2.0 超集
  • mTLS:双向 TLS 证书认证,安全性最高

5.3 Agent Card 签名(v1.0 新特性)

Agent Card 可以被签名,防止中间人篡改。签名格式基于 JWS(JSON Web Signature)。

攻击场景: 攻击者拦截并修改 Agent Card,将 url 指向恶意服务器。客户端基于伪造的 Card 发送请求,导致敏感数据泄露。

防御机制: Agent Card 的发布者用私钥对 Card 内容签名,客户端用公钥验证签名,确认 Card 来自真实的发布者、且内容未被篡改。

5.4 Push Notification 安全

当 Agent 使用 Push Notification 回调客户端时,存在一个固有风险:回调请求可能被伪造。

A2A 的解决方案是:客户端在配置回调地址时,可以同时提供一个 JWKS URL,要求 Agent 对每次回调请求做签名。客户端收到回调后,用 JWKS 中的公钥验证签名,确认通知来自合法的 Agent。

// 客户端设置 Push Notification 配置
{
  "method": "tasks/pushNotificationConfig/set",
  "params": {
    "taskId": "task-abc-123",
    "pushNotificationEndpoint": "https://my-app.example.com/a2a-callback",
    "jwksUri": "https://my-app.example.com/.well-known/jwks.json"
  }
}

这样,即使回调 URL 被泄露,攻击者也无法伪造有效的签名请求。


六、MCP vs A2A:一张图讲清楚

很多人会把 MCP 和 A2A 搞混,觉得"都是 Agent 协议,有什么区别"。其实它们解决的是完全不同的问题:

┌─────────────────────────────────────────────────────────────┐
│                    AI Agent 系统                             │
│                                                             │
│  ┌──────────┐    MCP协议     ┌──────────┐                  │
│  │  Agent   │ ───────────→  │  工具层   │                  │
│  │          │               │ 数据库     │                  │
│  │          │               │ 搜索API   │                  │
│  │          │               │ 文件系统   │                  │
│  └──────────┘               └──────────┘                  │
│       │                                                  │
│       │ A2A协议                                          │
│       ▼                                                  │
│  ┌──────────┐    A2A协议     ┌──────────┐                  │
│  │  Agent A  │ ←──────────→ │  Agent B │                  │
│  │ (招聘协调) │              │ (背调服务) │                  │
│  └──────────┘               └──────────┘                  │
│       │                                                  │
│       │ A2A协议                                          │
│       ▼                                                  │
│  ┌──────────┐    A2A协议     ┌──────────┐                  │
│  │  Agent A  │ ←──────────→ │  Agent C │                  │
│  │ (招聘协调) │              │ (日历服务) │                  │
│  └──────────┘               └──────────┘                  │
└─────────────────────────────────────────────────────────────┘

MCP = "USB 接口":连接 Agent 与外部资源(工具、数据源)
A2A = "网络协议":连接 Agent 与 Agent(协作、工作流)

维度MCPA2A
解决问题Agent → 工具Agent → Agent
发起方Agent 主动调用工具客户端协调或 Agent 间相互委托
通信模式单次请求-响应单次/流式/多轮/异步
状态管理无状态有状态的 Task 生命周期
典型厂商Anthropic(为主)Google(牵头),多家联合维护
核心抽象ToolTask
2025年状态成熟,广泛采用快速演进,v0.3 已发布

6.1 互补使用场景

在实际系统中,MCP 和 A2A 不是互斥的,而是互补的:

场景:一个 AI 代码审查系统

  • A2A 层:协调 Agent(Orchestrator)把代码审查任务分发给代码审查 Agent
  • MCP 层:代码审查 Agent 通过 MCP 调用 Git 工具获取源码、通过 MCP 调用 Linter 执行检查、通过 MCP 调用数据库查询代码库信息
协调 Agent (A2A) ──→ 代码审查 Agent (A2A)
                              │
                              ├── MCP: git工具(获取源码)
                              ├── MCP: linter工具(执行检查)
                              └── MCP: 数据库工具(查询代码库)

这就是 2026 年最流行的 MCP + A2A 双协议架构:A2A 处理 Agent 间的工作流编排,MCP 处理 Agent 的工具调用能力。


七、生产级实战:Python 实现完整 A2A 交互

7.1 项目结构

a2a_demo/
├── agent_card.json    # Agent 名片
├── server.py          # A2A 服务端
├── client.py          # A2A 客户端
└── main.py            # 演示入口

依赖安装:

pip install fastapi uvicorn httpx sseclient-py

7.2 agent_card.json

{
  "name": "代码翻译 Agent",
  "description": "将代码从一种编程语言翻译成另一种语言的专业 Agent",
  "version": "1.0.0",
  "url": "http://localhost:8000/a2a",
  "provider": {
    "organization": "Demo Corp",
    "url": "https://demo.example.com"
  },
  "capabilities": {
    "streaming": false,
    "pushNotifications": false
  },
  "defaultInputModes": ["text"],
  "defaultOutputModes": ["text"],
  "skills": [
    {
      "id": "code-translate",
      "name": "代码翻译",
      "description": "将源代码从一种语言翻译成另一种语言",
      "tags": ["code", "translation", "programming"],
      "examples": [
        "把这段 Python 代码翻译成 Go",
        "将以下 JavaScript 代码转换为 TypeScript"
      ],
      "inputModes": ["text"],
      "outputModes": ["text"]
    }
  ]
}

7.3 server.py:A2A 服务端

"""
A2A 服务端:发布 Agent Card,处理 JSON-RPC 请求
职责最小化:只做协议层,不含业务逻辑
"""

import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

# 任务存储,生产环境替换为数据库或 Redis
_tasks: dict[str, dict] = {}


# ─────────────────────────────────────────
# 业务逻辑占位符(真实场景替换为 LLM 调用)
# ─────────────────────────────────────────
def translate_code(text: str) -> str:
    """模拟代码翻译。真实场景接入 Claude/GPT API。"""
    text_lower = text.lower()
    if "python" in text_lower and ("go" in text_lower or "golang" in text_lower):
        return (
            "// Go 翻译结果\n\n"
            "package main\n\n"
            "func add(a, b int) int {\n"
            "    return a + b\n"
            "}\n\n"
            "func main() {}\n"
        )
    elif "python" in text_lower and "typescript" in text_lower:
        return (
            "// TypeScript 翻译结果\n\n"
            "function add(a: number, b: number): number {\n"
            "    return a + b;\n"
            "}\n"
        )
    else:
        return f"// 翻译完成(输入摘要:{text[:60]}...)"


# ─────────────────────────────────────────
# 端点一:发布 Agent Card
# ─────────────────────────────────────────
@app.get("/.well-known/agent.json")
async def get_agent_card():
    """GET /.well-known/agent.json,对外暴露 Agent 能力"""
    card = json.loads(Path("agent_card.json").read_text(encoding="utf-8"))
    return JSONResponse(content=card)


# ─────────────────────────────────────────
# 端点二:A2A 主入口
# ─────────────────────────────────────────
@app.post("/a2a")
async def a2a_endpoint(request: Request):
    """所有 JSON-RPC 请求的统一入口"""
    body: dict = await request.json()
    method = body.get("method", "")
    req_id = body.get("id")

    if method == "message/send":
        return _handle_message_send(body)
    elif method == "message/stream":
        return _handle_message_stream(body)
    elif method == "tasks/get":
        return _handle_tasks_get(body)
    elif method == "tasks/cancel":
        return _handle_tasks_cancel(body)
    else:
        return _error(req_id, -32601, "Method not found")


# ─────────────────────────────────────────
# 处理器:message/send(同步)
# ─────────────────────────────────────────
def _handle_message_send(body: dict) -> JSONResponse:
    params = body["params"]
    message = params["message"]
    req_id = body["id"]

    # 从 parts 提取文本内容
    user_text = "".join(
        p.get("text", "")
        for p in message.get("parts", [])
        if p.get("kind") == "text"
    )

    # 提取上下文 ID
    task_id = str(uuid.uuid4())
    context_id = message.get("contextId") or str(uuid.uuid4())
    now = datetime.now(timezone.utc).isoformat()

    # 执行翻译(业务逻辑)
    translated = translate_code(user_text)

    # 构建 Task
    task = {
        "kind": "task",
        "id": task_id,
        "contextId": context_id,
        "status": {
            "state": "completed",
            "timestamp": now,
            "message": {
                "role": "agent",
                "parts": [
                    {"kind": "text", "text": "翻译完成。"}
                ]
            }
        },
        "artifacts": [
            {
                "artifactId": str(uuid.uuid4()),
                "name": "翻译结果",
                "parts": [{"kind": "text", "text": translated}]
            }
        ],
        "history": [
            {
                "role": "user",
                "parts": message.get("parts", []),
                "messageId": message.get("messageId", str(uuid.uuid4())),
                "taskId": task_id,
                "contextId": context_id
            }
        ]
    }

    _tasks[task_id] = task

    return JSONResponse(content={
        "jsonrpc": "2.0",
        "id": req_id,
        "result": task
    })


# ─────────────────────────────────────────
# 处理器:message/stream(流式,SSE)
# ─────────────────────────────────────────
def _handle_message_stream(body: dict) -> JSONResponse:
    """演示流式响应,返回 Task 初始状态(含 taskId)
    真实场景用 StreamingResponse + 异步生成器推送 SSE 事件"""
    params = body["params"]
    message = params["message"]
    req_id = body.get("id")

    task_id = str(uuid.uuid4())
    context_id = message.get("contextId") or str(uuid.uuid4())

    # 保存任务(状态为 working)
    _tasks[task_id] = {
        "kind": "task",
        "id": task_id,
        "contextId": context_id,
        "status": {"state": "working", "timestamp": datetime.now(timezone.utc).isoformat()}
    }

    return JSONResponse(content={
        "jsonrpc": "2.0",
        "id": req_id,
        "result": _tasks[task_id]
    })


# ─────────────────────────────────────────
# 处理器:tasks/get(查询状态)
# ─────────────────────────────────────────
def _handle_tasks_get(body: dict) -> JSONResponse:
    req_id = body["id"]
    task_id = body["params"].get("id")

    if task_id not in _tasks:
        return _error(req_id, -32001, "TaskNotFound", {"taskId": task_id})

    return JSONResponse(content={
        "jsonrpc": "2.0",
        "id": req_id,
        "result": _tasks[task_id]
    })


# ─────────────────────────────────────────
# 处理器:tasks/cancel(取消任务)
# ─────────────────────────────────────────
def _handle_tasks_cancel(body: dict) -> JSONResponse:
    req_id = body["id"]
    task_id = body["params"].get("id")

    if task_id not in _tasks:
        return _error(req_id, -32001, "TaskNotFound", {"taskId": task_id})

    state = _tasks[task_id]["status"]["state"]
    if state in ("completed", "failed", "canceled"):
        return _error(req_id, -32002, "TaskNotCancelable", {
            "taskId": task_id,
            "currentState": state
        })

    _tasks[task_id]["status"]["state"] = "canceled"
    return JSONResponse(content={
        "jsonrpc": "2.0",
        "id": req_id,
        "result": _tasks[task_id]
    })


# ─────────────────────────────────────────
# 工具函数
# ─────────────────────────────────────────
def _error(req_id: Any, code: int, message: str, data: dict = None) -> JSONResponse:
    err = {"code": code, "message": message}
    if data:
        err["data"] = data
    return JSONResponse(content={"jsonrpc": "2.0", "id": req_id, "error": err})

7.4 client.py:A2A 客户端

"""
A2A 客户端:封装发现、发送、查询逻辑
对外暴露语义清晰的方法,屏蔽 JSON-RPC 细节
"""

import uuid
import httpx
from typing import AsyncIterator, Optional


class A2AClient:
    """符合 A2A 规范的客户端"""

    def __init__(self, agent_card_url: str):
        self.agent_card_url = agent_card_url  # 例如: http://localhost:8000/.well-known/agent.json
        self.agent_card: Optional[dict] = None
        self.a2a_url: str = ""

    # ─────────────────────────────────
    # 发现:读取 Agent Card
    # ─────────────────────────────────
    async def discover(self) -> dict:
        """GET /.well-known/agent.json,获取对方的能力描述和通信地址"""
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.get(self.agent_card_url)
            response.raise_for_status()
            self.agent_card = response.json()
            # A2A URL 来自 agent_card.url 字段
            self.a2a_url = self.agent_card["url"]
            return self.agent_card

    # ─────────────────────────────────
    # 发送消息(同步,等结果)
    # ─────────────────────────────────
    async def send_message(
        self,
        text: str,
        context_id: Optional[str] = None,
        task_id: Optional[str] = None,
    ) -> dict:
        """发送消息,同步等待完整结果"""
        message_id = str(uuid.uuid4())
        body = {
            "jsonrpc": "2.0",
            "id": message_id,
            "method": "message/send",
            "params": {
                "message": {
                    "role": "user",
                    "parts": [{"kind": "text", "text": text}],
                    "messageId": message_id,
                    "taskId": task_id,
                    "contextId": context_id,
                },
                "configuration": {
                    "blocking": True,
                    "acceptedOutputModes": self.agent_card.get("defaultOutputModes", ["text"])
                }
            }
        }

        async with httpx.AsyncClient(timeout=60.0) as client:
            response = await client.post(self.a2a_url, json=body)
            response.raise_for_status()
            result = response.json()

            if "error" in result:
                raise RuntimeError(f"A2A Error {result['error']['code']}: {result['error']['message']}")

            return result["result"]

    # ─────────────────────────────────
    # 查询任务状态
    # ─────────────────────────────────
    async def get_task(self, task_id: str) -> dict:
        """查询某个 Task 的当前状态"""
        body = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/get",
            "params": {"id": task_id}
        }

        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(self.a2a_url, json=body)
            response.raise_for_status()
            result = response.json()

            if "error" in result:
                raise RuntimeError(f"A2A Error {result['error']['code']}: {result['error']['message']}")

            return result["result"]

    # ─────────────────────────────────
    # 取消任务
    # ─────────────────────────────────
    async def cancel_task(self, task_id: str) -> dict:
        """取消一个正在进行的 Task"""
        body = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/cancel",
            "params": {"id": task_id}
        }

        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(self.a2a_url, json=body)
            response.raise_for_status()
            result = response.json()

            if "error" in result:
                raise RuntimeError(f"A2A Error {result['error']['code']}: {result['error']['message']}")

            return result["result"]

7.5 main.py:演示入口

"""
演示 A2A 完整交互流程
启动方式:uvicorn server:app --reload(另一个终端)
"""

import asyncio
from client import A2AClient


async def demo():
    # 第一步:发现 Agent
    client = A2AClient("http://localhost:8000/.well-known/agent.json")
    card = await client.discover()

    print(f"发现 Agent:{card['name']}")
    print(f"版本:{card['version']}")
    print(f"支持的能力:{card['capabilities']}")
    print(f"可提供服务:{[s['name'] for s in card['skills']]}")
    print("-" * 50)

    # 第二步:同步调用
    result = await client.send_message(
        "把这段 Python 代码翻译成 Go:\ndef add(a, b):\n    return a + b"
    )

    print(f"Task ID:{result['id']}")
    print(f"状态:{result['status']['state']}")
    print(f"输出数量:{len(result['artifacts'])} 个 Artifact")

    for artifact in result["artifacts"]:
        for part in artifact["parts"]:
            if part["kind"] == "text":
                print(f"\n{artifact['name']}:\n{part['text']}")


if __name__ == "__main__":
    asyncio.run(demo())

运行效果:

发现 Agent:代码翻译 Agent
版本:1.0.0
支持的能力:{'streaming': False, 'pushNotifications': False}
可提供服务:['代码翻译']
--------------------------------------------------
Task ID:6f82c1de-3b4a-4e9f-8c2d-1a7b3e5f9c2d
状态:completed
输出数量:1 个 Artifact

翻译结果:
// Go 翻译结果

package main

func add(a, b int) int {
    return a + b
}

func main() {}

八、架构演进:从单体 Agent 到 Agent 协作网络

8.1 单体 Agent 的局限性

传统单体 Agent(如 GPT-4 配合几个 Tool)的局限性在于:

  1. 能力边界固定:一个 Agent 不可能同时成为代码专家、财务分析师、法律顾问
  2. 上下文膨胀:把所有专业知识塞进 Prompt 导致上下文爆炸、推理成本飙升
  3. 单点故障:一个 Agent 崩溃,整个系统崩溃
  4. 升级困难:替换某个能力意味着重新训练或重新配置整个 Agent

8.2 多 Agent 协作网络

A2A 推动的架构是 Agent 协作网络(Agentic Network)

                    ┌─────────────┐
                    │  用户请求    │
                    └──────┬──────┘
                           │
                    ┌──────▼──────┐
                    │ 协调 Agent   │  ← 理解意图,路由任务
                    │ (Orchestrator)│
                    └──────┬──────┘
           ┌──────────────┼──────────────┐
           │              │              │
    ┌──────▼──────┐ ┌─────▼─────┐ ┌─────▼─────┐
    │  招聘 Agent │ │背调 Agent │ │日历 Agent │
    │  (A2A)      │ │  (A2A)    │ │  (A2A)   │
    └──────┬──────┘ └─────┬─────┘ └─────┬─────┘
           │              │              │
    ┌──────▼──────┐ ┌─────▼─────┐ ┌─────▼─────┐
    │ MCP: LinkedIn│ │MCP: 背调API│ │MCP: Google│
    │ MCP: 简历库  │ │MCP: 数据库│ │  Calendar │
    └─────────────┘ └───────────┘ └───────────┘

协调 Agent 不需要知道各个专业 Agent 的内部实现,只需要通过 A2A 委托任务、获取结果。专业 Agent 也不需要知道协调 Agent 是如何做出决策的——它们只关心输入和输出。

8.3 标准化带来的生态效应

当 A2A 成为事实标准后,会催生几个有趣的生态效应:

Agent 市场(Agent Marketplace): 企业可以像采购 SaaS 一样,直接采购专业 Agent——"招聘 Agent"、"CRM Agent"、"数据分析 Agent",插上 A2A 就能用,不需要定制开发。

Agent 组合器(Agent Composer): 低代码/无代码平台提供可视化界面,通过拖拽组合不同的 Agent,配置工作流,自动生成多 Agent 协作系统。

Agent 发现服务(Agent Discovery): 类似 DNS 的 Agent 发现网络,通过 Agent Card 的元数据让 Agent 自动找到其他 Agent,无需手动配置。


九、性能优化与生产实践

9.1 降低发现延迟

Agent Card 应该使用 CDN 缓存或边缘节点,避免每次调用都从源站获取。对于高频调用场景,可以将 Card 内容缓存在客户端本地,定期刷新(TTL 可设为 5-10 分钟)。

from functools import lru_cache
import httpx

@lru_cache(maxsize=128)
def get_agent_card_cached(url: str, ttl_seconds: int = 300):
    """带 TTL 的 Agent Card 缓存"""
    # 实际实现可加时间戳检查
    with httpx.Client(timeout=10.0) as client:
        r = client.get(url)
        r.raise_for_status()
        return r.json()

9.2 Task 超时与重试策略

对于长时间运行的 Task,建议设置合理的超时和重试策略:

import asyncio
from httpx import TimeoutException

async def send_with_retry(client: A2AClient, text: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            return await client.send_message(text)
        except TimeoutException:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # 指数退避

9.3 多轮对话的上下文管理

contextId 的设计让同一个会话的所有 Task 共享上下文。但要注意,上下文信息可能随时间推移变得陈旧。建议定期检查 Task 历史,清理不再需要的中间结果:

# 定期清理过期的中间 Task(保留最终结果)
async def cleanup_intermediate_tasks(context_id: str, keep_final: bool = True):
    tasks = await client.list_tasks(context_id)
    for task in tasks:
        if task["status"]["state"] in ("completed", "failed") and keep_final:
            # 保留最终结果,清理中间步骤的 history
            task["history"] = []  # 释放内存

十、总结与展望

核心要点回顾

  1. A2A 解决的是 Agent→Agent 的通信问题,与 MCP(Agent→工具)形成互补关系,共同构成 AI Agent 生态的双协议栈
  2. Task 是 A2A 的核心抽象,完整的状态机(submitted→working→input-required/completed/failed)覆盖了真实世界协作的各种场景
  3. Agent Card 是发现机制的入口,通过 skills 的声明式描述实现松耦合的任务路由
  4. 基于已有 Web 标准(HTTP/JSON-RPC/SSE) 是 A2A 最重要的设计决策,大幅降低了学习和实现成本
  5. 安全是协议的正式组成部分,包括传输层 TLS、认证方案、Card 签名和 Push Notification 签名

2026 年展望

  • v1.0 稳定版:预计2026年Q4发布,提供向后兼容性保证,企业可以开始大规模采用
  • 生态工具成熟:A2A SDK 会覆盖更多语言(Go、Rust、TypeScript),A2A 调试工具、可视化工作流编辑器会逐步出现
  • 与 MCP 的深度集成:MCP + A2A 双协议架构将成为 2026 年多 Agent 系统的事实标准
  • 企业采用加速:随着 Agent 市场的发展,会有更多企业通过 A2A 组合专业 Agent,而非从头构建自己的多 Agent 系统

A2A 协议的意义,不仅在于让 Agent 能互相通信,更在于它为 AI Agent 生态的"工业化"奠定了基础——就像 REST API 让互联网服务可以互相调用,A2A 让 AI Agent 可以互相协作。当 Agent 协作网络足够丰富,AI 能做的事情将远超今天任何单体 Agent 的能力边界。


参考资源:

  • A2A 协议规范:https://github.com/a2a-protocol/a2a
  • Linux Foundation A2A 项目:https://lf-a2a.io
  • A2A Python SDK(社区):github.com/a2a-protocol/a2a-python
复制全文 生成海报 A2A Agent 多Agent MCP 协议解析 JSON-RPC

推荐文章

Vue中的样式绑定是如何实现的?
2024-11-18 10:52:14 +0800 CST
一个数字时钟的HTML
2024-11-19 07:46:53 +0800 CST
Elasticsearch 条件查询
2024-11-19 06:50:24 +0800 CST
跟着 IP 地址,我能找到你家不?
2024-11-18 12:12:54 +0800 CST
前端开发中常用的设计模式
2024-11-19 07:38:07 +0800 CST
nginx反向代理
2024-11-18 20:44:14 +0800 CST
在 Rust 生产项目中存储数据
2024-11-19 02:35:11 +0800 CST
程序员茄子在线接单