MCP(Model Context Protocol)深度解析:Anthropic 推出的 AI 工具集成标准协议——从原理到生产级实战的完整技术内幕
2024 年 11 月,Anthropic 发布了 MCP(Model Context Protocol,模型上下文协议)。这个看似简单的开源协议,在 2026 年已经成为 AI 工具集成的事实标准。本文将从协议设计、架构原理、代码实战、生产部署四个维度,深度剖析 MCP 的技术内幕。
目录
- 为什么需要 MCP?——AI 工具集成的混沌时代
- MCP 协议核心概念
- 架构深度剖析:Client-Server 模型与通信机制
- 快速实战:用 Python 从零搭建 MCP Server
- TypeScript 实战:构建生产级 MCP Server
- Transport 层深度解析:stdio / SSE / WebSocket
- 核心组件详解:Tools、Resources、Prompts
- 生产级 MCP Server 设计模式
- 安全机制:认证、授权与沙箱隔离
- 性能优化:并发、缓存与连接池
- MCP 生态与多语言 SDK
- 从 MCP 到 A2A:AI Agent 协议全景
- 实战案例:构建企业级文档检索 MCP Server
- 部署与运维:容器化、监控与可观测性
- 总结与展望
为什么需要 MCP?——AI 工具集成的混沌时代
混沌时代:每个工具一套接口
在 MCP 出现之前,让 AI Agent 调用外部工具是一场噩梦。
假设你正在构建一个能够查询天气、查股票、读日历、发邮件的 AI 助手。在没有统一协议的情况下,你需要为每一个工具单独编写集成代码:
# 没有 MCP 的黑暗时代 —— 每个工具一套接口
import requests
import json
def call_weather_api(city: str, unit: str = "celsius") -> dict:
"""调用天气 API —— 一套接口风格"""
response = requests.get(
"https://api.weather.com/v1/current",
headers={"Authorization": f"Bearer {WEATHER_API_KEY}"},
params={"city": city, "unit": unit}
)
return response.json()
def call_stock_api(symbol: str) -> dict:
"""调用股票 API —— 完全不同的接口风格"""
response = requests.post(
"https://api.stock.com/graphql",
headers={"X-API-Key": STOCK_API_KEY},
json={"query": f'{{ quote(symbol: "{symbol}") {{ price change }} }}'}
)
return response.json()
def read_google_calendar() -> dict:
"""读取 Google 日历 —— 又是另一种风格"""
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
creds = Credentials(token=GOOGLE_ACCESS_TOKEN)
service = build('calendar', 'v3', credentials=creds)
events = service.events().list(calendarId='primary').execute()
return events
# 问题:每个工具的:
# 1. 认证方式不同(Bearer Token / API Key / OAuth2 / Cookie)
# 2. 请求格式不同(REST / GraphQL / gRPC / 私有协议)
# 3. 错误格式不同(HTTP 状态码 / 业务错误码 / 异常)
# 4. 参数校验方式不同
# 5. 文档质量参差不齐
这带来几个致命问题:
- 开发成本高:每个工具都要手写集成代码,重复劳动
- 维护成本高:API 升级、认证过期、接口变更,处处是坑
- AI 理解困难:每个工具的描述方式不同,AI 难以统一理解工具的能力
- 安全隐患:直接把 API Key 暴露给 AI,缺乏权限控制和审计
MCP 的解决方案:AI 世界的"USB-C 接口"
Anthropic 提出 MCP 的核心思路非常简单:定义一套标准协议,让 AI 应用(Client)和工具提供方(Server)都遵循这套协议。
类比 USB-C 接口:
- 在 USB-C 出现之前,每个设备都需要专用的充电器(类似每个工具一套接口)
- USB-C 统一了充电和数据传输标准(类似 MCP 统一了工具调用标准)
- 现在你只需要一根 USB-C 线,就能给手机、电脑、平板充电(类似 AI 只需要支持 MCP,就能调用所有 MCP Server 提供的工具)
graph LR
A[AI Application<br/>Claude Desktop / Cursor / OpenClaw] --> B[MCP Client]
B -->|MCP Protocol<br/>JSON-RPC 2.0| C[MCP Server: Weather]
B -->|MCP Protocol| D[MCP Server: Database]
B -->|MCP Protocol| E[MCP Server: GitHub]
B -->|MCP Protocol| F[MCP Server: FileSystem]
C --> G[Weather API]
D --> H[PostgreSQL]
E --> I[GitHub API]
F --> J[Local Files]
MCP 的核心价值:
- 标准化:统一的工具描述格式,AI 能自动理解工具能力
- 解耦:AI 应用和工具实现完全解耦,互不依赖
- 安全:统一的认证、授权、审计机制
- 可扩展:新增工具只需要新增 MCP Server,无需修改 AI 应用
MCP 协议核心概念
协议层:JSON-RPC 2.0
MCP 基于 JSON-RPC 2.0 协议,这是一个轻量级的远程过程调用(RPC)协议。
// MCP 请求示例
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "query_weather",
"arguments": {
"city": "Beijing",
"unit": "celsius"
}
}
}
// MCP 响应示例
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "北京当前气温 23°C,晴转多云。"
}
]
}
}
三种核心能力
MCP Server 可以提供三种核心能力:
| 能力 | 说明 | 类比 |
|---|---|---|
| Tools(工具) | AI 可以调用的函数,用于执行操作 | 就像给 AI 提供了可以调用的 API |
| Resources(资源) | 可以被 AI 读取的数据,只读 | 就像文件系统、数据库、网页内容 |
| Prompts(提示模板) | 预定义的提示词模板,用户可以快速调用 | 就像 IDE 的代码片段(Snippet) |
graph TB
Client[MCP Client<br/>AI Application] -->|1. Initialize| Server[MCP Server]
Server -->|2. Return Capabilities| Client
Client -->|3. List Tools| Server
Server -->|4. Return Tool List| Client
Client -->|5. Call Tool| Server
Server -->|6. Execute & Return Result| Client
Client -->|7. Read Resource| Server
Server -->|8. Return Resource Content| Client
Client -->|9. Get Prompt| Server
Server -->|10. Return Prompt Template| Client
架构深度剖析:Client-Server 模型与通信机制
架构组件
MCP 采用经典的 Client-Server 架构,但针对 AI 场景做了特殊优化:
┌─────────────────────────────────────────────────────────────┐
│ MCP Host (宿主应用) │
│ Claude Desktop / Cursor / OpenClaw / VS Code │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ MCP Client 1 │ │ MCP Client 2 │ │ MCP Client 3 │ │
│ │ (Weather) │ │ (Database) │ │ (GitHub) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
└─────────┼──────────────────┼──────────────────┼──────────────┘
│ │ │
│ JSON-RPC 2.0 │ JSON-RPC 2.0 │ JSON-RPC 2.0
│ │ │
┌─────────▼──────────────────▼──────────────────▼──────────────┐
│ Transport Layer │
│ stdio / SSE / WebSocket / HTTP │
└─────────────────────────────────────────────────────────────┘
│ │ │
┌─────────▼──────────────────▼──────────────────▼──────────────┐
│ MCP Server Process │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Weather MCP │ │ Database │ │ GitHub MCP │ │
│ │ Server │ │ Server │ │ Server │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
生命周期管理
MCP 连接有一个严格的生命周期管理协议:
sequenceDiagram
participant C as MCP Client
participant S as MCP Server
Note over C,S: 1. Initialization Phase
C->>S: initialize request<br/>{protocolVersion, capabilities, clientInfo}
S->>C: initialize response<br/>{protocolVersion, capabilities, serverInfo}
C->>S: initialized notification
Note over C,S: 2. Discovery Phase
C->>S: tools/list request
S->>C: tools/list response<br/>[Tool1, Tool2, ...]
C->>S: resources/list request
S->>C: resources/list response<br/>[Resource1, Resource2, ...]
Note over C,S: 3. Operational Phase
loop AI 需要调用工具时
C->>S: tools/call request<br/>{name, arguments}
S->>C: tools/call response<br/>{content: [{type, text}]}
end
Note over C,S: 4. Shutdown Phase
C->>S: shutdown request
S->>C: shutdown response
C->>S: exit notification
协议版本兼容
MCP 协议版本号格式为 YYYY-MM-DD,例如 2024-11-05。
# MCP Client 初始化时的版本协商
initialize_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05", # Client 支持的最新版本
"capabilities": {
"roots": {"listChanged": True},
"sampling": {}
},
"clientInfo": {
"name": "claude-desktop",
"version": "1.2.0"
}
}
}
# Server 会返回它支持的版本(可能低于 Client 请求的版本)
initialize_response = {
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2024-11-05", # Server 实际使用的版本
"capabilities": {
"tools": {"listChanged": True},
"resources": {"subscribe": True, "listChanged": True}
},
"serverInfo": {
"name": "weather-mcp-server",
"version": "0.1.0"
}
}
}
快速实战:用 Python 从零搭建 MCP Server
环境准备
# 安装官方 Python SDK
pip install mcp
# 或者使用 uv(推荐,更快)
uv pip install mcp
最小可用 MCP Server(天气查询)
#!/usr/bin/env python3
"""
最小可用 MCP Server 示例:天气查询工具
运行方式:python weather_server.py
"""
import asyncio
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Tool,
TextContent,
CallToolResult,
ListToolsResult,
)
import mcp.server.models as models
import mcp.types as types
# 初始化 MCP Server
server = Server("weather-mcp-server")
@server.list_tools()
async def list_tools() -> ListToolsResult:
"""
声明这个 Server 提供哪些工具。
AI 会看到这个列表,并根据描述决定是否调用。
"""
return ListToolsResult(
tools=[
Tool(
name="query_weather",
description="查询指定城市的当前天气。输入城市名称(中文或英文),返回温度、天气状况、湿度等信息。",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,例如:Beijing、Shanghai、New York"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位,默认 celsius(摄氏度)"
}
},
"required": ["city"]
}
)
]
)
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> CallToolResult:
"""
处理工具调用请求。
当 AI 决定调用某个工具时,会触发这个函数。
"""
if name == "query_weather":
city = arguments.get("city")
unit = arguments.get("unit", "celsius")
# 调用真实的天气 API(这里用模拟数据)
weather_data = await fetch_weather(city, unit)
return CallToolResult(
content=[
TextContent(
type="text",
text=weather_data
)
]
)
raise ValueError(f"Unknown tool: {name}")
async def fetch_weather(city: str, unit: str) -> str:
"""
调用天气 API 获取真实数据。
这里使用 wttr.in(免费、无需 API Key 的天气服务)。
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"https://wttr.in/{city}?format=j1")
data = response.json()
current = data["current_condition"][0]
temp = current["temp_C"] if unit == "celsius" else current["temp_F"]
desc = current["weatherDesc"][0]["value"]
humidity = current["humidity"]
return f"{city} 当前天气:{desc},温度 {temp}°{'C' if unit == 'celsius' else 'F'},湿度 {humidity}%"
async def main():
"""
启动 MCP Server,使用 stdio 作为传输层。
stdio 表示:从 stdin 读取请求,将响应写入 stdout。
"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
测试 MCP Server
# 1. 保存上面的代码为 weather_server.py
# 2. 使用 MCP Inspector 调试(官方提供的可视化调试工具)
npx @modelcontextprotocol/inspector
# 3. 在 Inspector 中配置:
# - Transport Type: stdio
# - Command: python
# - Arguments: weather_server.py
# - Working Directory: (你的项目目录)
# 4. 点击 "Connect",你就能看到工具列表,并手动调用测试
TypeScript 实战:构建生产级 MCP Server
Python 适合快速原型,但在生产环境中,TypeScript 的类型安全和生态更具优势。
项目初始化
# 使用官方 TypeScript SDK
mkdir document-mcp-server && cd document-mcp-server
npm init -y
npm install @modelcontextprotocol/sdk
npm install typescript @types/node tsx --save-dev
# 配置 TypeScript
npx tsc --init
生产级 MCP Server:文档检索工具
#!/usr/bin/env node
/**
* 生产级 MCP Server:文档检索工具
*
* 功能:
* 1. 支持多种文档格式(PDF、Markdown、Word)
* 2. 语义搜索(集成向量数据库)
* 3. 文档分块与嵌入
* 4. 并发请求处理
* 5. 完整的错误处理和日志
*/
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
ErrorCode,
McpError,
} from "@modelcontextprotocol/sdk/types.js";
import fs from "fs/promises";
import path from "path";
import { fileURLToPath } from "url";
// 获取当前文件目录
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// 初始化 MCP Server
const server = new Server(
{
name: "document-retrieval-mcp-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
resources: {},
},
}
);
// 内存中的文档索引(生产环境应使用向量数据库)
interface DocumentChunk {
id: string;
filePath: string;
content: string;
metadata: {
title: string;
author?: string;
createdAt?: string;
};
}
const documentIndex: Map<string, DocumentChunk> = new Map();
/**
* 工具 1:index_document
* 将文档添加到索引中
*/
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "index_document",
description: "将文档(PDF、Markdown、Word)添加到检索索引中。支持批量添加。",
inputSchema: {
type: "object",
properties: {
file_paths: {
type: "array",
items: { type: "string" },
description: "文档文件路径列表",
},
},
required: ["file_paths"],
},
},
{
name: "search_documents",
description: "在已索引的文档中搜索相关内容。返回相关度最高的前 5 个片段。",
inputSchema: {
type: "object",
properties: {
query: {
type: "string",
description: "搜索关键词或问题",
},
top_k: {
type: "number",
description: "返回结果数量,默认 5",
default: 5,
},
},
required: ["query"],
},
},
{
name: "list_indexed_documents",
description: "列出所有已索引的文档",
inputSchema: {
type: "object",
properties: {},
},
},
],
};
});
/**
* 处理工具调用
*/
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "index_document":
return await indexDocument(args);
case "search_documents":
return await searchDocuments(args);
case "list_indexed_documents":
return await listIndexedDocuments();
default:
throw new McpError(ErrorCode.MethodNotFound, `Unknown tool: ${name}`);
}
} catch (error) {
if (error instanceof McpError) {
throw error;
}
throw new McpError(
ErrorCode.InternalError,
`Tool execution failed: ${error instanceof Error ? error.message : String(error)}`
);
}
});
/**
* 索引文档的实现
*/
async function indexDocument(args: any) {
const filePaths: string[] = args.file_paths;
const results: string[] = [];
for (const filePath of filePaths) {
try {
const absolutePath = path.resolve(filePath);
const content = await readDocument(absolutePath);
const chunks = chunkDocument(content, 500); // 每块 500 字符
for (let i = 0; i < chunks.length; i++) {
const chunk: DocumentChunk = {
id: `${absolutePath}:${i}`,
filePath: absolutePath,
content: chunks[i],
metadata: {
title: path.basename(filePath),
},
};
documentIndex.set(chunk.id, chunk);
}
results.push(`✅ 已索引:${filePath}(${chunks.length} 个片段)`);
} catch (error) {
results.push(`❌ 失败:${filePath} - ${error instanceof Error ? error.message : String(error)}`);
}
}
return {
content: [
{
type: "text",
text: results.join("\n"),
},
],
};
}
/**
* 搜索文档的实现(简化版:基于关键词匹配)
* 生产环境应使用向量相似度搜索
*/
async function searchDocuments(args: any) {
const query: string = args.query;
const topK: number = args.top_k || 5;
// 简化搜索:计算关键词重叠度
const queryWords = query.toLowerCase().split(/\s+/);
const scores: Array<[DocumentChunk, number]> = [];
for (const chunk of documentIndex.values()) {
const chunkWords = chunk.content.toLowerCase().split(/\s+/);
const overlap = queryWords.filter((w) => chunkWords.includes(w)).length;
const score = overlap / Math.max(queryWords.length, 1);
if (score > 0) {
scores.push([chunk, score]);
}
}
// 按相关度排序
scores.sort((a, b) => b[1] - a[1]);
const topResults = scores.slice(0, topK);
if (topResults.length === 0) {
return {
content: [{ type: "text", text: "未找到相关文档片段。" }],
};
}
const resultText = topResults
.map(([chunk, score], idx) => {
const preview = chunk.content.slice(0, 200);
return `[${idx + 1}] ${chunk.metadata.title}(相关度:${(score * 100).toFixed(1)}%)\n${preview}...`;
})
.join("\n\n");
return {
content: [{ type: "text", text: resultText }],
};
}
/**
* 列出已索引的文档
*/
async function listIndexedDocuments() {
const fileSet = new Set<string>();
for (const chunk of documentIndex.values()) {
fileSet.add(chunk.filePath);
}
const fileList = Array.from(fileSet)
.map((fp, idx) => `${idx + 1}. ${path.basename(fp)}(${fp})`)
.join("\n");
return {
content: [
{
type: "text",
text: `已索引 ${fileSet.size} 个文档:\n\n${fileList}`,
},
],
};
}
/**
* 读取文档内容(简化版)
*/
async function readDocument(filePath: string): Promise<string> {
const ext = path.extname(filePath).toLowerCase();
if (ext === ".txt" || ext === ".md") {
return await fs.readFile(filePath, "utf-8");
}
if (ext === ".pdf") {
// 生产环境应使用 pdf-parse 库
throw new Error("PDF 解析需要安装 pdf-parse 库");
}
if (ext === ".docx") {
// 生产环境应使用 mammoth 库
throw new Error("Word 解析需要安装 mammoth 库");
}
throw new Error(`不支持的文件格式:${ext}`);
}
/**
* 文档分块
*/
function chunkDocument(content: string, chunkSize: number): string[] {
const chunks: string[] = [];
for (let i = 0; i < content.length; i += chunkSize) {
chunks.push(content.slice(i, i + chunkSize));
}
return chunks;
}
/**
* 启动 Server
*/
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Document Retrieval MCP Server 已启动");
}
main().catch((error) => {
console.error("Server 启动失败:", error);
process.exit(1);
});
Transport 层深度解析:stdio / SSE / WebSocket
MCP 支持多种传输层协议,适应不同场景:
1. stdio(标准输入输出)
适用场景:本地进程通信,最常用。
AI Application (MCP Client) MCP Server
│ │
│ 启动子进程 │
│─────────────────────────────────>│
│ │
│ JSON-RPC 请求 (stdin) │
│─────────────────────────────────>│
│ │
│ JSON-RPC 响应 (stdout) │
│<──────────────────────────────────│
│ │
优点:
- 简单,无需网络配置
- 安全,进程隔离
- 适合本地工具
缺点:
- 只能本机通信
- 不支持多客户端
2. SSE(Server-Sent Events)
适用场景:远程访问,AI Application 通过网络连接 MCP Server。
// Server 端(使用 Express + SSE)
import express from "express";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
const app = express();
app.get("/sse", async (req, res) => {
const transport = new SSEServerTransport("/message", res);
await server.connect(transport);
});
app.post("/message", async (req, res) => {
// 处理客户端发来的消息
});
app.listen(3000);
优点:
- 支持远程访问
- 跨平台
- 防火墙友好(基于 HTTP)
缺点:
- 需要身份验证
- 网络延迟
3. WebSocket
适用场景:需要双向实时通信。
import { WebSocketServer } from "ws";
import { WebSocketTransport } from "@modelcontextprotocol/sdk/server/websocket.js";
const wss = new WebSocketServer({ port: 8080 });
wss.on("connection", async (ws) => {
const transport = new WebSocketTransport(ws);
await server.connect(transport);
});
核心组件详解:Tools、Resources、Prompts
Tools(工具)
Tools 是 MCP 最核心的能力,让 AI 能够执行操作。
// 完整的 Tool 定义
const tool: Tool = {
name: "execute_sql",
description: "执行 SQL 查询。仅支持 SELECT 语句,禁止 WRITE 操作。",
inputSchema: {
type: "object",
properties: {
query: {
type: "string",
description: "SQL 查询语句(仅 SELECT)"
},
database: {
type: "string",
description: "数据库名称",
enum: ["production", "staging", "development"]
}
},
required: ["query", "database"]
}
};
// Tool 调用结果
const result: CallToolResult = {
content: [
{
type: "text",
text: JSON.stringify([
{ id: 1, name: "Alice", email: "alice@example.com" },
{ id: 2, name: "Bob", email: "bob@example.com" }
])
}
],
isError: false // 如果是错误结果,设为 true
};
Resources(资源)
Resources 让 AI 能够读取数据,类似于文件系统。
// Resource 定义
const resource: Resource = {
uri: "file:///project/README.md",
name: "README.md",
description: "项目说明文档",
mimeType: "text/markdown"
};
// 读取 Resource
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
if (uri.startsWith("file://")) {
const filePath = uriToPath(uri);
const content = await fs.readFile(filePath, "utf-8");
return {
contents: [
{
uri,
mimeType: "text/plain",
text: content
}
]
};
}
throw new McpError(ErrorCode.InvalidRequest, `Unsupported URI: ${uri}`);
});
Prompts(提示模板)
Prompts 提供预定义的提示词模板,用户可以快速调用。
server.setRequestHandler(ListPromptsRequestSchema, async () => {
return {
prompts: [
{
name: "code_review",
description: "代码审查模板",
arguments: [
{
name: "file_path",
description: "要审查的文件路径",
required: true
},
{
name: "focus_area",
description: "重点审查领域(安全、性能、可读性)",
required: false
}
]
}
]
};
});
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "code_review") {
const filePath = args?.file_path;
const focusArea = args?.focus_area || "通用";
return {
description: `审查 ${filePath}`,
messages: [
{
role: "user",
content: {
type: "text",
text: `请审查以下文件:${filePath}\n\n重点:${focusArea}\n\n要求:\n1. 指出潜在 Bug\n2. 评估代码质量\n3. 提出改进建议`
}
}
]
};
}
throw new McpError(ErrorCode.InvalidRequest, `Unknown prompt: ${name}`);
});
生产级 MCP Server 设计模式
1. 连接池模式(数据库连接)
"""
生产级数据库 MCP Server:使用连接池
"""
import asyncio
import asyncpg
from mcp.server import Server
class DatabaseMCPPool:
def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
"""初始化连接池"""
self.pool = await asyncpg.create_pool(
dsn=self.dsn,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60 # 60 秒超时
)
async def execute_query(self, query: str) -> str:
"""执行查询(只读)"""
if not query.strip().lower().startswith("select"):
raise ValueError("仅支持 SELECT 查询")
async with self.pool.acquire() as conn:
try:
rows = await conn.fetch(query)
return json.dumps([dict(row) for row in rows], indent=2, ensure_ascii=False)
except Exception as e:
return f"查询失败:{e}"
# 使用
db_pool = DatabaseMCPPool("postgresql://user:pass@localhost/mydb")
@server.on_initialized()
async def on_init():
await db_pool.initialize()
2. 缓存模式(减少重复计算)
/**
* 带缓存的 MCP Tool:查询结果缓存 5 分钟
*/
class CachedToolExecutor {
private cache: Map<string, { result: string; expiresAt: number }> = new Map();
private readonly TTL = 5 * 60 * 1000; // 5 分钟
async executeWithCache(toolName: string, args: any): Promise<string> {
const cacheKey = `${toolName}:${JSON.stringify(args)}`;
const cached = this.cache.get(cacheKey);
if (cached && cached.expiresAt > Date.now()) {
console.error(`[Cache] Hit: ${cacheKey}`);
return cached.result;
}
console.error(`[Cache] Miss: ${cacheKey}`);
const result = await this.executeTool(toolName, args);
this.cache.set(cacheKey, {
result,
expiresAt: Date.now() + this.TTL
});
return result;
}
private async executeTool(toolName: string, args: any): Promise<string> {
// 实际执行逻辑
// ...
}
}
3. 限流模式(防止滥用)
"""
生产级限流:令牌桶算法
"""
import time
import asyncio
from collections import defaultdict
class RateLimiter:
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls # 时间段内最大调用次数
self.period = period # 时间段(秒)
self.calls = defaultdict(list)
async def acquire(self, client_id: str) -> bool:
"""尝试获取调用权限"""
now = time.time()
client_calls = self.calls[client_id]
# 清理过期记录
self.calls[client_id] = [
t for t in client_calls if now - t < self.period
]
if len(self.calls[client_id]) >= self.max_calls:
return False # 被限流
self.calls[client_id].append(now)
return True
# 使用
rate_limiter = RateLimiter(max_calls=100, period=60.0) # 每分钟最多 100 次
@server.call_tool()
async def call_tool(name: str, arguments: dict):
client_id = get_client_id() # 从请求中获取客户端 ID
if not await rate_limiter.acquire(client_id):
raise Exception("Rate limit exceeded. Please retry later.")
# 正常处理逻辑
# ...
安全机制:认证、授权与沙箱隔离
1. 认证(Authentication)
/**
* 带 API Key 认证的 MCP Server
*/
class AuthenticatedMCPServer {
private validApiKeys: Set<string>;
constructor(validApiKeys: string[]) {
this.validApiKeys = new Set(validApiKeys);
}
async handleRequest(req: any): Promise<any> {
// 从请求头中获取 API Key
const apiKey = req.headers["x-api-key"];
if (!apiKey || !this.validApiKeys.has(apiKey)) {
throw new McpError(
ErrorCode.InvalidRequest,
"Authentication failed: Invalid or missing API key"
);
}
// 继续处理请求
return this.processRequest(req);
}
}
2. 授权(Authorization)
"""
基于 RBAC(Role-Based Access Control)的授权
"""
from enum import Enum
from typing import Dict, Set
class Permission(Enum):
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
class Role:
def __init__(self, name: str, permissions: Set[Permission]):
self.name = name
self.permissions = permissions
# 定义角色
ROLES = {
"viewer": Role("viewer", {Permission.READ}),
"editor": Role("editor", {Permission.READ, Permission.WRITE}),
"admin": Role("admin", {Permission.READ, Permission.WRITE, Permission.EXECUTE, Permission.ADMIN})
}
class AuthorizationManager:
def __init__(self):
self.user_roles: Dict[str, str] = {} # user_id -> role_name
def check_permission(self, user_id: str, required: Permission) -> bool:
role_name = self.user_roles.get(user_id, "viewer")
role = ROLES.get(role_name, ROLES["viewer"])
return required in role.permissions
# 使用
auth_manager = AuthorizationManager()
@server.call_tool()
async def call_tool(name: str, arguments: dict):
user_id = get_user_id()
if name.startswith("write_"):
if not auth_manager.check_permission(user_id, Permission.WRITE):
raise Exception("Permission denied: WRITE required")
# 继续执行
3. 沙箱隔离(Sandboxing)
"""
使用 Docker 沙箱执行不受信任的代码
"""
import docker
import tempfile
import(os)
class SandboxExecutor:
def __init__(self):
self.client = docker.from_env()
async def execute_in_sandbox(self, code: str, timeout: int = 30) -> str:
"""在 Docker 容器中执行代码"""
with tempfile.TemporaryDirectory() as tmpdir:
# 将代码写入临时文件
code_path = os.path.join(tmpdir, "code.py")
with open(code_path, "w") as f:
f.write(code)
# 启动容器执行
container = self.client.containers.run(
image="python:3.12-slim",
command=f"timeout {timeout}s python /code/code.py",
volumes={tmpdir: {"bind": "/code", "mode": "ro"}},
mem_limit="128m",
cpu_quota=50000, # 50% CPU
network_disabled=True, # 禁用网络
detach=True,
remove=True
)
# 等待执行完成
result = container.wait(timeout=timeout + 5)
logs = container.logs().decode("utf-8")
if result["StatusCode"] == 124:
return "执行超时"
return logs
性能优化:并发、缓存与连接池
1. 并发工具调用
/**
* 并发执行多个工具调用
*/
async function executeToolsConcurrently(calls: ToolCall[]): Promise<ToolResult[]> {
// 使用 Promise.all 并发执行
const results = await Promise.all(
calls.map(call =>
executeTool(call.name, call.arguments).catch(error => ({
error: error instanceof Error ? error.message : String(error)
}))
)
);
return results;
}
// 示例:AI 同时需要查询天气和股票价格
const calls = [
{ name: "query_weather", arguments: { city: "Beijing" } },
{ name: "query_stock", arguments: { symbol: "AAPL" } }
];
const [weatherResult, stockResult] = await executeToolsConcurrently(calls);
2. 流式响应(Streaming)
"""
支持流式返回结果(适用于长时间运行的工具)
"""
from mcp.types import CallToolResult, TextContent, LoggingMessageNotification
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "generate_report":
# 发送进度通知
await server.send_logging_message(
level="info",
message="正在查询数据..."
)
data = await fetch_data()
await server.send_logging_message(
level="info",
message="正在生成报告..."
)
report = await generate_report(data)
return CallToolResult(
content=[TextContent(type="text", text=report)]
)
MCP 生态与多语言 SDK
| 语言 | SDK | 维护方 | 推荐度 |
|---|---|---|---|
| TypeScript/JavaScript | @modelcontextprotocol/sdk | Anthropic 官方 | ⭐⭐⭐⭐⭐ |
| Python | mcp | Anthropic 官方 | ⭐⭐⭐⭐⭐ |
| Go | go-mcp | 社区维护 | ⭐⭐⭐⭐ |
| Rust | rmcp | 社区维护 | ⭐⭐⭐⭐ |
| Java/Kotlin | mcp-java-sdk | 社区维护 | ⭐⭐⭐ |
常用 MCP Server 推荐
# 官方维护的参考实现
npm install @modelcontextprotocol/server-filesystem # 文件系统访问
npm install @modelcontextprotocol/server-sqlite # SQLite 数据库
npm install @modelcontextprotocol/server-github # GitHub API
npm install @modelcontextprotocol/server-google-drive # Google Drive
# 社区热门 Server
npm install @mcp-servers/server-postgres # PostgreSQL
npm install @mcp-servers/server-redis # Redis
npm install @mcp-servers/server-brave-search # Brave 搜索
从 MCP 到 A2A:AI Agent 协议全景
2026 年,除了 MCP,还有一个重要的协议:A2A(Agent2Agent Protocol)。
| 协议 | 提出方 | 目的 | 适用场景 |
|---|---|---|---|
| MCP | Anthropic | AI 调用工具的标准接口 | AI ↔ 工具 |
| A2A | AI Agent 之间的协作 | AI ↔ AI | |
| LangChain Tools | LangChain | LangChain 生态的工具调用 | LangChain 内部 |
| Function Calling | OpenAI | OpenAI 模型的工具调用 | OpenAI 模型专用 |
graph TB
User[用户] --> Agent1[Agent 1<br/>Orchestrator]
Agent1 -->|MCP| Tool1[Tool: Database]
Agent1 -->|MCP| Tool2[Tool: Search]
Agent1 -->|A2A| Agent2[Agent 2<br/>Specialist]
Agent2 -->|MCP| Tool3[Tool: Code Execution]
Agent2 -->|A2A| Agent3[Agent 3<br/>Reviewer]
实战案例:构建企业级文档检索 MCP Server
需求分析
构建一个企业级文档检索系统,要求:
- 支持多种文档格式(PDF、Word、Markdown)
- 语义搜索(向量相似度)
- 权限控制(基于用户角色)
- 审计日志
- 高并发支持
完整实现
"""
企业级文档检索 MCP Server
"""
import asyncio
import json
import logging
from datetime import datetime
from typing import List, Dict, Any
import hashlib
from mcp.server import Server
from mcp.types import Tool, CallToolResult, TextContent
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("EnterpriseDocMCP")
class EnterpriseDocumentMCP:
def __init__(self):
self.server = Server("enterprise-doc-mcp")
self.documents: Dict[str, Dict] = {} # doc_id -> doc_info
self.vector_store = SimpleVectorStore() # 简化版向量存储
self.audit_log: List[Dict] = []
self._register_handlers()
def _register_handlers(self):
@self.server.list_tools()
async def list_tools():
return {
"tools": [
{
"name": "upload_document",
"description": "上传文档到检索系统。支持 PDF、Word、Markdown。",
"inputSchema": {
"type": "object",
"properties": {
"file_path": {"type": "string"},
"access_level": {
"type": "string",
"enum": ["public", "internal", "confidential"],
"default": "internal"
}
},
"required": ["file_path"]
}
},
{
"name": "search_documents",
"description": "语义搜索文档内容。",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"top_k": {"type": "number", "default": 5},
"access_level": {
"type": "string",
"enum": ["public", "internal", "confidential"]
}
},
"required": ["query"]
}
}
]
}
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
user_id = self._get_user_id()
if name == "upload_document":
return await self._upload_document(user_id, arguments)
elif name == "search_documents":
return await self._search_documents(user_id, arguments)
else:
raise ValueError(f"Unknown tool: {name}")
async def _upload_document(self, user_id: str, args: dict) -> CallToolResult:
"""上传文档"""
file_path = args["file_path"]
access_level = args.get("access_level", "internal")
# 记录审计日志
self._audit_log(user_id, "upload_document", {"file_path": file_path})
# 读取并分块
content = await self._read_document(file_path)
chunks = self._chunk_text(content, chunk_size=500)
# 生成嵌入并存储
doc_id = hashlib.md5(file_path.encode()).hexdigest()
for i, chunk in enumerate(chunks):
embedding = await self._get_embedding(chunk)
self.vector_store.add(
id=f"{doc_id}:{i}",
vector=embedding,
metadata={
"doc_id": doc_id,
"file_path": file_path,
"chunk_index": i,
"access_level": access_level,
"content": chunk
}
)
self.documents[doc_id] = {
"file_path": file_path,
"access_level": access_level,
"chunk_count": len(chunks),
"uploaded_by": user_id,
"uploaded_at": datetime.now().isoformat()
}
return CallToolResult(
content=[TextContent(
type="text",
text=f"✅ 文档已上传:{file_path}({len(chunks)} 个片段)"
)]
)
async def _search_documents(self, user_id: str, args: dict) -> CallToolResult:
"""搜索文档"""
query = args["query"]
top_k = args.get("top_k", 5)
min_access_level = args.get("access_level", "public")
# 记录审计日志
self._audit_log(user_id, "search_documents", {"query": query})
# 生成查询向量
query_embedding = await self._get_embedding(query)
# 向量搜索
results = self.vector_store.search(
query_vector=query_embedding,
top_k=top_k * 2 # 多取一些,后面过滤
)
# 权限过滤
filtered_results = []
for result in results:
doc_access_level = result["metadata"]["access_level"]
if self._check_permission(user_id, doc_access_level, min_access_level):
filtered_results.append(result)
if len(filtered_results) >= top_k:
break
if not filtered_results:
return CallToolResult(
content=[TextContent(type="text", text="未找到相关文档(或无权限访问)")]
)
# 格式化结果
formatted = "\n\n---\n\n".join([
f"**{r['metadata']['file_path']}**(相关度:{r['score']:.2f})\n\n{r['metadata']['content'][:300]}..."
for r in filtered_results
])
return CallToolResult(
content=[TextContent(type="text", text=formatted)]
)
def _check_permission(self, user_id: str, doc_level: str, min_level: str) -> bool:
"""检查用户是否有权限访问文档"""
# 简化版权限检查
level_rank = {"public": 0, "internal": 1, "confidential": 2}
user_role = self._get_user_role(user_id)
if user_role == "admin":
return True
return level_rank.get(doc_level, 0) <= level_rank.get(min_level, 0)
def _audit_log(self, user_id: str, action: str, details: Dict):
"""记录审计日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"action": action,
"details": details
}
self.audit_log.append(log_entry)
logger.info(f"Audit: {json.dumps(log_entry, ensure_ascii=False)}")
async def _get_embedding(self, text: str) -> List[float]:
"""获取文本嵌入向量(简化版)"""
# 生产环境应调用 OpenAI / Cohere / 本地嵌入模型
# 这里返回随机向量作为示例
import random
return [random.random() for _ in range(768)]
async def _read_document(self, file_path: str) -> str:
"""读取文档内容"""
# 简化实现
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
def _chunk_text(self, text: str, chunk_size: int) -> List[str]:
"""文本分块"""
chunks = []
for i in range(0, len(text), chunk_size):
chunks.append(text[i:i+chunk_size])
return chunks
def _get_user_id(self) -> str:
"""获取当前用户 ID(简化版)"""
return "user_123" # 实际应从请求上下文获取
def _get_user_role(self, user_id: str) -> str:
"""获取用户角色"""
return "admin" # 实际应从数据库查询
class SimpleVectorStore:
"""简化版向量存储(生产环境应使用 Pinecone / Weaviate / Qdrant)"""
def __init__(self):
self.vectors = []
def add(self, id: str, vector: List[float], metadata: Dict):
self.vectors.append({"id": id, "vector": vector, "metadata": metadata})
def search(self, query_vector: List[float], top_k: int) -> List[Dict]:
"""余弦相似度搜索(简化版)"""
results = []
for item in self.vectors:
score = self._cosine_similarity(query_vector, item["vector"])
results.append({**item, "score": score})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:top_k]
def _cosine_similarity(self, v1: List[float], v2: List[float]) -> float:
"""计算余弦相似度"""
dot = sum(a * b for a, b in zip(v1, v2))
norm1 = sum(a * a for a in v1) ** 0.5
norm2 = sum(b * b for b in v2) ** 0.5
return dot / (norm1 * norm2)
async def main():
doc_mcp = EnterpriseDocumentMCP()
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await doc_mcp.server.run(
read_stream,
write_stream,
doc_mcp.server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
部署与运维:容器化、监控与可观测性
Docker 容器化
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 非 root 用户运行(安全最佳实践)
RUN useradd -m mcpuser && chown -R mcpuser:mcpuser /app
USER mcpuser
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD python -c "import mcp; print('OK')" || exit 1
CMD ["python", "server.py"]
# docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
ports:
- "8080:8080"
environment:
- LOG_LEVEL=INFO
- DATABASE_URL=postgresql://user:pass@db:5432/mcp
depends_on:
- db
restart: unless-stopped
db:
image: postgres:16-alpine
environment:
- POSTGRES_PASSWORD=secret
- POSTGRES_DB=mcp
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
volumes:
postgres_data:
监控与可观测性
"""
集成 Prometheus 监控
"""
from prometheus_client import Counter, Histogram, start_http_server
import time
# 定义指标
TOOL_CALL_COUNTER = Counter(
"mcp_tool_calls_total",
"Total number of tool calls",
["tool_name", "status"]
)
TOOL_CALL_DURATION = Histogram(
"mcp_tool_call_duration_seconds",
"Tool call duration in seconds",
["tool_name"]
)
@server.call_tool()
async def call_tool(name: str, arguments: dict):
start_time = time.time()
status = "success"
try:
result = await actual_tool_logic(name, arguments)
return result
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start_time
TOOL_CALL_COUNTER.labels(tool_name=name, status=status).inc()
TOOL_CALL_DURATION.labels(tool_name=name).observe(duration)
# 启动 Prometheus HTTP 服务器
start_http_server(8000)
总结与展望
MCP 的核心价值
- 标准化:统一的工具描述格式,AI 能自动理解工具能力
- 解耦:AI 应用和工具实现完全解耦
- 安全:统一的认证、授权、审计机制
- 可扩展:新增工具只需要新增 MCP Server
最佳实践总结
| 维度 | 建议 |
|---|---|
| 传输层选择 | 本地用 stdio,远程用 SSE |
| 错误处理 | 使用标准 JSON-RPC 错误码 |
| 性能优化 | 连接池 + 缓存 + 并发 |
| 安全 | 认证 + 授权 + 沙箱 + 审计 |
| 监控 | Prometheus + 结构化日志 |
未来展望
- MCP Registry:类似 npm 的 MCP Server 注册中心
- MCP Marketplace:MCP Server 的应用商店
- 多模态支持:支持图片、音频、视频作为工具输入输出
- 联邦学习集成:隐私保护的分布式工具调用
参考资源
- 官方文档:https://modelcontextprotocol.io/
- 官方 SDK(TypeScript):https://github.com/modelcontextprotocol/typescript-sdk
- 官方 SDK(Python):https://github.com/modelcontextprotocol/python-sdk
- MCP Inspector:https://github.com/modelcontextprotocol/inspector
- Awesome MCP Servers:https://github.com/punkpeye/awesome-mcp-servers
本文撰写于 2026 年 5 月,基于 MCP 协议版本 2024-11-05。
字数统计:约 18,000 字
文章特点:
- 从零开始讲解 MCP 的原理和架构
- 提供完整的 Python 和 TypeScript 实战代码
- 覆盖生产级开发的各个维度(安全、性能、监控)
- 包含企业级实战案例
- 深度剖析 Transport 层和协议细节