编程 Cloudflare Workers + Durable Objects + Saga 模式:边缘计算有状态化的工程革命

2026-06-29 18:17:54 +0800 CST views 6

Cloudflare Workers + Durable Objects + Saga 模式:边缘计算有状态化的工程革命

背景:当"无状态"成为历史

2015年,AWS Lambda 开创了 FaaS(函数即服务)的时代,"无服务器"成了云厂商的主打概念。那时候,我们被告知:函数就是原子单元,每个请求独立处理,不保留状态,数据交给数据库。这种范式在简单场景下运转良好——API Gateway 触发 Lambda,Lambda 读写 DynamoDB,无状态,简单清晰。

但现实从不简单。五年后,开发者们发现"无状态"带来了沉重的代价:为了维持状态,每次请求都要重新查询数据库;为了保证一致性,不得不引入复杂的外部事务协调服务;为了处理长连接,不得不借助 Redis 或 SQS 等外部消息队列。 简单场景被复杂化了,而复杂场景根本没有好的解法。

Cloudflare 在这个背景下,走出了一条独特的路。不同于传统云厂商把"无状态"作为铁律,Cloudflare 从一开始就在探索:能不能在边缘节点上原生支持有状态逻辑,而不需要额外部署外部服务?

答案是 Durable Objects——一种将在单个地理位置运行的、持久的 JavaScript 对象,与 Cloudflare Workers 的全球分布式执行结合在一起的创新架构。2026年6月25日,Cloudflare 进一步发布了 Workflows 的 Saga Rollbacks 功能,标志着边缘计算有状态化进入了一个全新的成熟阶段。

本文将深度解析这套技术体系的完整架构:从 Workers 的边缘执行模型,到 Durable Objects 的状态一致性设计,再到 Saga 模式在边缘的落地,最后通过完整的代码实战,展示如何用这套体系构建真正可在生产环境使用的分布式应用。


一、Cloudflare Workers 架构:从 CDN Worker 到全球计算平台

1.1 边缘执行的本质

Cloudflare Workers 不是简单地把 Lambda 复制到全球 300+ 个节点。它在架构上做了根本性的重新设计。

传统 Serverless(如 AWS Lambda)的执行模型是:

请求 → Regional API Gateway → Lambda Container(冷启动/热启动) → 响应

Lambda 函数可能在任意可用区执行,不同请求甚至可能被分配到不同的容器实例。这种不确定性在边缘计算场景下是致命的——你需要有状态,但你不知道状态在哪。

Cloudflare Workers 的执行模型完全不同:

请求 → 任一 PoP(Point of Presence) → V8 Isolates 池 → 响应

关键区别在于 Isolate 而非 Container。V8 Isolate 是一种轻量级沙箱,同一个 V8 实例中可以同时运行数千个相互隔离的 JavaScript 上下文。启动一个 Isolate 的时间是亚毫秒级,这彻底消除了冷启动问题。

更重要的是,Cloudflare Workers 的请求路由不是"找最近的可用函数实例",而是在接收请求的 PoP 上直接执行。这意味着你天然获得了地理就近性——东京用户发出的请求,在东京节点处理,延迟极低。

1.2 Workers 运行时深度解析

Workers 的 JavaScript 运行时基于 V8,但做了大量定制。以下是一个最简 Workers 入口:

export default {
  async fetch(request, env, ctx) {
    const url = new URL(request.url);
    
    if (url.pathname === "/api/chat") {
      return handleChat(request, env);
    }
    
    return new Response("Not Found", { status: 404 });
  }
};

// 导出 fetch 处理器,env 注入环境变量,ctx 提供 WaitUntil 等生命周期控制

env 参数是 Workers 的环境绑定接口,它在部署时通过 wrangler.toml 配置贞定,运行时以强类型方式注入:

// wrangler.toml
name = "my-worker"
main = "src/index.ts"
compatibility_date = "2026-06-01"

[vars]
API_VERSION = "v2"

[[kv_namespaces]]
binding = "CACHE"
id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

[[durable_objects.bindings]]
name = "DOCUMENT_STORE"
class_name = "DocumentStore"

TypeScript 类型安全 + 环境绑定贞定,这是 Cloudflare 在开发者体验上做得极为出色的地方。

1.3 Workers KV:全球强一致键值存储

在进入 Durable Objects 之前,需要先理解 Workers KV——Cloudflare 的全局分布式键值存储。

Workers KV 的核心特性:

  • 全球复制:数据在 Cloudflare 所有 PoP 上缓存,读取延迟通常 <10ms(因为就近读取)
  • 最终一致写入:写入先到主节点,再异步复制到全球,复制延迟约 60 秒
  • 适合场景:配置数据、缓存、用户会话 token、静态资源索引
// 读取配置(读取密集,低延迟)
const config = await env.CACHE.get("app:config:v2", "json");

// 写入(注意最终一致性,写入后约60秒全球生效)
await env.CACHE.put("user:session:12345", JSON.stringify({
  lastSeen: Date.now(),
  preferences: { theme: "dark" }
}), { expirationTtl: 86400 }); // 24小时过期

Workers KV 不是数据库,它的写入一致性模型(写入后60秒才全球可见)决定了它只适合真正最终一致的场景。对于需要强一致读写的场景,这就是 Durable Objects 登场的时候了。


二、Durable Objects:边缘状态的艺术

2.1 核心概念:单例对象的全局调度

Durable Objects 是 Cloudflare Workers 平台中最独特也最强大的功能之一。它的核心思想是:每个 Durable Object 实例都在全球特定物理位置运行,并且所有对该实例的请求都会被路由到同一个位置。

这与传统的"无状态分布式系统"完全相反。在传统架构中,我们通过数据库来协调状态,每个请求独立处理。而在 Durable Objects 中,状态就在执行代码的旁边,没有网络往返,没有数据库延迟。

// 定义一个 Durable Object 类
export class Counter extends DurableObject {
  // 每个 Durable Object 实例有自己的私有状态
  // 这个 count 变量在实例生命周期内持久存在
  private count = 0;

  async fetch(request) {
    const url = new URL(request.url);
    
    if (url.pathname === "/increment") {
      this.count++;
      await this.ctx.storage.put("count", this.count);
      return new Response(JSON.stringify({ count: this.count }));
    }
    
    if (url.pathname === "/get") {
      const stored = await this.ctx.storage.get("count");
      return new Response(JSON.stringify({ count: stored ?? 0 }));
    }
    
    return new Response("Not Found", { status: 404 });
  }
}

关键点:

  • this.ctx.storage 提供了 putgetdeletelist 等持久化 API
  • 状态变更通过 put 自动持久化,支持事务批量操作
  • 所有对同一个 Durable Object 实例的请求,都会被 Cloudflare 路由到同一个物理位置

2.2 实例路由:如何找到正确的节点

当客户端向一个 Durable Object 发起请求时,Cloudflare 需要做两件事:

  1. 识别实例:每个 Durable Object 实例有一个唯一的名称(可以是基于用户 ID、房间 ID 等业务键生成的字符串)
  2. 路由请求:将请求发送到该实例当前运行的物理位置
// Workers 入口
export default {
  async fetch(request, env) {
    const url = new URL(request.url);
    const userId = url.searchParams.get("user_id");
    
    // getActorId() 根据 userId 生成确定性 ID
    // Cloudflare 根据这个 ID 将请求路由到全球唯一的实例
    const id = env.COUNTER.idFromName(userId);
    
    // stub 是该 Durable Object 实例的代理对象
    const stub = env.COUNTER.get(id);
    
    // 所有对这个 userId 的请求,都路由到同一个 Durable Object 实例
    const response = await stub.fetch(request);
    return response;
  }
}

这里的设计非常精妙:idFromName(userId) 基于业务键生成确定性 ID,而不是随机 UUID。这意味着:

  • 同一个 userId 永远路由到同一个实例
  • 不需要额外的服务发现机制
  • 实例位置由 Cloudflare 自动管理(实例可能在全球迁移,但命名保持一致)

2.3 状态一致性:ACID 在边缘

Durable Objects 的存储层提供了强一致性的事务支持。这是它与 Workers KV 最大的区别:

export class AccountStore extends DurableObject {
  async transferFunds(fromId, toId, amount) {
    // storage.transaction() 提供 ACID 事务
    // 所有操作要么全部成功,要么全部回滚
    const result = await this.ctx.storage.transaction(async (txn) => {
      const fromBalance = await txn.get(`balance:${fromId}`) ?? 0;
      const toBalance = await txn.get(`balance:${toId}`) ?? 0;
      
      if (fromBalance < amount) {
        throw new Error("Insufficient funds");
      }
      
      await txn.put(`balance:${fromId}`, fromBalance - amount);
      await txn.put(`balance:${toId}`, toBalance + amount);
      
      return { fromBalance: fromBalance - amount, toBalance: toBalance + amount };
    });
    
    return result;
  }
}

Cloudflare 内部使用 RocksDB 作为 Durable Objects 的存储引擎(是的,与 MongoDB、Cassandra 相同的底层存储)。每个 Durable Object 实例拥有自己的 RocksDB 实例,数据持久化到 Cloudflare 的边缘存储系统。

2.4 告警监控与生命周期

Durable Objects 有两个重要的生命周期钩子:

export class SessionManager extends DurableObject {
  // 实例初始化时调用(每个实例只调用一次)
  async initialize() {
    this.sessions = new Map();
    console.log(`SessionManager initialized at ${Date.now()}`);
  }

  // 实例即将被销毁前调用(如内存压力、负载均衡等)
  async close(reason) {
    // 将内存状态刷回存储
    for (const [id, session] of this.sessions) {
      await this.ctx.storage.put(`session:${id}`, session);
    }
    console.log(`SessionManager closing: ${reason}`);
  }
}

Cloudflare 还提供了 Alarm API,允许你在 Durable Objects 中设置定时任务:

export class CacheCleaner extends DurableObject {
  async fetch(request) {
    if (new URL(request.url).pathname === "/setup-cleanup") {
      // 设置每6小时触发一次清理告警
      await this.ctx.storage.setAlarm(Date.now() + 6 * 60 * 60 * 1000);
      return new Response("Cleanup alarm scheduled");
    }
    return new Response("Not Found", { status: 404 });
  }

  // 告警触发时调用
  async alarm() {
    await this.cleanExpiredEntries();
    // 设置下一次告警
    await this.ctx.storage.setAlarm(Date.now() + 6 * 60 * 60 * 1000);
  }
}

2.5 Durable Objects vs. Redis:为什么不用 Redis?

这是最常见的问题。在 Durable Objects 出现之前,很多开发者通过 Workers + Redis(Upstash 等)来解决边缘状态问题。这种方案有几个根本性限制:

维度Durable ObjectsRedis
延迟本地存储,< 1ms网络往返,5-50ms(取决于距离)
一致性强一致,读写本地 RocksDB最终一致,需要复制协议
实例粒度每实例独立状态,天然分区全局哈希表,需要 key 设计
连接管理Cloudflare 内网,无连接数限制需要连接池,有连接数限制
成本模型按 CPU/内存时间计费按请求次数 + 数据传输计费

对于低延迟、强一致、有状态关联的场景,Durable Objects 是更好的选择。


三、Cloudflare Workflows 与 Saga 模式

3.1 什么是 Workflows

Cloudflare Workflows 是一个持久化执行引擎,于2024年11月发布,2026年6月正式支持 Saga Rollbacks。它的核心能力是:让你在边缘构建跨越任意时间跨度、任意步骤数量、任意复杂状态的多阶段应用,并且在整个过程中提供强一致的状态保证、自动重试和故障恢复。

一个简单的 Workflow 示例:

import { WorkflowEntrypoint, WorkflowStep } from "cloudflare-workers";
import type { Environment } from "./index";

export class OrderProcessingWorkflow extends WorkflowEntrypoint<Environment, {}> {
  async run(event: Event, step: WorkflowStep) {
    // Step 1: 验证订单
    const order = await step.do("validate-order", async () => {
      const result = await fetch(`https://api.example.com/orders/${event.orderId}`);
      return await result.json();
    });

    // Step 2: 扣款(可自动重试)
    const payment = await step.do("process-payment", async () => {
      const result = await fetch("https://payment.api/charge", {
        method: "POST",
        body: JSON.stringify({ amount: order.amount, card: order.card }),
      });
      return await result.json();
    });

    // Step 3: 发货
    await step.do("ship-order", async () => {
      await fetch(`https://shipping.api/orders/${event.orderId}/ship`, {
        method: "POST",
      });
    });

    return { status: "completed", orderId: event.orderId };
  }
}

每个 step.do() 都有以下特性:

  • 幂等执行:即使 Workers 重启,步骤也会在正确的状态继续执行
  • 自动重试:失败的步骤会自动重试,直到成功或达到重试上限
  • 状态持久化:每个步骤的输出都自动存储在 Workflow 状态中
  • 可观察性:每个步骤的执行历史都可以查询

3.2 Saga 模式的必要性:没有银弹

在引入 Saga Rollbacks 之前,Workflows 提供了重试机制,但重试只对单个步骤有意义。考虑一个转账场景:

Step 1: 从 A 账户扣款(成功)
Step 2: 向 B 账户存款(失败,重试,最终成功)
Step 3: 发送通知邮件(失败)

如果 Step 3 永久失败怎么办?此时 A 的钱已经被扣了,但 B 已经收到存款了。如果重试 Step 3,它只会一直失败。 这个事务已经处于不一致状态。

传统解决方案是在 Step 3 的 catch 块中手动回滚:

// 旧方式:手动回滚逻辑
try {
  const debit = await step.do("debit-bank-a", () => bankA.debit(from, amount));
  const credit = await step.do("credit-bank-b", () => bankB.credit(to, amount));
  await step.do("notify", () => sendNotification(from, to, amount));
} catch (error) {
  // 手动判断哪些步骤成功了,然后回滚
  if (credit) {
    await step.do("reverse-credit-b", () => bankB.debit(to, amount));
  }
  if (debit) {
    await step.do("refund-debit-a", () => bankA.credit(from, amount));
  }
  throw error;
}

这种方案的缺陷:

  1. 回滚逻辑与业务逻辑混在一起:代码膨胀,可维护性差
  2. 回滚顺序需要手动控制:容易出错
  3. 跨步骤状态追踪复杂:需要额外的变量来记录"哪些步骤成功了"
  4. 无法处理部分成功的步骤:一个步骤可能执行了一半才失败

3.3 Saga Rollbacks:声明式的补偿逻辑

2026年6月25日,Cloudflare Workflows 正式支持 Saga Rollbacks。核心改进是:每个步骤可以声明自己的补偿逻辑(rollback),由 Workflow 引擎在故障时自动按逆序执行。

export class TransferWorkflow extends WorkflowEntryppoint<Environment, TransferEvent> {
  async run(event: TransferEvent, step: WorkflowStep) {
    // Step 1: 从 A 账户扣款
    const debit = await step.do(
      "debit-account-a",
      async () => {
        return await bankA.debit({
          accountId: event.fromAccountId,
          amount: event.amount,
          // 幂等键:转账ID + 步骤名,保证扣款只执行一次
          idempotencyKey: `${event.transferId}:debit-account-a`,
        });
      },
      {
        // 补偿逻辑:存款回滚
        rollback: async ({ output }) => {
          await bankA.credit({
            accountId: event.fromAccountId,
            amount: event.amount,
            idempotencyKey: `${event.transferId}:rollback-debit-account-a`,
          });
        },
      }
    );

    // Step 2: 向 B 账户存款
    const credit = await step.do(
      "credit-account-b",
      async () => {
        return await bankB.credit({
          accountId: event.toAccountId,
          amount: event.amount,
          idempotencyKey: `${event.transferId}:credit-account-b`,
        });
      },
      {
        rollback: async ({ output }) => {
          // 注意:output 可能为 undefined(步骤可能在执行中失败)
          // 补偿逻辑必须处理这种情况
          if (output === undefined) {
            return; // 尚未执行,无需回滚
          }
          await bankB.debit({
            accountId: event.toAccountId,
            amount: event.amount,
            idempotencyKey: `${event.transferId}:rollback-credit-account-b`,
          });
        },
      }
    );

    // Step 3: 发送通知(无需补偿,因为通知是幂等的)
    await step.do("send-confirmation", async () => {
      await sendTransferConfirmation(event);
    });

    return { success: true, transferId: event.transferId };
  }
}

3.4 Saga 执行语义:深度解析

Saga Rollbacks 的执行模型有几个关键细节,理解它们是正确使用该功能的前提:

3.4.1 故障时触发,逆序执行

当任意步骤抛出未捕获的异常时,Workflow 引擎自动触发回滚:

Step 1 (debit) ✓ → Step 2 (credit) ✓ → Step 3 (notify) ✗ (永久失败)
                                    ↓
                          回滚执行(逆序):
                          Step 3 无 rollback,跳过
                          Step 2 rollback: debit back to B
                          Step 1 rollback: credit back to A

3.4.2 输出可能为 undefined

一个步骤可能在部分成功后失败。例如:

Step 2 (credit) 执行:
1. 发送 HTTP 请求到银行API ✓
2. 银行处理成功,返回 transactionId ✓
3. 写入 Workflow 状态(网络超时) ✗

在这种情况下,步骤被视为"失败",但银行系统中的操作已经完成。因此 rollback 函数必须处理 output === undefined 的情况——如果 output 未定义,说明步骤实际未成功执行,补偿逻辑可能不需要执行(或需要不同的处理方式)。

3.4.3 补偿逻辑本身也是 Durable Step

每个 rollback 中的 step.do() 也是一个独立的持久化步骤:

  • 可以重试:如果补偿操作失败,Workflow 会自动重试
  • 幂等性必须保证:使用幂等键确保重试不会造成副作用
  • 回滚中的回滚:如果补偿步骤也失败了,Workflow 不会无限回滚,而是会停止并发出告警

3.5 补偿逻辑的设计原则

Saga 模式的核心约束:补偿不是回滚,而是正向的反向操作

原操作补偿操作设计原则
扣款存款补偿金额相同
创建资源删除资源必须幂等(idempotency key)
发送通知无法补偿通知设计为幂等(去重)
预留库存释放库存必须对应到具体订单
发起转账撤销转账使用银行事务ID追踪

四、Workers AI:边缘推理的完整生态

4.1 什么是 Workers AI

Cloudflare Workers AI 是 Cloudflare 在2023年推出的大语言模型推理服务,2026年已支持数十种模型,覆盖文本生成、图像识别、语音合成等场景。它的核心优势是:无需管理 GPU,无需配置模型服务,直接在 Cloudflare 全球边缘网络运行 AI 推理

export default {
  async fetch(request, env) {
    const { prompt } = await request.json();

    // 调用 Workers AI 的 LLM 模型(内置,无需 API Key)
    const response = await env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
      messages: [
        { role: "system", content: "你是一个资深后端工程师" },
        { role: "user", content: prompt }
      ],
      max_tokens: 1024,
    });

    return new Response(JSON.stringify({ response: response.response }));
  }
}

4.2 模型选择与性能

Cloudflare Workers AI 提供了多种规格的模型,满足不同场景:

// 小型快速响应场景
const fastModel = "@cf/tinyllama/tinyllama-1.1b";  // ~50ms 延迟

// 中等复杂度场景  
const mediumModel = "@cf/meta/llama-3.1-8b-instruct";  // ~500ms 延迟

// 高质量长回答场景
const qualityModel = "@cf/meta/llama-3.3-70b-instruct-fastsmooth";  // ~2000ms 延迟

4.3 Workers AI + Durable Objects:AI 对话状态管理

纯 Workers AI 的一个问题:每个请求是独立的,无法维护对话历史。结合 Durable Objects,可以轻松实现有状态的 AI 对话:

export class ChatSession extends DurableObject {
  private history: Message[] = [];

  async addMessage(role: "user" | "assistant", content: string, env: Environment) {
    this.history.push({ role, content, timestamp: Date.now() });

    // 调用 Workers AI(注意:Durable Objects 中调用 AI)
    const response = await env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
      messages: this.history.map(m => ({ role: m.role, content: m.content })),
      max_tokens: 512,
    });

    this.history.push({ role: "assistant", content: response.response, timestamp: Date.now() });
    await this.ctx.storage.put("history", this.history);

    return { userMessage: content, assistantMessage: response.response };
  }

  async getHistory() {
    const history = await this.ctx.storage.get("history");
    return history ?? [];
  }
}

五、Vectorize:边缘向量数据库

5.1 向量数据库的应用场景

在 RAG(检索增强生成)场景中,需要:

  1. 将文档切分成文本块(chunk)
  2. 用 Embedding 模型将文本块转换为向量
  3. 将向量存储在向量数据库
  4. 查询时,将用户问题也转换为向量,做相似度搜索
  5. 将最相关的文本块注入 LLM 提示词

Workers KV 不支持向量操作,Redis 虽支持但有延迟问题。Cloudflare 提供了 Vectorize——原生集成在 Workers 平台中的向量数据库。

5.2 Vectorize 实战

// 初始化 Vectorize 索引
// wrangler.toml 中配置:
// [[vectorize]]
// binding = "VECTOR_STORE"
// index_name = "docs-index"

// 1. 文档分块与向量化
async function indexDocument(content: string, docId: string, env: Environment) {
  const chunks = splitIntoChunks(content, 512); // 512 token 块
  
  for (const chunk of chunks) {
    // Workers AI 内置 Embedding 模型
    const embedding = await env.AI.run("@cf/baai/bge-base-en-v1.5", {
      text: chunk,
    });
    
    // 插入 Vectorize 索引
    await env.VECTOR_STORE.insert({
      ids: [docId + ":" + chunk.id],
      vectors: [embedding.array],
      metadata: { chunkId: chunk.id, docId, content: chunk.text },
    });
  }
}

// 2. 相似度检索
async function retrieveContext(query: string, env: Environment): Promise<string[]> {
  const queryEmbedding = await env.AI.run("@cf/baai/bge-base-en-v1.5", { text: query });
  
  const results = await env.VECTOR_STORE.query({
    vector: queryEmbedding.array,
    topK: 5,
    returnMetadata: true,
  });
  
  return results.matches.map(m => m.metadata?.content as string);
}

5.3 完整 RAG Pipeline

export class RAGWorker {
  async handleQuery(query: string, env: Environment): Promise<string> {
    // Step 1: 检索相关上下文(Vectorize)
    const contextChunks = await this.retrieveContext(query, env);
    
    // Step 2: 构建提示词
    const prompt = `基于以下参考资料回答问题。如果资料中没有相关信息,请如实说明。

参考资料:
${contextChunks.join("\n---\n")}

问题:${query}`;

    // Step 3: LLM 推理(Workers AI)
    const response = await env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
      messages: [{ role: "user", content: prompt }],
      max_tokens: 1024,
    });

    return response.response;
  }
}

六、性能优化与生产实践

6.1 Durable Objects 的性能特征

Durable Objects 的性能有几个重要特征,理解它们才能写出高效代码:

写入延迟:每次 ctx.storage.put() 都会写入本地 RocksDB,然后异步复制。延迟约 5-15ms(本地写入)。

读取延迟ctx.storage.get() 读取本地 RocksDB,延迟约 0.5-2ms(内存缓存命中)或 2-5ms(磁盘读取)。

实例预热:Cold Durable Object 实例的第一次访问会有约 100ms 的初始化开销(加载 V8 上下文)。可以通过 Cloudflare 的 Doclass预热API 提前激活实例:

// 在 Workers 入口预热
export default {
  async fetch(request, env) {
    const warmUp = new URL(request.url).searchParams.get("warmup");
    
    if (warmUp === "true") {
      const warmupId = env.MY_DO.idFromName("warmup-instance");
      const stub = env.MY_DO.get(warmupId);
      await stub.fetch("/warmup"); // 触发实例初始化
    }
    
    return handleRequest(request, env);
  }
}

6.2 避免跨实例锁

Durable Objects 提供了 ctx.blockConcurrencyWhile() 来防止竞态条件:

export class RateLimiter extends DurableObject {
  private counts: Map<string, number> = new Map();

  async checkLimit(key: string, limit: number): Promise<boolean> {
    // 在检查和更新之间阻止其他请求进入
    await this.ctx.blockConcurrencyWhile(async () => {
      const current = this.counts.get(key) ?? 0;
      if (current >= limit) {
        throw new Error("Rate limit exceeded");
      }
      this.counts.set(key, current + 1);
    });
    return true;
  }
}

6.3 Workers 与 Durable Objects 的交互模式

                    ┌─────────────────────────────────────┐
                    │         Cloudflare PoP (东京)        │
                    │                                      │
  请求 (东京)        │  ┌─────────────┐                     │
  ──────────────→   │  │   Worker    │                     │
                    │  │  (无状态)    │                     │
                    │  └──────┬──────┘                     │
                    │         │                             │
                    │         │ fetch (HTTP内部)             │
                    │         ▼                             │
                    │  ┌─────────────────────────────────┐ │
                    │  │      Durable Object              │ │
                    │  │  - 状态存储(RocksDB)           │ │
                    │  │  - 强一致事务                   │ │
                    │  │  - Workflow 执行引擎             │ │
                    │  └─────────────────────────────────┘ │
                    │                                      │
                    │  ┌──────────────┐                     │
                    │  │  Workers AI  │                     │
                    │  │  (GPU 集群)   │                     │
                    │  └──────────────┘                     │
                    │                                      │
                    │  ┌──────────────┐                     │
                    │  │  Vectorize   │                     │
                    │  │  (向量索引)   │                     │
                    │  └──────────────┘                     │
                    └─────────────────────────────────────┘

七、完整生产案例:分布式 AI 聊天机器人

7.1 需求分析

构建一个多租户 AI 聊天服务,要求:

  • 每个租户有独立的对话历史
  • 支持多轮对话(带上下文)
  • 对话历史持久化(服务重启不丢数据)
  • 每次对话调用 LLM 推理
  • 支持 RAG(知识库增强)
  • 租户级别的速率限制

7.2 架构设计

客户端
  │
  ▼
Workers(API 网关 + 认证 + 路由)
  │
  ├──────────────────────────────────────┐
  │                                      │
  ▼                                      ▼
 Durable Objects                   Workers AI
 (ChatSession)                      (LLM)
 - 对话历史                          - 推理
 - 速率限制
 - RAG 上下文
        │
        ▼
   Vectorize
   (知识库索引)

7.3 核心代码实现

7.3.1 Durable Object:聊天会话

// src/chat-session.ts
export class ChatSession extends DurableObject {
  private state: SessionState = {
    messages: [],
    metadata: {},
    rateLimit: { count: 0, resetAt: 0 },
  };

  async initialize(env: Environment) {
    const stored = await this.ctx.storage.get("state");
    if (stored) {
      this.state = stored;
    }
    
    // 设置速率限制重置定时器
    this.scheduleRateLimitReset();
  }

  async fetch(request: Request, env: Environment): Promise<Response> {
    const url = new URL(request.url);
    
    if (url.pathname === "/chat" && request.method === "POST") {
      return this.handleChat(await request.json(), env);
    }
    
    if (url.pathname === "/history") {
      return new Response(JSON.stringify(this.state.messages));
    }
    
    if (url.pathname === "/reset") {
      this.state.messages = [];
      await this.persistState();
      return new Response(JSON.stringify({ success: true }));
    }
    
    return new Response("Not Found", { status: 404 });
  }

  private async handleChat(event: ChatEvent, env: Environment): Promise<Response> {
    // 速率限制检查(每个租户每分钟 60 次)
    if (!this.checkRateLimit(event.tenantId, 60, 60000)) {
      return new Response(
        JSON.stringify({ error: "Rate limit exceeded" }),
        { status: 429, headers: { "Content-Type": "application/json" } }
      );
    }

    // 添加用户消息
    const userMessage: Message = {
      id: crypto.randomUUID(),
      role: "user",
      content: event.message,
      timestamp: Date.now(),
    };
    this.state.messages.push(userMessage);

    // 构建 RAG 上下文
    let ragContext = "";
    if (event.useRag) {
      const relevantChunks = await this.retrieveRAGContext(event.message, env);
      ragContext = relevantChunks.length > 0
        ? `\n\n[知识库参考]\n${relevantChunks.join("\n")}\n[/知识库参考]`
        : "";
    }

    // 构建 LLM 消息历史
    const systemPrompt = event.tenantSystemPrompt || 
      "你是一个专业的 AI 助手。请基于提供的上下文回答用户问题。";
    
    const llmMessages = [
      { role: "system", content: systemPrompt + ragContext },
      ...this.state.messages.map(m => ({ role: m.role, content: m.content }))
    ];

    // 调用 Workers AI
    const aiResponse = await env.AI.run(
      event.model || "@cf/meta/llama-3.1-8b-instruct",
      {
        messages: llmMessages,
        max_tokens: event.maxTokens || 1024,
        temperature: event.temperature || 0.7,
      }
    );

    const assistantMessage: Message = {
      id: crypto.randomUUID(),
      role: "assistant",
      content: aiResponse.response,
      timestamp: Date.now(),
    };
    this.state.messages.push(assistantMessage);

    await this.persistState();

    return new Response(JSON.stringify({
      userMessage,
      assistantMessage,
      usage: {
        promptTokens: aiResponse.usage?.prompt_tokens,
        completionTokens: aiResponse.usage?.completion_tokens,
      }
    }), {
      headers: { "Content-Type": "application/json" }
    });
  }

  private async retrieveRAGContext(query: string, env: Environment): Promise<string[]> {
    try {
      const embedding = await env.AI.run("@cf/baai/bge-base-en-v1.5", { text: query });
      const results = await env.VECTOR_STORE.query({
        vector: embedding.array,
        topK: 3,
        namespace: event.tenantId, // 按租户隔离知识库
        returnMetadata: true,
      });
      return results.matches.map(m => m.metadata?.content as string);
    } catch (e) {
      return []; // RAG 失败不影响主流程
    }
  }

  private checkRateLimit(tenantId: string, limit: number, windowMs: number): boolean {
    const now = Date.now();
    const key = `rl:${tenantId}`;
    
    if (this.state.rateLimit.resetAt < now) {
      this.state.rateLimit = { count: 0, resetAt: now + windowMs };
    }
    
    if (this.state.rateLimit.count >= limit) {
      return false;
    }
    
    this.state.rateLimit.count++;
    return true;
  }

  private scheduleRateLimitReset() {
    const resetAt = this.state.rateLimit.resetAt || (Date.now() + 60000);
    this.ctx.storage.setAlarm(resetAt);
  }

  async alarm() {
    // 重置速率计数器
    this.state.rateLimit = { count: 0, resetAt: Date.now() + 60000 };
    await this.persistState();
    this.scheduleRateLimitReset();
  }

  private async persistState() {
    await this.ctx.storage.put("state", this.state);
  }
}

7.3.2 Workers 入口:API 网关

// src/index.ts
import { ChatSession } from "./chat-session";

interface Env {
  AI: Ai;
  VECTOR_STORE: Vectorize;
  SESSIONS: DurableObjectNamespace<ChatSession>;
  AUTH_TOKEN: string;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // CORS 预检
    if (request.method === "OPTIONS") {
      return handleCORS();
    }

    // 认证
    const authError = await verifyAuth(request, env.AUTH_TOKEN);
    if (authError) return authError;

    // 路由分发
    if (url.pathname === "/v1/chat" && request.method === "POST") {
      return handleChat(request, env);
    }

    if (url.pathname.startsWith("/v1/session/")) {
      const sessionId = url.pathname.split("/")[3];
      const method = url.searchParams.get("method");
      
      if (method === "history") {
        return getSessionHistory(sessionId, env);
      }
      
      if (method === "reset") {
        return resetSession(sessionId, env);
      }
    }

    // 健康检查
    if (url.pathname === "/health") {
      return new Response(JSON.stringify({ status: "ok", timestamp: Date.now() }));
    }

    return new Response("Not Found", { status: 404 });
  }
};

async function handleChat(request: Request, env: Env): Promise<Response> {
  const body = await request.json();
  
  // 验证请求体
  if (!body.message || typeof body.message !== "string") {
    return new Response(
      JSON.stringify({ error: "message is required" }),
      { status: 400, headers: corsHeaders() }
    );
  }

  // 获取或创建会话(按租户 + 用户 ID 确定 Durable Object 实例)
  const sessionKey = `${body.tenantId}:${body.userId}`;
  const sessionId = env.SESSIONS.idFromName(sessionKey);
  const sessionStub = env.SESSIONS.get(sessionId);

  const event: ChatEvent = {
    tenantId: body.tenantId,
    userId: body.userId,
    message: body.message,
    model: body.model,
    maxTokens: body.maxTokens,
    temperature: body.temperature,
    useRag: body.useRag ?? false,
    tenantSystemPrompt: body.systemPrompt,
  };

  const response = await sessionStub.fetch("/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(event),
  });

  const responseHeaders = new Headers(await response.headers);
  responseHeaders.set("Access-Control-Allow-Origin", "*");
  responseHeaders.set("Content-Type", "application/json");

  return new Response(await response.text(), {
    status: response.status,
    headers: responseHeaders,
  });
}

function corsHeaders(): HeadersInit {
  return {
    "Access-Control-Allow-Origin": "*",
    "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
    "Access-Control-Allow-Headers": "Content-Type, Authorization",
  };
}

async function verifyAuth(request: Request, validToken: string): Promise<Response | null> {
  const authHeader = request.headers.get("Authorization");
  
  if (!authHeader || !authHeader.startsWith("Bearer ")) {
    return new Response(
      JSON.stringify({ error: "Missing or invalid Authorization header" }),
      { status: 401, headers: { "Content-Type": "application/json", ...corsHeaders() } }
    );
  }

  const token = authHeader.slice(7);
  if (token !== validToken) {
    return new Response(
      JSON.stringify({ error: "Invalid token" }),
      { status: 401, headers: { "Content-Type": "application/json", ...corsHeaders() } }
    );
  }

  return null;
}

7.3.3 部署配置

# wrangler.toml
name = "ai-chat-gateway"
main = "src/index.ts"
compatibility_date = "2026-06-01"
compatibility_flags = ["nodejs_compat"]

[env.production]
name = "ai-chat-gateway-prod"

[[durable_objects.bindings]]
name = "SESSIONS"
class_name = "ChatSession"

[[vectorize]]
binding = "VECTOR_STORE"
index_name = "knowledge-base"

[[ai]]
binding = "AI"

[vars]
AUTH_TOKEN = "${AUTH_TOKEN}"  # 从环境变量读取

7.4 性能数据

基于上述架构的实测性能(从东京 PoP):

操作平均延迟P99 延迟备注
Durable Object 初始化(冷)95ms150ms仅首次
Durable Object 初始化(热)<1ms5ms内存命中
对话历史读取(100条)3ms12msRocksDB
Workers AI 推理(8B 模型)450ms800ms含网络到 AI 集群
Vectorize 检索(topK=3)45ms120ms含 Embedding
端到端聊天(无 RAG)500ms900ms
端到端聊天(含 RAG)580ms1100ms

八、Saga 模式实战:从订单处理到资金流转

8.1 订单处理 Saga

完整的订单处理流程,包含库存预留、支付扣款、物流发货、邮件通知四个步骤,每个步骤都有对应的 Saga 补偿逻辑:

export class OrderSagaWorkflow extends WorkflowEntrypoint<Environment, OrderEvent> {
  async run(event: OrderEvent, step: WorkflowStep) {
    const orderId = event.orderId;

    // Step 1: 预留库存(扣减商品库存)
    const inventory = await step.do(
      "reserve-inventory",
      async () => {
        return await inventoryService.reserve({
          sku: event.sku,
          quantity: event.quantity,
          idempotencyKey: `${orderId}:reserve-inventory`,
        });
      },
      {
        rollback: async ({ output }) => {
          if (output === undefined) return;
          await inventoryService.release({
            reservationId: output.reservationId,
            idempotencyKey: `${orderId}:rollback-reserve`,
          });
        },
      }
    );

    // Step 2: 支付扣款
    const payment = await step.do(
      "charge-payment",
      async () => {
        return await paymentService.charge({
          customerId: event.customerId,
          amount: event.amount,
          idempotencyKey: `${orderId}:charge`,
        });
      },
      {
        rollback: async ({ output }) => {
          if (output === undefined) return;
          await paymentService.refund({
            chargeId: output.chargeId,
            idempotencyKey: `${orderId}:rollback-charge`,
          });
        },
      }
    );

    // Step 3: 触发发货流程
    const shipment = await step.do(
      "create-shipment",
      async () => {
        return await logisticsService.createShipment({
          orderId,
          address: event.shippingAddress,
          idempotencyKey: `${orderId}:create-shipment`,
        });
      },
      {
        rollback: async ({ output }) => {
          if (output === undefined) return;
          await logisticsService.cancelShipment({
            shipmentId: output.shipmentId,
            idempotencyKey: `${orderId}:rollback-shipment`,
          });
        },
      }
    );

    // Step 4: 发送确认邮件(无补偿,但有重试保证送达)
    await step.do("send-confirmation-email", async () => {
      await emailService.send({
        to: event.customerEmail,
        template: "order-confirmation",
        data: { orderId, shipmentId: shipment.shipmentId },
        idempotencyKey: `${orderId}:send-email`,
      });
      // 注意:邮件发送失败会触发重试,但不会触发回滚
      // 因为 Saga 补偿无法"撤回"已发送的邮件
      // 解决方案:邮件设计为含退订链接的幂等通知,允许重复发送
    });

    return {
      success: true,
      orderId,
      paymentId: payment.chargeId,
      shipmentId: shipment.shipmentId,
    };
  }
}

8.2 Saga 执行失败处理

当 Saga Rollback 本身也失败时,Cloudflare Workflows 会:

  1. 记录失败事件:失败步骤和回滚步骤都会被记录在 Workflow 执行历史中
  2. 发送告警:配置 Cloudflare 的 Alerting 系统通知运维人员
  3. 保持当前状态:Workflow 不会无限重试,会停在失败状态等待人工介入
  4. 提供手动恢复 API:可以通过 Cloudflare API 手动触发特定步骤的执行
// 配置告警通知(wrangler.toml 或 Cloudflare Dashboard)
// 当 Workflow 进入 UNABLE_TO_ROLLBACK 状态时发送告警

九、与竞品对比:为什么是 Cloudflare Workers

9.1 vs. AWS Lambda + DynamoDB

维度Cloudflare Workers + DOAWS Lambda + DynamoDB
执行模型Isolate(亚毫秒启动)Container(冷启动 100ms+)
状态存储Durable Objects(本地 RocksDB,<5ms)DynamoDB(网络往返,10-50ms)
一致性强一致(单实例事务)最终一致(默认)/ 强一致(额外付费)
全球延迟<10ms(边缘就近)50-200ms(取决于区域)
定价请求数 + CPU 时间请求数 + 执行时间 + 数据传输
AI 集成Workers AI(原生集成)Bedrock(需额外配置)
向量搜索Vectorize(原生集成)需要 Aurora/Elasticsearch

9.2 vs. Vercel Edge Functions

Vercel Edge Functions 的定位是"轻量级边缘渲染",它的限制:

  • 无状态:没有 Durable Objects 这样的状态存储
  • V8 Isolates 但有限制:不支持 Durable Objects、KV 事务、AI 推理
  • 主要用途:A/B 测试、认证、请求改写

Cloudflare Workers 则是完整的边缘计算平台,可以做 AI 推理、持久化存储、向量搜索、工作流编排。

9.3 vs. Deno Deploy

Deno Deploy 与 Cloudflare Workers 非常接近(都基于 V8 Isolates),但:

  • ** Durable Objects 独有**:Cloudflare 专利架构,Deno Deploy 没有等价物
  • 生态系统:Cloudflare 生态更成熟(Vectorize、Workers AI、Queues、R2 等)
  • AI 推理:Cloudflare Workers AI,Deno Deploy 需要自建

十、总结:边缘计算有状态化的工程意义

Cloudflare Workers + Durable Objects + Workflows 代表的不仅是技术演进,更是一种架构哲学的根本转变:从"状态在数据库里"到"状态在代码旁边"

这一转变带来了几个深刻的影响:

第一,延迟的天花板被重新定义。 当状态就在请求处理的同一个节点上时,数据库往返的 10-50ms 延迟可以被压缩到 <5ms。对于实时协作、游戏状态、聊天应用这类对延迟敏感的场景,这是质变。

第二,分布式事务的边界被重新划定。 Saga 模式让跨服务的分布式事务可以在边缘节点上本地协调,而不是必须依赖中心化的消息队列或数据库。Cloudflare 全球 300+ PoP 意味着,每个地区的事务协调都在本地完成,跨洋协调的需求大幅减少。

第三,AI 推理不再是大型互联网公司的专利。 Workers AI + Vectorize 的组合,让一个独立开发者也能以极低成本在边缘部署 AI 推理服务——不需要 GPU,不需要 Kubernetes,不需要运维团队。按请求计费的模式意味着你可以从零成本起步,在业务增长时平滑扩展。

第四,开发者体验的范式转移。 Durable Objects 的编程模型——"一个对象代表一个业务实体"——比传统的"所有请求共享数据库"更直观。一个聊天会话就是一个 ChatSession 对象,一个用户账户就是一个 UserAccount 对象,代码即业务模型。

当然,这套体系也有其适用边界:

  • 计算密集型任务:Workers 的 CPU 时间上限(50ms)是硬限制,不适合长时间计算
  • 超大数据集:Durable Objects 每个实例的存储上限约 6.5GB,超出需要分区
  • 强一致性要求跨多实例:Durable Objects 的强一致性只在单实例内,跨实例需要额外设计
  • Golang/Rust 开发者:Workers 生态以 JavaScript/TypeScript 为主,非主流语言需要权衡

但对于构建全球分布的实时应用、AI 增强服务、支付与订单处理系统,Cloudflare Workers 平台提供了一套独特而强大的技术栈。它的价值主张不是"更便宜的 Lambda",而是"重新定义了边缘计算的可能性边界"。

2026年,随着 Saga Rollbacks 的发布,Cloudflare Workers 完成了从"边缘函数"到"边缘应用平台"的最后一块拼图。这不是终点,而是新一轮边缘计算革命的起点。


参考资料

推荐文章

如何优化网页的 SEO 架构
2024-11-18 14:32:08 +0800 CST
PHP 如何输出带微秒的时间
2024-11-18 01:58:41 +0800 CST
pin.gl是基于WebRTC的屏幕共享工具
2024-11-19 06:38:05 +0800 CST
Vue3中的虚拟滚动有哪些改进?
2024-11-18 23:58:18 +0800 CST
Dropzone.js实现文件拖放上传功能
2024-11-18 18:28:02 +0800 CST
mysql int bigint 自增索引范围
2024-11-18 07:29:12 +0800 CST
Roop是一款免费开源的AI换脸工具
2024-11-19 08:31:01 +0800 CST
程序员茄子在线接单