AI 时代的 USB 接口迎来最大升级:MCP 协议 2026-07-28 规范候选版深度解析
45 天后,这些代码将全部失效——从有状态到无状态,远程 MCP Server 必须迁移的七个理由
一、背景:MCP 为什么突然需要「动大手术」?
2026 年 5 月 21 日,Model Context Protocol(MCP)核心维护团队正式发布了 2026-07-28 规范的 Release Candidate(候选版)。官方用了"自 MCP 发布以来最大规模的修订"来形容这次变更——这不是公关话术,而是字面意义上的破坏性更新。
MCP 是什么?它是 Anthropic 在 2025 年底主导推出的开放协议,旨在为 AI 模型提供统一的方式连接外部工具、数据源和服务。如果把 AI 模型比作一台电脑,那 MCP 就是** USB 接口**——无论你用的是 Claude、GPT-5.5 还是 Gemini 2.5,都可以通过同一套协议调用文件系统、GitHub、数据库、企业内部 API 等外部能力。
自发布以来,MCP 的生态扩张速度堪称惊人:
| 维度 | 数据 |
|---|---|
| MCP Server 数量 | 5000+(官方 + 社区) |
| 支持 MCP 的客户端 | Claude Desktop、Cursor、VS Code、JetBrains、Continue |
| 主流云厂商支持 | AWS、Azure、GCP 均已提供官方 MCP Server |
| GitHub Stars(spec 仓库) | 30k+ |
| 规范版本 | 2025-03-26(旧)→ 2026-07-28(新 RC) |
问题在于:早期 MCP 的设计面向本地开发场景,协议层维护了一个有状态会话模型。这个设计在本地用一用没问题,但一旦进入生产环境——要做水平扩展、要上 K8s 集群、要暴露到公网——问题就全来了:
- 水平扩展困难:有状态意味着请求必须路由到同一台服务器,需要 sticky session。
- 基础设施成本高:需要 Redis 或 Memcached 存储会话状态。
- 认证混乱:旧规范对认证几乎没有强制要求,各实现各自为政,安全隐患巨大。
2026-07-28 规范从根本上解决了这三个问题。代价是——破坏性变更。最终规范将于 2026 年 7 月 28 日正式发布,从 RC 到废弃旧规范,你有约 45 天的迁移窗口期(过渡期截至 2026-08-28)。
二、七大破坏性变更:代码级详解
2.1 最核心变更:会话移除(Session Removal)
这是所有变更中最重要的一条。协议层彻底移除了会话概念。
旧规范的工作流程:
// 第一步:初始化握手,建立会话
→ {"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {
"protocolVersion": "2025-03-26",
"clientInfo": {"name": "cursor", "version": "1.0"}
}}
← {"jsonrpc": "2.0", "result": {
"protocolVersion": "2025-03-26",
"serverInfo": {"name": "github-mcp", "version": "2.1"},
"sessionId": "abc-123-def-456" // ← 会话ID,所有后续请求都要带
}, "id": 1}
// 第二步:调用工具,携带 sessionId
→ {"jsonrpc": "2.0", "method": "tools/call", "id": 2, "params": {
"name": "create_issue",
"arguments": {"title": "Bug report"},
"_sessionId": "abc-123-def-456" // ← 必须带!
}}
新规范的工作流程:
// 没有初始化,没有 sessionId,直接调用
→ {"jsonrpc": "2.0", "method": "tools/call", "id": 1, "params": {
"name": "create_issue",
"arguments": {"title": "Bug report"}
}}
← {"jsonrpc": "2.0", "result": {
"content": [{"type": "text", "text": "Issue created: #42"}]
}, "id": 1}
这对工程实践意味着什么?
首先,你不再需要关心「连接建立」这个概念。每一次请求都是独立的、自包含的,天然适合 HTTP/2 和 HTTP/3 的多路复用。其次,如果你之前的 Server 依赖 session_id 做路由或状态管理,必须重构。
新规范引入了「显式状态句柄」(Explicit State Handles)作为迁移路径:
# Python 实现:无状态 MCP Server 示例
import httpx
import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class ConnectionState:
"""无状态设计中的显式连接状态"""
connection_id: str
last_query: str
user_context: dict
class StatelessMCPServer:
"""
新规范下的无状态 MCP Server。
所有状态通过 state_token 在请求/响应中显式传递,
不再依赖协议层维护的会话。
"""
def __init__(self, connection_pool_size: int = 10):
self.connection_pool: dict[str, object] = {}
self.state_store: dict[str, ConnectionState] = {}
async def handle_tool_call(
self,
request: dict,
state_token: Optional[str] = None
) -> dict:
"""
核心方法:从请求中提取显式状态句柄,
在响应中返回新的状态句柄。
"""
# 从 state_token 恢复状态(如果有的话)
if state_token and state_token in self.state_store:
state = self.state_store[state_token]
else:
# 新请求,创建初始状态
state = ConnectionState(
connection_id=self._generate_conn_id(),
last_query="",
user_context={}
)
# 执行实际的工具调用
method = request.get("method")
params = request.get("params", {})
if method == "tools/call":
result = await self._execute_tool(
name=params.get("name"),
arguments=params.get("arguments", {}),
state=state
)
else:
result = {"error": f"Unknown method: {method}"}
# 更新状态并返回新的 state_token
new_token = self._save_state(state)
return {
"jsonrpc": "2.0",
"result": {"content": [{"type": "text", "text": str(result)}]},
"state_token": new_token # 客户端下次请求需要携带这个
}
async def _execute_tool(self, name: str, arguments: dict, state: ConnectionState) -> str:
"""模拟工具执行"""
if name == "database_query":
sql = arguments.get("sql", "")
state.last_query = sql
return f"Query executed: {sql[:50]}..."
return f"Tool '{name}' executed successfully"
def _generate_conn_id(self) -> str:
import uuid
return str(uuid.uuid4())[:8]
def _save_state(self, state: ConnectionState) -> str:
"""将状态加密序列化,返回 state_token"""
import json
import base64
token_data = json.dumps({
"conn_id": state.connection_id,
"last_query": state.last_query,
"ctx": state.user_context
})
token = base64.urlsafe_b64encode(token_data.encode()).decode()
# 实际生产中应该用 AES-256-GCM 加密
self.state_store[token] = state
return token
架构师视角的思考:无状态设计的最大好处是可观测性提升。每一对请求/响应都包含了完整的上下文,分布式追踪(如 Jaeger、Zipkin)的接入成本大幅降低。不再需要额外的 session 存储,Redis 的使用场景可以减少一个。
2.2 初始化握手移除(Initialize Handshake Removal)
旧规范要求客户端首先调用 initialize 方法,交换协议版本和能力信息。这是每个 MCP Client 启动时必经的第一步。
旧方式(JSON-RPC 初始化):
# Python MCP Client 旧实现
import asyncio
import mcp.protocol as mcp
async def old_initialize():
client = mcp.Client(server_url="https://api.example.com/mcp")
# 第一步:必须先初始化
init_result = await client.request(
method="initialize",
params={
"protocolVersion": "2025-03-26",
"clientInfo": {"name": "my-app", "version": "1.0.0"},
"capabilities": {"tools": {"listChanged": True}}
}
)
# 从响应中提取 session_id
session_id = init_result["sessionId"]
print(f"Session established: {session_id}")
# 第二步:才可以使用工具
tools = await client.list_tools(session_id=session_id)
return tools
新方式(HTTP 头部协商):
# Python MCP Client 新实现
import httpx
class NewMCPClient:
"""
2026-07-28 规范下的 MCP Client。
版本和能力信息通过 HTTP 头部传递,无需初始化握手。
"""
def __init__(self, server_url: str, access_token: str):
self.server_url = server_url
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"MCP-Protocol-Version": "2026-07-28",
"MCP-Capabilities": "tools,listChanged"
}
async def call_tool(self, name: str, arguments: dict) -> dict:
"""
直接调用,无需初始化。
头部信息告诉 Server 我们的协议版本和能力。
"""
async with httpx.AsyncClient() as client:
response = await client.post(
self.server_url,
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": name,
"arguments": arguments
},
"id": self._next_id()
},
headers=self.headers
)
return response.json()
async def list_tools(self) -> dict:
"""直接列出工具,无需先建立会话"""
async with httpx.AsyncClient() as client:
response = await client.post(
self.server_url,
json={
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": self._next_id()
},
headers=self.headers
)
return response.json()
def _next_id(self) -> int:
if not hasattr(self, "_counter"):
self._counter = 0
self._counter += 1
return self._counter
架构影响分析:初始化握手的移除,使得 MCP 的冷启动时间大幅缩短。对于需要频繁创建/销毁连接的边缘计算场景(Cloudflare Workers、AWS Lambda),这是巨大的利好。你不需要再维护一个「连接生命周期」,每一次请求都是完全独立的。
2.3 三个核心功能废弃
| 废弃功能 | 替代方案 | 废弃原因 |
|---|---|---|
resources/list | 统一到 tools/list | 资源本质上是一种只读工具,统一接口减少概念复杂度 |
prompts/list | 扩展机制(Extensions) | Prompts 作为核心功能使用率极低,移至扩展层 |
sampling | 客户端决定 | 采样是客户端的职责,不属于协议层 |
这对大多数开发者影响不大。但如果你的 Server 实现了 prompts/list 接口,需要迁移到新的扩展机制:
# 新规范:Prompts 作为扩展实现
class MCPExtensions:
"""
MCP 2026-07-28 扩展注册表
"""
# 官方扩展
TASKS = "tasks/v1" # 长时间运行的任务
MCP_APPS = "mcp-apps/v1" # 服务端渲染 UI
CUSTOM = "custom/v1" # 自定义扩展
@staticmethod
def register_prompt_extension(server, prompts: list[dict]):
"""将旧的 prompts/list 迁移为扩展"""
@server.route("/extensions/prompts/list")
async def list_prompts(request):
return {"prompts": prompts}
2.4 OAuth 2.1 全面接入(最重要企业级变更)
这是对企业用户影响最大的一条变更。新规范将 MCP Server 明确定义为 OAuth 2.1 资源服务器,MCP Client 作为 OAuth 2.1 客户端。PKCE 成为强制要求,不再是可选的安全加固。
OAuth 2.1 的关键要求:
- PKCE 强制化:Authorization Code Flow 必须使用 code_challenge + code_verifier
- Redirect URI 精确匹配:不允许通配符
- 令牌内省:服务端必须支持令牌撤销
# TypeScript: MCP Server OAuth 2.1 认证实现
import {
AuthorizationCode,
ResourceServer,
generateCodeVerifier,
generateCodeChallenge
} from "oauth2.1";
interface MCPOAuthConfig {
authorizationEndpoint: string;
tokenEndpoint: string;
requirePKCE: true;
supportedScopes: string[];
revocationEndpoint?: string;
}
class MCPOAuthResourceServer implements ResourceServer {
private config: MCPOAuthConfig;
constructor(config: MCPOAuthConfig) {
this.config = config;
}
// 服务端:提供 OAuth 元数据端点(RFC 8414)
async getOAuthMetadata(): Promise<object> {
return {
authorization_endpoint: this.config.authorizationEndpoint,
token_endpoint: this.config.tokenEndpoint,
scopes_supported: this.config.supportedScopes,
grant_types_supported: ["authorization_code"],
code_challenge_methods_supported: ["S256"],
revocation_endpoint: this.config.revocationEndpoint
};
}
// 服务端:授权码发放
async issueAuthorizationCode(
clientId: string,
redirectUri: string,
scope: string
): Promise<string> {
const code = crypto.randomBytes(32).toString("hex");
const pkceVerifier = generateCodeVerifier(); // 服务端也应存储 verifier 的 hash
await this.codeStore.set(code, {
clientId,
redirectUri,
scope,
pkceVerifierHash: await this._hashVerifier(pkceVerifier),
expiresAt: Date.now() + 10 * 60 * 1000 // 10 分钟有效期
});
return code;
}
// 服务端:令牌交换(必须验证 PKCE)
async exchangeCodeForToken(
code: string,
codeVerifier: string,
redirectUri: string
): Promise<{ access_token: string; expires_in: number }> {
const stored = await this.codeStore.get(code);
if (!stored || stored.redirectUri !== redirectUri) {
throw new Error("INVALID_GRANT");
}
// 验证 PKCE - 这是 OAuth 2.1 的核心要求
const verifierHash = await this._hashVerifier(codeVerifier);
if (verifierHash !== stored.pkceVerifierHash) {
throw new Error("INVALID_CODE_VERIFIER"); // 攻击者无法伪造
}
const accessToken = crypto.randomBytes(32).toString("base64url");
const expiresIn = 3600; // 1 小时
await this.tokenStore.set(accessToken, {
clientId: stored.clientId,
scope: stored.scope,
issuedAt: Date.now()
});
// 授权码一次性使用
await this.codeStore.delete(code);
return { access_token: accessToken, expires_in: expiresIn };
}
private async _hashVerifier(verifier: string): Promise<string> {
const encoder = new TextEncoder();
const data = encoder.encode(verifier);
const hash = await crypto.subtle.digest("SHA-256", data);
return Buffer.from(hash).toString("base64url");
}
}
MCP Client 端的 PKCE 流程:
# Python: MCP Client OAuth 2.1 认证(带 PKCE)
import httpx
import asyncio
import secrets
import base64
import hashlib
class MCPOAuthClient:
"""
MCP 2026-07-28 规范下的 OAuth 2.1 Client。
PKCE 是强制要求。
"""
def __init__(self, server_url: str, client_id: str):
self.server_url = server_url
self.client_id = client_id
self.access_token: str | None = None
self.token_expires_at: float = 0
async def authenticate(self) -> str:
"""完整的 OAuth 2.1 + PKCE 认证流程"""
# 第一步:发现 OAuth 元数据
metadata = await self._discover_oauth_metadata()
# 第二步:生成 PKCE 挑战
code_verifier = secrets.token_urlsafe(64)
code_challenge = self._generate_code_challenge(code_verifier)
state = secrets.token_urlsafe(16)
# 第三步:构造授权 URL(这里简化了,实际需要启动本地 HTTP 服务处理回调)
auth_url = metadata["authorization_endpoint"]
auth_url += f"?response_type=code"
auth_url += f"&client_id={self.client_id}"
auth_url += f"&redirect_uri=http://localhost:8765/callback"
auth_url += f"&scope={' '.join(metadata['supported_scopes'])}"
auth_url += f"&state={state}"
auth_url += f"&code_challenge={code_challenge}"
auth_url += f"&code_challenge_method=S256"
print(f"请访问以下地址完成授权:\n{auth_url}")
# 第四步:等待授权码(实际应用中启动本地 HTTP 服务器接收回调)
auth_code = await self._wait_for_auth_code()
# 第五步:交换令牌
token_response = await httpx.AsyncClient().post(
metadata["token_endpoint"],
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": "http://localhost:8765/callback",
"client_id": self.client_id,
"code_verifier": code_verifier # PKCE: 发送明文 verifier
}
)
token_data = token_response.json()
self.access_token = token_data["access_token"]
self.token_expires_at = asyncio.get_event_loop().time() + token_data["expires_in"]
return self.access_token
def _generate_code_challenge(self, verifier: str) -> str:
"""S256 PKCE 方法:SHA-256 哈希 + Base64URL 编码"""
digest = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(digest).decode().rstrip("=")
async def _discover_oauth_metadata(self) -> dict:
"""从 /.well-known/oauth-authorization-server 发现元数据"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.server_url}/.well-known/oauth-authorization-server"
)
return response.json()
async def _wait_for_auth_code(self) -> str:
"""等待授权回调(实际应用中启动 HTTP 服务器)"""
# 这里简化处理,实际需要 asyncio.Event 和本地 HTTP 服务器
await asyncio.sleep(0)
return "PLACEHOLDER_AUTH_CODE"
async def call_tool(self, name: str, arguments: dict) -> dict:
"""带认证的 MCP 调用"""
if not self.access_token or asyncio.get_event_loop().time() >= self.token_expires_at:
await self.authenticate()
async with httpx.AsyncClient() as client:
response = await client.post(
self.server_url,
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": name, "arguments": arguments}
},
headers={
"Authorization": f"Bearer {self.access_token}",
"MCP-Protocol-Version": "2026-07-28"
}
)
return response.json()
这条变更的实际意义:旧规范对认证几乎是放任的——很多 MCP Server 根本没有认证,或者用简单的 API Key 认证。现在 OAuth 2.1 强制化,PKCE 强制化,意味着 MCP 可以安全地暴露在公网上了。对企业用户来说,这意味着终于可以在生产环境中放心使用第三方 MCP Server。
2.5 传输层标准化:Streamable HTTP 取代 SSE
旧规范使用 SSE(Server-Sent Events)作为远程传输方式。新规范统一为 Streamable HTTP,兼顾流式响应和水平扩展能力。
# Python: 新规范的 Streamable HTTP Transport
import httpx
import asyncio
from typing import AsyncIterator
class StreamableHTTPTransport:
"""
MCP 2026-07-28 Streamable HTTP Transport。
支持:
- 短请求/响应(无状态)
- 流式响应(Server-Sent Events 替代方案)
- 错误处理
"""
def __init__(
self,
server_url: str,
access_token: str,
protocol_version: str = "2026-07-28"
):
self.server_url = server_url
self.default_headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"MCP-Protocol-Version": protocol_version,
"Accept": "application/json, text/event-stream"
}
self._client: httpx.AsyncClient | None = None
@property
def client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=5.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
return self._client
async def request(
self,
method: str,
params: dict,
idempotency_key: str | None = None
) -> dict:
"""
发送短请求,返回完整响应。
用于 tools/call 等同步操作。
"""
headers = {**self.default_headers}
if idempotency_key:
headers["Idempotency-Key"] = idempotency_key
response = await self.client.post(
self.server_url,
json={
"jsonrpc": "2.0",
"method": method,
"params": params
},
headers=headers
)
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return response.json()
elif "text/event-stream" in content_type:
# 处理流式响应(累积直到完整)
return await self._collect_sse_response(response)
else:
raise ValueError(f"Unexpected content-type: {content_type}")
async def stream_request(
self,
method: str,
params: dict
) -> AsyncIterator[dict]:
"""
流式请求,用于返回大量数据的场景。
生成器模式,不等待完整响应。
"""
async with self.client.stream(
"POST",
self.server_url,
json={
"jsonrpc": "2.0",
"method": method,
"params": params
},
headers=self.default_headers
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
import json
yield json.loads(line[6:])
async def _collect_sse_response(self, response: httpx.Response) -> dict:
"""将 SSE 流累积为完整 JSON 响应"""
chunks = []
async for line in response.aiter_lines():
if line.startswith("data: "):
chunks.append(line[6:])
import json
return json.loads("".join(chunks))
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
2.6 错误码标准化
新规范统一了 JSON-RPC 错误码体系,并新增了认证相关的标准错误码:
# MCP 2026-07-28 标准化错误码
class MCPErrors:
# JSON-RPC 标准错误(保留)
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
# MCP 专用错误(新增)
AUTHENTICATION_REQUIRED = -32000 # 请求缺少认证信息
AUTHORIZATION_FAILED = -32001 # 认证通过但权限不足
TOKEN_EXPIRED = -32002 # 令牌过期
TOKEN_REVOKED = -32003 # 令牌已被撤销
SCOPE_INSUFFICIENT = -32004 # 令牌 scope 不足以执行该操作
SERVER_UNAVAILABLE = -32005 # 服务暂时不可用(可重试)
# 统一错误响应格式
def create_error_response(error_code: int, message: str, data: dict = None) -> dict:
return {
"jsonrpc": "2.0",
"error": {
"code": error_code,
"message": message,
"data": data or {}
}
}
2.7 扩展机制:MCP Apps 和 Tasks
新规范引入了正式的扩展机制,用于协议核心之外的功能:
# 扩展声明与使用示例
# MCP Apps 扩展:服务端渲染 UI
# 用于让 AI 生成的响应中包含可交互的 UI 组件
async def create_mcp_app(client, app_spec: dict):
"""创建 MCP App(服务端渲染的交互界面)"""
response = await client.request(
method="extensions/mcp-apps/create",
params={
"spec": app_spec, # React/Vue/Svelte 组件定义
"framework": "react",
"styles": {"theme": "dark", "maxWidth": "800px"}
}
)
return response["app_id"] # 返回 App ID,前端渲染该 URL
# Tasks 扩展:长时间运行的任务
# 用于代码重构、大规模分析等耗时操作
async def run_long_task(client, task_type: str, params: dict):
"""创建长时间运行的任务"""
task = await client.request(
method="extensions/tasks/create",
params={
"type": task_type,
"params": params,
"timeout": 3600, # 1 小时超时
"priority": "normal"
}
)
task_id = task["id"]
# 轮询任务状态
while True:
status = await client.request(
method="extensions/tasks/status",
params={"id": task_id}
)
if status["state"] == "completed":
return status["result"]
elif status["state"] == "failed":
raise RuntimeError(f"Task failed: {status['error']}")
await asyncio.sleep(10) # 每 10 秒轮询
三、迁移路径:从有状态到无状态的实战指南
3.1 完整迁移示例:数据库查询 MCP Server
假设你有一个提供数据库查询能力的 MCP Server,旧实现基于有状态会话:
# ========== 旧实现(有状态) ==========
import uuid
from typing import Optional
from dataclasses import dataclass, field
@dataclass
class Session:
db_connection: object
query_history: list[str] = field(default_factory=list)
user: Optional[str] = None
class OldDatabaseMCPServer:
"""旧规范:有状态会话管理"""
def __init__(self, db_url: str):
self.db_url = db_url
self.sessions: dict[str, Session] = {} # ← 内存存储会话
self.db_pool = self._create_pool(db_url)
async def initialize(self, client_info: dict) -> dict:
"""旧规范:初始化建立会话"""
session_id = str(uuid.uuid4())
self.sessions[session_id] = Session(
db_connection=await self.db_pool.get_connection(),
user=client_info.get("name")
)
return {
"sessionId": session_id,
"protocolVersion": "2025-03-26",
"instructions": "所有请求请携带 sessionId"
}
async def execute_query(
self,
session_id: str,
sql: str
) -> dict:
"""旧规范:通过 session_id 查找会话"""
if session_id not in self.sessions:
raise ValueError(f"Invalid session: {session_id}")
session = self.sessions[session_id]
result = await session.db_connection.execute(sql)
session.query_history.append(sql)
return {
"rows": result.rows,
"affected": result.affected,
"history_count": len(session.query_history)
}
迁移到新规范后:
# ========== 新实现(无状态 + OAuth 2.1) ==========
import uuid
import base64
import json
import asyncio
from typing import Optional
from dataclasses import dataclass
@dataclass
class ConnectionState:
"""显式状态:替代旧规范的 Session"""
connection_id: str
user: Optional[str]
query_history: list[str]
last_activity: float
class NewDatabaseMCPServer:
"""
新规范:无状态 + OAuth 2.1
关键变化:
1. 移除 initialize 方法
2. 无 session_id 依赖
3. 状态通过 state_token 显式传递
4. 强制 OAuth 2.1 认证
"""
def __init__(self, db_url: str):
self.db_url = db_url
# 连接池(用于无状态设计中的连接复用)
self._connection_pool: asyncio.Queue = asyncio.Queue()
self._init_pool(db_url)
# 状态存储(可以用 Redis 替代内存)
self._state_store: dict[str, ConnectionState] = {}
def _init_pool(self, db_url: str):
"""初始化连接池"""
# 实际生产中初始化连接...
pass
async def handle_request(
self,
request: dict,
access_token: str,
state_token: Optional[str] = None
) -> dict:
"""统一请求处理入口"""
# 第一步:验证 OAuth 令牌
if not await self._verify_token(access_token):
return self._error_response(-32000, "Authentication required")
# 第二步:从 state_token 恢复状态(或创建新状态)
current_state = self._restore_or_create_state(state_token)
# 第三步:执行方法
method = request.get("method", "")
params = request.get("params", {})
if method == "tools/call":
result = await self._call_tool(params, current_state)
elif method == "tools/list":
result = self._list_tools()
else:
return self._error_response(-32601, f"Unknown method: {method}")
# 第四步:保存状态,返回新的 state_token
new_state_token = self._save_state(current_state)
return {
"jsonrpc": "2.0",
"result": result,
"state_token": new_state_token # ← 显式状态句柄
}
async def _call_tool(self, params: dict, state: ConnectionState) -> dict:
"""执行工具调用"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name == "database_query":
sql = arguments.get("sql", "")
# 获取连接(无状态:从池中获取,不依赖会话)
conn = await self._get_connection(state)
try:
result = await conn.execute(sql)
state.query_history.append(sql)
state.last_activity = asyncio.get_event_loop().time()
return {
"content": [{
"type": "text",
"text": json.dumps({
"rows": result.rows[:100], # 限制返回行数
"affected": result.affected,
"total_rows": result.total
}, default=str)
}]
}
finally:
await self._release_connection(conn)
return {"error": f"Unknown tool: {tool_name}"}
async def _get_connection(self, state: ConnectionState):
"""从连接池获取连接"""
try:
conn = await asyncio.wait_for(
self._connection_pool.get(),
timeout=5.0
)
return conn
except asyncio.TimeoutError:
# 超时:创建临时连接
return await self._create_temp_connection()
async def _release_connection(self, conn):
"""归还连接到池"""
if not self._connection_pool.full():
await self._connection_pool.put(conn)
def _list_tools(self) -> dict:
"""列出可用工具(替代旧规范的 resources/list)"""
return {
"tools": [
{
"name": "database_query",
"description": "Execute SQL query against the database",
"inputSchema": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL query to execute"
}
},
"required": ["sql"]
}
}
]
}
def _restore_or_create_state(
self,
state_token: Optional[str]
) -> ConnectionState:
"""从 state_token 恢复状态,或创建新状态"""
if state_token:
try:
# 解码 state_token(实际生产中应该解密)
decoded = base64.urlsafe_b64decode(state_token).decode()
data = json.loads(decoded)
if data.get("conn_id") in self._state_store:
return self._state_store[data["conn_id"]]
except Exception:
pass # token 无效,当作新状态处理
# 创建新状态
return ConnectionState(
connection_id=str(uuid.uuid4())[:8],
user=None,
query_history=[],
last_activity=asyncio.get_event_loop().time()
)
def _save_state(self, state: ConnectionState) -> str:
"""将状态编码为 state_token"""
data = json.dumps({
"conn_id": state.connection_id,
"user": state.user,
"history_len": len(state.query_history)
})
token = base64.urlsafe_b64encode(data.encode()).decode()
self._state_store[state.connection_id] = state
return token
async def _verify_token(self, token: str) -> bool:
"""验证 OAuth 2.1 访问令牌"""
# 实际实现中应该调用令牌内省端点或验证 JWT
return len(token) > 0 # 简化示例
def _error_response(self, code: int, message: str) -> dict:
return {
"jsonrpc": "2.0",
"error": {"code": code, "message": message}
}
def _create_temp_connection(self):
"""创建临时连接(池耗尽时的降级方案)"""
pass
四、连接池设计与无状态架构的权衡
无状态架构带来了一个关键问题:连接谁来管理?
在旧规范中,连接在 initialize 时建立,在整个会话生命周期内复用。这有好处(减少 TCP 握手开销),也有坏处(连接长期占用,资源利用率低)。
新规范下,这个问题变得有趣了:
连接池策略选择
from enum import Enum
from typing import Protocol
import asyncio
class PoolStrategy(Enum):
PER_REQUEST = "per_request" # 每次请求新建连接
POOL_SHARED = "pool_shared" # 全局连接池共享
STATEFUL_AWARE = "stateful_aware" # 按 state_token 亲和
class ConnectionManager(Protocol):
"""连接管理器接口"""
async def acquire(self) -> Connection: ...
async def release(self, conn: Connection) -> None: ...
class PerRequestPool(ConnectionManager):
"""
策略一:每请求一个连接
优点:简单,无状态,适合短生命周期的 Serverless 函数
缺点:TCP 握手开销,数据库连接建立开销
"""
def __init__(self, db_url: str):
self.db_url = db_url
async def acquire(self) -> Connection:
# 每次都创建新连接(实际用数据库驱动)
return await create_connection(self.db_url)
async def release(self, conn: Connection):
await conn.close()
class SharedPool(ConnectionManager):
"""
策略二:全局连接池
优点:高复用率,低延迟,适合高并发场景
缺点:连接数有上限,需要处理池耗尽
"""
def __init__(self, db_url: str, pool_size: int = 20):
self._pool = asyncio.Queue(maxsize=pool_size)
self._semaphore = asyncio.Semaphore(pool_size)
self._db_url = db_url
# 预热连接池
for _ in range(pool_size):
conn = await create_connection(db_url)
await self._pool.put(conn)
async def acquire(self) -> Connection:
await self._semaphore.acquire()
try:
# 带超时的获取
conn = await asyncio.wait_for(
self._pool.get(),
timeout=10.0
)
return conn
except asyncio.TimeoutError:
self._semaphore.release()
raise RuntimeError("Connection pool exhausted")
async def release(self, conn: Connection):
if not self._pool.full():
await self._pool.put(conn)
self._semaphore.release()
class StatefulAwarePool(ConnectionManager):
"""
策略三:状态感知连接池(推荐)
根据 state_token 将请求路由到固定的连接,
兼顾无状态架构的灵活性与连接复用的效率。
工作原理:
1. state_token 中编码了 connection_id
2. 连接池按 connection_id 哈希分配连接
3. 同一 state_token 的请求复用同一连接
4. 不同 state_token 的请求分散到不同连接
"""
def __init__(self, db_url: str, pool_size: int = 50):
self._pools: dict[str, asyncio.Queue] = {}
self._db_url = db_url
self._pool_size = pool_size
self._lock = asyncio.Lock()
async def _get_or_create_pool(self, conn_id: str) -> asyncio.Queue:
if conn_id not in self._pools:
async with self._lock:
if conn_id not in self._pools:
q = asyncio.Queue(maxsize=3) # 每个连接 ID 最多 3 个并发
for _ in range(2):
conn = await create_connection(self._db_url)
await q.put(conn)
self._pools[conn_id] = q
return self._pools[conn_id]
async def acquire(self) -> tuple[str, Connection]:
"""返回 (conn_id, connection)"""
# 这个设计让无状态架构下仍能保持连接亲和性
conn_id = "default" # 实际从 state_token 解析
pool = await self._get_or_create_pool(conn_id)
conn = await pool.get()
return conn_id, conn
async def release(self, conn_id: str, conn: Connection):
pool = self._pools.get(conn_id)
if pool and not pool.full():
await pool.put(conn)
五、OAuth 2.1 企业级集成:真实部署架构
对于企业用户,OAuth 2.1 强制化意味着需要重新设计认证架构。下面是一个基于 Keycloak 的企业级部署方案:
# docker-compose.yml: MCP Server 企业级部署架构
version: '3.9'
services:
# MCP Server(无状态)
mcp-server:
image: your-org/mcp-server:latest
environment:
DB_URL: postgresql://db:5432/mcp
OAUTH_ISSUER: http://keycloak:8080/realms/your-org
MCP_PROTOCOL_VERSION: "2026-07-28"
ports:
- "8080:8080"
deploy:
replicas: 3 # ← 无状态设计支持水平扩展!
depends_on:
- db
- keycloak
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 5s
retries: 3
# PostgreSQL(状态存储,不再存会话)
db:
image: postgres:16
environment:
POSTGRES_DB: mcp
volumes:
- pgdata:/var/lib/postgresql/data
- ./migrations:/docker-entrypoint-initdb.d
command: >
postgres
-c max_connections=200
-c shared_buffers=256MB
-c work_mem=4MB
# Keycloak(OAuth 2.1 授权服务器)
keycloak:
image: quay.io/keycloak/keycloak:24.0
environment:
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: ${KEYCLOAK_ADMIN_PASSWORD}
KC_HEALTH_ENABLED: true
KC_METRICS_ENABLED: true
command: start-dev
volumes:
- ./keycloak/themes:/opt/keycloak/themes
ports:
- "8180:8080"
# Redis(状态缓存,可选,用于高性能场景)
redis:
image: redis:7-alpine
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
# Nginx(负载均衡 + TLS 终止)
nginx:
image: nginx:1.27-alpine
ports:
- "443:443"
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- mcp-server
volumes:
pgdata:
# nginx.conf: 支持 Streamable HTTP 的负载均衡配置
events {
worker_connections 1024;
}
http {
upstream mcp_backend {
least_conn; # 最少连接优先,适合长连接场景
server mcp-server-1:8080;
server mcp-server-2:8080;
server mcp-server-3:8080;
keepalive 32; # 保持长连接复用
}
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /certs/fullchain.pem;
ssl_certificate_key /certs/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
# OAuth 令牌验证(通过 Nginx Auth Request Module)
auth_request /auth/validate;
auth_request_set $auth_status $upstream_status;
location / {
proxy_pass http://mcp_backend;
# Streamable HTTP 必需的配置
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 流式响应支持
proxy_buffering off;
proxy_cache off;
# 超时配置
proxy_connect_timeout 10s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
}
# OAuth 令牌验证端点
location /auth/validate {
internal;
proxy_pass http://keycloak:8080/realms/your-org/protocol/openid-connect/token/introspect;
proxy_method POST;
proxy_pass_request_body off;
proxy_set_header Content-Type "application/x-www-form-urlencoded";
proxy_set_body "token=$http_authorization";
# 验证通过继续,不通过返回 401
proxy_intercept_errors off;
}
}
}
六、迁移检查清单与时间线
| 日期 | 事件 | 行动 |
|---|---|---|
| 2026-05-21 | RC 发布 | 开始评估影响范围 |
| 2026-06-24 | 当前(本文发布) | 开始迁移开发 |
| 2026-07-28 | 正式发布 | 完成迁移,上线测试 |
| 2026-08-28 | 过渡期结束 | 旧规范废弃 |
高优先级(必须完成)
# 1. 检查代码中的废弃 API
grep -rn "initialize\|sessionId\|resources/list\|prompts/list\|sampling" ./mcp_server/
# 2. 检查 SSE 传输实现
grep -rn "EventSource\|sse\|text/event-stream" ./mcp_server/
# 3. 检查认证实现
grep -rn "api_key\|basic_auth" ./mcp_server/
迁移自检脚本
#!/usr/bin/env python3
"""
MCP 2026-07-28 规范迁移自检脚本
运行此脚本检查你的 MCP 实现是否需要迁移
"""
import ast
import sys
from pathlib import Path
from typing import Optional
class MCPMigrationChecker:
"""MCP 2026-07-28 迁移自检工具"""
BREAKING_CHANGES = {
"initialize": {
"severity": "HIGH",
"reason": "初始化握手已移除,改用 HTTP 头部协商",
"fix": "删除 initialize 调用,更新 Client 实现"
},
"sessionId": {
"severity": "HIGH",
"reason": "会话机制已移除,改用显式状态句柄",
"fix": "实现 state_token 编码/解码逻辑"
},
"resources/list": {
"severity": "MEDIUM",
"reason": "已废弃,统一到 tools/list",
"fix": "将 resources/list 端点迁移到 tools/list"
},
"prompts/list": {
"severity": "MEDIUM",
"reason": "已废弃,改用扩展机制",
"fix": "将 prompts 迁移到 MCP Extensions"
},
"sampling": {
"severity": "LOW",
"reason": "采样是客户端职责,不属于协议层",
"fix": "从服务端移除 sampling 相关代码"
},
"EventSource": {
"severity": "MEDIUM",
"reason": "SSE 已废弃,改用 Streamable HTTP",
"fix": "迁移到 httpx/aiohttp 的流式请求"
},
}
def __init__(self, project_path: str):
self.project_path = Path(project_path)
self.issues: list[dict] = []
def scan(self) -> None:
"""扫描项目中的所有 Python 文件"""
for py_file in self.project_path.rglob("*.py"):
if "node_modules" in str(py_file) or ".venv" in str(py_file):
continue
self._scan_file(py_file)
def _scan_file(self, file_path: Path) -> None:
"""扫描单个文件"""
try:
content = file_path.read_text(encoding="utf-8")
tree = ast.parse(content)
except (SyntaxError, UnicodeDecodeError):
return
# 检查字符串字面量
for node in ast.walk(tree):
if isinstance(node, ast.Constant) and isinstance(node.value, str):
self._check_string(file_path, node.value, node.lineno)
def _check_string(self, file_path: Path, content: str, line_no: int):
"""检查字符串内容"""
for key, info in self.BREAKING_CHANGES.items():
if key in content:
self.issues.append({
"file": str(file_path),
"line": line_no,
"pattern": key,
"severity": info["severity"],
"reason": info["reason"],
"fix": info["fix"]
})
def report(self) -> str:
"""生成迁移报告"""
if not self.issues:
return "✅ 未发现问题,你的代码已兼容 MCP 2026-07-28 规范"
# 按严重程度排序
order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
self.issues.sort(key=lambda x: order.get(x["severity"], 3))
lines = ["# MCP 2026-07-28 迁移报告", ""]
high = [i for i in self.issues if i["severity"] == "HIGH"]
medium = [i for i in self.issues if i["severity"] == "MEDIUM"]
low = [i for i in self.issues if i["severity"] == "LOW"]
if high:
lines.append(f"## 🔴 高优先级({len(high)} 项)")
for issue in high:
lines.append(f"\n### {issue['file']}:{issue['line']}")
lines.append(f"- **模式**: `{issue['pattern']}`")
lines.append(f"- **原因**: {issue['reason']}")
lines.append(f"- **修复**: {issue['fix']}")
if medium:
lines.append(f"\n## 🟡 中优先级({len(medium)} 项)")
for issue in medium:
lines.append(f"- `{issue['file']}:{issue['line']}` - {issue['pattern']}")
if low:
lines.append(f"\n## 🟢 低优先级({len(low)} 项)")
for issue in low:
lines.append(f"- `{issue['file']}:{issue['line']}` - {issue['pattern']}")
lines.append(f"\n---\n**总计**: {len(high)} 高 + {len(medium)} 中 + {len(low)} 低 = {len(self.issues)} 项待处理")
lines.append(f"**建议**: 在 2026-07-28 正式发布前完成高优先级项的迁移")
return "\n".join(lines)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python mcp_migration_check.py /path/to/your/mcp/project")
sys.exit(1)
checker = MCPMigrationChecker(sys.argv[1])
checker.scan()
print(checker.report())
七、对 AI 生态的影响与展望
开发者工具的影响
Cursor、Windsurf、Claude Code、VS Code 等主流工具需要更新其 MCP Client 实现。好消息是,新规范中的 stdio 传输方式保持了向后兼容,本地开发场景(通过命令行启动的本地 MCP Server)几乎不受影响。
真正受影响的是:
- 远程 MCP Server 部署:必须重新设计认证和状态管理
- 企业 MCP Gateway:需要升级 OAuth 集成
- MCP SDK 维护者:需要发布兼容新规范的 SDK 版本
云厂商的影响
AWS、Azure、GCP 都已经提供了官方 MCP Server。这些服务需要:
- 升级 OAuth 2.1 实现(PKCE 强制化)
- 从 SSE 迁移到 Streamable HTTP
- 设计无状态的 Serverless 部署方案
这次规范升级的深层意义
MCP 2026-07-28 规范的意义,远不止「修 bug」或「优化性能」。它标志着 MCP 从一个实验性协议正式走向生产级基础设施。
有状态 → 无状态的转变,让 MCP 终于可以:
- 水平扩展:不再需要 sticky session,K8s HPA 自动扩缩容成为可能
- Serverless 部署:Cloudflare Workers、AWS Lambda 可以原生支持 MCP
- 安全合规:OAuth 2.1 强制化,满足企业安全审计要求
- 多租户隔离:每个请求独立验证,无需担心会话混淆
MCP 从「开发者玩具」变成了真正可以承载生产流量的协议。
总结
MCP 2026-07-28 规范是一次彻底的架构升级,带来了七个方向的破坏性变更:
| 变更 | 影响 | 迁移难度 |
|---|---|---|
| 会话移除 | 必须重写状态管理逻辑 | ⭐⭐⭐⭐ |
| 初始化握手移除 | 需要更新 Client SDK | ⭐⭐⭐ |
| 三大功能废弃 | 迁移到统一接口/扩展 | ⭐⭐ |
| OAuth 2.1 强制化 | 企业认证架构需重建 | ⭐⭐⭐⭐ |
| Streamable HTTP | 传输层升级 | ⭐⭐⭐ |
| 错误码标准化 | 错误处理逻辑调整 | ⭐ |
| 扩展机制引入 | 新功能可选接入 | ⭐ |
45 天的迁移窗口期并不宽裕。建议立即行动,按以下顺序推进:
- 第一周:运行迁移自检脚本,评估影响范围
- 第二周:完成 OAuth 2.1 认证改造(最重要)
- 第三周:迁移传输层,删除 initialize 调用
- 第四周:替换废弃方法,全面测试
- 第五周(缓冲):灰度发布,监控异常
现在就去打开你的 MCP 代码,搜索 initialize 和 sessionId,开始这场迟来已久的架构升级吧。
参考资源: