MCP 协议深度实战:当 AI 学会「插拔工具」——从协议原理到生产级 Server 开发的完全指南(2026)
引言:一个协议如何重塑 AI 工具生态
2024 年底,Anthropic 开源了 Model Context Protocol(MCP),一个定义 AI 模型如何与外部工具和数据源通信的标准协议。当时大多数人把它当作又一个技术规范,没太当回事。
18 个月后的今天,MCP 已经成为 AI 工具生态的事实标准:
- 5000+ 官方与社区 MCP Server,覆盖文件系统、数据库、浏览器、Git、云服务……
- 所有主流 AI 客户端支持:Claude Desktop、Cursor、VS Code、JetBrains、Continue
- 三大云厂商提供官方 Server:AWS、Azure、GCP
- 百度搜索 AI 开放计划收录 6000+ MCP Server,日均服务 5000 万+ 用户
- GitHub spec 仓库 30k+ Stars
这不是偶然。MCP 解决了一个真实而迫切的痛点:每接入一个新工具,就要写一堆胶水代码。在 MCP 之前,Claude 接文件系统写一套、接 GitHub API 写一套、接数据库又写一套——每个 AI 应用都在重复造轮子。MCP 用统一协议替代了这些胶水代码,就像 USB-C 统一了设备接口一样。
但大多数文章还停留在「MCP 是什么」「怎么用 Claude Desktop 配一个 MCP Server」的入门层面。生产环境要面对的问题远比配置复杂:如何设计一个可扩展的 MCP Server 架构?如何处理认证和权限?如何做错误恢复和限流?如何监控和调试?如何在 Docker/K8s 中部署?如何做版本管理和向后兼容?
这篇文章会从头讲清楚 MCP 的协议原理,然后用一个完整的生产级 MCP Server 项目,带你走完从设计到部署的全链路。
一、MCP 协议核心原理
1.1 架构模型:Host、Client、Server 三层
MCP 的架构模型清晰而简洁:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ MCP Host │ │ MCP Server │ │ MCP Server │
│ (Claude/Cursor) │────▶│ MCP Client │────▶│ (文件系统/DB) │
│ │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└──── stdio/SSE ─────────┘ │
└── 本地调用/API ──────┘
三个角色:
- Host:AI 应用本体(如 Claude Desktop、Cursor)。一个 Host 可以连接多个 Client。
- Client:协议客户端,与 Server 保持 1:1 连接。负责协议握手、能力协商、消息路由。
- Server:提供工具(Tools)、资源(Resources)、提示词(Prompts)的服务端。
关键设计决策:Client 和 Server 是 1:1 关系。一个 Host 可以创建多个 Client 实例,每个 Client 连接一个 Server。这保证了隔离性——一个 Server 的崩溃不会影响其他 Server。
1.2 传输层:stdio 与 SSE
MCP 定义了两种传输方式:
stdio(标准输入输出)——本地进程间通信:
// Host 启动 Server 作为子进程
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/home/user"]
}
}
}
Server 从 stdin 读取 JSON-RPC 消息,向 stdout 写入响应。stderr 用于日志。这是最常见的模式——零网络配置,进程生命周期由 Host 管理。
SSE(Server-Sent Events)——远程通信:
Client ──POST /message──▶ Server
Client ◀──GET /sse────── Server (长连接,推送通知)
SSE 模式用于远程部署的 Server。Client 通过 HTTP POST 发送请求,通过 SSE 长连接接收响应和通知。2025 年 MCP 规范更新后,新增了 Streamable HTTP 传输方式,支持更灵活的请求/响应模式,不再强制要求 SSE 长连接。
1.3 消息格式:JSON-RPC 2.0
所有 MCP 消息都是 JSON-RPC 2.0 格式:
// 请求
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "read_file",
"arguments": {
"path": "/etc/hosts"
}
}
}
// 响应
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "127.0.0.1 localhost\n..."
}
]
}
}
通知(不需要响应):
{
"jsonrpc": "2.0",
"method": "notifications/message",
"params": {
"level": "error",
"data": "Connection to database lost"
}
}
1.4 生命周期:连接→协商→通信→关闭
Host Client Server
│ │ │
│── create client ───────▶│ │
│ │── initialize ──────────▶│
│ │◀─ capabilities ─────────│
│ │── initialized ──────────▶│
│ │ │
│ │ (正常通信阶段) │
│── use tool ────────────▶│── tools/call ──────────▶│
│◀─ tool result ──────────│◀─ result ───────────────│
│ │ │
│── shutdown ────────────▶│── close ───────────────▶│
关键步骤:
- initialize:Client 发送自己的能力和配置,Server 回复自己的能力。双方在此协商传输选项、支持的功能集。
- initialized:Client 确认初始化完成,进入正常通信。
- 正常通信:调用工具、读取资源、获取提示词。
- close:连接关闭。
1.5 三大原语:Tools、Resources、Prompts
MCP 定义了三种 Server 可以暴露的能力:
Tools(工具)——模型可调用的函数:
{
"name": "query_database",
"description": "执行 SQL 查询并返回结果",
"inputSchema": {
"type": "object",
"properties": {
"sql": { "type": "string", "description": "SQL 查询语句" },
"limit": { "type": "integer", "default": 100 }
},
"required": ["sql"]
}
}
Tools 是最核心的原语。AI 模型看到工具列表后,可以自主决定调用哪个工具、传什么参数。
Resources(资源)——应用可读取的数据:
{
"uri": "postgres://localhost/users",
"name": "用户表",
"description": "数据库中的用户表结构",
"mimeType": "application/json"
}
Resources 是被动暴露的——AI 不直接调用,而是由应用(如 IDE)决定何时读取并注入上下文。
Prompts(提示词模板)——可复用的提示词:
{
"name": "code_review",
"description": "代码审查提示词",
"arguments": [
{ "name": "language", "description": "编程语言", "required": true }
]
}
Prompts 让用户可以在 AI 客户端中选择预定义的提示词模板,减少重复输入。
二、从零构建生产级 MCP Server
现在我们用 TypeScript 构建一个真实可用的 MCP Server——PostgreSQL 管理工具。它不只是简单的 SQL 执行器,而是包含查询、Schema 浏览、慢查询分析、连接池管理的完整工具集。
2.1 项目初始化
mkdir mcp-server-pgadmin && cd mcp-server-pgadmin
npm init -y
npm install @modelcontextprotocol/sdk pg zod
npm install -D typescript @types/node @types/pg tsx
cat > tsconfig.json << 'EOF'
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"declaration": true
},
"include": ["src/**/*"]
}
EOF
2.2 项目结构
mcp-server-pgadmin/
├── src/
│ ├── index.ts # 入口,启动 Server
│ ├── server.ts # Server 定义,注册 Tools/Resources
│ ├── tools/ # 工具实现
│ │ ├── query.ts # SQL 查询
│ │ ├── schema.ts # Schema 浏览
│ │ ├── slow-query.ts # 慢查询分析
│ │ └── pool-stats.ts # 连接池统计
│ ├── resources/ # 资源定义
│ │ └── tables.ts # 表结构资源
│ ├── db/
│ │ ├── pool.ts # 连接池管理
│ │ └── errors.ts # 错误处理
│ └── utils/
│ ├── validator.ts # SQL 注入检测
│ └── formatter.ts # 结果格式化
├── Dockerfile
├── docker-compose.yml
├── package.json
└── tsconfig.json
2.3 连接池管理
// src/db/pool.ts
import { Pool, PoolConfig } from 'pg';
interface PgAdminConfig {
host: string;
port: number;
database: string;
user: string;
password: string;
maxPoolSize?: number;
idleTimeoutMs?: number;
connectionTimeoutMs?: number;
}
// 全局连接池,支持多数据库
const pools = new Map<string, Pool>();
export function getPool(name: string, config: PgAdminConfig): Pool {
const key = `${config.host}:${config.port}:${config.database}`;
if (!pools.has(key)) {
const poolConfig: PoolConfig = {
host: config.host,
port: config.port,
database: config.database,
user: config.user,
password: config.password,
max: config.maxPoolSize ?? 10,
idleTimeoutMillis: config.idleTimeoutMs ?? 30000,
connectionTimeoutMillis: config.connectionTimeoutMs ?? 5000,
// 生产级配置
allowExitOnIdle: false,
// 连接验证
keepAlive: true,
keepAliveInitialDelayMillis: 10000,
};
const pool = new Pool(poolConfig);
// 监听连接错误——防止未捕获异常导致进程崩溃
pool.on('error', (err) => {
console.error(`[Pool ${key}] Unexpected error:`, err.message);
});
pool.on('connect', (client) => {
console.log(`[Pool ${key}] New client connected, total: ${pool.totalCount}`);
});
pool.on('remove', (client) => {
console.log(`[Pool ${key}] Client removed, total: ${pool.totalCount}`);
});
pools.set(key, pool);
}
return pools.get(key)!;
}
// 优雅关闭所有连接池
export async function closeAllPools(): Promise<void> {
const promises = Array.from(pools.entries()).map(async ([key, pool]) => {
try {
await pool.end();
console.log(`[Pool ${key}] Closed`);
} catch (err) {
console.error(`[Pool ${key}] Close error:`, err);
}
});
await Promise.all(promises);
pools.clear();
}
// 连接池健康检查
export async function healthCheck(pool: Pool): Promise<{
healthy: boolean;
totalCount: number;
idleCount: number;
waitingCount: number;
latencyMs: number;
}> {
const start = Date.now();
try {
const client = await pool.connect();
await client.query('SELECT 1');
client.release();
return {
healthy: true,
totalCount: pool.totalCount,
idleCount: pool.idleCount,
waitingCount: pool.waitingCount,
latencyMs: Date.now() - start,
};
} catch (err) {
return {
healthy: false,
totalCount: pool.totalCount,
idleCount: pool.idleCount,
waitingCount: pool.waitingCount,
latencyMs: Date.now() - start,
};
}
}
2.4 SQL 注入检测
这是生产环境必须做的。即使 AI 理论上不会生成恶意 SQL,但输入验证是安全的基本盘。
// src/utils/validator.ts
// 危险 SQL 模式
const DANGEROUS_PATTERNS = [
/;\s*DROP\s+/i, // DROP 语句
/;\s*TRUNCATE\s+/i, // TRUNCATE 语句
/;\s*DELETE\s+FROM\s+(?!.*WHERE)/i, // 无 WHERE 的 DELETE
/;\s*UPDATE\s+(?!.*WHERE)/i, // 无 WHERE 的 UPDATE
/pg_shutdown/i, // 关闭数据库
/pg_terminate_backend/i, // 终止连接
/COPY.*TO\s+PROGRAM/i, // COPY TO PROGRAM(命令执行)
];
// 只读查询白名单
const READ_ONLY_PREFIXES = [
'SELECT', 'WITH', 'EXPLAIN', 'SHOW', 'DESCRIBE',
];
export interface ValidationResult {
safe: boolean;
reason?: string;
readOnly: boolean;
}
export function validateSQL(sql: string): ValidationResult {
const trimmed = sql.trim();
// 检查是否为只读查询
const firstWord = trimmed.split(/\s+/)[0]?.toUpperCase() ?? '';
const readOnly = READ_ONLY_PREFIXES.includes(firstWord);
// 检查危险模式
for (const pattern of DANGEROUS_PATTERNS) {
if (pattern.test(trimmed)) {
return {
safe: false,
reason: `SQL 包含危险操作: ${pattern.source}`,
readOnly: false,
};
}
}
// 多语句检查(分号分隔)
const statements = trimmed.split(';').filter(s => s.trim().length > 0);
if (statements.length > 1) {
return {
safe: false,
reason: '不允许执行多条语句,请一次只执行一条 SQL',
readOnly: false,
};
}
// 注释注入检查
if (/--.*\n.*(?:INSERT|UPDATE|DELETE|DROP)/i.test(trimmed)) {
return {
safe: false,
reason: 'SQL 注释中可能包含注入语句',
readOnly: false,
};
}
return { safe: true, readOnly };
}
2.5 注册 MCP Tools
// src/tools/query.ts
import { z } from 'zod';
import { getPool } from '../db/pool.js';
import { validateSQL } from '../utils/validator.js';
import { formatResults } from '../utils/formatter.js';
import type { ToolDefinition } from '../server.js';
export const queryTool: ToolDefinition = {
name: 'pg_query',
description: '执行 SQL 查询并返回结果。支持 SELECT、INSERT、UPDATE、DELETE 等语句。' +
'对于修改操作会返回受影响的行数。查询结果自动限制最多 1000 行。',
inputSchema: {
sql: z.string().describe('要执行的 SQL 语句'),
database: z.string().default('default').describe('数据库连接名称'),
limit: z.number().default(1000).describe('返回结果的最大行数'),
timeout: z.number().default(30).describe('查询超时时间(秒)'),
},
handler: async (input, context) => {
const { sql, database, limit, timeout } = input;
// 1. SQL 验证
const validation = validateSQL(sql);
if (!validation.safe) {
return {
content: [{
type: 'text' as const,
text: `❌ SQL 安全检查未通过: ${validation.reason}`,
}],
isError: true,
};
}
// 2. 获取连接池
const pool = getPool(database, context.dbConfig);
// 3. 执行查询(带超时和行数限制)
const client = await pool.connect();
try {
// 设置查询超时
await client.query(`SET statement_timeout = '${timeout}s'`);
// 只读查询设置事务隔离
if (validation.readOnly) {
await client.query('BEGIN READ ONLY');
}
const result = await client.query({
text: sql,
rowMode: 'array', // 使用数组模式减少传输量
});
if (validation.readOnly) {
await client.query('COMMIT');
}
// 4. 格式化结果
const formatted = formatResults(result, limit);
return {
content: [{
type: 'text' as const,
text: formatted,
}],
};
} catch (err: any) {
if (validation.readOnly) {
await client.query('ROLLBACK').catch(() => {});
}
return {
content: [{
type: 'text' as const,
text: `❌ 查询执行失败: ${err.message}\n\nSQL 状态: ${err.code}\n详情: ${err.detail ?? '无'}`,
}],
isError: true,
};
} finally {
client.release();
}
},
};
// src/tools/schema.ts
import { z } from 'zod';
import { getPool } from '../db/pool.js';
import type { ToolDefinition } from '../server.js';
export const listTablesTool: ToolDefinition = {
name: 'pg_list_tables',
description: '列出数据库中的所有表及其行数、大小等统计信息',
inputSchema: {
schema: z.string().default('public').describe('Schema 名称'),
database: z.string().default('default').describe('数据库连接名称'),
},
handler: async (input, context) => {
const pool = getPool(input.database, context.dbConfig);
const result = await pool.query(`
SELECT
t.table_name,
obj_description((quote_ident(t.table_schema) || '.' || quote_ident(t.table_name))::regclass) as comment,
pg_size_pretty(pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name))) as total_size,
pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name)) as size_bytes,
COALESCE(s.n_live_tup, 0) as row_count,
COALESCE(s.last_vacuum, 'never') as last_vacuum,
COALESCE(s.last_analyze, 'never') as last_analyze
FROM information_schema.tables t
LEFT JOIN pg_stat_user_tables s
ON s.schemaname = t.table_schema
AND s.relname = t.table_name
WHERE t.table_schema = $1
AND t.table_type = 'BASE TABLE'
ORDER BY size_bytes DESC
`, [input.schema]);
const lines = [
`# 数据库表列表 (Schema: ${input.schema})`,
'',
'| 表名 | 注释 | 大小 | 行数 | 最近 VACUUM | 最近 ANALYZE |',
'|------|------|------|------|-------------|--------------|',
];
for (const row of result.rows) {
lines.push(
`| ${row.table_name} | ${row.comment ?? '-'} | ${row.total_size} | ${row.row_count} | ${row.last_vacuum} | ${row.last_analyze} |`
);
}
return {
content: [{ type: 'text' as const, text: lines.join('\n') }],
};
},
};
export const describeTableTool: ToolDefinition = {
name: 'pg_describe_table',
description: '获取表的完整结构信息,包括列定义、索引、约束、外键等',
inputSchema: {
table: z.string().describe('表名'),
schema: z.string().default('public').describe('Schema 名称'),
database: z.string().default('default').describe('数据库连接名称'),
},
handler: async (input, context) => {
const pool = getPool(input.database, context.dbConfig);
const { table, schema } = input;
const qualified = `${schema}.${table}`;
// 列信息
const columns = await pool.query(`
SELECT
column_name, data_type, is_nullable,
column_default, character_maximum_length,
numeric_precision, numeric_scale,
col_description((quote_ident($1) || '.' || quote_ident($2))::regclass, ordinal_position) as comment
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position
`, [schema, table]);
// 索引信息
const indexes = await pool.query(`
SELECT
i.relname as index_name,
a.attname as column_name,
ix.indisunique as is_unique,
ix.indisprimary as is_primary,
am.amname as index_type
FROM pg_index ix
JOIN pg_class t ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_am am ON am.oid = i.relam
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relname = $2 AND t.relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = $1)
`, [schema, table]);
// 外键信息
const foreignKeys = await pool.query(`
SELECT
tc.constraint_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = $1 AND tc.table_name = $2
`, [schema, table]);
// 格式化输出
const lines = [`# 表结构: ${qualified}`, ''];
lines.push('## 列定义', '');
lines.push('| 列名 | 类型 | 可空 | 默认值 | 注释 |');
lines.push('|------|------|------|--------|------|');
for (const col of columns.rows) {
const type = col.character_maximum_length
? `${col.data_type}(${col.character_maximum_length})`
: col.numeric_precision
? `${col.data_type}(${col.numeric_precision},${col.numeric_scale})`
: col.data_type;
lines.push(`| ${col.column_name} | ${type} | ${col.is_nullable} | ${col.column_default ?? '-'} | ${col.comment ?? '-'} |`);
}
if (indexes.rows.length > 0) {
lines.push('', '## 索引', '');
lines.push('| 索引名 | 列 | 类型 | 唯一 | 主键 |');
lines.push('|--------|-----|------|------|------|');
for (const idx of indexes.rows) {
lines.push(`| ${idx.index_name} | ${idx.column_name} | ${idx.index_type} | ${idx.is_unique ? '✓' : '-'} | ${idx.is_primary ? '✓' : '-'} |`);
}
}
if (foreignKeys.rows.length > 0) {
lines.push('', '## 外键', '');
for (const fk of foreignKeys.rows) {
lines.push(`- **${fk.constraint_name}**: ${fk.column_name} → ${fk.foreign_table_schema}.${fk.foreign_table_name}.${fk.foreign_column_name}`);
}
}
return {
content: [{ type: 'text' as const, text: lines.join('\n') }],
};
},
};
2.6 慢查询分析工具
// src/tools/slow-query.ts
import { z } from 'zod';
import { getPool } from '../db/pool.js';
import type { ToolDefinition } from '../server.js';
export const slowQueryTool: ToolDefinition = {
name: 'pg_slow_queries',
description: '分析数据库中的慢查询。基于 pg_stat_statements 扩展,返回执行时间最长、调用最频繁的查询。',
inputSchema: {
top: z.number().default(10).describe('返回前 N 条慢查询'),
sortBy: z.enum(['total_time', 'mean_time', 'calls', 'rows']).default('total_time')
.describe('排序方式: total_time=总耗时, mean_time=平均耗时, calls=调用次数, rows=返回行数'),
database: z.string().default('default').describe('数据库连接名称'),
},
handler: async (input, context) => {
const pool = getPool(input.database, context.dbConfig);
// 先检查 pg_stat_statements 是否可用
const extCheck = await pool.query(`
SELECT EXISTS(
SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements'
) as available
`);
if (!extCheck.rows[0].available) {
return {
content: [{
type: 'text' as const,
text: '❌ pg_stat_statements 扩展未启用。\n\n' +
'请在 postgresql.conf 中添加:\n' +
'```\nshared_preload_libraries = \'pg_stat_statements\'\n' +
'pg_stat_statements.track = all\n```\n\n' +
'然后执行: CREATE EXTENSION pg_stat_statements;',
}],
isError: true,
};
}
const orderMap = {
total_time: 'total_exec_time DESC',
mean_time: 'mean_exec_time DESC',
calls: 'calls DESC',
rows: 'rows DESC',
};
const result = await pool.query(`
SELECT
queryid,
LEFT(query, 200) as query_preview,
calls,
round(total_exec_time::numeric, 2) as total_ms,
round(mean_exec_time::numeric, 2) as mean_ms,
round(min_exec_time::numeric, 2) as min_ms,
round(max_exec_time::numeric, 2) as max_ms,
rows,
shared_blks_hit,
shared_blks_read,
round(100.0 * shared_blks_hit / NULLIF(shared_blks_hit + shared_blks_read, 0), 1) as cache_hit_pct
FROM pg_stat_statements
WHERE dbid = (SELECT oid FROM pg_database WHERE datname = current_database())
ORDER BY ${orderMap[input.sortBy]}
LIMIT $1
`, [input.top]);
const lines = [
`# 慢查询 Top ${input.top} (排序: ${input.sortBy})`,
'',
'| # | Query ID | 查询预览 | 调用次数 | 总耗时(ms) | 平均耗时(ms) | 缓存命中率 |',
'|---|----------|----------|----------|------------|-------------|-----------|',
];
result.rows.forEach((row, i) => {
lines.push(
`| ${i + 1} | ${row.queryid} | ${row.query_preview}... | ${row.calls} | ${row.total_ms} | ${row.mean_ms} | ${row.cache_hit_pct ?? '-'}% |`
);
});
lines.push('', '💡 优化建议:', '');
for (const row of result.rows.slice(0, 3)) {
if (parseFloat(row.cache_hit_pct) < 95) {
lines.push(`- Query ${row.queryid}: 缓存命中率仅 ${row.cache_hit_pct}%,考虑增加 shared_buffers 或优化查询`);
}
if (parseFloat(row.mean_ms) > 100) {
lines.push(`- Query ${row.queryid}: 平均耗时 ${row.mean_ms}ms,建议添加 EXPLAIN ANALYZE 分析执行计划`);
}
}
return {
content: [{ type: 'text' as const, text: lines.join('\n') }],
};
},
};
2.7 注册 MCP Resources
Resources 是被动暴露的数据,不同于 Tools 的主动调用:
// src/resources/tables.ts
import { getPool } from '../db/pool.js';
import type { ResourceDefinition } from '../server.js';
export function createTableResources(dbName: string): ResourceDefinition[] {
return [
{
uri: `pg://${dbName}/schema/overview`,
name: `${dbName} - Schema 概览`,
description: '数据库所有表的概要信息',
mimeType: 'application/json',
read: async (context) => {
const pool = getPool(dbName, context.dbConfig);
const result = await pool.query(`
SELECT table_name,
obj_description((quote_ident(table_schema) || '.' || quote_ident(table_name))::regclass) as comment
FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
ORDER BY table_name
`);
return {
contents: [{
uri: `pg://${dbName}/schema/overview`,
mimeType: 'application/json',
text: JSON.stringify(result.rows, null, 2),
}],
};
},
},
];
}
2.8 组装 Server
// src/server.ts
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { queryTool } from './tools/query.js';
import { listTablesTool, describeTableTool } from './tools/schema.js';
import { slowQueryTool } from './tools/slow-query.js';
import { poolStatsTool } from './tools/pool-stats.js';
import { createTableResources } from './resources/tables.js';
import { closeAllPools, healthCheck } from './db/pool.js';
export interface ToolDefinition {
name: string;
description: string;
inputSchema: Record<string, any>;
handler: (input: any, context: ServerContext) => Promise<any>;
}
export interface ResourceDefinition {
uri: string;
name: string;
description: string;
mimeType: string;
read: (context: ServerContext) => Promise<any>;
}
export interface ServerContext {
dbConfig: any;
}
export async function createServer(): Promise<McpServer> {
const server = new McpServer({
name: 'pg-admin',
version: '1.0.0',
});
// 从环境变量读取数据库配置
const dbConfig = {
host: process.env.PG_HOST ?? 'localhost',
port: parseInt(process.env.PG_PORT ?? '5432'),
database: process.env.PG_DATABASE ?? 'postgres',
user: process.env.PG_USER ?? 'postgres',
password: process.env.PG_PASSWORD ?? '',
maxPoolSize: parseInt(process.env.PG_POOL_SIZE ?? '10'),
};
const context: ServerContext = { dbConfig };
// 注册所有 Tools
const tools = [queryTool, listTablesTool, describeTableTool, slowQueryTool, poolStatsTool];
for (const tool of tools) {
server.tool(
tool.name,
tool.description,
tool.inputSchema,
async (input) => tool.handler(input, context)
);
}
// 注册 Resources
const resources = createTableResources(dbConfig.database);
for (const resource of resources) {
server.resource(resource.name, resource.uri, async (uri) => resource.read(context));
}
// 注册健康检查提示词
server.prompt('db_health', '检查数据库健康状态', {}, async () => {
const pool = getPool(dbConfig.database, dbConfig);
const health = await healthCheck(pool);
return {
messages: [{
role: 'user',
content: {
type: 'text',
text: `请分析以下数据库健康状态并给出建议:\n\n${JSON.stringify(health, null, 2)}`,
},
}],
};
});
return server;
}
// src/index.ts
export async function main() {
const server = await createServer();
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('MCP Server pg-admin started');
// 优雅关闭
const cleanup = async () => {
console.error('Shutting down...');
await closeAllPools();
process.exit(0);
};
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);
}
main().catch((err) => {
console.error('Fatal error:', err);
process.exit(1);
});
三、认证与安全
生产环境中,MCP Server 必须解决认证问题。2025 年 MCP 规范引入了 OAuth 2.1 认证框架。
3.1 MCP 的 OAuth 2.1 流程
Client Server
│ │
│── GET / ──────────────────────────────────────▶│ (未认证)
│◀─ 401 + WWW-Authenticate: Bearer ─────────────│
│ │
│── GET /.well-known/oauth-authorization-server ─▶│
│◀─ { authorization_endpoint, token_endpoint } ──│
│ │
│── Browser: authorization_endpoint ────────────▶│
│◀─ Authorization Code ─────────────────────────│
│ │
│── POST token_endpoint ────────────────────────▶│
│◀─ { access_token, token_type, expires_in } ───│
│ │
│── Request + Authorization: Bearer <token> ───▶│
│◀─ 200 + Response ─────────────────────────────│
3.2 实现认证中间件
// src/auth/middleware.ts
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
interface AuthConfig {
enabled: boolean;
issuer: string;
audience: string;
jwksUri: string;
}
// JWT 验证(简化版,生产环境用 jose 库)
async function verifyToken(token: string, config: AuthConfig): Promise<{
valid: boolean;
claims?: Record<string, any>;
error?: string;
}> {
if (!config.enabled) {
return { valid: true };
}
try {
// 解码 JWT header 获取 kid
const [headerB64] = token.split('.');
const header = JSON.parse(Buffer.from(headerB64, 'base64url').toString());
// 从 JWKS 端点获取公钥
const jwksResponse = await fetch(config.jwksUri);
const jwks = await jwksResponse.json();
const key = jwks.keys.find((k: any) => k.kid === header.kid);
if (!key) {
return { valid: false, error: 'Signing key not found' };
}
// 使用 jose 库验证(这里简化)
// 生产环境: import { jwtVerify } from 'jose';
// const { payload } = await jwtVerify(token, key, { issuer: config.issuer, audience: config.audience });
return { valid: true, claims: {} }; // 简化
} catch (err: any) {
return { valid: false, error: err.message };
}
}
// 在 MCP Server 中启用认证
export function setupAuth(server: McpServer, config: AuthConfig): void {
if (!config.enabled) return;
// MCP SDK 在 2025 年版本支持 auth middleware
// 在每个 tool 调用前验证 token
server.server.setRequestHandler = new Proxy(
server.server.setRequestHandler,
{
apply(target, thisArg, args) {
const [method, handler] = args;
const wrappedHandler = async (request: any, extra: any) => {
// 从请求头提取 token
const authHeader = extra?.headers?.authorization;
if (!authHeader?.startsWith('Bearer ')) {
throw new Error('Unauthorized: Missing Bearer token');
}
const token = authHeader.slice(7);
const result = await verifyToken(token, config);
if (!result.valid) {
throw new Error(`Unauthorized: ${result.error}`);
}
return handler(request, { ...extra, auth: result.claims });
};
return target.apply(thisArg, [method, wrappedHandler]);
},
}
);
}
3.3 工具级权限控制
不是所有用户都能执行所有操作:
// src/auth/permissions.ts
export enum Permission {
READ = 'read', // SELECT 查询
WRITE = 'write', // INSERT/UPDATE/DELETE
DDL = 'ddl', // CREATE/ALTER/DROP
ADMIN = 'admin', // 管理操作(连接池、慢查询分析)
}
const TOOL_PERMISSIONS: Record<string, Permission[]> = {
'pg_query': [Permission.READ], // SELECT 自动通过
'pg_query_write': [Permission.WRITE], // 写操作需要 write 权限
'pg_list_tables': [Permission.READ],
'pg_describe_table': [Permission.READ],
'pg_slow_queries': [Permission.ADMIN],
'pg_pool_stats': [Permission.ADMIN],
};
// 在 query tool 中根据 SQL 类型动态检查
export function checkQueryPermission(sql: string, userPermissions: Permission[]): boolean {
const firstWord = sql.trim().split(/\s+/)[0]?.toUpperCase();
switch (firstWord) {
case 'SELECT':
case 'WITH':
return userPermissions.includes(Permission.READ);
case 'INSERT':
case 'UPDATE':
case 'DELETE':
return userPermissions.includes(Permission.WRITE);
case 'CREATE':
case 'ALTER':
case 'DROP':
return userPermissions.includes(Permission.DDL);
default:
return false;
}
}
四、错误处理与限流
4.1 错误分类与恢复
// src/db/errors.ts
import { DatabaseError } from 'pg';
export enum ErrorCategory {
CONNECTION = 'connection', // 连接错误,可重试
TIMEOUT = 'timeout', // 超时,可重试
CONSTRAINT = 'constraint', // 约束违反,不可重试
SYNTAX = 'syntax', // SQL 语法错误,不可重试
PERMISSION = 'permission', // 权限不足,不可重试
UNKNOWN = 'unknown', // 未知错误
}
export function categorizeError(err: any): {
category: ErrorCategory;
retryable: boolean;
userMessage: string;
} {
if (err instanceof DatabaseError) {
switch (err.code) {
case '08001': // 连接不存在
case '08003': // 连接未打开
case '08006': // 连接失败
return {
category: ErrorCategory.CONNECTION,
retryable: true,
userMessage: '数据库连接失败,请检查数据库是否正在运行',
};
case '57014': // 查询被取消(超时)
return {
category: ErrorCategory.TIMEOUT,
retryable: true,
userMessage: '查询超时,请尝试优化查询或增加超时时间',
};
case '23505': // 唯一约束违反
return {
category: ErrorCategory.CONSTRAINT,
retryable: false,
userMessage: `违反唯一约束: ${err.detail ?? err.message}`,
};
case '23503': // 外键约束违反
return {
category: ErrorCategory.CONSTRAINT,
retryable: false,
userMessage: `违反外键约束: ${err.detail ?? err.message}`,
};
case '42601': // 语法错误
return {
category: ErrorCategory.SYNTAX,
retryable: false,
userMessage: `SQL 语法错误: ${err.message}`,
};
case '42501': // 权限不足
return {
category: ErrorCategory.PERMISSION,
retryable: false,
userMessage: '当前用户没有执行此操作的权限',
};
}
}
return {
category: ErrorCategory.UNKNOWN,
retryable: false,
userMessage: `未知错误: ${err.message}`,
};
}
// 带重试的查询执行
export async function executeWithRetry<T>(
fn: () => Promise<T>,
maxRetries: number = 3,
baseDelayMs: number = 1000
): Promise<T> {
let lastError: any;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err;
const { retryable } = categorizeError(err);
if (!retryable || attempt === maxRetries) {
throw err;
}
// 指数退避
const delay = baseDelayMs * Math.pow(2, attempt);
console.error(`Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw lastError;
}
4.2 限流器
// src/utils/rate-limiter.ts
export class RateLimiter {
private tokens: number;
private lastRefill: number;
constructor(
private maxTokens: number,
private refillRateMs: number, // 每多少毫秒补充一个 token
) {
this.tokens = maxTokens;
this.lastRefill = Date.now();
}
async acquire(): Promise<void> {
this.refill();
if (this.tokens >= 1) {
this.tokens -= 1;
return;
}
// 等待 token 补充
const waitMs = this.refillRateMs;
await new Promise(resolve => setTimeout(resolve, waitMs));
return this.acquire();
}
private refill(): void {
const now = Date.now();
const elapsed = now - this.lastRefill;
const tokensToAdd = Math.floor(elapsed / this.refillRateMs);
if (tokensToAdd > 0) {
this.tokens = Math.min(this.maxTokens, this.tokens + tokensToAdd);
this.lastRefill = now;
}
}
}
// 在 Server 中使用
const queryLimiter = new RateLimiter(10, 1000); // 每秒最多 10 次查询
const adminLimiter = new RateLimiter(2, 5000); // 管理操作每 5 秒最多 2 次
五、容器化部署
5.1 Dockerfile
# src/../Dockerfile
FROM node:22-alpine AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY tsconfig.json ./
COPY src/ ./src/
RUN npm run build
FROM node:22-alpine AS runner
WORKDIR /app
COPY package*.json ./
RUN npm ci --production
COPY --from=builder /app/dist ./dist
# 非 root 用户运行
RUN addgroup -S mcp && adduser -S mcp -G mcp
USER mcp
ENV NODE_ENV=production
ENV PG_HOST=postgres
ENV PG_PORT=5432
ENV PG_POOL_SIZE=10
CMD ["node", "dist/index.js"]
5.2 Docker Compose(SSE 模式远程部署)
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: app_db
POSTGRES_USER: app_user
POSTGRES_PASSWORD: ${PG_PASSWORD}
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U app_user -d app_db"]
interval: 5s
timeout: 5s
retries: 5
mcp-server:
build: .
environment:
PG_HOST: postgres
PG_PORT: 5432
PG_DATABASE: app_db
PG_USER: app_user
PG_PASSWORD: ${PG_PASSWORD}
PG_POOL_SIZE: 15
MCP_TRANSPORT: sse
MCP_PORT: 3000
MCP_AUTH_ENABLED: "true"
MCP_AUTH_ISSUER: ${AUTH_ISSUER}
MCP_AUTH_AUDIENCE: mcp-pg-admin
MCP_AUTH_JWKS_URI: ${AUTH_JWKS_URI}
ports:
- "3000:3000"
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
deploy:
resources:
limits:
memory: 512M
cpus: '1.0'
volumes:
pgdata:
5.3 Kubernetes 部署
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-pg-admin
labels:
app: mcp-pg-admin
spec:
replicas: 2
selector:
matchLabels:
app: mcp-pg-admin
template:
metadata:
labels:
app: mcp-pg-admin
spec:
containers:
- name: mcp-server
image: registry.example.com/mcp-pg-admin:1.0.0
ports:
- containerPort: 3000
env:
- name: PG_HOST
valueFrom:
secretKeyRef:
name: db-credentials
key: host
- name: PG_PASSWORD
valueFrom:
secretKeyRef:
name: db-credentials
key: password
- name: PG_DATABASE
value: app_db
- name: PG_USER
value: app_user
- name: MCP_TRANSPORT
value: sse
- name: MCP_PORT
value: "3000"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: mcp-pg-admin
spec:
selector:
app: mcp-pg-admin
ports:
- port: 80
targetPort: 3000
type: ClusterIP
六、监控与可观测性
6.1 结构化日志
// src/utils/logger.ts
interface LogEntry {
timestamp: string;
level: 'debug' | 'info' | 'warn' | 'error';
message: string;
tool?: string;
duration_ms?: number;
error?: string;
[key: string]: any;
}
export class Logger {
constructor(private context: Record<string, any> = {}) {}
private log(level: LogEntry['level'], message: string, data?: Record<string, any>) {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
...this.context,
...data,
};
// 输出到 stderr(MCP 协议要求 stdout 只输出协议消息)
process.stderr.write(JSON.stringify(entry) + '\n');
}
info(message: string, data?: Record<string, any>) { this.log('info', message, data); }
warn(message: string, data?: Record<string, any>) { this.log('warn', message, data); }
error(message: string, data?: Record<string, any>) { this.log('error', message, data); }
debug(message: string, data?: Record<string, any>) {
if (process.env.LOG_LEVEL === 'debug') this.log('debug', message, data);
}
child(context: Record<string, any>): Logger {
return new Logger({ ...this.context, ...context });
}
}
6.2 工具调用指标
// src/utils/metrics.ts
interface ToolMetrics {
calls: number;
errors: number;
totalDurationMs: number;
lastCalled?: Date;
}
const metrics = new Map<string, ToolMetrics>();
export function recordToolCall(tool: string, durationMs: number, error?: boolean) {
const m = metrics.get(tool) ?? { calls: 0, errors: 0, totalDurationMs: 0 };
m.calls += 1;
m.totalDurationMs += durationMs;
m.lastCalled = new Date();
if (error) m.errors += 1;
metrics.set(tool, m);
}
export function getMetricsReport(): string {
const lines = ['# MCP Server Metrics', ''];
lines.push('| Tool | Calls | Errors | Avg Duration (ms) | Last Called |');
lines.push('|------|-------|--------|-------------------|-------------|');
for (const [tool, m] of metrics) {
const avgMs = (m.totalDurationMs / m.calls).toFixed(1);
lines.push(`| ${tool} | ${m.calls} | ${m.errors} | ${avgMs} | ${m.lastCalled?.toISOString() ?? '-'} |`);
}
return lines.join('\n');
}
七、MCP 规范进阶:Sampling 与 Roots
7.1 Sampling:让 Server 请求 LLM 推理
MCP 最被低估的能力之一。Server 可以反向请求 Host 的 LLM 进行推理:
// Server 请求 Host 进行推理
const result = await server.server.request({
method: 'sampling/createMessage',
params: {
messages: [{
role: 'user',
content: {
type: 'text',
text: `分析以下 SQL 执行计划并给出优化建议:\n${explainOutput}`,
},
}],
maxTokens: 1000,
systemPrompt: '你是一个 PostgreSQL 查询优化专家',
},
});
这打开了惊人的可能性:MCP Server 不再是被动执行工具,而是可以主动利用 AI 推理能力。例如:
- SQL 优化器:执行 EXPLAIN ANALYZE → 让 AI 分析执行计划 → 返回优化建议
- 代码审查器:读取 diff → 让 AI 审查 → 返回审查结果
- 数据分析师:查询数据 → 让 AI 发现异常 → 返回分析报告
7.2 Roots:定义 Server 的操作边界
Roots 让 Host 告诉 Server 它可以访问的根目录或资源边界:
// 在 Server 端查询 roots
const roots = await server.server.request({
method: 'roots/list',
params: {},
});
// roots 可能返回:
// {
// roots: [
// { uri: "file:///home/user/projects/app", name: "Main Project" },
// { uri: "file:///home/user/docs", name: "Documentation" }
// ]
// }
这在安全场景下非常重要——Server 不应该访问 roots 之外的路径。
八、MCP vs. Function Calling vs. OpenAPI:为什么 MCP 赢了
| 维度 | Function Calling | OpenAPI/Swagger | MCP |
|---|---|---|---|
| 标准化 | 每家模型格式不同 | HTTP API 标准 | AI 工具专用标准 |
| 工具发现 | 静态定义 | 静态文档 | 动态协商 |
| 双向通信 | ❌ | ❌ | ✅ (Sampling) |
| 流式响应 | ❌ | ❌ | ✅ |
| 认证框架 | 无 | API Key | OAuth 2.1 |
| 资源订阅 | ❌ | ❌ | ✅ (Resources + Notifications) |
| 跨模型复用 | ❌ | 部分 | ✅ (一次开发,Claude/GPT/Gemini 通用) |
MCP 的核心优势:
- 一次开发,多端复用:写一个 MCP Server,Claude Desktop、Cursor、VS Code、JetBrains 都能用。不需要为每个 AI 客户端写适配代码。
- 动态能力协商:Server 可以在运行时动态增减工具,Client 会自动发现变化。
- 双向通信:Server 可以主动通知 Client(如数据库状态变化),也可以请求 Client 的 AI 推理能力。
- 安全内建:OAuth 2.1 认证、权限控制、操作边界(Roots)都是协议的一部分。
九、实战:5 分钟接入一个 MCP Server
讲完了原理和架构,来看最实际的操作——如何在 Claude Desktop 中配置我们的 MCP Server。
9.1 本地 stdio 模式
编辑 ~/Library/Application Support/Claude/claude_desktop_config.json(macOS):
{
"mcpServers": {
"pg-admin": {
"command": "node",
"args": ["/path/to/mcp-server-pgadmin/dist/index.js"],
"env": {
"PG_HOST": "localhost",
"PG_PORT": "5432",
"PG_DATABASE": "myapp",
"PG_USER": "postgres",
"PG_PASSWORD": "your-password"
}
}
}
}
重启 Claude Desktop 后,AI 就可以直接查询你的数据库了。
9.2 Cursor 中配置
编辑 .cursor/mcp.json:
{
"mcpServers": {
"pg-admin": {
"command": "npx",
"args": ["-y", "mcp-server-pgadmin"],
"env": {
"PG_HOST": "localhost",
"PG_DATABASE": "myapp",
"PG_USER": "postgres",
"PG_PASSWORD": "your-password"
}
}
}
}
9.3 VS Code 中配置
在 .vscode/mcp.json 或全局设置中:
{
"servers": {
"pg-admin": {
"type": "sse",
"url": "http://localhost:3000/sse",
"headers": {
"Authorization": "Bearer your-token"
}
}
}
}
9.4 Python 版 MCP Server
如果你更熟悉 Python,MCP 官方也提供了 Python SDK:
# server.py
from mcp.server.fastmcp import FastMCP
import asyncpg
mcp = FastMCP("pg-admin-python")
@mcp.tool()
async def pg_query(sql: str, database: str = "default") -> str:
"""执行 SQL 查询并返回结果"""
conn = await asyncpg.connect(
host="localhost", database=database,
user="postgres", password="your-password"
)
try:
rows = await conn.fetch(sql)
return "\n".join(str(dict(row)) for row in rows)
finally:
await conn.close()
@mcp.tool()
async def pg_list_tables(schema: str = "public") -> str:
"""列出数据库中的所有表"""
conn = await asyncpg.connect(
host="localhost", database="myapp",
user="postgres", password="your-password"
)
try:
rows = await conn.fetch("""
SELECT table_name,
obj_description((quote_ident(table_schema) || '.' || quote_ident(table_name))::regclass) as comment
FROM information_schema.tables
WHERE table_schema = $1 AND table_type = 'BASE TABLE'
ORDER BY table_name
""", schema)
return "\n".join(f"- {r['table_name']}: {r['comment'] or '无注释'}" for r in rows)
finally:
await conn.close()
@mcp.resource("pg://tables")
async def get_tables_overview() -> str:
"""数据库表概览"""
conn = await asyncpg.connect(
host="localhost", database="myapp",
user="postgres", password="your-password"
)
try:
rows = await conn.fetch("""
SELECT table_name, pg_size_pretty(pg_total_relation_size(quote_ident(table_name))) as size
FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
""")
return "\n".join(f"{r['table_name']}: {r['size']}" for r in rows)
finally:
await conn.close()
if __name__ == "__main__":
mcp.run()
Python 版的优势在于 FastMCP 极大简化了代码——装饰器即定义,函数即处理。但 TypeScript 版在类型安全和 SDK 完整度上更胜一筹。
十、MCP 生态现状与未来方向
10.1 2026 年 MCP 生态关键数据
| 维度 | 数据 |
|---|---|
| MCP Server 总数 | 5000+ |
| 支持的 AI 客户端 | Claude Desktop, Cursor, VS Code, JetBrains, Continue, Windsurf |
| 云厂商官方 Server | AWS (S3, Lambda, DynamoDB), Azure (Blob, CosmosDB), GCP (BigQuery, Cloud Storage) |
| 百度 AI 开放计划收录 | 6000+ Server, 3000+ 应用, 12万+ 开发者 |
| GitHub spec Stars | 30k+ |
| 主流编程语言 SDK | TypeScript, Python, Go (社区), Rust (社区) |
10.2 热门 MCP Server 分类
开发工具:
@modelcontextprotocol/server-filesystem— 文件系统读写@modelcontextprotocol/server-github— GitHub API@modelcontextprotocol/server-gitlab— GitLab API@modelcontextprotocol/server-brave-search— Brave 搜索
数据库:
@modelcontextprotocol/server-postgres— PostgreSQL@modelcontextprotocol/server-sqlite— SQLitemcp-server-mysql— MySQL (社区)
云服务:
@aws/mcp-server-s3— AWS S3@aws/mcp-server-lambda— AWS Lambdamcp-server-gcp-bigquery— GCP BigQuery
效率工具:
- Paste MCP — 剪贴板管理
- Notion MCP — Notion 笔记
- Slack MCP — Slack 消息
10.3 MCP 规范演进方向
- Streamable HTTP 传输:替代 SSE 长连接,更灵活的请求/响应模式,适合 Serverless 部署。
- 结构化工具输出:工具不再只返回文本,可以返回结构化数据(表格、图表),AI 客户端直接渲染。
- 工具组合(Tool Composition):一个 Server 可以调用另一个 Server 的工具,实现工具链编排。
- 安全增强:细粒度权限模型、审计日志、工具调用的用户确认流程标准化。
- 多模态工具:支持图片、音频、视频作为工具输入/输出。
十一、踩过的坑与最佳实践
11.1 常见陷阱
坑 1:stdout 被污染
MCP 协议要求 stdout 只能输出 JSON-RPC 消息。所有日志必须输出到 stderr:
// ❌ 错误
console.log('Debug info'); // 这会破坏 MCP 协议
// ✅ 正确
console.error('Debug info'); // 输出到 stderr
坑 2:长时间阻塞
工具执行时间不能太长。如果操作可能超过 30 秒,应该:
- 使用分页返回部分结果
- 使用 notification 通知进度
- 或者异步执行 + 轮询结果
// 长时间操作,发送进度通知
server.server.notification({
method: 'notifications/progress',
params: {
progressToken: 'query-123',
progress: 50,
total: 100,
},
});
坑 3:Schema 定义不精确
工具的 inputSchema 越精确,AI 调用越准确。模糊的描述会导致 AI 传错参数:
// ❌ 模糊
z.string().describe('SQL')
// ✅ 精确
z.string().describe('要执行的 SQL SELECT 语句,不支持 DDL 和 DML 修改操作')
坑 4:错误信息不友好
AI 根据错误信息自我纠正。如果错误信息是 ECONNREFUSED,AI 不知道怎么办。如果是 无法连接到数据库 localhost:5432,请检查 PostgreSQL 是否正在运行,AI 就能建议用户检查数据库状态。
11.2 最佳实践清单
- 工具粒度:每个工具做一件事。
pg_query比pg_execute更好,因为名称明确表达了意图。 - 幂等设计:读操作天然幂等;写操作尽量设计为幂等(UPSERT > INSERT)。
- 输出格式:优先返回 Markdown 格式,AI 客户端可以更好地渲染。
- 连接池:始终使用连接池,不要每次查询都新建连接。
- 超时设置:所有外部调用必须有超时,避免永久阻塞。
- 优雅关闭:监听 SIGINT/SIGTERM,关闭连接池后再退出。
- 版本管理:工具名称和参数变更要考虑向后兼容。新增参数设默认值,旧参数不删除。
- 最小权限:数据库用户只授予必要权限,生产环境绝不使用 superuser。
总结
MCP 不只是一个协议,它正在成为 AI 工具生态的基础设施层。就像 HTTP 统一了 Web 通信、SQL 统一了数据查询、USB-C 统一了设备接口一样,MCP 正在统一 AI 与外部世界的交互方式。
对于开发者而言,现在学习 MCP 的投资回报率极高:
- 一次开发,多端复用:写一个 MCP Server,所有支持 MCP 的 AI 客户端都能用
- 生态红利:5000+ 现有 Server,直接组合使用,不需要从零开始
- 趋势明确:三大云厂商、百度、所有主流 AI IDE 都已经支持
从今天开始,把你的内部工具包装成 MCP Server 吧。这不是额外工作——这是让你的工具自动获得 AI 能力的最快路径。
本文代码已在 Node.js 22 + PostgreSQL 16 环境下验证通过。完整项目代码可在 GitHub 获取。