OmniRoute深度实践:构建高可用AI智能路由网关——整合多AI提供商的最佳实践
当你需要在236家AI提供商之间做智能路由、负载均衡、故障切换和成本优化时,OmniRoute给出了一个开源的解法。本文从架构设计到代码实战,深入剖析如何构建一个生产级的AI智能路由网关。
目录
- 背景与痛点:为什么需要AI智能路由网关
- 核心概念:AI路由网关的架构设计
- OmniRoute深度解析:架构与实现
- 代码实战:从零构建AI智能路由网关
- 高级特性:负载均衡、故障切换与成本优化
- 性能优化:缓存、并发与流式响应
- 生产实践:监控、告警与安全
- 总结与展望
背景与痛点:为什么需要AI智能路由网关
1.1 AI提供商碎片化的现实困境
2026年的AI开发生态呈现出一个显著特征:AI提供商爆炸式增长。从OpenAI、Anthropic、Google到国内的DeepSeek、智谱、MiniMax,再到各种垂直领域的小模型提供商,开发者面临着前所未有的"选择困境"。
核心痛点:
- 接口不统一:每家AI提供商都有自己的一套API规范,请求格式、响应结构、错误码各不相同
- 价格差异巨大:同样质量的推理,不同提供商的价格可能相差10倍以上
- 可用性参差不齐:某些提供商API稳定性差,需要备用方案
- 速率限制不同:每个提供商都有不同的TPM(Tokens Per Minute)限制
- 模型能力差异:不同模型在不同任务上表现各异,需要智能路由
1.2 现有解决方案的局限性
在OmniRoute出现之前,开发者通常有以下几种选择:
方案A:单一提供商绑定
# 简单但脆弱
import openai
response = openai.ChatCompletion.create(
model="gpt-5.5",
messages=[{"role": "user", "content": "Hello"}]
)
问题:单点故障,无法利用价格优势,无法根据任务选择最优模型。
方案B:手动封装多提供商
# 繁琐且难以维护
class AIClient:
def __init__(self):
self.providers = {
'openai': OpenAIClient(),
'anthropic': AnthropicClient(),
'deepseek': DeepSeekClient()
}
def chat(self, provider, model, messages):
# 每个提供商都要单独处理
if provider == 'openai':
return self.providers['openai'].chat(model, messages)
elif provider == 'anthropic':
return self.providers['anthropic'].chat(model, messages)
# ... 无穷无尽的if-else
问题:代码臃肿,新增提供商需要修改核心代码,无法动态路由。
方案C:使用LangChain等框架
from langchain.chat_models import ChatOpenAI, ChatAnthropic
问题:抽象层级过高,难以精细控制路由逻辑,性能开销大。
1.3 OmniRoute的解决思路
OmniRoute提出了一个优雅的解决方案:
- 统一抽象层:所有AI提供商都实现相同的接口
- 智能路由:根据任务类型、成本、延迟、可用性自动选择最优提供商
- 故障自动切换:当主提供商失败时,自动切换到备用提供商
- 负载均衡:在多个提供商之间分散请求,提高吞吐量
- 成本优化:优先考虑性价比高的提供商
核心概念:AI路由网关的架构设计
2.1 网关模式(Gateway Pattern)
AI路由网关本质上是一个**API网关(API Gateway)**的特化版本。其核心职责是:
客户端 → AI路由网关 → 多个AI提供商
↓
路由决策引擎
- 任务分析
- 成本计算
- 延迟预测
- 可用性检查
2.2 核心组件设计
一个完整的AI路由网关应包含以下核心组件:
2.2.1 Provider适配器层
// 统一接口定义
interface AIProvider {
// 聊天补全接口
chatCompletion(request: ChatRequest): Promise<ChatResponse>;
// 文本嵌入接口
embedding(request: EmbedRequest): Promise<EmbedResponse>;
// 获取模型列表
listModels(): Promise<Model[]>;
// 获取提供商状态
getStatus(): Promise<ProviderStatus>;
// 获取定价信息
getPricing(): Promise<PricingInfo>;
}
// OpenAI适配器实现
class OpenAIProvider implements AIProvider {
private apiKey: string;
private baseURL: string;
constructor(config: OpenAIConfig) {
this.apiKey = config.apiKey;
this.baseURL = config.baseURL || 'https://api.openai.com/v1';
}
async chatCompletion(request: ChatRequest): Promise<ChatResponse> {
const response = await fetch(`${this.baseURL}/chat/completions`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: request.model,
messages: request.messages,
temperature: request.temperature,
max_tokens: request.maxTokens,
stream: request.stream
})
});
if (!response.ok) {
throw new AIProviderError({
provider: 'openai',
status: response.status,
message: await response.text()
});
}
return await response.json();
}
async embedding(request: EmbedRequest): Promise<EmbedResponse> {
// 类似实现...
}
async listModels(): Promise<Model[]> {
const response = await fetch(`${this.baseURL}/models`, {
headers: { 'Authorization': `Bearer ${this.apiKey}` }
});
const data = await response.json();
return data.data.map(model => ({
id: model.id,
contextWindow: model.context_window,
pricing: this.extractPricing(model.id)
}));
}
async getStatus(): Promise<ProviderStatus> {
// 通过健康检查端点或最近错误率判断
try {
await this.chatCompletion({
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: 'ping' }],
maxTokens: 10
});
return { status: 'healthy', latency: 0 };
} catch (error) {
return { status: 'unhealthy', error: error.message };
}
}
async getPricing(): Promise<PricingInfo> {
// 返回缓存的定价信息或从API获取
return {
'gpt-4o': { input: 5.00, output: 15.00 }, // 每百万token
'gpt-4o-mini': { input: 0.15, output: 0.60 },
'gpt-5.5': { input: 10.00, output: 30.00 }
};
}
private extractPricing(modelId: string): PricingInfo {
// 从模型ID提取定价信息
// 实际实现可能需要查询定价API或配置文件
}
}
2.2.2 路由决策引擎
路由决策引擎是网关的大脑,负责根据多种策略选择最优提供商。
// 路由策略接口
interface RoutingStrategy {
selectProvider(
request: ChatRequest,
providers: AIProvider[]
): Promise<AIProvider>;
}
// 成本优先策略
class CostOptimizedStrategy implements RoutingStrategy {
async selectProvider(
request: ChatRequest,
providers: AIProvider[]
): Promise<AIProvider> {
const estimations = await Promise.all(
providers.map(async (provider) => {
const pricing = await provider.getPricing();
const estimatedCost = this.estimateCost(request, pricing);
const status = await provider.getStatus();
return {
provider,
estimatedCost,
isHealthy: status.status === 'healthy'
};
})
);
// 过滤掉不健康的提供商
const healthyProviders = estimations.filter(e => e.isHealthy);
if (healthyProviders.length === 0) {
throw new Error('No healthy providers available');
}
// 选择成本最低的
healthyProviders.sort((a, b) => a.estimatedCost - b.estimatedCost);
return healthyProviders[0].provider;
}
private estimateCost(request: ChatRequest, pricing: PricingInfo): number {
// 简单估算:假设输入500tokens,输出150tokens
const inputTokens = 500;
const outputTokens = 150;
const modelPricing = pricing[request.model];
if (!modelPricing) {
return Infinity; // 该提供商不支持此模型
}
return (inputTokens / 1_000_000) * modelPricing.input +
(outputTokens / 1_000_000) * modelPricing.output;
}
}
// 延迟优先策略
class LatencyOptimizedStrategy implements RoutingStrategy {
async selectProvider(
request: ChatRequest,
providers: AIProvider[]
): Promise<AIProvider> {
const latencies = await Promise.all(
providers.map(async (provider) => {
const start = Date.now();
try {
await provider.getStatus();
const latency = Date.now() - start;
return { provider, latency, isHealthy: true };
} catch (error) {
return { provider, latency: Infinity, isHealthy: false };
}
})
);
const healthyProviders = latencies.filter(l => l.isHealthy);
if (healthyProviders.length === 0) {
throw new Error('No healthy providers available');
}
healthyProviders.sort((a, b) => a.latency - b.latency);
return healthyProviders[0].provider;
}
}
// 混合策略:成本+延迟+质量的加权评分
class HybridRoutingStrategy implements RoutingStrategy {
private weights: { cost: number; latency: number; quality: number };
constructor(weights: { cost: number; latency: number; quality: number }) {
this.weights = weights;
}
async selectProvider(
request: ChatRequest,
providers: AIProvider[]
): Promise<AIProvider> {
const scores = await Promise.all(
providers.map(async (provider) => {
const [cost, latency, quality, status] = await Promise.all([
this.getCostScore(provider, request),
this.getLatencyScore(provider),
this.getQualityScore(provider, request.model),
provider.getStatus()
]);
if (status.status !== 'healthy') {
return { provider, score: -Infinity };
}
const totalScore =
this.weights.cost * cost +
this.weights.latency * latency +
this.weights.quality * quality;
return { provider, score: totalScore };
})
);
const validScores = scores.filter(s => s.score > -Infinity);
if (validScores.length === 0) {
throw new Error('No healthy providers available');
}
validScores.sort((a, b) => b.score - a.score); // 降序,高分优先
return validScores[0].provider;
}
private async getCostScore(provider: AIProvider, request: ChatRequest): Promise<number> {
const pricing = await provider.getPricing();
const estimatedCost = this.estimateCost(request, pricing);
// 成本越低,分数越高(归一化到0-1)
return 1 / (1 + estimatedCost);
}
private async getLatencyScore(provider: AIProvider): Promise<number> {
const start = Date.now();
try {
await provider.getStatus();
const latency = Date.now() - start;
// 延迟越低,分数越高
return 1 / (1 + latency / 1000); // 转换为秒
} catch (error) {
return 0;
}
}
private async getQualityScore(provider: AIProvider, model: string): Promise<number> {
// 根据基准测试或用户反馈给模型质量打分
const qualityScores = {
'gpt-5.5': 0.98,
'claude-4-opus': 0.97,
'gpt-4o': 0.95,
'claude-4-sonnet': 0.93,
'deepseek-v4': 0.92,
'gpt-4o-mini': 0.85
};
return qualityScores[model] || 0.5;
}
private estimateCost(request: ChatRequest, pricing: PricingInfo): number {
// 同 CostOptimizedStrategy
const inputTokens = 500;
const outputTokens = 150;
const modelPricing = pricing[request.model];
if (!modelPricing) {
return Infinity;
}
return (inputTokens / 1_000_000) * modelPricing.input +
(outputTokens / 1_000_000) * modelPricing.output;
}
}
2.2.3 故障切换(Failover)机制
class FailoverManager {
private primaryProvider: AIProvider;
private backupProviders: AIProvider[];
private healthCheckInterval: number = 30000; // 30秒
constructor(
primary: AIProvider,
backups: AIProvider[],
options?: { healthCheckInterval?: number }
) {
this.primaryProvider = primary;
this.backupProviders = backups;
if (options?.healthCheckInterval) {
this.healthCheckInterval = options.healthCheckInterval;
}
this.startHealthCheck();
}
async executeWithFailover<T>(
operation: (provider: AIProvider) => Promise<T>
): Promise<T> {
const allProviders = [this.primaryProvider, ...this.backupProviders];
for (const provider of allProviders) {
try {
const status = await provider.getStatus();
if (status.status !== 'healthy') {
console.warn(`Provider ${provider.constructor.name} is unhealthy, skipping`);
continue;
}
const result = await operation(provider);
return result;
} catch (error) {
console.error(`Provider ${provider.constructor.name} failed:`, error);
// 标记提供商为不健康
this.markUnhealthy(provider);
// 继续尝试下一个提供商
continue;
}
}
throw new Error('All providers failed');
}
private markUnhealthy(provider: AIProvider): void {
// 在实际实现中,这会将提供商标记为不健康,
// 并在下一次健康检查之前避免使用它
console.error(`Marking provider ${provider.constructor.name} as unhealthy`);
}
private startHealthCheck(): void {
setInterval(async () => {
for (const provider of [this.primaryProvider, ...this.backupProviders]) {
try {
await provider.getStatus();
this.markHealthy(provider);
} catch (error) {
this.markUnhealthy(provider);
}
}
}, this.healthCheckInterval);
}
private markHealthy(provider: AIProvider): void {
console.log(`Provider ${provider.constructor.name} is healthy`);
}
}
2.3 请求/响应转换层
不同AI提供商的API格式差异很大,需要统一的转换层:
// 统一请求格式
interface UnifiedChatRequest {
model: string;
messages: Message[];
temperature?: number;
maxTokens?: number;
stream?: boolean;
[key: string]: any; // 额外参数
}
// 统一响应格式
interface UnifiedChatResponse {
id: string;
model: string;
choices: Choice[];
usage: {
promptTokens: number;
completionTokens: number;
totalTokens: number;
};
provider: string; // 实际使用的提供商
}
// 请求转换器
class RequestTransformer {
static toOpenAIFormat(request: UnifiedChatRequest): any {
return {
model: request.model,
messages: request.messages,
temperature: request.temperature,
max_tokens: request.maxTokens,
stream: request.stream
};
}
static toAnthropicFormat(request: UnifiedChatRequest): any {
// Anthropic的格式不同
return {
model: request.model,
messages: request.messages.map(msg => ({
role: msg.role === 'assistant' ? 'assistant' : 'user',
content: msg.content
})),
max_tokens: request.maxTokens,
temperature: request.temperature
};
}
static toDeepSeekFormat(request: UnifiedChatRequest): any {
// DeepSeek兼容OpenAI格式,但可能有细微差异
return this.toOpenAIFormat(request);
}
}
// 响应转换器
class ResponseTransformer {
static fromOpenAIFormat(response: any, provider: string): UnifiedChatResponse {
return {
id: response.id,
model: response.model,
choices: response.choices.map(choice => ({
index: choice.index,
message: choice.message,
finishReason: choice.finish_reason
})),
usage: {
promptTokens: response.usage.prompt_tokens,
completionTokens: response.usage.completion_tokens,
totalTokens: response.usage.total_tokens
},
provider
};
}
static fromAnthropicFormat(response: any, provider: string): UnifiedChatResponse {
return {
id: response.id,
model: response.model,
choices: [{
index: 0,
message: {
role: 'assistant',
content: response.content[0].text
},
finishReason: response.stop_reason
}],
usage: {
promptTokens: response.usage.input_tokens,
completionTokens: response.usage.output_tokens,
totalTokens: response.usage.input_tokens + response.usage.output_tokens
},
provider
};
}
}
OmniRoute深度解析:架构与实现
3.1 OmniRoute的核心架构
OmniRoute采用了插件化和可组合的架构设计,其核心模块包括:
OmniRoute
├── Provider Registry(提供商注册中心)
├── Router Core(路由核心)
│ ├── Strategy Engine(策略引擎)
│ ├── Load Balancer(负载均衡器)
│ └── Failover Manager(故障切换管理器)
├── Middleware Pipeline(中间件管道)
│ ├── Authentication(认证)
│ ├── Rate Limiting(限流)
│ ├── Caching(缓存)
│ ├── Logging(日志)
│ └── Metrics(指标收集)
├── Admin API(管理API)
└── Dashboard(监控面板)
3.2 Provider Registry实现
class ProviderRegistry {
private providers: Map<string, AIProvider> = new Map();
private providerConfigs: Map<string, ProviderConfig> = new Map();
register(name: string, provider: AIProvider, config?: ProviderConfig): void {
this.providers.set(name, provider);
if (config) {
this.providerConfigs.set(name, config);
}
}
get(name: string): AIProvider | undefined {
return this.providers.get(name);
}
list(): string[] {
return Array.from(this.providers.keys());
}
async getHealthyProviders(): Promise<string[]> {
const healthy: string[] = [];
for (const [name, provider] of this.providers) {
try {
const status = await provider.getStatus();
if (status.status === 'healthy') {
healthy.push(name);
}
} catch (error) {
// 跳过不健康的提供商
}
}
return healthy;
}
async getProviderPricing(name: string): Promise<PricingInfo | undefined> {
const provider = this.providers.get(name);
if (!provider) {
return undefined;
}
return await provider.getPricing();
}
}
// 使用示例
const registry = new ProviderRegistry();
// 注册OpenAI
registry.register('openai', new OpenAIProvider({
apiKey: process.env.OPENAI_API_KEY!
}));
// 注册Anthropic
registry.register('anthropic', new AnthropicProvider({
apiKey: process.env.ANTHROPIC_API_KEY!
}));
// 注册DeepSeek
registry.register('deepseek', new DeepSeekProvider({
apiKey: process.env.DEEPSEEK_API_KEY!
}));
3.3 中间件管道(Middleware Pipeline)
中间件管道允许在请求处理的前后插入自定义逻辑:
interface Middleware {
process(
request: UnifiedChatRequest,
next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
): Promise<UnifiedChatResponse>;
}
// 认证中间件
class AuthenticationMiddleware implements Middleware {
async process(
request: UnifiedChatRequest,
next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
): Promise<UnifiedChatResponse> {
// 检查API密钥
if (!request.apiKey && !request.headers?.['Authorization']) {
throw new Error('Authentication required');
}
// 验证API密钥有效性
const apiKey = request.apiKey || request.headers!['Authorization'].replace('Bearer ', '');
const isValid = await this.validateApiKey(apiKey);
if (!isValid) {
throw new Error('Invalid API key');
}
// 继续处理
return await next(request);
}
private async validateApiKey(apiKey: string): Promise<boolean> {
// 在实际实现中,这会查询数据库或缓存
return true; // 简化
}
}
// 限流中间件
class RateLimitingMiddleware implements Middleware {
private rateLimiter: RateLimiter;
constructor(rateLimiter: RateLimiter) {
this.rateLimiter = rateLimiter;
}
async process(
request: UnifiedChatRequest,
next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
): Promise<UnifiedChatResponse> {
const clientId = this.getClientId(request);
if (!await this.rateLimiter.allow(clientId)) {
throw new Error('Rate limit exceeded');
}
return await next(request);
}
private getClientId(request: UnifiedChatRequest): string {
// 根据API密钥或IP地址识别客户端
return request.apiKey || request.ipAddress || 'anonymous';
}
}
// 缓存中间件
class CachingMiddleware implements Middleware {
private cache: Cache;
constructor(cache: Cache) {
this.cache = cache;
}
async process(
request: UnifiedChatRequest,
next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
): Promise<UnifiedChatResponse> {
// 只有非流式请求才缓存
if (request.stream) {
return await next(request);
}
const cacheKey = this.generateCacheKey(request);
// 检查缓存
const cached = await this.cache.get(cacheKey);
if (cached) {
return cached;
}
// 执行请求
const response = await next(request);
// 缓存响应
await this.cache.set(cacheKey, response, 3600); // 1小时
return response;
}
private generateCacheKey(request: UnifiedChatRequest): string {
// 生成基于请求内容的缓存键
const content = JSON.stringify({
model: request.model,
messages: request.messages,
temperature: request.temperature,
maxTokens: request.maxTokens
});
return `chat:${Buffer.from(content).toString('sha256')}`;
}
}
// 日志中间件
class LoggingMiddleware implements Middleware {
async process(
request: UnifiedChatRequest,
next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
): Promise<UnifiedChatResponse> {
const start = Date.now();
try {
const response = await next(request);
const latency = Date.now() - start;
console.log({
type: 'request',
model: request.model,
provider: response.provider,
latency,
promptTokens: response.usage.promptTokens,
completionTokens: response.usage.completionTokens,
status: 'success'
});
return response;
} catch (error) {
const latency = Date.now() - start;
console.error({
type: 'request',
model: request.model,
latency,
error: error.message,
status: 'error'
});
throw error;
}
}
}
3.4 Router Core实现
class RouterCore {
private registry: ProviderRegistry;
private strategy: RoutingStrategy;
private middlewares: Middleware[] = [];
private failoverManager: FailoverManager;
constructor(
registry: ProviderRegistry,
strategy: RoutingStrategy,
options?: { failover?: FailoverManager }
) {
this.registry = registry;
this.strategy = strategy;
this.failoverManager = options?.failover || new FailoverManager(
registry.get('openai')!,
[registry.get('anthropic')!, registry.get('deepseek')!]
);
}
use(middleware: Middleware): void {
this.middlewares.push(middleware);
}
async route(request: UnifiedChatRequest): Promise<UnifiedChatResponse> {
// 构建中间件管道
const pipeline = this.buildPipeline();
// 执行管道
return await pipeline(request);
}
private buildPipeline(): (req: UnifiedChatRequest) => Promise<UnifiedChatResponse> {
// 从内到外构建中间件链
let pipeline = async (req: UnifiedChatRequest) => {
// 核心路由逻辑
return await this.executeRouting(req);
};
// 包装中间件(注意顺序:最后一个中间件最外层)
for (let i = this.middlewares.length - 1; i >= 0; i--) {
const middleware = this.middlewares[i];
const next = pipeline;
pipeline = async (req: UnifiedChatRequest) => {
return await middleware.process(req, next);
};
}
return pipeline;
}
private async executeRouting(request: UnifiedChatRequest): Promise<UnifiedChatResponse> {
// 获取健康的提供商列表
const healthyProviders = await this.registry.getHealthyProviders();
if (healthyProviders.length === 0) {
throw new Error('No healthy providers available');
}
// 将提供商名称转换为AIProvider实例
const providerInstances = healthyProviders
.map(name => this.registry.get(name)!)
.filter(p => p !== undefined);
// 使用路由策略选择最优提供商
const selectedProvider = await this.strategy.selectProvider(request, providerInstances);
// 使用故障切换执行请求
return await this.failoverManager.executeWithFailover(async (provider) => {
// 根据提供商类型转换请求格式
const providerName = this.getProviderName(provider);
const transformedRequest = this.transformRequest(request, providerName);
// 执行请求
const rawResponse = await provider.chatCompletion(transformedRequest);
// 转换响应格式
const unifiedResponse = this.transformResponse(rawResponse, providerName);
return unifiedResponse;
});
}
private getProviderName(provider: AIProvider): string {
// 在实际实现中,可能需要维护一个反向映射
// 这里简化为检查构造函数名称
return provider.constructor.name.replace('Provider', '').toLowerCase();
}
private transformRequest(request: UnifiedChatRequest, providerName: string): any {
switch (providerName) {
case 'openai':
return RequestTransformer.toOpenAIFormat(request);
case 'anthropic':
return RequestTransformer.toAnthropicFormat(request);
case 'deepseek':
return RequestTransformer.toDeepSeekFormat(request);
default:
throw new Error(`Unknown provider: ${providerName}`);
}
}
private transformResponse(response: any, providerName: string): UnifiedChatResponse {
switch (providerName) {
case 'openai':
case 'deepseek':
return ResponseTransformer.fromOpenAIFormat(response, providerName);
case 'anthropic':
return ResponseTransformer.fromAnthropicFormat(response, providerName);
default:
throw new Error(`Unknown provider: ${providerName}`);
}
}
}
代码实战:从零构建AI智能路由网关
4.1 项目初始化
# 创建项目
mkdir ai-router-gateway
cd ai-router-gateway
# 初始化npm项目
npm init -y
# 安装依赖
npm install typescript @types/node ts-node
npm install express @types/express
npm install axios @types/axios
npm install redis ioredis @types/ioredis
npm install dotenv
npm install winston @types/winston # 日志
npm install prom-client # 指标收集
# 安装开发依赖
npm install -D jest @types/jest ts-jest
4.2 项目结构
ai-router-gateway/
├── src/
│ ├── providers/ # 提供商适配器
│ │ ├── base.ts
│ │ ├── openai.ts
│ │ ├── anthropic.ts
│ │ ├── deepseek.ts
│ │ └── index.ts
│ ├── routing/ # 路由策略
│ │ ├── strategy.ts
│ │ ├── cost-optimized.ts
│ │ ├── latency-optimized.ts
│ │ ├── hybrid.ts
│ │ └── index.ts
│ ├── middleware/ # 中间件
│ │ ├── auth.ts
│ │ ├── rate-limit.ts
│ │ ├── cache.ts
│ │ ├── logging.ts
│ │ └── index.ts
│ ├── core/ # 核心模块
│ │ ├── registry.ts
│ │ ├── router.ts
│ │ ├── failover.ts
│ │ └── index.ts
│ ├── utils/ # 工具函数
│ │ ├── transformer.ts
│ │ ├── cache.ts
│ │ └── logger.ts
│ ├── config/ # 配置
│ │ ├── index.ts
│ │ └── providers.ts
│ ├── app.ts # Express应用
│ └── server.ts # 入口文件
├── tests/ # 测试
├── package.json
├── tsconfig.json
├── .env.example
└── README.md
4.3 配置文件
// src/config/providers.ts
export interface ProviderConfig {
name: string;
apiKey: string;
baseURL?: string;
timeout?: number;
weight?: number; // 负载均衡权重
enabled?: boolean;
}
export const providerConfigs: ProviderConfig[] = [
{
name: 'openai',
apiKey: process.env.OPENAI_API_KEY || '',
baseURL: process.env.OPENAI_BASE_URL,
timeout: 60000,
weight: 3,
enabled: true
},
{
name: 'anthropic',
apiKey: process.env.ANTHROPIC_API_KEY || '',
timeout: 60000,
weight: 2,
enabled: true
},
{
name: 'deepseek',
apiKey: process.env.DEEPSEEK_API_KEY || '',
baseURL: 'https://api.deepseek.com/v1',
timeout: 60000,
weight: 4,
enabled: true
}
];
// src/config/index.ts
export const config = {
server: {
port: parseInt(process.env.PORT || '3000'),
host: process.env.HOST || '0.0.0.0'
},
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD
},
routing: {
strategy: process.env.ROUTING_STRATEGY || 'hybrid', // cost|latency|hybrid
weights: {
cost: parseFloat(process.env.WEIGHT_COST || '0.4'),
latency: parseFloat(process.env.WEIGHT_LATENCY || '0.3'),
quality: parseFloat(process.env.WEIGHT_QUALITY || '0.3')
}
},
cache: {
enabled: process.env.CACHE_ENABLED !== 'false',
ttl: parseInt(process.env.CACHE_TTL || '3600')
},
rateLimit: {
enabled: process.env.RATE_LIMIT_ENABLED !== 'false',
windowMs: parseInt(process.env.RATE_LIMIT_WINDOW_MS || '60000'),
maxRequests: parseInt(process.env.RATE_LIMIT_MAX || '100')
}
};
4.4 Express应用实现
// src/app.ts
import express, { Express, Request, Response, NextFunction } from 'express';
import { RouterCore } from './core/router';
import { ProviderRegistry } from './core/registry';
import { config } from './config';
export function createApp(): Express {
const app = express();
// 解析JSON请求体
app.use(express.json());
// 初始化提供商注册中心
const registry = new ProviderRegistry();
initializeProviders(registry);
// 初始化路由核心
const routingStrategy = createRoutingStrategy(config.routing.strategy);
const router = new RouterCore(registry, routingStrategy);
// 注册中间件
if (config.rateLimit.enabled) {
router.use(new RateLimitingMiddleware(/* ... */));
}
if (config.cache.enabled) {
router.use(new CachingMiddleware(/* ... */));
}
router.use(new LoggingMiddleware());
// 聊天补全端点
app.post('/v1/chat/completions', async (req: Request, res: Response) => {
try {
const request: UnifiedChatRequest = {
model: req.body.model,
messages: req.body.messages,
temperature: req.body.temperature,
maxTokens: req.body.max_tokens,
stream: req.body.stream || false,
apiKey: req.headers['authorization']?.replace('Bearer ', '')
};
const response = await router.route(request);
res.json(response);
} catch (error: any) {
console.error('Chat completion error:', error);
if (error.message === 'Authentication required') {
res.status(401).json({ error: 'Unauthorized' });
} else if (error.message === 'Rate limit exceeded') {
res.status(429).json({ error: 'Too many requests' });
} else if (error.message === 'All providers failed') {
res.status(503).json({ error: 'Service unavailable' });
} else {
res.status(500).json({ error: 'Internal server error' });
}
}
});
// 模型列表端点
app.get('/v1/models', async (req: Request, res: Response) => {
try {
const models = await registry.listModels();
res.json({ data: models });
} catch (error) {
res.status(500).json({ error: 'Failed to fetch models' });
}
});
// 健康检查端点
app.get('/health', async (req: Request, res: Response) => {
const healthyProviders = await registry.getHealthyProviders();
res.json({
status: healthyProviders.length > 0 ? 'healthy' : 'degraded',
healthyProviders,
totalProviders: registry.list().length
});
});
// 指标端点(Prometheus格式)
app.get('/metrics', async (req: Request, res: Response) => {
const metrics = await collectMetrics();
res.set('Content-Type', 'text/plain');
res.send(metrics);
});
return app;
}
function initializeProviders(registry: ProviderRegistry): void {
// 从配置文件初始化所有提供商
for (const providerConfig of providerConfigs) {
if (!providerConfig.enabled) {
continue;
}
let provider: AIProvider;
switch (providerConfig.name) {
case 'openai':
provider = new OpenAIProvider(providerConfig);
break;
case 'anthropic':
provider = new AnthropicProvider(providerConfig);
break;
case 'deepseek':
provider = new DeepSeekProvider(providerConfig);
break;
default:
console.warn(`Unknown provider: ${providerConfig.name}`);
continue;
}
registry.register(providerConfig.name, provider, providerConfig);
}
}
function createRoutingStrategy(strategyName: string): RoutingStrategy {
switch (strategyName) {
case 'cost':
return new CostOptimizedStrategy();
case 'latency':
return new LatencyOptimizedStrategy();
case 'hybrid':
return new HybridRoutingStrategy(config.routing.weights);
default:
throw new Error(`Unknown routing strategy: ${strategyName}`);
}
}
4.5 入口文件
// src/server.ts
import { createApp } from './app';
import { config } from './config';
const app = createApp();
app.listen(config.server.port, config.server.host, () => {
console.log(`AI Router Gateway listening on ${config.server.host}:${config.server.port}`);
console.log(`Routing strategy: ${config.routing.strategy}`);
console.log(`Cache: ${config.cache.enabled ? 'enabled' : 'disabled'}`);
console.log(`Rate limiting: ${config.rateLimit.enabled ? 'enabled' : 'disabled'}`);
});
高级特性:负载均衡、故障切换与成本优化
5.1 负载均衡算法
除了简单的轮询和随机选择,生产环境需要更复杂的负载均衡算法:
// 加权轮询(Weighted Round Robin)
class WeightedRoundRobin {
private weights: Map<string, number>;
private currentIndex: number = 0;
private gcdWeight: number;
constructor(providers: { name: string; weight: number }[]) {
this.weights = new Map(providers.map(p => [p.name, p.weight]));
this.gcdWeight = this.calculateGCD([...this.weights.values()]);
}
select(): string {
const names = [...this.weights.keys()];
const weights = [...this.weights.values()];
while (true) {
this.currentIndex = (this.currentIndex + 1) % names.length;
if (this.currentIndex === 0) {
this.currentWeight -= this.gcdWeight;
if (this.currentWeight <= 0) {
this.currentWeight = Math.max(...weights);
}
}
if (weights[this.currentIndex] >= this.currentWeight) {
return names[this.currentIndex];
}
}
}
private calculateGCD(numbers: number[]): number {
let result = numbers[0];
for (let i = 1; i < numbers.length; i++) {
result = this.gcd(result, numbers[i]);
}
return result;
}
private gcd(a: number, b: number): number {
if (b === 0) return a;
return this.gcd(b, a % b);
}
}
// 最少连接(Least Connections)
class LeastConnections {
private connections: Map<string, number> = new Map();
constructor(providers: string[]) {
for (const name of providers) {
this.connections.set(name, 0);
}
}
select(): string {
let minConnections = Infinity;
let selected = '';
for (const [name, count] of this.connections) {
if (count < minConnections) {
minConnections = count;
selected = name;
}
}
// 增加连接计数
this.connections.set(selected, this.connections.get(selected)! + 1);
return selected;
}
release(name: string): void {
const count = this.connections.get(name);
if (count && count > 0) {
this.connections.set(name, count - 1);
}
}
}
// 一致性哈希(Consistent Hashing)
class ConsistentHashing {
private ring: Map<number, string> = new Map();
private virtualNodes: number = 150; // 每个物理节点的虚拟节点数
constructor(providers: string[]) {
for (const provider of providers) {
this.addNode(provider);
}
}
select(key: string): string {
const hash = this.hash(key);
// 找到第一个哈希值大于等于key哈希的节点
const hashes = [...this.ring.keys()].sort((a, b) => a - b);
for (const nodeHash of hashes) {
if (nodeHash >= hash) {
return this.ring.get(nodeHash)!;
}
}
// 如果没找到,返回第一个节点(环形)
return this.ring.get(hashes[0])!;
}
private addNode(provider: string): void {
for (let i = 0; i < this.virtualNodes; i++) {
const virtualKey = `${provider}:${i}`;
const hash = this.hash(virtualKey);
this.ring.set(hash, provider);
}
}
private hash(key: string): number {
// 使用简单的哈希函数(生产环境应使用更健壮的哈希)
let hash = 0;
for (let i = 0; i < key.length; i++) {
const char = key.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // 转换为32位整数
}
return Math.abs(hash);
}
}
5.2 智能故障切换
class IntelligentFailoverManager {
private providers: AIProvider[];
private healthStatus: Map<AIProvider, HealthStatus> = new Map();
private circuitBreakers: Map<AIProvider, CircuitBreaker> = new Map();
constructor(providers: AIProvider[]) {
this.providers = providers;
// 初始化每个提供商的断路器
for (const provider of providers) {
this.circuitBreakers.set(provider, new CircuitBreaker({
failureThreshold: 5,
resetTimeout: 60000, // 1分钟后尝试恢复
halfOpenMaxRequests: 3
}));
this.healthStatus.set(provider, {
status: 'healthy',
lastCheck: Date.now(),
consecutiveFailures: 0
});
}
// 启动健康检查
this.startHealthCheck();
}
async execute<T>(operation: (provider: AIProvider) => Promise<T>): Promise<T> {
// 获取健康的提供商(断路器半开或关闭的)
const availableProviders = this.providers.filter(provider => {
const breaker = this.circuitBreakers.get(provider)!;
return breaker.state !== 'open';
});
if (availableProviders.length === 0) {
throw new Error('All providers are unavailable (circuit breaker open)');
}
// 按照优先级排序(健康度、延迟等)
const sortedProviders = this.sortByHealthAndLatency(availableProviders);
for (const provider of sortedProviders) {
const breaker = this.circuitBreakers.get(provider)!;
try {
// 通过断路器执行
const result = await breaker.execute(() => operation(provider));
// 成功,重置健康状态
this.recordSuccess(provider);
return result;
} catch (error) {
// 失败,记录
this.recordFailure(provider, error);
// 继续尝试下一个提供商
continue;
}
}
throw new Error('All providers failed');
}
private sortByHealthAndLatency(providers: AIProvider[]): AIProvider[] {
return providers.sort((a, b) => {
const statusA = this.healthStatus.get(a)!;
const statusB = this.healthStatus.get(b)!;
// 首先按健康状态排序
if (statusA.status !== statusB.status) {
return statusA.status === 'healthy' ? -1 : 1;
}
// 然后按延迟排序
return statusA.lastLatency - statusB.lastLatency;
});
}
private recordSuccess(provider: AIProvider): void {
const status = this.healthStatus.get(provider)!;
status.status = 'healthy';
status.consecutiveFailures = 0;
status.lastCheck = Date.now();
}
private recordFailure(provider: AIProvider, error: any): void {
const status = this.healthStatus.get(provider)!;
status.consecutiveFailures++;
status.lastError = error.message;
status.lastCheck = Date.now();
// 如果连续失败超过阈值,标记为不健康
if (status.consecutiveFailures >= 3) {
status.status = 'unhealthy';
}
}
private startHealthCheck(): void {
setInterval(async () => {
for (const provider of this.providers) {
try {
const start = Date.now();
await provider.getStatus();
const latency = Date.now() - start;
const status = this.healthStatus.get(provider)!;
status.status = 'healthy';
status.lastLatency = latency;
status.lastCheck = Date.now();
} catch (error) {
const status = this.healthStatus.get(provider)!;
status.status = 'unhealthy';
status.lastCheck = Date.now();
}
}
}, 30000); // 每30秒检查一次
}
}
// 断路器实现
class CircuitBreaker {
private state: 'closed' | 'open' | 'half-open' = 'closed';
private failureCount: number = 0;
private successCount: number = 0;
private lastFailureTime?: number;
constructor(
private options: {
failureThreshold: number;
resetTimeout: number;
halfOpenMaxRequests: number;
}
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
if (this.shouldAttemptReset()) {
this.state = 'half-open';
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await operation();
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
private shouldAttemptReset(): boolean {
return Date.now() - this.lastFailureTime! >= this.options.resetTimeout;
}
private recordSuccess(): void {
if (this.state === 'half-open') {
this.successCount++;
if (this.successCount >= this.options.halfOpenMaxRequests) {
this.state = 'closed';
this.failureCount = 0;
this.successCount = 0;
}
} else {
this.failureCount = 0;
}
}
private recordFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.options.failureThreshold) {
this.state = 'open';
}
}
}
5.3 成本优化策略
class CostOptimizer {
private pricingCache: Map<string, PricingInfo> = new Map();
private usageStats: Map<string, UsageStats> = new Map();
constructor(private registry: ProviderRegistry) {
// 定期更新定价信息
this.updatePricingCache();
setInterval(() => this.updatePricingCache(), 3600000); // 每小时
}
async optimizeRouting(request: ChatRequest): Promise<OptimizationResult> {
const providers = await this.registry.getHealthyProviders();
const options = await Promise.all(
providers.map(async (providerName) => {
const provider = this.registry.get(providerName)!;
const pricing = await this.getPricing(providerName);
const estimatedCost = this.estimateCost(request, pricing);
const estimatedQuality = this.estimateQuality(providerName, request.model);
const estimatedLatency = await this.estimateLatency(providerName);
return {
provider: providerName,
cost: estimatedCost,
quality: estimatedQuality,
latency: estimatedLatency,
valueScore: this.calculateValueScore(estimatedCost, estimatedQuality, estimatedLatency)
};
})
);
// 按价值分数排序
options.sort((a, b) => b.valueScore - a.valueScore);
return {
recommended: options[0],
alternatives: options.slice(1)
};
}
async trackUsage(provider: string, request: ChatRequest, response: ChatResponse): Promise<void> {
const stats = this.usageStats.get(provider) || {
totalRequests: 0,
totalTokens: 0,
totalCost: 0,
averageLatency: 0
};
const cost = this.calculateActualCost(provider, response.usage);
stats.totalRequests++;
stats.totalTokens += response.usage.totalTokens;
stats.totalCost += cost;
stats.averageLatency = (stats.averageLatency * (stats.totalRequests - 1) + response.latency) / stats.totalRequests;
this.usageStats.set(provider, stats);
// 在实际实现中,这可能会写入数据库
console.log(`Usage: provider=${provider}, tokens=${response.usage.totalTokens}, cost=$${cost.toFixed(4)}`);
}
private async updatePricingCache(): Promise<void> {
const providers = this.registry.list();
for (const providerName of providers) {
const provider = this.registry.get(providerName)!;
try {
const pricing = await provider.getPricing();
this.pricingCache.set(providerName, pricing);
} catch (error) {
console.error(`Failed to update pricing for ${providerName}:`, error);
}
}
}
private async getPricing(providerName: string): Promise<PricingInfo> {
let pricing = this.pricingCache.get(providerName);
if (!pricing) {
const provider = this.registry.get(providerName)!;
pricing = await provider.getPricing();
this.pricingCache.set(providerName, pricing);
}
return pricing;
}
private estimateCost(request: ChatRequest, pricing: PricingInfo): number {
// 估算输入和输出token数
const estimatedInputTokens = this.estimateInputTokens(request.messages);
const estimatedOutputTokens = request.maxTokens || 150;
const modelPricing = pricing[request.model];
if (!modelPricing) {
return Infinity;
}
return (estimatedInputTokens / 1_000_000) * modelPricing.input +
(estimatedOutputTokens / 1_000_000) * modelPricing.output;
}
private estimateInputTokens(messages: Message[]): number {
// 简单估算:每个字符约0.3个token(对于英文)
// 生产环境应使用真正的tokenizer
const totalChars = messages.reduce((sum, msg) => sum + msg.content.length, 0);
return Math.ceil(totalChars * 0.3);
}
private estimateQuality(providerName: string, model: string): number {
// 基于基准测试的质量分数
const qualityMatrix: Record<string, Record<string, number>> = {
'openai': {
'gpt-5.5': 0.98,
'gpt-4o': 0.95,
'gpt-4o-mini': 0.85
},
'anthropic': {
'claude-4-opus': 0.97,
'claude-4-sonnet': 0.93
},
'deepseek': {
'deepseek-v4': 0.92,
'deepseek-coder': 0.88
}
};
return qualityMatrix[providerName]?.[model] || 0.5;
}
private async estimateLatency(providerName: string): Promise<number> {
const stats = this.usageStats.get(providerName);
return stats?.averageLatency || 2000; // 默认2秒
}
private calculateValueScore(cost: number, quality: number, latency: number): number {
// 价值分数 = 质量 / (成本 * 延迟因子)
const latencyFactor = 1 + (latency / 10000); // 延迟归一化
return quality / (cost * latencyFactor + 0.001); // 避免除以0
}
private calculateActualCost(provider: string, usage: TokenUsage): number {
const pricing = this.pricingCache.get(provider);
if (!pricing) {
return 0;
}
// 找到使用的模型定价
// 在实际实现中,需要从请求中获取模型信息
const model = 'gpt-4o'; // 示例
const modelPricing = pricing[model];
if (!modelPricing) {
return 0;
}
return (usage.promptTokens / 1_000_000) * modelPricing.input +
(usage.completionTokens / 1_000_000) * modelPricing.output;
}
}
性能优化:缓存、并发与流式响应
6.1 多层缓存策略
// L1缓存:内存缓存(最快)
class MemoryCache implements Cache {
private cache: Map<string, CacheEntry> = new Map();
private maxSize: number = 1000;
async get(key: string): Promise<any> {
const entry = this.cache.get(key);
if (!entry) {
return null;
}
// 检查是否过期
if (Date.now() > entry.expiresAt) {
this.cache.delete(key);
return null;
}
// LRU:移动到末尾
this.cache.delete(key);
this.cache.set(key, entry);
return entry.value;
}
async set(key: string, value: any, ttl: number): Promise<void> {
// 如果缓存已满,删除最旧的条目
if (this.cache.size >= this.maxSize) {
const oldestKey = this.cache.keys().next().value;
this.cache.delete(oldestKey);
}
this.cache.set(key, {
value,
expiresAt: Date.now() + ttl * 1000
});
}
async delete(key: string): Promise<void> {
this.cache.delete(key);
}
}
// L2缓存:Redis缓存(共享)
class RedisCache implements Cache {
private client: Redis;
constructor(redisUrl: string) {
this.client = new Redis(redisUrl);
}
async get(key: string): Promise<any> {
const value = await this.client.get(key);
return value ? JSON.parse(value) : null;
}
async set(key: string, value: any, ttl: number): Promise<void> {
await this.client.setex(key, ttl, JSON.stringify(value));
}
async delete(key: string): Promise<void> {
await this.client.del(key);
}
}
// 多层缓存
class MultiLevelCache implements Cache {
private l1Cache: Cache; // Memory
private l2Cache: Cache; // Redis
constructor(options?: { l1Size?: number; redisUrl?: string }) {
this.l1Cache = new MemoryCache(options?.l1Size || 1000);
this.l2Cache = new RedisCache(options?.redisUrl || 'redis://localhost:6379');
}
async get(key: string): Promise<any> {
// 先查L1
let value = await this.l1Cache.get(key);
if (value !== null) {
return value;
}
// L1未命中,查L2
value = await this.l2Cache.get(key);
if (value !== null) {
// 回填L1
await this.l1Cache.set(key, value, 60); // L1缓存1分钟
}
return value;
}
async set(key: string, value: any, ttl: number): Promise<void> {
// 写入L1和L2
await Promise.all([
this.l1Cache.set(key, value, Math.min(ttl, 60)), // L1最多缓存1分钟
this.l2Cache.set(key, value, ttl)
]);
}
async delete(key: string): Promise<void> {
await Promise.all([
this.l1Cache.delete(key),
this.l2Cache.delete(key)
]);
}
}
6.2 并发请求优化
class ConcurrentRequestManager {
private queue: Map<string, Promise<any>> = new Map();
async deduplicate<T>(
key: string,
fn: () => Promise<T>
): Promise<T> {
// 如果已有相同的请求在进行,返回同一个Promise
const existing = this.queue.get(key);
if (existing) {
return existing as Promise<T>;
}
// 否则,执行请求并缓存Promise
const promise = fn().finally(() => {
// 请求完成后,从队列中删除
this.queue.delete(key);
});
this.queue.set(key, promise);
return promise;
}
async batch<T, R>(
items: T[],
fn: (item: T) => Promise<R>,
options?: { maxConcurrency?: number }
): Promise<R[]> {
const maxConcurrency = options?.maxConcurrency || 5;
const results: R[] = [];
// 分批并发执行
for (let i = 0; i < items.length; i += maxConcurrency) {
const batch = items.slice(i, i + maxConcurrency);
const batchResults = await Promise.all(
batch.map(item => fn(item))
);
results.push(...batchResults);
}
return results;
}
}
6.3 流式响应处理
// 支持Server-Sent Events (SSE)
async function handleStreamingResponse(
request: UnifiedChatRequest,
res: Response
): Promise<void> {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const provider = await selectProvider(request);
// 根据提供商类型处理流式响应
if (provider.constructor.name === 'OpenAIProvider') {
await handleOpenAIStream(request, res, provider);
} else if (provider.constructor.name === 'AnthropicProvider') {
await handleAnthropicStream(request, res, provider);
} else {
throw new Error(`Streaming not supported for provider: ${provider.constructor.name}`);
}
}
async function handleOpenAIStream(
request: UnifiedChatRequest,
res: Response,
provider: OpenAIProvider
): Promise<void> {
const stream = await provider.chatCompletionStream({
model: request.model,
messages: request.messages,
temperature: request.temperature,
maxTokens: request.maxTokens,
stream: true
});
// 转发SSE事件
for await (const chunk of stream) {
const data = JSON.stringify(chunk);
res.write(`data: ${data}\n\n`);
}
res.write('data: [DONE]\n\n');
res.end();
}
生产实践:监控、告警与安全
7.1 监控指标
// Prometheus指标
import { register, Counter, Histogram, Gauge } from 'prom-client';
const requestCounter = new Counter({
name: 'ai_gateway_requests_total',
help: 'Total number of requests',
labelNames: ['provider', 'model', 'status']
});
const requestDuration = new Histogram({
name: 'ai_gateway_request_duration_seconds',
help: 'Request duration in seconds',
labelNames: ['provider', 'model'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60]
});
const tokenUsage = new Counter({
name: 'ai_gateway_tokens_total',
help: 'Total number of tokens used',
labelNames: ['provider', 'model', 'type'] // type: prompt|completion
});
const costGauge = new Gauge({
name: 'ai_gateway_cost_dollars',
help: 'Estimated cost in dollars',
labelNames: ['provider', 'model']
});
// 在请求处理中记录指标
async function recordMetrics(
provider: string,
model: string,
duration: number,
usage: TokenUsage,
cost: number,
status: 'success' | 'error'
): Promise<void> {
requestCounter.inc({ provider, model, status });
requestDuration.observe({ provider, model }, duration);
tokenUsage.inc({ provider, model, type: 'prompt' }, usage.promptTokens);
tokenUsage.inc({ provider, model, type: 'completion' }, usage.completionTokens);
costGauge.set({ provider, model }, cost);
}
7.2 告警规则
# alerts.yml
groups:
- name: ai_gateway
rules:
- alert: HighErrorRate
expr: rate(ai_gateway_requests_total{status="error"}[5m]) / rate(ai_gateway_requests_total[5m]) > 0.1
for: 5m
annotations:
summary: "High error rate detected"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(ai_gateway_request_duration_seconds_bucket[5m])) > 5
for: 5m
annotations:
summary: "95th percentile latency > 5 seconds"
- alert: ProviderDown
expr: up{job="ai_gateway"} == 0
for: 1m
annotations:
summary: "Provider {{ $labels.provider }} is down"
7.3 安全最佳实践
// API密钥加密存储
import crypto from 'crypto';
class APIKeyManager {
private encryptionKey: Buffer;
constructor(masterKey: string) {
// 从主密钥派生加密密钥
this.encryptionKey = crypto.scryptSync(masterKey, 'salt', 32);
}
encrypt(apiKey: string): string {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv('aes-256-gcm', this.encryptionKey, iv);
let encrypted = cipher.update(apiKey, 'utf8', 'hex');
encrypted += cipher.final('hex');
const authTag = cipher.getAuthTag();
return iv.toString('hex') + ':' + authTag.toString('hex') + ':' + encrypted;
}
decrypt(encryptedKey: string): string {
const [ivHex, authTagHex, encrypted] = encryptedKey.split(':');
const iv = Buffer.from(ivHex, 'hex');
const authTag = Buffer.from(authTagHex, 'hex');
const decipher = crypto.createDecipheriv('aes-256-gcm', this.encryptionKey, iv);
decipher.setAuthTag(authTag);
let decrypted = decipher.update(encrypted, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return decrypted;
}
}
// 速率限制
class TokenBucket {
private tokens: number;
private lastRefill: number;
constructor(
private capacity: number,
private refillRate: number // tokens per second
) {
this.tokens = capacity;
this.lastRefill = Date.now();
}
consume(count: number = 1): boolean {
this.refill();
if (this.tokens >= count) {
this.tokens -= count;
return true;
}
return false;
}
private refill(): void {
const now = Date.now();
const timePassed = (now - this.lastRefill) / 1000;
const tokensToAdd = timePassed * this.refillRate;
this.tokens = Math.min(this.capacity, this.tokens + tokensToAdd);
this.lastRefill = now;
}
}
总结与展望
8.1 核心要点回顾
在本文中,我们深入探讨了如何构建生产级AI智能路由网关,核心要点包括:
- 统一抽象层:通过适配器模式统一不同AI提供商的接口
- 智能路由:基于成本、延迟、质量的混合路由策略
- 高可用:故障自动切换、断路器、健康检查
- 性能优化:多层缓存、请求去重、并发控制
- 可观测性:指标收集、日志、告警
- 安全:API密钥加密、速率限制、认证
8.2 生产环境检查清单
在将AI路由网关部署到生产环境之前,确保完成以下检查:
- 高可用:部署多个实例,使用负载均衡器
- 监控:配置Prometheus + Grafana仪表板
- 告警:设置关键指标告警(错误率、延迟、成本)
- 安全:启用认证、加密API密钥、配置速率限制
- 灾备:制定提供商全部不可用的应急预案
- 成本控制:设置每日/每月成本上限
- 测试:进行故障注入测试,验证故障切换逻辑
8.3 未来展望
AI路由网关的未来发展方向:
- 更智能的路由:基于ML模型预测请求的最佳提供商
- 边缘部署:将网关部署到边缘节点,降低延迟
- 多模态支持:支持图像、音频、视频等多模态AI请求
- 自助服务:提供Web UI让用户可以配置路由规则、查看用量
- 成本分析:提供详细的成本分析报告和优化建议
参考资源
- OmniRoute GitHub
- OpenAI API Reference
- Anthropic API Docs
- DeepSeek API
- Circuit Breaker Pattern
- Load Balancing Algorithms
本文基于2026年7月最新的AI开发生态撰写,代码示例仅供参考,生产环境请根据实际需求调整。