MCP 协议深度实战:AI 世界的 USB-C 标准——从协议原理到生产级 Server 开发的完全指南(2026)
一、为什么 AI Agent 需要一个"USB-C 标准"
2024 年之前,AI 工具生态的碎片化程度令人窒息。OpenAI 用 Function Calling,Anthropic 用 Tool Use,LangChain 有自己的 Tool 接口,CrewAI 有自己的工具注册机制——每个框架、每个模型提供商都在用完全不同的方式定义"AI 怎么调用工具"。
这种碎片化带来的直接后果是:开发者为每个 AI 平台写一遍工具适配代码,工具提供商为每个框架写一遍 SDK,跨平台迁移成本极高,整个生态被割裂成孤岛。
2024 年 11 月,Anthropic 开源了 MCP(Model Context Protocol,模型上下文协议),目标明确:给 AI Agent 的工具调用建立统一通信标准。Anthropic 把它比作"AI 世界的 USB-C 接口"。
到 2026 年中,MCP 生态已从"Anthropic 的实验性项目"发展为全行业的事实标准。GitHub 上数千个社区 MCP Server,LangChain/AutoGen/CrewAI/LlamaIndex 原生支持,AWS Bedrock 和 Azure AI Studio 集成,规范已提交 Linux Foundation 的 Agentic AI Foundation。天气通、Slack、Notion、GitHub、AWS 等大厂均推出官方 MCP Server。
本文从协议底层原理讲起,手把手开发生产级 MCP Server,涵盖 TypeScript 和 Python 双语言实现、Streamable HTTP 传输、OAuth 2.0 远程认证等最新特性。
二、核心架构:Host-Client-Server 三层模型
MCP 采用优雅的三层架构:
┌─────────────────────────────────────────────────┐
│ MCP Host │
│ (Claude Desktop / Cursor / OpenClaw / TRAE) │
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Client A │ │ Client B │ │ Client C │ │
│ │ (GitHub) │ │ (Database)│ │ (Slack) │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
└────────┼──────────────┼──────────────┼──────────┘
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ Server A│ │ Server B│ │ Server C│
│ GitHub │ │ PG/My │ │ Slack │
│ MCP API │ │ MCP API │ │ MCP API │
└─────────┘ └─────────┘ └─────────┘
Host:用户直接交互的 AI 应用,负责协调多个 Client、管理会话生命周期。
Client:运行在 Host 内部,每个 Client 对应一个 Server 连接,处理协议握手、认证、序列化。
Server:暴露标准化能力的端点。关键价值在于"即插即用"——任何符合 MCP 的 Server 都能被任何 Host 直接使用。
三大核心能力原语
| 能力 | 类比 | 方向 | 典型场景 |
|---|---|---|---|
| Resources | 读文件 | AI 读取外部数据 | 读取表结构、获取文件内容、查询 API |
| Tools | 执行命令 | AI 触发外部动作 | 发送邮件、修改数据库、部署服务 |
| Prompts | 加载模板 | 预定义提示词 | 代码审查模板、数据分析模板 |
Resources 和 Tools 的关键区别:Resources 是只读数据暴露,Tools 是可执行的副作用操作。
MCP vs Function Calling
| 维度 | Function Calling | MCP |
|---|---|---|
| 定位 | 应用级集成 | 系统级生态协议 |
| 连接模式 | 请求-响应无状态 | Client-Server 持久连接 |
| 发现机制 | 手动注册 | 标准化能力发现 |
| 认证 | 无标准 | 内置 OAuth 2.0 |
| 可组合性 | 低(绑定模型) | 高(任何 Host + 任何 Server) |
一句话:Function Calling 是"一个厂商的私有接口",MCP 是"全行业的公共标准"。
三、底层通信机制
3.1 JSON-RPC 2.0 消息协议
MCP 所有通信基于 JSON-RPC 2.0:
// 请求
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "search_code",
"arguments": { "query": "auth middleware", "pattern": "*.ts" }
}
}
// 响应
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [{ "type": "text", "text": "Found 3 matching files" }]
}
}
核心消息类型:请求(有 id,需要响应)、通知(无 id,单向)、响应(带 result 或 error)。
3.2 三种传输模式
stdio 模式(本地进程,最常用):Host 启动 Server 为子进程,通过 stdin/stdout 交换消息。零网络开销,启动即用。
{
"mcpServers": {
"my-db-tools": {
"command": "node",
"args": ["/path/to/server/index.js"],
"env": { "DB_URL": "postgresql://..." }
}
}
}
SSE 模式(旧方案):HTTP 长连接 + Server-Sent Events。已逐步被 Streamable HTTP 替代。
Streamable HTTP 模式(2026 推荐方案):基于 HTTP/1.1 chunked transfer encoding,支持单连接多请求、双向流式传输、Session 恢复、HTTP/2 多路复用。
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamable-http.js";
import { createServer } from "http";
const server = new McpServer({ name: "prod-server", version: "1.0.0" });
server.tool("query", "执行 SQL", { sql: z.string() }, async ({ sql }) => {
const rows = await pool.query(sql);
return { content: [{ type: "text", text: JSON.stringify(rows) }] };
});
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: crypto.randomUUID,
});
await server.connect(transport);
const httpServer = createServer();
httpServer.on("request", (req, res) => {
if (req.url === "/mcp") transport.handleRequest(req, res);
});
httpServer.listen(3000);
3.3 协议握手与能力协商
Client Server
│─── initialize (能力声明) ───→│
│←── capabilities (能力响应) ──│
│─── initialized (确认) ──────→│
│ ← 协商完成 → │
双方通过 initialize 消息声明各自能力,按最小公共集工作。这种设计保证了向后兼容性——新版本的能力在旧客户端上优雅降级,而不是直接报错。
四、实战:TypeScript 生产级 MCP Server
构建一个数据库管理助手 MCP Server,提供 Schema 查询、SQL 执行、性能分析能力。
4.1 项目搭建
mkdir db-mcp-server && cd db-mcp-server
npm init -y
npm install @modelcontextprotocol/sdk zod
npm install -D typescript @types/node tsx
4.2 完整 Server 实现
import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
const server = new McpServer({
name: "db-assistant",
version: "1.0.0",
description: "生产级数据库管理 MCP Server",
});
// ===== Resources:只读数据库元数据 =====
server.resource(
"databases", "db://list", "可连接的数据库列表",
async (uri) => ({
contents: [{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify([
{ name: "app_production", host: "pg-primary.internal", status: "healthy" },
{ name: "app_analytics", host: "pg-analytics.internal", status: "healthy" },
]),
}],
})
);
// 动态 ResourceTemplate:按需暴露每张表的 Schema
server.resource(
"table-schema",
new ResourceTemplate("db://schema/{database}/{table}", {
list: async () => {
const tables = await getTableList();
return {
resources: tables.map(t => ({
uri: `db://schema/${t.database}/${t.table}`,
name: `${t.database}.${t.table} 表结构`,
mimeType: "application/json",
})),
};
}
}),
async (uri, { database, table }) => ({
contents: [{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify(
await getTableSchema(database as string, table as string), null, 2
),
}],
})
);
// ===== Tools:可执行的数据库操作 =====
// 列出所有表(按大小排序)
server.tool(
"list_tables", "列出指定数据库的所有表及大小信息",
{ database: z.string(), schema_pattern: z.string().optional() },
async ({ database, schema_pattern }) => {
const result = await executeQuery(database, `
SELECT table_schema, table_name,
pg_size_pretty(pg_total_relation_size(
quote_ident(table_schema)||'.'||quote_ident(table_name)
)) as size,
pg_stat_get_live_tuples(
(quote_ident(table_schema)||'.'||quote_ident(table_name))::regclass
) as row_count
FROM information_schema.tables
WHERE table_schema LIKE $1 AND table_type = 'BASE TABLE'
ORDER BY pg_total_relation_size(
quote_ident(table_schema)||'.'||quote_ident(table_name)
) DESC LIMIT 50
`, [schema_pattern || "public"]);
return {
content: [{
type: "text",
text: `数据库 ${database} 表列表(按大小降序):\n\n` +
result.rows.map(r =>
` • ${r.table_schema}.${r.table_name} — ${r.size},` +
`约 ${Number(r.row_count).toLocaleString()} 行`
).join("\n"),
}],
};
}
);
// 只读查询(自动 LIMIT 保护)
server.tool(
"execute_query", "执行只读 SQL 查询(自动限制 100 行)",
{
database: z.string(),
sql: z.string(),
params: z.array(z.any()).optional(),
limit: z.number().optional(),
},
async ({ database, sql, params, limit }) => {
const normalized = sql.trim().toUpperCase();
if (!normalized.startsWith("SELECT") && !normalized.startsWith("WITH")) {
return {
content: [{ type: "text",
text: "❌ 安全限制:只允许 SELECT。写操作请用 execute_mutation。" }],
isError: true,
};
}
const safeLimit = Math.min(limit || 100, 1000);
const safeSql = `${sql.replace(/;\s*$/, "")} LIMIT ${safeLimit}`;
try {
const start = Date.now();
const result = await executeQuery(database, safeSql, params);
const elapsed = Date.now() - start;
const headers = result.fields.map(f => f.name);
let output = `查询结果(${result.rows.length} 行,${elapsed}ms):\n\n`;
output += "| " + headers.join(" | ") + " |\n";
output += "| " + headers.map(() => "---").join(" | ") + " |\n";
output += result.rows.map(row =>
"| " + headers.map(h => String(row[h] ?? "NULL")).join(" | ") + " |"
).join("\n");
if (result.rows.length >= safeLimit) {
output += `\n\n⚠️ 截断至 ${safeLimit} 行,调整 limit 参数可查看更多。`;
}
return { content: [{ type: "text", text: output }] };
} catch (err: any) {
return {
content: [{ type: "text", text: `❌ 查询失败:${err.message}` }],
isError: true,
};
}
}
);
// 写操作(支持 dry_run 试运行)
server.tool(
"execute_mutation", "执行 INSERT/UPDATE/DELETE(支持 dry_run 试运行)",
{
database: z.string(),
sql: z.string(),
params: z.array(z.any()).optional(),
dry_run: z.boolean().optional(),
},
async ({ database, sql, params, dry_run }) => {
const ops = ["INSERT", "UPDATE", "DELETE"];
const normalized = sql.trim().toUpperCase();
if (!ops.some(op => normalized.startsWith(op))) {
return {
content: [{ type: "text", text: "❌ 只允许 INSERT / UPDATE / DELETE" }],
isError: true,
};
}
if (dry_run) {
const plan = await executeQuery(database, `EXPLAIN (FORMAT JSON) ${sql}`, params);
return {
content: [{ type: "text",
text: `🔧 试运行模式:\n${JSON.stringify(plan.rows[0], null, 2)}` }],
};
}
try {
const result = await executeQuery(database, sql, params);
return {
content: [{ type: "text", text: `✅ 成功,影响 ${result.rowCount} 行` }],
};
} catch (err: any) {
return {
content: [{ type: "text", text: `❌ 失败:${err.message}` }],
isError: true,
};
}
}
);
// 性能分析
server.tool(
"explain_query", "执行 EXPLAIN ANALYZE 分析查询性能",
{ database: z.string(), sql: z.string() },
async ({ database, sql }) => {
const result = await executeQuery(
database, `EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) ${sql}`
);
const plan = result.rows.map(r => r["QUERY PLAN"]).join("\n");
return {
content: [{
type: "text",
text: `📊 执行计划:\n\n${plan}\n\n💡 关注点:\n` +
` • Seq Scan on 大表 → 需要索引\n` +
` • Nested Loop + 高 rows → 考虑改写 JOIN\n` +
` • actual time >> planned time → 统计信息过期,执行 ANALYZE`,
}],
};
}
);
// ===== Prompts:预定义分析模板 =====
server.prompt(
"performance-audit", "数据库性能审计模板",
{ database: z.string() },
async ({ database }) => ({
messages: [{
role: "user",
content: { type: "text", text: `对 ${database} 做性能审计:\n` +
`1. list_tables 获取大表\n` +
`2. 检查未使用索引(idx_scan=0)\n` +
`3. 检查缺失索引候选(seq_scan > idx_scan)\n` +
`4. 给出优化建议`
},
}],
})
);
// ===== 启动 =====
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("DB Assistant MCP Server running on stdio");
}
main().catch(console.error);
// ===== 模拟辅助函数(生产中替换为真实连接池)=====
async function getTableList() {
return [
{ database: "app_production", table: "users" },
{ database: "app_production", table: "orders" },
];
}
async function getTableSchema(db: string, table: string) {
return {
database: db, table,
columns: [
{ name: "id", type: "uuid", nullable: false, default: "gen_random_uuid()" },
{ name: "created_at", type: "timestamptz", nullable: false, default: "now()" },
{ name: "email", type: "varchar(255)", nullable: false },
],
indexes: [{ name: `${table}_pkey`, columns: ["id"], unique: true }],
};
}
async function executeQuery(database: string, sql: string, params?: any[]) {
console.error(`[${database}] ${sql}`);
return { rows: [], fields: [], rowCount: 0 };
}
4.3 关键设计决策
1. ResourceTemplate 动态资源:不用硬编码所有资源 URI,用模板动态生成。数据库有几百张表时不可能预先注册,模板模式解决了动态发现的问题。
2. 只读/读写分离的安全设计:execute_query 强制只允许 SELECT,execute_mutation 负责 DML。AI 调用工具时天然需要最小权限原则,读操作不应该有写权限。
3. dry_run 试运行模式:写操作支持 dry_run: true,先用 EXPLAIN 查看影响范围再决定是否实际执行。这是生产环境的关键安全措施——让 AI 在"先看后做"的模式下工作。
4. 自动 LIMIT 保护:即使 AI 生成了没有 LIMIT 的查询,Server 端也会自动加上限制,防止大表全量扫描拖垮数据库。这是 MCP Server 作为"安全边界"的核心价值。
五、Python 版 MCP Server 实现
对于 Python 生态(LangChain、数据处理团队),MCP SDK 同样完善:
# pip install mcp[cli]
from mcp.server import Server
from mcp.types import Tool, Resource, TextContent
import mcp.server.stdio
import asyncio
import json
app = Server("project-manager")
# 内存项目数据
PROJECTS = {
"proj-001": {
"name": "新产品功能设计",
"status": "in_progress",
"priority": "high",
"assignee": "dev-team-a",
"due_date": "2026-07-15",
"subtasks": [
{"id": "t1", "title": "需求分析", "status": "completed"},
{"id": "t2", "title": "API 设计", "status": "in_progress"},
{"id": "t3", "title": "前端实现", "status": "todo"},
],
"github_repo": "https://github.com/company/new-product",
"slack_channel": "#team-new-product",
},
"proj-002": {
"name": "数据库迁移 PG→TiDB",
"status": "planning",
"priority": "critical",
"assignee": "infra-team",
"due_date": "2026-08-01",
"subtasks": [
{"id": "t1", "title": "Schema 兼容性分析", "status": "completed"},
{"id": "t2", "title": "数据验证方案", "status": "in_progress"},
{"id": "t3", "title": "灰度迁移测试", "status": "todo"},
],
},
}
# ===== Resources =====
@app.list_resources()
async def list_resources() -> list[Resource]:
resources = []
for proj_id, proj in PROJECTS.items():
resources.append(Resource(
uri=f"project:///{proj_id}",
name=proj["name"],
description=f"状态: {proj['status']} | 优先级: {proj['priority']}",
mimeType="application/json",
))
for subtask in proj.get("subtasks", []):
resources.append(Resource(
uri=f"project:///{proj_id}/task/{subtask['id']}",
name=f"{proj['name']} - {subtask['title']}",
description=f"子任务状态: {subtask['status']}",
mimeType="application/json",
))
return resources
@app.read_resource()
async def read_resource(uri: str) -> str:
parts = uri.replace("project:///", "").split("/")
proj_id = parts[0]
if proj_id in PROJECTS:
if len(parts) == 1:
return json.dumps(PROJECTS[proj_id], ensure_ascii=False, indent=2)
elif len(parts) == 3 and parts[1] == "task":
for task in PROJECTS[proj_id]["subtasks"]:
if task["id"] == parts[2]:
return json.dumps(task, ensure_ascii=False, indent=2)
raise ValueError(f"Resource not found: {uri}")
# ===== Tools =====
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="get_all_projects",
description="获取所有项目概览",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="get_project_detail",
description="获取项目详情及子任务",
inputSchema={
"type": "object",
"properties": {"project_id": {"type": "string"}},
"required": ["project_id"],
},
),
Tool(
name="update_task_status",
description="更新子任务状态",
inputSchema={
"type": "object",
"properties": {
"project_id": {"type": "string"},
"task_id": {"type": "string"},
"new_status": {"type": "string",
"enum": ["todo", "in_progress", "blocked", "completed"]},
"comment": {"type": "string"},
},
"required": ["project_id", "task_id", "new_status"],
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "get_all_projects":
lines = []
for pid, proj in PROJECTS.items():
total = len(proj.get("subtasks", []))
done = sum(1 for t in proj.get("subtasks", []) if t["status"] == "completed")
lines.append(
f"[{pid}] {proj['name']}\n"
f" 状态: {proj['status']} | 优先级: {proj['priority']}\n"
f" 负责人: {proj['assignee']} | 截止: {proj['due_date']}\n"
f" 进度: {done}/{total} 子任务完成"
)
return [TextContent(type="text", text="\n".join(lines))]
elif name == "get_project_detail":
pid = arguments["project_id"]
if pid not in PROJECTS:
return [TextContent(type="text", text=f"❌ 项目不存在: {pid}")]
proj = PROJECTS[pid]
output = (
f"📋 {proj['name']}({pid})\n"
f" 状态: {proj['status']} | 优先级: {proj['priority']}\n"
f" 负责人: {proj['assignee']} | 截止: {proj['due_date']}\n"
f" 仓库: {proj.get('github_repo', 'N/A')}\n\n📝 子任务:\n"
)
for t in proj.get("subtasks", []):
icon = {"completed": "✅", "in_progress": "🔄",
"blocked": "🚫"}.get(t["status"], "⬜")
output += f" {icon} [{t['id']}] {t['title']} — {t['status']}\n"
return [TextContent(type="text", text=output)]
elif name == "update_task_status":
pid, tid = arguments["project_id"], arguments["task_id"]
new_status = arguments["new_status"]
if pid not in PROJECTS:
return [TextContent(type="text", text=f"❌ 项目不存在: {pid}")]
for task in PROJECTS[pid]["subtasks"]:
if task["id"] == tid:
old = task["status"]
task["status"] = new_status
return [TextContent(type="text", text=(
f"✅ 任务状态已更新:\n"
f" {pid}/{tid}: {old} → {new_status}\n"
f" 备注:{arguments.get('comment', '无')}"
))]
return [TextContent(type="text", text=f"❌ 子任务不存在: {tid}")]
return [TextContent(type="text", text=f"❌ 未知工具: {name}")]
# 启动
async def main():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
asyncio.run(main())
六、Streamable HTTP 部署与 OAuth 2.0 认证
6.1 从 stdio 升级到 Streamable HTTP
当 MCP Server 需要暴露给远程 Host 使用时,stdio 模式不够用。Streamable HTTP 是 2026 年推荐的生产部署方案——支持单连接多请求、双向流式、Session 恢复。
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamable-http.js";
import { createServer } from "http";
const mcpServer = new McpServer({ name: "remote-db-server", version: "2.0.0" });
// ... 注册 tools/resources 同上 ...
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => crypto.randomUUID(),
});
await mcpServer.connect(transport);
const httpServer = createServer();
httpServer.on("request", (req, res) => {
if (req.url === "/mcp") transport.handleRequest(req, res);
});
httpServer.listen(3000);
6.2 OAuth 2.0 远程认证
2026 年 6 月,AWS Bedrock AgentCore 网关支持 MCP 的 OAuth 2.0 代表令牌交换。远程 MCP Server 可以安全地代表 AI Agent 访问企业后端。
// 认证中间件
app.use("/mcp", async (req, res, next) => {
const authHeader = req.headers["authorization"];
if (!authHeader?.startsWith("Bearer ")) {
return res.status(401).json({ error: "Missing Bearer token" });
}
const payload = await verifyToken(authHeader.slice(7));
if (!payload.scopes?.includes("mcp:tools:read")) {
return res.status(403).json({ error: "Insufficient scopes" });
}
req.user = payload;
next();
});
七、MCP 生态现状(2026 年 6 月)
主流平台支持
| 平台 | 支持状态 | 特点 |
|---|---|---|
| Claude Desktop | ✅ 原生 | 最早支持者,配置最简单 |
| Cursor IDE | ✅ 原生 | 编辑器内直接调用 MCP 工具 |
| VS Code + Copilot | ✅ 插件 | 通过扩展支持 |
| OpenClaw | ✅ 核心 | Agent 优先架构,原生 MCP 技能系统 |
| TRAE | ✅ 原生 | 阿里出品,已集成 MCP Server 支持 |
| LangChain | ✅ SDK | MCPClientAdapter + load_mcp_tools |
| AutoGen | ✅ SDK | MCPStdioServer 集成 |
| CrewAI | ✅ SDK | 原生 MCP 工具桥接 |
| LlamaIndex | ✅ SDK | MCP 数据源连接器 |
大厂 MCP Server 案例
2026 年 6 月,天气通正式推出 MCP Server,覆盖实时天气、预报、灾害预警、空气质量等 40 余项能力,1 分钱调用 100 万次。这标志着 MCP 从开发者工具走向了商业服务。
典型大厂 MCP Server:
| 厂商 | MCP Server | 能力 |
|---|---|---|
| GitHub | GitHub MCP | PR 管理、Issue 追踪、代码搜索、仓库操作 |
| Slack | Slack MCP | 消息收发、频道管理、文件共享 |
| Notion | Notion MCP | 页面读写、数据库查询、内容搜索 |
| AWS | AWS Bedrock MCP | 云服务管理、模型调用 |
| 天气通 | 天气通 MCP | 40+ 项气象能力 |
7.2 MCP 在实际场景中的价值
场景 1:AI 编程助手的数据库访问
没有 MCP 时:Cursor 需要为每种数据库(MySQL、PostgreSQL、MongoDB、Redis)分别编写工具适配代码,或者依赖用户手动提供数据。
有 MCP 后:Cursor 只需要支持 MCP 协议,就能自动使用社区提供的数百个数据库 MCP Server。
场景 2:企业 AI 工作流的工具编排
没有 MCP 时:每个 AI Agent 平台需要独立集成 Slack、Jira、GitHub、数据库等工具,集成代码无法复用。
有 MCP 后:企业的工具团队只需开发一次 MCP Server,所有 AI Agent 平台(Claude、Cursor、自建系统)都能直接使用。
八、性能优化与生产部署最佳实践
8.1 连接池与并发控制
生产环境的 MCP Server 需要处理并发请求。以下是关键优化:
// 数据库连接池
import { Pool } from "pg";
const pool = new Pool({
max: 20, // 最大连接数
idleTimeoutMillis: 30000, // 空闲超时
connectionTimeoutMillis: 5000,
});
// 请求限流(防止 AI 生成过多工具调用)
const rateLimiter = new Map<string, { count: number; resetAt: number }>();
function checkRateLimit(clientId: string, maxPerMinute: number): boolean {
const now = Date.now();
const entry = rateLimiter.get(clientId);
if (!entry || now > entry.resetAt) {
rateLimiter.set(clientId, { count: 1, resetAt: now + 60000 });
return true;
}
if (entry.count >= maxPerMinute) return false;
entry.count++;
return true;
}
8.2 错误处理与可观测性
// 统一错误处理
server.tool("safe_query", "带完整错误处理的查询", { sql: z.string() }, async ({ sql }) => {
try {
const result = await pool.query(sql);
// 结构化日志
logger.info("query_executed", {
sql: sql.substring(0, 200),
rowCount: result.rowCount,
duration_ms: Date.now() - start,
client_id: req.user?.sub,
});
return { content: [{ type: "text", text: JSON.stringify(result.rows) }] };
} catch (err) {
logger.error("query_failed", { sql, error: err.message, stack: err.stack });
// 区分客户端错误和服务端错误
if (err.code === "42P01") { // 关系不存在
return {
content: [{ type: "text", text: `表不存在。可用表:${await listTables()}` }],
isError: true,
};
}
return {
content: [{ type: "text", text: `服务器错误,请联系管理员。错误ID: ${errId}` }],
isError: true,
};
}
});
8.3 安全最佳实践
- 最小权限原则:每个 Tool 只暴露必要的操作,读/写分离
- 输入验证:用 Zod Schema 严格校验所有输入参数
- SQL 注入防护:永远使用参数化查询,禁止字符串拼接 SQL
- 审计日志:记录所有工具调用的操作者、参数、结果
- 敏感数据脱敏:返回结果中自动脱敏密码、密钥等字段
- 超时控制:每个工具调用设置合理的超时时间,防止无限等待
九、总结与展望
MCP 正在成为 AI 应用领域的"HTTP 协议"——一个所有人都在用、所有人都在贡献的基础设施。从 2024 年底的实验性项目,到 2026 年中全行业的事实标准,MCP 的发展速度超出了大多数人的预期。
核心要点回顾:
- 三层架构(Host-Client-Server)实现了 AI 工具的完全解耦,任何 Host + 任何 Server 即插即用
- 三大能力原语(Resources/Tools/Prompts)覆盖了 AI 与外部世界交互的所有模式
- 三种传输模式(stdio/SSE/Streamable HTTP)适应从本地开发到远程部署的全部场景
- OAuth 2.0 认证让企业级 MCP Server 的安全部署成为现实
- SDK 完整(TypeScript + Python),开发者可以快速上手
对开发者的建议:
- 如果你在构建 AI Agent,现在就开始用 MCP 标准来设计和实现工具接口。这不是选做题,而是必答题。
- 如果你在开发企业内部工具,把它们封装成 MCP Server。一次开发,所有 AI 平台通用。
- 如果你在评估 AI 技术栈,优先选择原生支持 MCP 的平台。这决定了你的工具生态能否持续扩展。
MCP 的最终目标不是"又一个工具调用框架",而是让 AI Agent 的工具生态像 Web 一样开放和互联。当任何 AI 都能调用任何工具,任何工具都能服务任何 AI 时,我们才真正进入了 Agent 时代。
参考资源
- MCP 官方规范:https://modelcontextprotocol.io
- MCP GitHub 仓库:https://github.com/modelcontextprotocol
- TypeScript SDK:
@modelcontextprotocol/sdk - Python SDK:
mcp[cli] - MCP Server 列表:https://github.com/modelcontextprotocol/servers