编程 OmniRoute深度实践:构建高可用AI智能路由网关——整合多AI提供商的最佳实践

2026-07-05 03:13:30 +0800 CST views 8

OmniRoute深度实践:构建高可用AI智能路由网关——整合多AI提供商的最佳实践

当你需要在236家AI提供商之间做智能路由、负载均衡、故障切换和成本优化时,OmniRoute给出了一个开源的解法。本文从架构设计到代码实战,深入剖析如何构建一个生产级的AI智能路由网关。

目录

  1. 背景与痛点:为什么需要AI智能路由网关
  2. 核心概念:AI路由网关的架构设计
  3. OmniRoute深度解析:架构与实现
  4. 代码实战:从零构建AI智能路由网关
  5. 高级特性:负载均衡、故障切换与成本优化
  6. 性能优化:缓存、并发与流式响应
  7. 生产实践:监控、告警与安全
  8. 总结与展望

背景与痛点:为什么需要AI智能路由网关

1.1 AI提供商碎片化的现实困境

2026年的AI开发生态呈现出一个显著特征:AI提供商爆炸式增长。从OpenAI、Anthropic、Google到国内的DeepSeek、智谱、MiniMax,再到各种垂直领域的小模型提供商,开发者面临着前所未有的"选择困境"。

核心痛点:

  • 接口不统一:每家AI提供商都有自己的一套API规范,请求格式、响应结构、错误码各不相同
  • 价格差异巨大:同样质量的推理,不同提供商的价格可能相差10倍以上
  • 可用性参差不齐:某些提供商API稳定性差,需要备用方案
  • 速率限制不同:每个提供商都有不同的TPM(Tokens Per Minute)限制
  • 模型能力差异:不同模型在不同任务上表现各异,需要智能路由

1.2 现有解决方案的局限性

在OmniRoute出现之前,开发者通常有以下几种选择:

方案A:单一提供商绑定

# 简单但脆弱
import openai
response = openai.ChatCompletion.create(
    model="gpt-5.5",
    messages=[{"role": "user", "content": "Hello"}]
)

问题:单点故障,无法利用价格优势,无法根据任务选择最优模型。

方案B:手动封装多提供商

# 繁琐且难以维护
class AIClient:
    def __init__(self):
        self.providers = {
            'openai': OpenAIClient(),
            'anthropic': AnthropicClient(),
            'deepseek': DeepSeekClient()
        }
    
    def chat(self, provider, model, messages):
        # 每个提供商都要单独处理
        if provider == 'openai':
            return self.providers['openai'].chat(model, messages)
        elif provider == 'anthropic':
            return self.providers['anthropic'].chat(model, messages)
        # ... 无穷无尽的if-else

问题:代码臃肿,新增提供商需要修改核心代码,无法动态路由。

方案C:使用LangChain等框架

from langchain.chat_models import ChatOpenAI, ChatAnthropic

问题:抽象层级过高,难以精细控制路由逻辑,性能开销大。

1.3 OmniRoute的解决思路

OmniRoute提出了一个优雅的解决方案:

  1. 统一抽象层:所有AI提供商都实现相同的接口
  2. 智能路由:根据任务类型、成本、延迟、可用性自动选择最优提供商
  3. 故障自动切换:当主提供商失败时,自动切换到备用提供商
  4. 负载均衡:在多个提供商之间分散请求,提高吞吐量
  5. 成本优化:优先考虑性价比高的提供商

核心概念:AI路由网关的架构设计

2.1 网关模式(Gateway Pattern)

AI路由网关本质上是一个**API网关(API Gateway)**的特化版本。其核心职责是:

客户端 → AI路由网关 → 多个AI提供商
                ↓
          路由决策引擎
          - 任务分析
          - 成本计算
          - 延迟预测
          - 可用性检查

2.2 核心组件设计

一个完整的AI路由网关应包含以下核心组件:

2.2.1 Provider适配器层

// 统一接口定义
interface AIProvider {
  // 聊天补全接口
  chatCompletion(request: ChatRequest): Promise<ChatResponse>;
  
  // 文本嵌入接口
  embedding(request: EmbedRequest): Promise<EmbedResponse>;
  
  // 获取模型列表
  listModels(): Promise<Model[]>;
  
  // 获取提供商状态
  getStatus(): Promise<ProviderStatus>;
  
  // 获取定价信息
  getPricing(): Promise<PricingInfo>;
}

// OpenAI适配器实现
class OpenAIProvider implements AIProvider {
  private apiKey: string;
  private baseURL: string;
  
  constructor(config: OpenAIConfig) {
    this.apiKey = config.apiKey;
    this.baseURL = config.baseURL || 'https://api.openai.com/v1';
  }
  
  async chatCompletion(request: ChatRequest): Promise<ChatResponse> {
    const response = await fetch(`${this.baseURL}/chat/completions`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: request.model,
        messages: request.messages,
        temperature: request.temperature,
        max_tokens: request.maxTokens,
        stream: request.stream
      })
    });
    
    if (!response.ok) {
      throw new AIProviderError({
        provider: 'openai',
        status: response.status,
        message: await response.text()
      });
    }
    
    return await response.json();
  }
  
  async embedding(request: EmbedRequest): Promise<EmbedResponse> {
    // 类似实现...
  }
  
  async listModels(): Promise<Model[]> {
    const response = await fetch(`${this.baseURL}/models`, {
      headers: { 'Authorization': `Bearer ${this.apiKey}` }
    });
    const data = await response.json();
    return data.data.map(model => ({
      id: model.id,
      contextWindow: model.context_window,
      pricing: this.extractPricing(model.id)
    }));
  }
  
  async getStatus(): Promise<ProviderStatus> {
    // 通过健康检查端点或最近错误率判断
    try {
      await this.chatCompletion({
        model: 'gpt-4o-mini',
        messages: [{ role: 'user', content: 'ping' }],
        maxTokens: 10
      });
      return { status: 'healthy', latency: 0 };
    } catch (error) {
      return { status: 'unhealthy', error: error.message };
    }
  }
  
  async getPricing(): Promise<PricingInfo> {
    // 返回缓存的定价信息或从API获取
    return {
      'gpt-4o': { input: 5.00, output: 15.00 }, // 每百万token
      'gpt-4o-mini': { input: 0.15, output: 0.60 },
      'gpt-5.5': { input: 10.00, output: 30.00 }
    };
  }
  
  private extractPricing(modelId: string): PricingInfo {
    // 从模型ID提取定价信息
    // 实际实现可能需要查询定价API或配置文件
  }
}

2.2.2 路由决策引擎

路由决策引擎是网关的大脑,负责根据多种策略选择最优提供商。

// 路由策略接口
interface RoutingStrategy {
  selectProvider(
    request: ChatRequest,
    providers: AIProvider[]
  ): Promise<AIProvider>;
}

// 成本优先策略
class CostOptimizedStrategy implements RoutingStrategy {
  async selectProvider(
    request: ChatRequest,
    providers: AIProvider[]
  ): Promise<AIProvider> {
    const estimations = await Promise.all(
      providers.map(async (provider) => {
        const pricing = await provider.getPricing();
        const estimatedCost = this.estimateCost(request, pricing);
        const status = await provider.getStatus();
        
        return {
          provider,
          estimatedCost,
          isHealthy: status.status === 'healthy'
        };
      })
    );
    
    // 过滤掉不健康的提供商
    const healthyProviders = estimations.filter(e => e.isHealthy);
    
    if (healthyProviders.length === 0) {
      throw new Error('No healthy providers available');
    }
    
    // 选择成本最低的
    healthyProviders.sort((a, b) => a.estimatedCost - b.estimatedCost);
    return healthyProviders[0].provider;
  }
  
  private estimateCost(request: ChatRequest, pricing: PricingInfo): number {
    // 简单估算:假设输入500tokens,输出150tokens
    const inputTokens = 500;
    const outputTokens = 150;
    
    const modelPricing = pricing[request.model];
    if (!modelPricing) {
      return Infinity; // 该提供商不支持此模型
    }
    
    return (inputTokens / 1_000_000) * modelPricing.input +
           (outputTokens / 1_000_000) * modelPricing.output;
  }
}

// 延迟优先策略
class LatencyOptimizedStrategy implements RoutingStrategy {
  async selectProvider(
    request: ChatRequest,
    providers: AIProvider[]
  ): Promise<AIProvider> {
    const latencies = await Promise.all(
      providers.map(async (provider) => {
        const start = Date.now();
        try {
          await provider.getStatus();
          const latency = Date.now() - start;
          return { provider, latency, isHealthy: true };
        } catch (error) {
          return { provider, latency: Infinity, isHealthy: false };
        }
      })
    );
    
    const healthyProviders = latencies.filter(l => l.isHealthy);
    
    if (healthyProviders.length === 0) {
      throw new Error('No healthy providers available');
    }
    
    healthyProviders.sort((a, b) => a.latency - b.latency);
    return healthyProviders[0].provider;
  }
}

// 混合策略:成本+延迟+质量的加权评分
class HybridRoutingStrategy implements RoutingStrategy {
  private weights: { cost: number; latency: number; quality: number };
  
  constructor(weights: { cost: number; latency: number; quality: number }) {
    this.weights = weights;
  }
  
  async selectProvider(
    request: ChatRequest,
    providers: AIProvider[]
  ): Promise<AIProvider> {
    const scores = await Promise.all(
      providers.map(async (provider) => {
        const [cost, latency, quality, status] = await Promise.all([
          this.getCostScore(provider, request),
          this.getLatencyScore(provider),
          this.getQualityScore(provider, request.model),
          provider.getStatus()
        ]);
        
        if (status.status !== 'healthy') {
          return { provider, score: -Infinity };
        }
        
        const totalScore = 
          this.weights.cost * cost +
          this.weights.latency * latency +
          this.weights.quality * quality;
        
        return { provider, score: totalScore };
      })
    );
    
    const validScores = scores.filter(s => s.score > -Infinity);
    
    if (validScores.length === 0) {
      throw new Error('No healthy providers available');
    }
    
    validScores.sort((a, b) => b.score - a.score); // 降序,高分优先
    return validScores[0].provider;
  }
  
  private async getCostScore(provider: AIProvider, request: ChatRequest): Promise<number> {
    const pricing = await provider.getPricing();
    const estimatedCost = this.estimateCost(request, pricing);
    
    // 成本越低,分数越高(归一化到0-1)
    return 1 / (1 + estimatedCost);
  }
  
  private async getLatencyScore(provider: AIProvider): Promise<number> {
    const start = Date.now();
    try {
      await provider.getStatus();
      const latency = Date.now() - start;
      
      // 延迟越低,分数越高
      return 1 / (1 + latency / 1000); // 转换为秒
    } catch (error) {
      return 0;
    }
  }
  
  private async getQualityScore(provider: AIProvider, model: string): Promise<number> {
    // 根据基准测试或用户反馈给模型质量打分
    const qualityScores = {
      'gpt-5.5': 0.98,
      'claude-4-opus': 0.97,
      'gpt-4o': 0.95,
      'claude-4-sonnet': 0.93,
      'deepseek-v4': 0.92,
      'gpt-4o-mini': 0.85
    };
    
    return qualityScores[model] || 0.5;
  }
  
  private estimateCost(request: ChatRequest, pricing: PricingInfo): number {
    // 同 CostOptimizedStrategy
    const inputTokens = 500;
    const outputTokens = 150;
    
    const modelPricing = pricing[request.model];
    if (!modelPricing) {
      return Infinity;
    }
    
    return (inputTokens / 1_000_000) * modelPricing.input +
           (outputTokens / 1_000_000) * modelPricing.output;
  }
}

2.2.3 故障切换(Failover)机制

class FailoverManager {
  private primaryProvider: AIProvider;
  private backupProviders: AIProvider[];
  private healthCheckInterval: number = 30000; // 30秒
  
  constructor(
    primary: AIProvider,
    backups: AIProvider[],
    options?: { healthCheckInterval?: number }
  ) {
    this.primaryProvider = primary;
    this.backupProviders = backups;
    
    if (options?.healthCheckInterval) {
      this.healthCheckInterval = options.healthCheckInterval;
    }
    
    this.startHealthCheck();
  }
  
  async executeWithFailover<T>(
    operation: (provider: AIProvider) => Promise<T>
  ): Promise<T> {
    const allProviders = [this.primaryProvider, ...this.backupProviders];
    
    for (const provider of allProviders) {
      try {
        const status = await provider.getStatus();
        if (status.status !== 'healthy') {
          console.warn(`Provider ${provider.constructor.name} is unhealthy, skipping`);
          continue;
        }
        
        const result = await operation(provider);
        return result;
      } catch (error) {
        console.error(`Provider ${provider.constructor.name} failed:`, error);
        
        // 标记提供商为不健康
        this.markUnhealthy(provider);
        
        // 继续尝试下一个提供商
        continue;
      }
    }
    
    throw new Error('All providers failed');
  }
  
  private markUnhealthy(provider: AIProvider): void {
    // 在实际实现中,这会将提供商标记为不健康,
    // 并在下一次健康检查之前避免使用它
    console.error(`Marking provider ${provider.constructor.name} as unhealthy`);
  }
  
  private startHealthCheck(): void {
    setInterval(async () => {
      for (const provider of [this.primaryProvider, ...this.backupProviders]) {
        try {
          await provider.getStatus();
          this.markHealthy(provider);
        } catch (error) {
          this.markUnhealthy(provider);
        }
      }
    }, this.healthCheckInterval);
  }
  
  private markHealthy(provider: AIProvider): void {
    console.log(`Provider ${provider.constructor.name} is healthy`);
  }
}

2.3 请求/响应转换层

不同AI提供商的API格式差异很大,需要统一的转换层:

// 统一请求格式
interface UnifiedChatRequest {
  model: string;
  messages: Message[];
  temperature?: number;
  maxTokens?: number;
  stream?: boolean;
  [key: string]: any; // 额外参数
}

// 统一响应格式
interface UnifiedChatResponse {
  id: string;
  model: string;
  choices: Choice[];
  usage: {
    promptTokens: number;
    completionTokens: number;
    totalTokens: number;
  };
  provider: string; // 实际使用的提供商
}

// 请求转换器
class RequestTransformer {
  static toOpenAIFormat(request: UnifiedChatRequest): any {
    return {
      model: request.model,
      messages: request.messages,
      temperature: request.temperature,
      max_tokens: request.maxTokens,
      stream: request.stream
    };
  }
  
  static toAnthropicFormat(request: UnifiedChatRequest): any {
    // Anthropic的格式不同
    return {
      model: request.model,
      messages: request.messages.map(msg => ({
        role: msg.role === 'assistant' ? 'assistant' : 'user',
        content: msg.content
      })),
      max_tokens: request.maxTokens,
      temperature: request.temperature
    };
  }
  
  static toDeepSeekFormat(request: UnifiedChatRequest): any {
    // DeepSeek兼容OpenAI格式,但可能有细微差异
    return this.toOpenAIFormat(request);
  }
}

// 响应转换器
class ResponseTransformer {
  static fromOpenAIFormat(response: any, provider: string): UnifiedChatResponse {
    return {
      id: response.id,
      model: response.model,
      choices: response.choices.map(choice => ({
        index: choice.index,
        message: choice.message,
        finishReason: choice.finish_reason
      })),
      usage: {
        promptTokens: response.usage.prompt_tokens,
        completionTokens: response.usage.completion_tokens,
        totalTokens: response.usage.total_tokens
      },
      provider
    };
  }
  
  static fromAnthropicFormat(response: any, provider: string): UnifiedChatResponse {
    return {
      id: response.id,
      model: response.model,
      choices: [{
        index: 0,
        message: {
          role: 'assistant',
          content: response.content[0].text
        },
        finishReason: response.stop_reason
      }],
      usage: {
        promptTokens: response.usage.input_tokens,
        completionTokens: response.usage.output_tokens,
        totalTokens: response.usage.input_tokens + response.usage.output_tokens
      },
      provider
    };
  }
}

OmniRoute深度解析:架构与实现

3.1 OmniRoute的核心架构

OmniRoute采用了插件化可组合的架构设计,其核心模块包括:

OmniRoute
├── Provider Registry(提供商注册中心)
├── Router Core(路由核心)
│   ├── Strategy Engine(策略引擎)
│   ├── Load Balancer(负载均衡器)
│   └── Failover Manager(故障切换管理器)
├── Middleware Pipeline(中间件管道)
│   ├── Authentication(认证)
│   ├── Rate Limiting(限流)
│   ├── Caching(缓存)
│   ├── Logging(日志)
│   └── Metrics(指标收集)
├── Admin API(管理API)
└── Dashboard(监控面板)

3.2 Provider Registry实现

class ProviderRegistry {
  private providers: Map<string, AIProvider> = new Map();
  private providerConfigs: Map<string, ProviderConfig> = new Map();
  
  register(name: string, provider: AIProvider, config?: ProviderConfig): void {
    this.providers.set(name, provider);
    if (config) {
      this.providerConfigs.set(name, config);
    }
  }
  
  get(name: string): AIProvider | undefined {
    return this.providers.get(name);
  }
  
  list(): string[] {
    return Array.from(this.providers.keys());
  }
  
  async getHealthyProviders(): Promise<string[]> {
    const healthy: string[] = [];
    
    for (const [name, provider] of this.providers) {
      try {
        const status = await provider.getStatus();
        if (status.status === 'healthy') {
          healthy.push(name);
        }
      } catch (error) {
        // 跳过不健康的提供商
      }
    }
    
    return healthy;
  }
  
  async getProviderPricing(name: string): Promise<PricingInfo | undefined> {
    const provider = this.providers.get(name);
    if (!provider) {
      return undefined;
    }
    
    return await provider.getPricing();
  }
}

// 使用示例
const registry = new ProviderRegistry();

// 注册OpenAI
registry.register('openai', new OpenAIProvider({
  apiKey: process.env.OPENAI_API_KEY!
}));

// 注册Anthropic
registry.register('anthropic', new AnthropicProvider({
  apiKey: process.env.ANTHROPIC_API_KEY!
}));

// 注册DeepSeek
registry.register('deepseek', new DeepSeekProvider({
  apiKey: process.env.DEEPSEEK_API_KEY!
}));

3.3 中间件管道(Middleware Pipeline)

中间件管道允许在请求处理的前后插入自定义逻辑:

interface Middleware {
  process(
    request: UnifiedChatRequest,
    next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
  ): Promise<UnifiedChatResponse>;
}

// 认证中间件
class AuthenticationMiddleware implements Middleware {
  async process(
    request: UnifiedChatRequest,
    next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
  ): Promise<UnifiedChatResponse> {
    // 检查API密钥
    if (!request.apiKey && !request.headers?.['Authorization']) {
      throw new Error('Authentication required');
    }
    
    // 验证API密钥有效性
    const apiKey = request.apiKey || request.headers!['Authorization'].replace('Bearer ', '');
    const isValid = await this.validateApiKey(apiKey);
    
    if (!isValid) {
      throw new Error('Invalid API key');
    }
    
    // 继续处理
    return await next(request);
  }
  
  private async validateApiKey(apiKey: string): Promise<boolean> {
    // 在实际实现中,这会查询数据库或缓存
    return true; // 简化
  }
}

// 限流中间件
class RateLimitingMiddleware implements Middleware {
  private rateLimiter: RateLimiter;
  
  constructor(rateLimiter: RateLimiter) {
    this.rateLimiter = rateLimiter;
  }
  
  async process(
    request: UnifiedChatRequest,
    next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
  ): Promise<UnifiedChatResponse> {
    const clientId = this.getClientId(request);
    
    if (!await this.rateLimiter.allow(clientId)) {
      throw new Error('Rate limit exceeded');
    }
    
    return await next(request);
  }
  
  private getClientId(request: UnifiedChatRequest): string {
    // 根据API密钥或IP地址识别客户端
    return request.apiKey || request.ipAddress || 'anonymous';
  }
}

// 缓存中间件
class CachingMiddleware implements Middleware {
  private cache: Cache;
  
  constructor(cache: Cache) {
    this.cache = cache;
  }
  
  async process(
    request: UnifiedChatRequest,
    next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
  ): Promise<UnifiedChatResponse> {
    // 只有非流式请求才缓存
    if (request.stream) {
      return await next(request);
    }
    
    const cacheKey = this.generateCacheKey(request);
    
    // 检查缓存
    const cached = await this.cache.get(cacheKey);
    if (cached) {
      return cached;
    }
    
    // 执行请求
    const response = await next(request);
    
    // 缓存响应
    await this.cache.set(cacheKey, response, 3600); // 1小时
    
    return response;
  }
  
  private generateCacheKey(request: UnifiedChatRequest): string {
    // 生成基于请求内容的缓存键
    const content = JSON.stringify({
      model: request.model,
      messages: request.messages,
      temperature: request.temperature,
      maxTokens: request.maxTokens
    });
    
    return `chat:${Buffer.from(content).toString('sha256')}`;
  }
}

// 日志中间件
class LoggingMiddleware implements Middleware {
  async process(
    request: UnifiedChatRequest,
    next: (req: UnifiedChatRequest) => Promise<UnifiedChatResponse>
  ): Promise<UnifiedChatResponse> {
    const start = Date.now();
    
    try {
      const response = await next(request);
      const latency = Date.now() - start;
      
      console.log({
        type: 'request',
        model: request.model,
        provider: response.provider,
        latency,
        promptTokens: response.usage.promptTokens,
        completionTokens: response.usage.completionTokens,
        status: 'success'
      });
      
      return response;
    } catch (error) {
      const latency = Date.now() - start;
      
      console.error({
        type: 'request',
        model: request.model,
        latency,
        error: error.message,
        status: 'error'
      });
      
      throw error;
    }
  }
}

3.4 Router Core实现

class RouterCore {
  private registry: ProviderRegistry;
  private strategy: RoutingStrategy;
  private middlewares: Middleware[] = [];
  private failoverManager: FailoverManager;
  
  constructor(
    registry: ProviderRegistry,
    strategy: RoutingStrategy,
    options?: { failover?: FailoverManager }
  ) {
    this.registry = registry;
    this.strategy = strategy;
    this.failoverManager = options?.failover || new FailoverManager(
      registry.get('openai')!,
      [registry.get('anthropic')!, registry.get('deepseek')!]
    );
  }
  
  use(middleware: Middleware): void {
    this.middlewares.push(middleware);
  }
  
  async route(request: UnifiedChatRequest): Promise<UnifiedChatResponse> {
    // 构建中间件管道
    const pipeline = this.buildPipeline();
    
    // 执行管道
    return await pipeline(request);
  }
  
  private buildPipeline(): (req: UnifiedChatRequest) => Promise<UnifiedChatResponse> {
    // 从内到外构建中间件链
    let pipeline = async (req: UnifiedChatRequest) => {
      // 核心路由逻辑
      return await this.executeRouting(req);
    };
    
    // 包装中间件(注意顺序:最后一个中间件最外层)
    for (let i = this.middlewares.length - 1; i >= 0; i--) {
      const middleware = this.middlewares[i];
      const next = pipeline;
      pipeline = async (req: UnifiedChatRequest) => {
        return await middleware.process(req, next);
      };
    }
    
    return pipeline;
  }
  
  private async executeRouting(request: UnifiedChatRequest): Promise<UnifiedChatResponse> {
    // 获取健康的提供商列表
    const healthyProviders = await this.registry.getHealthyProviders();
    
    if (healthyProviders.length === 0) {
      throw new Error('No healthy providers available');
    }
    
    // 将提供商名称转换为AIProvider实例
    const providerInstances = healthyProviders
      .map(name => this.registry.get(name)!)
      .filter(p => p !== undefined);
    
    // 使用路由策略选择最优提供商
    const selectedProvider = await this.strategy.selectProvider(request, providerInstances);
    
    // 使用故障切换执行请求
    return await this.failoverManager.executeWithFailover(async (provider) => {
      // 根据提供商类型转换请求格式
      const providerName = this.getProviderName(provider);
      const transformedRequest = this.transformRequest(request, providerName);
      
      // 执行请求
      const rawResponse = await provider.chatCompletion(transformedRequest);
      
      // 转换响应格式
      const unifiedResponse = this.transformResponse(rawResponse, providerName);
      
      return unifiedResponse;
    });
  }
  
  private getProviderName(provider: AIProvider): string {
    // 在实际实现中,可能需要维护一个反向映射
    // 这里简化为检查构造函数名称
    return provider.constructor.name.replace('Provider', '').toLowerCase();
  }
  
  private transformRequest(request: UnifiedChatRequest, providerName: string): any {
    switch (providerName) {
      case 'openai':
        return RequestTransformer.toOpenAIFormat(request);
      case 'anthropic':
        return RequestTransformer.toAnthropicFormat(request);
      case 'deepseek':
        return RequestTransformer.toDeepSeekFormat(request);
      default:
        throw new Error(`Unknown provider: ${providerName}`);
    }
  }
  
  private transformResponse(response: any, providerName: string): UnifiedChatResponse {
    switch (providerName) {
      case 'openai':
      case 'deepseek':
        return ResponseTransformer.fromOpenAIFormat(response, providerName);
      case 'anthropic':
        return ResponseTransformer.fromAnthropicFormat(response, providerName);
      default:
        throw new Error(`Unknown provider: ${providerName}`);
    }
  }
}

代码实战:从零构建AI智能路由网关

4.1 项目初始化

# 创建项目
mkdir ai-router-gateway
cd ai-router-gateway

# 初始化npm项目
npm init -y

# 安装依赖
npm install typescript @types/node ts-node
npm install express @types/express
npm install axios @types/axios
npm install redis ioredis @types/ioredis
npm install dotenv
npm install winston @types/winston  # 日志
npm install prom-client  # 指标收集

# 安装开发依赖
npm install -D jest @types/jest ts-jest

4.2 项目结构

ai-router-gateway/
├── src/
│   ├── providers/          # 提供商适配器
│   │   ├── base.ts
│   │   ├── openai.ts
│   │   ├── anthropic.ts
│   │   ├── deepseek.ts
│   │   └── index.ts
│   ├── routing/            # 路由策略
│   │   ├── strategy.ts
│   │   ├── cost-optimized.ts
│   │   ├── latency-optimized.ts
│   │   ├── hybrid.ts
│   │   └── index.ts
│   ├── middleware/         # 中间件
│   │   ├── auth.ts
│   │   ├── rate-limit.ts
│   │   ├── cache.ts
│   │   ├── logging.ts
│   │   └── index.ts
│   ├── core/              # 核心模块
│   │   ├── registry.ts
│   │   ├── router.ts
│   │   ├── failover.ts
│   │   └── index.ts
│   ├── utils/             # 工具函数
│   │   ├── transformer.ts
│   │   ├── cache.ts
│   │   └── logger.ts
│   ├── config/            # 配置
│   │   ├── index.ts
│   │   └── providers.ts
│   ├── app.ts             # Express应用
│   └── server.ts          # 入口文件
├── tests/                 # 测试
├── package.json
├── tsconfig.json
├── .env.example
└── README.md

4.3 配置文件

// src/config/providers.ts
export interface ProviderConfig {
  name: string;
  apiKey: string;
  baseURL?: string;
  timeout?: number;
  weight?: number;  // 负载均衡权重
  enabled?: boolean;
}

export const providerConfigs: ProviderConfig[] = [
  {
    name: 'openai',
    apiKey: process.env.OPENAI_API_KEY || '',
    baseURL: process.env.OPENAI_BASE_URL,
    timeout: 60000,
    weight: 3,
    enabled: true
  },
  {
    name: 'anthropic',
    apiKey: process.env.ANTHROPIC_API_KEY || '',
    timeout: 60000,
    weight: 2,
    enabled: true
  },
  {
    name: 'deepseek',
    apiKey: process.env.DEEPSEEK_API_KEY || '',
    baseURL: 'https://api.deepseek.com/v1',
    timeout: 60000,
    weight: 4,
    enabled: true
  }
];
// src/config/index.ts
export const config = {
  server: {
    port: parseInt(process.env.PORT || '3000'),
    host: process.env.HOST || '0.0.0.0'
  },
  redis: {
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
    password: process.env.REDIS_PASSWORD
  },
  routing: {
    strategy: process.env.ROUTING_STRATEGY || 'hybrid',  // cost|latency|hybrid
    weights: {
      cost: parseFloat(process.env.WEIGHT_COST || '0.4'),
      latency: parseFloat(process.env.WEIGHT_LATENCY || '0.3'),
      quality: parseFloat(process.env.WEIGHT_QUALITY || '0.3')
    }
  },
  cache: {
    enabled: process.env.CACHE_ENABLED !== 'false',
    ttl: parseInt(process.env.CACHE_TTL || '3600')
  },
  rateLimit: {
    enabled: process.env.RATE_LIMIT_ENABLED !== 'false',
    windowMs: parseInt(process.env.RATE_LIMIT_WINDOW_MS || '60000'),
    maxRequests: parseInt(process.env.RATE_LIMIT_MAX || '100')
  }
};

4.4 Express应用实现

// src/app.ts
import express, { Express, Request, Response, NextFunction } from 'express';
import { RouterCore } from './core/router';
import { ProviderRegistry } from './core/registry';
import { config } from './config';

export function createApp(): Express {
  const app = express();
  
  // 解析JSON请求体
  app.use(express.json());
  
  // 初始化提供商注册中心
  const registry = new ProviderRegistry();
  initializeProviders(registry);
  
  // 初始化路由核心
  const routingStrategy = createRoutingStrategy(config.routing.strategy);
  const router = new RouterCore(registry, routingStrategy);
  
  // 注册中间件
  if (config.rateLimit.enabled) {
    router.use(new RateLimitingMiddleware(/* ... */));
  }
  
  if (config.cache.enabled) {
    router.use(new CachingMiddleware(/* ... */));
  }
  
  router.use(new LoggingMiddleware());
  
  // 聊天补全端点
  app.post('/v1/chat/completions', async (req: Request, res: Response) => {
    try {
      const request: UnifiedChatRequest = {
        model: req.body.model,
        messages: req.body.messages,
        temperature: req.body.temperature,
        maxTokens: req.body.max_tokens,
        stream: req.body.stream || false,
        apiKey: req.headers['authorization']?.replace('Bearer ', '')
      };
      
      const response = await router.route(request);
      
      res.json(response);
    } catch (error: any) {
      console.error('Chat completion error:', error);
      
      if (error.message === 'Authentication required') {
        res.status(401).json({ error: 'Unauthorized' });
      } else if (error.message === 'Rate limit exceeded') {
        res.status(429).json({ error: 'Too many requests' });
      } else if (error.message === 'All providers failed') {
        res.status(503).json({ error: 'Service unavailable' });
      } else {
        res.status(500).json({ error: 'Internal server error' });
      }
    }
  });
  
  // 模型列表端点
  app.get('/v1/models', async (req: Request, res: Response) => {
    try {
      const models = await registry.listModels();
      res.json({ data: models });
    } catch (error) {
      res.status(500).json({ error: 'Failed to fetch models' });
    }
  });
  
  // 健康检查端点
  app.get('/health', async (req: Request, res: Response) => {
    const healthyProviders = await registry.getHealthyProviders();
    
    res.json({
      status: healthyProviders.length > 0 ? 'healthy' : 'degraded',
      healthyProviders,
      totalProviders: registry.list().length
    });
  });
  
  // 指标端点(Prometheus格式)
  app.get('/metrics', async (req: Request, res: Response) => {
    const metrics = await collectMetrics();
    res.set('Content-Type', 'text/plain');
    res.send(metrics);
  });
  
  return app;
}

function initializeProviders(registry: ProviderRegistry): void {
  // 从配置文件初始化所有提供商
  for (const providerConfig of providerConfigs) {
    if (!providerConfig.enabled) {
      continue;
    }
    
    let provider: AIProvider;
    
    switch (providerConfig.name) {
      case 'openai':
        provider = new OpenAIProvider(providerConfig);
        break;
      case 'anthropic':
        provider = new AnthropicProvider(providerConfig);
        break;
      case 'deepseek':
        provider = new DeepSeekProvider(providerConfig);
        break;
      default:
        console.warn(`Unknown provider: ${providerConfig.name}`);
        continue;
    }
    
    registry.register(providerConfig.name, provider, providerConfig);
  }
}

function createRoutingStrategy(strategyName: string): RoutingStrategy {
  switch (strategyName) {
    case 'cost':
      return new CostOptimizedStrategy();
    case 'latency':
      return new LatencyOptimizedStrategy();
    case 'hybrid':
      return new HybridRoutingStrategy(config.routing.weights);
    default:
      throw new Error(`Unknown routing strategy: ${strategyName}`);
  }
}

4.5 入口文件

// src/server.ts
import { createApp } from './app';
import { config } from './config';

const app = createApp();

app.listen(config.server.port, config.server.host, () => {
  console.log(`AI Router Gateway listening on ${config.server.host}:${config.server.port}`);
  console.log(`Routing strategy: ${config.routing.strategy}`);
  console.log(`Cache: ${config.cache.enabled ? 'enabled' : 'disabled'}`);
  console.log(`Rate limiting: ${config.rateLimit.enabled ? 'enabled' : 'disabled'}`);
});

高级特性:负载均衡、故障切换与成本优化

5.1 负载均衡算法

除了简单的轮询和随机选择,生产环境需要更复杂的负载均衡算法:

// 加权轮询(Weighted Round Robin)
class WeightedRoundRobin {
  private weights: Map<string, number>;
  private currentIndex: number = 0;
  private gcdWeight: number;
  
  constructor(providers: { name: string; weight: number }[]) {
    this.weights = new Map(providers.map(p => [p.name, p.weight]));
    this.gcdWeight = this.calculateGCD([...this.weights.values()]);
  }
  
  select(): string {
    const names = [...this.weights.keys()];
    const weights = [...this.weights.values()];
    
    while (true) {
      this.currentIndex = (this.currentIndex + 1) % names.length;
      
      if (this.currentIndex === 0) {
        this.currentWeight -= this.gcdWeight;
        if (this.currentWeight <= 0) {
          this.currentWeight = Math.max(...weights);
        }
      }
      
      if (weights[this.currentIndex] >= this.currentWeight) {
        return names[this.currentIndex];
      }
    }
  }
  
  private calculateGCD(numbers: number[]): number {
    let result = numbers[0];
    for (let i = 1; i < numbers.length; i++) {
      result = this.gcd(result, numbers[i]);
    }
    return result;
  }
  
  private gcd(a: number, b: number): number {
    if (b === 0) return a;
    return this.gcd(b, a % b);
  }
}

// 最少连接(Least Connections)
class LeastConnections {
  private connections: Map<string, number> = new Map();
  
  constructor(providers: string[]) {
    for (const name of providers) {
      this.connections.set(name, 0);
    }
  }
  
  select(): string {
    let minConnections = Infinity;
    let selected = '';
    
    for (const [name, count] of this.connections) {
      if (count < minConnections) {
        minConnections = count;
        selected = name;
      }
    }
    
    // 增加连接计数
    this.connections.set(selected, this.connections.get(selected)! + 1);
    
    return selected;
  }
  
  release(name: string): void {
    const count = this.connections.get(name);
    if (count && count > 0) {
      this.connections.set(name, count - 1);
    }
  }
}

// 一致性哈希(Consistent Hashing)
class ConsistentHashing {
  private ring: Map<number, string> = new Map();
  private virtualNodes: number = 150;  // 每个物理节点的虚拟节点数
  
  constructor(providers: string[]) {
    for (const provider of providers) {
      this.addNode(provider);
    }
  }
  
  select(key: string): string {
    const hash = this.hash(key);
    
    // 找到第一个哈希值大于等于key哈希的节点
    const hashes = [...this.ring.keys()].sort((a, b) => a - b);
    
    for (const nodeHash of hashes) {
      if (nodeHash >= hash) {
        return this.ring.get(nodeHash)!;
      }
    }
    
    // 如果没找到,返回第一个节点(环形)
    return this.ring.get(hashes[0])!;
  }
  
  private addNode(provider: string): void {
    for (let i = 0; i < this.virtualNodes; i++) {
      const virtualKey = `${provider}:${i}`;
      const hash = this.hash(virtualKey);
      this.ring.set(hash, provider);
    }
  }
  
  private hash(key: string): number {
    // 使用简单的哈希函数(生产环境应使用更健壮的哈希)
    let hash = 0;
    for (let i = 0; i < key.length; i++) {
      const char = key.charCodeAt(i);
      hash = ((hash << 5) - hash) + char;
      hash = hash & hash;  // 转换为32位整数
    }
    return Math.abs(hash);
  }
}

5.2 智能故障切换

class IntelligentFailoverManager {
  private providers: AIProvider[];
  private healthStatus: Map<AIProvider, HealthStatus> = new Map();
  private circuitBreakers: Map<AIProvider, CircuitBreaker> = new Map();
  
  constructor(providers: AIProvider[]) {
    this.providers = providers;
    
    // 初始化每个提供商的断路器
    for (const provider of providers) {
      this.circuitBreakers.set(provider, new CircuitBreaker({
        failureThreshold: 5,
        resetTimeout: 60000,  // 1分钟后尝试恢复
        halfOpenMaxRequests: 3
      }));
      
      this.healthStatus.set(provider, {
        status: 'healthy',
        lastCheck: Date.now(),
        consecutiveFailures: 0
      });
    }
    
    // 启动健康检查
    this.startHealthCheck();
  }
  
  async execute<T>(operation: (provider: AIProvider) => Promise<T>): Promise<T> {
    // 获取健康的提供商(断路器半开或关闭的)
    const availableProviders = this.providers.filter(provider => {
      const breaker = this.circuitBreakers.get(provider)!;
      return breaker.state !== 'open';
    });
    
    if (availableProviders.length === 0) {
      throw new Error('All providers are unavailable (circuit breaker open)');
    }
    
    // 按照优先级排序(健康度、延迟等)
    const sortedProviders = this.sortByHealthAndLatency(availableProviders);
    
    for (const provider of sortedProviders) {
      const breaker = this.circuitBreakers.get(provider)!;
      
      try {
        // 通过断路器执行
        const result = await breaker.execute(() => operation(provider));
        
        // 成功,重置健康状态
        this.recordSuccess(provider);
        
        return result;
      } catch (error) {
        // 失败,记录
        this.recordFailure(provider, error);
        
        // 继续尝试下一个提供商
        continue;
      }
    }
    
    throw new Error('All providers failed');
  }
  
  private sortByHealthAndLatency(providers: AIProvider[]): AIProvider[] {
    return providers.sort((a, b) => {
      const statusA = this.healthStatus.get(a)!;
      const statusB = this.healthStatus.get(b)!;
      
      // 首先按健康状态排序
      if (statusA.status !== statusB.status) {
        return statusA.status === 'healthy' ? -1 : 1;
      }
      
      // 然后按延迟排序
      return statusA.lastLatency - statusB.lastLatency;
    });
  }
  
  private recordSuccess(provider: AIProvider): void {
    const status = this.healthStatus.get(provider)!;
    status.status = 'healthy';
    status.consecutiveFailures = 0;
    status.lastCheck = Date.now();
  }
  
  private recordFailure(provider: AIProvider, error: any): void {
    const status = this.healthStatus.get(provider)!;
    status.consecutiveFailures++;
    status.lastError = error.message;
    status.lastCheck = Date.now();
    
    // 如果连续失败超过阈值,标记为不健康
    if (status.consecutiveFailures >= 3) {
      status.status = 'unhealthy';
    }
  }
  
  private startHealthCheck(): void {
    setInterval(async () => {
      for (const provider of this.providers) {
        try {
          const start = Date.now();
          await provider.getStatus();
          const latency = Date.now() - start;
          
          const status = this.healthStatus.get(provider)!;
          status.status = 'healthy';
          status.lastLatency = latency;
          status.lastCheck = Date.now();
        } catch (error) {
          const status = this.healthStatus.get(provider)!;
          status.status = 'unhealthy';
          status.lastCheck = Date.now();
        }
      }
    }, 30000);  // 每30秒检查一次
  }
}

// 断路器实现
class CircuitBreaker {
  private state: 'closed' | 'open' | 'half-open' = 'closed';
  private failureCount: number = 0;
  private successCount: number = 0;
  private lastFailureTime?: number;
  
  constructor(
    private options: {
      failureThreshold: number;
      resetTimeout: number;
      halfOpenMaxRequests: number;
    }
  ) {}
  
  async execute<T>(operation: () => Promise<T>): Promise<T> {
    if (this.state === 'open') {
      if (this.shouldAttemptReset()) {
        this.state = 'half-open';
      } else {
        throw new Error('Circuit breaker is open');
      }
    }
    
    try {
      const result = await operation();
      this.recordSuccess();
      return result;
    } catch (error) {
      this.recordFailure();
      throw error;
    }
  }
  
  private shouldAttemptReset(): boolean {
    return Date.now() - this.lastFailureTime! >= this.options.resetTimeout;
  }
  
  private recordSuccess(): void {
    if (this.state === 'half-open') {
      this.successCount++;
      
      if (this.successCount >= this.options.halfOpenMaxRequests) {
        this.state = 'closed';
        this.failureCount = 0;
        this.successCount = 0;
      }
    } else {
      this.failureCount = 0;
    }
  }
  
  private recordFailure(): void {
    this.failureCount++;
    this.lastFailureTime = Date.now();
    
    if (this.failureCount >= this.options.failureThreshold) {
      this.state = 'open';
    }
  }
}

5.3 成本优化策略

class CostOptimizer {
  private pricingCache: Map<string, PricingInfo> = new Map();
  private usageStats: Map<string, UsageStats> = new Map();
  
  constructor(private registry: ProviderRegistry) {
    // 定期更新定价信息
    this.updatePricingCache();
    setInterval(() => this.updatePricingCache(), 3600000);  // 每小时
  }
  
  async optimizeRouting(request: ChatRequest): Promise<OptimizationResult> {
    const providers = await this.registry.getHealthyProviders();
    
    const options = await Promise.all(
      providers.map(async (providerName) => {
        const provider = this.registry.get(providerName)!;
        const pricing = await this.getPricing(providerName);
        
        const estimatedCost = this.estimateCost(request, pricing);
        const estimatedQuality = this.estimateQuality(providerName, request.model);
        const estimatedLatency = await this.estimateLatency(providerName);
        
        return {
          provider: providerName,
          cost: estimatedCost,
          quality: estimatedQuality,
          latency: estimatedLatency,
          valueScore: this.calculateValueScore(estimatedCost, estimatedQuality, estimatedLatency)
        };
      })
    );
    
    // 按价值分数排序
    options.sort((a, b) => b.valueScore - a.valueScore);
    
    return {
      recommended: options[0],
      alternatives: options.slice(1)
    };
  }
  
  async trackUsage(provider: string, request: ChatRequest, response: ChatResponse): Promise<void> {
    const stats = this.usageStats.get(provider) || {
      totalRequests: 0,
      totalTokens: 0,
      totalCost: 0,
      averageLatency: 0
    };
    
    const cost = this.calculateActualCost(provider, response.usage);
    
    stats.totalRequests++;
    stats.totalTokens += response.usage.totalTokens;
    stats.totalCost += cost;
    stats.averageLatency = (stats.averageLatency * (stats.totalRequests - 1) + response.latency) / stats.totalRequests;
    
    this.usageStats.set(provider, stats);
    
    // 在实际实现中,这可能会写入数据库
    console.log(`Usage: provider=${provider}, tokens=${response.usage.totalTokens}, cost=$${cost.toFixed(4)}`);
  }
  
  private async updatePricingCache(): Promise<void> {
    const providers = this.registry.list();
    
    for (const providerName of providers) {
      const provider = this.registry.get(providerName)!;
      try {
        const pricing = await provider.getPricing();
        this.pricingCache.set(providerName, pricing);
      } catch (error) {
        console.error(`Failed to update pricing for ${providerName}:`, error);
      }
    }
  }
  
  private async getPricing(providerName: string): Promise<PricingInfo> {
    let pricing = this.pricingCache.get(providerName);
    
    if (!pricing) {
      const provider = this.registry.get(providerName)!;
      pricing = await provider.getPricing();
      this.pricingCache.set(providerName, pricing);
    }
    
    return pricing;
  }
  
  private estimateCost(request: ChatRequest, pricing: PricingInfo): number {
    // 估算输入和输出token数
    const estimatedInputTokens = this.estimateInputTokens(request.messages);
    const estimatedOutputTokens = request.maxTokens || 150;
    
    const modelPricing = pricing[request.model];
    if (!modelPricing) {
      return Infinity;
    }
    
    return (estimatedInputTokens / 1_000_000) * modelPricing.input +
           (estimatedOutputTokens / 1_000_000) * modelPricing.output;
  }
  
  private estimateInputTokens(messages: Message[]): number {
    // 简单估算:每个字符约0.3个token(对于英文)
    // 生产环境应使用真正的tokenizer
    const totalChars = messages.reduce((sum, msg) => sum + msg.content.length, 0);
    return Math.ceil(totalChars * 0.3);
  }
  
  private estimateQuality(providerName: string, model: string): number {
    // 基于基准测试的质量分数
    const qualityMatrix: Record<string, Record<string, number>> = {
      'openai': {
        'gpt-5.5': 0.98,
        'gpt-4o': 0.95,
        'gpt-4o-mini': 0.85
      },
      'anthropic': {
        'claude-4-opus': 0.97,
        'claude-4-sonnet': 0.93
      },
      'deepseek': {
        'deepseek-v4': 0.92,
        'deepseek-coder': 0.88
      }
    };
    
    return qualityMatrix[providerName]?.[model] || 0.5;
  }
  
  private async estimateLatency(providerName: string): Promise<number> {
    const stats = this.usageStats.get(providerName);
    return stats?.averageLatency || 2000;  // 默认2秒
  }
  
  private calculateValueScore(cost: number, quality: number, latency: number): number {
    // 价值分数 = 质量 / (成本 * 延迟因子)
    const latencyFactor = 1 + (latency / 10000);  // 延迟归一化
    return quality / (cost * latencyFactor + 0.001);  // 避免除以0
  }
  
  private calculateActualCost(provider: string, usage: TokenUsage): number {
    const pricing = this.pricingCache.get(provider);
    if (!pricing) {
      return 0;
    }
    
    // 找到使用的模型定价
    // 在实际实现中,需要从请求中获取模型信息
    const model = 'gpt-4o';  // 示例
    const modelPricing = pricing[model];
    
    if (!modelPricing) {
      return 0;
    }
    
    return (usage.promptTokens / 1_000_000) * modelPricing.input +
           (usage.completionTokens / 1_000_000) * modelPricing.output;
  }
}

性能优化:缓存、并发与流式响应

6.1 多层缓存策略

// L1缓存:内存缓存(最快)
class MemoryCache implements Cache {
  private cache: Map<string, CacheEntry> = new Map();
  private maxSize: number = 1000;
  
  async get(key: string): Promise<any> {
    const entry = this.cache.get(key);
    
    if (!entry) {
      return null;
    }
    
    // 检查是否过期
    if (Date.now() > entry.expiresAt) {
      this.cache.delete(key);
      return null;
    }
    
    // LRU:移动到末尾
    this.cache.delete(key);
    this.cache.set(key, entry);
    
    return entry.value;
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    // 如果缓存已满,删除最旧的条目
    if (this.cache.size >= this.maxSize) {
      const oldestKey = this.cache.keys().next().value;
      this.cache.delete(oldestKey);
    }
    
    this.cache.set(key, {
      value,
      expiresAt: Date.now() + ttl * 1000
    });
  }
  
  async delete(key: string): Promise<void> {
    this.cache.delete(key);
  }
}

// L2缓存:Redis缓存(共享)
class RedisCache implements Cache {
  private client: Redis;
  
  constructor(redisUrl: string) {
    this.client = new Redis(redisUrl);
  }
  
  async get(key: string): Promise<any> {
    const value = await this.client.get(key);
    return value ? JSON.parse(value) : null;
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    await this.client.setex(key, ttl, JSON.stringify(value));
  }
  
  async delete(key: string): Promise<void> {
    await this.client.del(key);
  }
}

// 多层缓存
class MultiLevelCache implements Cache {
  private l1Cache: Cache;  // Memory
  private l2Cache: Cache;  // Redis
  
  constructor(options?: { l1Size?: number; redisUrl?: string }) {
    this.l1Cache = new MemoryCache(options?.l1Size || 1000);
    this.l2Cache = new RedisCache(options?.redisUrl || 'redis://localhost:6379');
  }
  
  async get(key: string): Promise<any> {
    // 先查L1
    let value = await this.l1Cache.get(key);
    
    if (value !== null) {
      return value;
    }
    
    // L1未命中,查L2
    value = await this.l2Cache.get(key);
    
    if (value !== null) {
      // 回填L1
      await this.l1Cache.set(key, value, 60);  // L1缓存1分钟
    }
    
    return value;
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    // 写入L1和L2
    await Promise.all([
      this.l1Cache.set(key, value, Math.min(ttl, 60)),  // L1最多缓存1分钟
      this.l2Cache.set(key, value, ttl)
    ]);
  }
  
  async delete(key: string): Promise<void> {
    await Promise.all([
      this.l1Cache.delete(key),
      this.l2Cache.delete(key)
    ]);
  }
}

6.2 并发请求优化

class ConcurrentRequestManager {
  private queue: Map<string, Promise<any>> = new Map();
  
  async deduplicate<T>(
    key: string,
    fn: () => Promise<T>
  ): Promise<T> {
    // 如果已有相同的请求在进行,返回同一个Promise
    const existing = this.queue.get(key);
    if (existing) {
      return existing as Promise<T>;
    }
    
    // 否则,执行请求并缓存Promise
    const promise = fn().finally(() => {
      // 请求完成后,从队列中删除
      this.queue.delete(key);
    });
    
    this.queue.set(key, promise);
    
    return promise;
  }
  
  async batch<T, R>(
    items: T[],
    fn: (item: T) => Promise<R>,
    options?: { maxConcurrency?: number }
  ): Promise<R[]> {
    const maxConcurrency = options?.maxConcurrency || 5;
    const results: R[] = [];
    
    // 分批并发执行
    for (let i = 0; i < items.length; i += maxConcurrency) {
      const batch = items.slice(i, i + maxConcurrency);
      const batchResults = await Promise.all(
        batch.map(item => fn(item))
      );
      results.push(...batchResults);
    }
    
    return results;
  }
}

6.3 流式响应处理

// 支持Server-Sent Events (SSE)
async function handleStreamingResponse(
  request: UnifiedChatRequest,
  res: Response
): Promise<void> {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  const provider = await selectProvider(request);
  
  // 根据提供商类型处理流式响应
  if (provider.constructor.name === 'OpenAIProvider') {
    await handleOpenAIStream(request, res, provider);
  } else if (provider.constructor.name === 'AnthropicProvider') {
    await handleAnthropicStream(request, res, provider);
  } else {
    throw new Error(`Streaming not supported for provider: ${provider.constructor.name}`);
  }
}

async function handleOpenAIStream(
  request: UnifiedChatRequest,
  res: Response,
  provider: OpenAIProvider
): Promise<void> {
  const stream = await provider.chatCompletionStream({
    model: request.model,
    messages: request.messages,
    temperature: request.temperature,
    maxTokens: request.maxTokens,
    stream: true
  });
  
  // 转发SSE事件
  for await (const chunk of stream) {
    const data = JSON.stringify(chunk);
    res.write(`data: ${data}\n\n`);
  }
  
  res.write('data: [DONE]\n\n');
  res.end();
}

生产实践:监控、告警与安全

7.1 监控指标

// Prometheus指标
import { register, Counter, Histogram, Gauge } from 'prom-client';

const requestCounter = new Counter({
  name: 'ai_gateway_requests_total',
  help: 'Total number of requests',
  labelNames: ['provider', 'model', 'status']
});

const requestDuration = new Histogram({
  name: 'ai_gateway_request_duration_seconds',
  help: 'Request duration in seconds',
  labelNames: ['provider', 'model'],
  buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60]
});

const tokenUsage = new Counter({
  name: 'ai_gateway_tokens_total',
  help: 'Total number of tokens used',
  labelNames: ['provider', 'model', 'type']  // type: prompt|completion
});

const costGauge = new Gauge({
  name: 'ai_gateway_cost_dollars',
  help: 'Estimated cost in dollars',
  labelNames: ['provider', 'model']
});

// 在请求处理中记录指标
async function recordMetrics(
  provider: string,
  model: string,
  duration: number,
  usage: TokenUsage,
  cost: number,
  status: 'success' | 'error'
): Promise<void> {
  requestCounter.inc({ provider, model, status });
  requestDuration.observe({ provider, model }, duration);
  tokenUsage.inc({ provider, model, type: 'prompt' }, usage.promptTokens);
  tokenUsage.inc({ provider, model, type: 'completion' }, usage.completionTokens);
  costGauge.set({ provider, model }, cost);
}

7.2 告警规则

# alerts.yml
groups:
  - name: ai_gateway
    rules:
      - alert: HighErrorRate
        expr: rate(ai_gateway_requests_total{status="error"}[5m]) / rate(ai_gateway_requests_total[5m]) > 0.1
        for: 5m
        annotations:
          summary: "High error rate detected"
          
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(ai_gateway_request_duration_seconds_bucket[5m])) > 5
        for: 5m
        annotations:
          summary: "95th percentile latency > 5 seconds"
          
      - alert: ProviderDown
        expr: up{job="ai_gateway"} == 0
        for: 1m
        annotations:
          summary: "Provider {{ $labels.provider }} is down"

7.3 安全最佳实践

// API密钥加密存储
import crypto from 'crypto';

class APIKeyManager {
  private encryptionKey: Buffer;
  
  constructor(masterKey: string) {
    // 从主密钥派生加密密钥
    this.encryptionKey = crypto.scryptSync(masterKey, 'salt', 32);
  }
  
  encrypt(apiKey: string): string {
    const iv = crypto.randomBytes(16);
    const cipher = crypto.createCipheriv('aes-256-gcm', this.encryptionKey, iv);
    
    let encrypted = cipher.update(apiKey, 'utf8', 'hex');
    encrypted += cipher.final('hex');
    
    const authTag = cipher.getAuthTag();
    
    return iv.toString('hex') + ':' + authTag.toString('hex') + ':' + encrypted;
  }
  
  decrypt(encryptedKey: string): string {
    const [ivHex, authTagHex, encrypted] = encryptedKey.split(':');
    
    const iv = Buffer.from(ivHex, 'hex');
    const authTag = Buffer.from(authTagHex, 'hex');
    
    const decipher = crypto.createDecipheriv('aes-256-gcm', this.encryptionKey, iv);
    decipher.setAuthTag(authTag);
    
    let decrypted = decipher.update(encrypted, 'hex', 'utf8');
    decrypted += decipher.final('utf8');
    
    return decrypted;
  }
}

// 速率限制
class TokenBucket {
  private tokens: number;
  private lastRefill: number;
  
  constructor(
    private capacity: number,
    private refillRate: number  // tokens per second
  ) {
    this.tokens = capacity;
    this.lastRefill = Date.now();
  }
  
  consume(count: number = 1): boolean {
    this.refill();
    
    if (this.tokens >= count) {
      this.tokens -= count;
      return true;
    }
    
    return false;
  }
  
  private refill(): void {
    const now = Date.now();
    const timePassed = (now - this.lastRefill) / 1000;
    const tokensToAdd = timePassed * this.refillRate;
    
    this.tokens = Math.min(this.capacity, this.tokens + tokensToAdd);
    this.lastRefill = now;
  }
}

总结与展望

8.1 核心要点回顾

在本文中,我们深入探讨了如何构建生产级AI智能路由网关,核心要点包括:

  1. 统一抽象层:通过适配器模式统一不同AI提供商的接口
  2. 智能路由:基于成本、延迟、质量的混合路由策略
  3. 高可用:故障自动切换、断路器、健康检查
  4. 性能优化:多层缓存、请求去重、并发控制
  5. 可观测性:指标收集、日志、告警
  6. 安全:API密钥加密、速率限制、认证

8.2 生产环境检查清单

在将AI路由网关部署到生产环境之前,确保完成以下检查:

  • 高可用:部署多个实例,使用负载均衡器
  • 监控:配置Prometheus + Grafana仪表板
  • 告警:设置关键指标告警(错误率、延迟、成本)
  • 安全:启用认证、加密API密钥、配置速率限制
  • 灾备:制定提供商全部不可用的应急预案
  • 成本控制:设置每日/每月成本上限
  • 测试:进行故障注入测试,验证故障切换逻辑

8.3 未来展望

AI路由网关的未来发展方向:

  1. 更智能的路由:基于ML模型预测请求的最佳提供商
  2. 边缘部署:将网关部署到边缘节点,降低延迟
  3. 多模态支持:支持图像、音频、视频等多模态AI请求
  4. 自助服务:提供Web UI让用户可以配置路由规则、查看用量
  5. 成本分析:提供详细的成本分析报告和优化建议

参考资源


本文基于2026年7月最新的AI开发生态撰写,代码示例仅供参考,生产环境请根据实际需求调整。

推荐文章

JavaScript 实现访问本地文件夹
2024-11-18 23:12:47 +0800 CST
Linux 网站访问日志分析脚本
2024-11-18 19:58:45 +0800 CST
批量导入scv数据库
2024-11-17 05:07:51 +0800 CST
一键配置本地yum源
2024-11-18 14:45:15 +0800 CST
WebSQL数据库:HTML5的非标准伴侣
2024-11-18 22:44:20 +0800 CST
Web浏览器的定时器问题思考
2024-11-18 22:19:55 +0800 CST
JavaScript 的模板字符串
2024-11-18 22:44:09 +0800 CST
程序员茄子在线接单