Cloudflare Workers + Durable Objects + Saga 模式:边缘计算有状态化的工程革命
背景:当"无状态"成为历史
2015年,AWS Lambda 开创了 FaaS(函数即服务)的时代,"无服务器"成了云厂商的主打概念。那时候,我们被告知:函数就是原子单元,每个请求独立处理,不保留状态,数据交给数据库。这种范式在简单场景下运转良好——API Gateway 触发 Lambda,Lambda 读写 DynamoDB,无状态,简单清晰。
但现实从不简单。五年后,开发者们发现"无状态"带来了沉重的代价:为了维持状态,每次请求都要重新查询数据库;为了保证一致性,不得不引入复杂的外部事务协调服务;为了处理长连接,不得不借助 Redis 或 SQS 等外部消息队列。 简单场景被复杂化了,而复杂场景根本没有好的解法。
Cloudflare 在这个背景下,走出了一条独特的路。不同于传统云厂商把"无状态"作为铁律,Cloudflare 从一开始就在探索:能不能在边缘节点上原生支持有状态逻辑,而不需要额外部署外部服务?
答案是 Durable Objects——一种将在单个地理位置运行的、持久的 JavaScript 对象,与 Cloudflare Workers 的全球分布式执行结合在一起的创新架构。2026年6月25日,Cloudflare 进一步发布了 Workflows 的 Saga Rollbacks 功能,标志着边缘计算有状态化进入了一个全新的成熟阶段。
本文将深度解析这套技术体系的完整架构:从 Workers 的边缘执行模型,到 Durable Objects 的状态一致性设计,再到 Saga 模式在边缘的落地,最后通过完整的代码实战,展示如何用这套体系构建真正可在生产环境使用的分布式应用。
一、Cloudflare Workers 架构:从 CDN Worker 到全球计算平台
1.1 边缘执行的本质
Cloudflare Workers 不是简单地把 Lambda 复制到全球 300+ 个节点。它在架构上做了根本性的重新设计。
传统 Serverless(如 AWS Lambda)的执行模型是:
请求 → Regional API Gateway → Lambda Container(冷启动/热启动) → 响应
Lambda 函数可能在任意可用区执行,不同请求甚至可能被分配到不同的容器实例。这种不确定性在边缘计算场景下是致命的——你需要有状态,但你不知道状态在哪。
Cloudflare Workers 的执行模型完全不同:
请求 → 任一 PoP(Point of Presence) → V8 Isolates 池 → 响应
关键区别在于 Isolate 而非 Container。V8 Isolate 是一种轻量级沙箱,同一个 V8 实例中可以同时运行数千个相互隔离的 JavaScript 上下文。启动一个 Isolate 的时间是亚毫秒级,这彻底消除了冷启动问题。
更重要的是,Cloudflare Workers 的请求路由不是"找最近的可用函数实例",而是在接收请求的 PoP 上直接执行。这意味着你天然获得了地理就近性——东京用户发出的请求,在东京节点处理,延迟极低。
1.2 Workers 运行时深度解析
Workers 的 JavaScript 运行时基于 V8,但做了大量定制。以下是一个最简 Workers 入口:
export default {
async fetch(request, env, ctx) {
const url = new URL(request.url);
if (url.pathname === "/api/chat") {
return handleChat(request, env);
}
return new Response("Not Found", { status: 404 });
}
};
// 导出 fetch 处理器,env 注入环境变量,ctx 提供 WaitUntil 等生命周期控制
env 参数是 Workers 的环境绑定接口,它在部署时通过 wrangler.toml 配置贞定,运行时以强类型方式注入:
// wrangler.toml
name = "my-worker"
main = "src/index.ts"
compatibility_date = "2026-06-01"
[vars]
API_VERSION = "v2"
[[kv_namespaces]]
binding = "CACHE"
id = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
[[durable_objects.bindings]]
name = "DOCUMENT_STORE"
class_name = "DocumentStore"
TypeScript 类型安全 + 环境绑定贞定,这是 Cloudflare 在开发者体验上做得极为出色的地方。
1.3 Workers KV:全球强一致键值存储
在进入 Durable Objects 之前,需要先理解 Workers KV——Cloudflare 的全局分布式键值存储。
Workers KV 的核心特性:
- 全球复制:数据在 Cloudflare 所有 PoP 上缓存,读取延迟通常 <10ms(因为就近读取)
- 最终一致写入:写入先到主节点,再异步复制到全球,复制延迟约 60 秒
- 适合场景:配置数据、缓存、用户会话 token、静态资源索引
// 读取配置(读取密集,低延迟)
const config = await env.CACHE.get("app:config:v2", "json");
// 写入(注意最终一致性,写入后约60秒全球生效)
await env.CACHE.put("user:session:12345", JSON.stringify({
lastSeen: Date.now(),
preferences: { theme: "dark" }
}), { expirationTtl: 86400 }); // 24小时过期
Workers KV 不是数据库,它的写入一致性模型(写入后60秒才全球可见)决定了它只适合真正最终一致的场景。对于需要强一致读写的场景,这就是 Durable Objects 登场的时候了。
二、Durable Objects:边缘状态的艺术
2.1 核心概念:单例对象的全局调度
Durable Objects 是 Cloudflare Workers 平台中最独特也最强大的功能之一。它的核心思想是:每个 Durable Object 实例都在全球特定物理位置运行,并且所有对该实例的请求都会被路由到同一个位置。
这与传统的"无状态分布式系统"完全相反。在传统架构中,我们通过数据库来协调状态,每个请求独立处理。而在 Durable Objects 中,状态就在执行代码的旁边,没有网络往返,没有数据库延迟。
// 定义一个 Durable Object 类
export class Counter extends DurableObject {
// 每个 Durable Object 实例有自己的私有状态
// 这个 count 变量在实例生命周期内持久存在
private count = 0;
async fetch(request) {
const url = new URL(request.url);
if (url.pathname === "/increment") {
this.count++;
await this.ctx.storage.put("count", this.count);
return new Response(JSON.stringify({ count: this.count }));
}
if (url.pathname === "/get") {
const stored = await this.ctx.storage.get("count");
return new Response(JSON.stringify({ count: stored ?? 0 }));
}
return new Response("Not Found", { status: 404 });
}
}
关键点:
this.ctx.storage提供了put、get、delete、list等持久化 API- 状态变更通过
put自动持久化,支持事务批量操作 - 所有对同一个 Durable Object 实例的请求,都会被 Cloudflare 路由到同一个物理位置
2.2 实例路由:如何找到正确的节点
当客户端向一个 Durable Object 发起请求时,Cloudflare 需要做两件事:
- 识别实例:每个 Durable Object 实例有一个唯一的名称(可以是基于用户 ID、房间 ID 等业务键生成的字符串)
- 路由请求:将请求发送到该实例当前运行的物理位置
// Workers 入口
export default {
async fetch(request, env) {
const url = new URL(request.url);
const userId = url.searchParams.get("user_id");
// getActorId() 根据 userId 生成确定性 ID
// Cloudflare 根据这个 ID 将请求路由到全球唯一的实例
const id = env.COUNTER.idFromName(userId);
// stub 是该 Durable Object 实例的代理对象
const stub = env.COUNTER.get(id);
// 所有对这个 userId 的请求,都路由到同一个 Durable Object 实例
const response = await stub.fetch(request);
return response;
}
}
这里的设计非常精妙:idFromName(userId) 基于业务键生成确定性 ID,而不是随机 UUID。这意味着:
- 同一个
userId永远路由到同一个实例 - 不需要额外的服务发现机制
- 实例位置由 Cloudflare 自动管理(实例可能在全球迁移,但命名保持一致)
2.3 状态一致性:ACID 在边缘
Durable Objects 的存储层提供了强一致性的事务支持。这是它与 Workers KV 最大的区别:
export class AccountStore extends DurableObject {
async transferFunds(fromId, toId, amount) {
// storage.transaction() 提供 ACID 事务
// 所有操作要么全部成功,要么全部回滚
const result = await this.ctx.storage.transaction(async (txn) => {
const fromBalance = await txn.get(`balance:${fromId}`) ?? 0;
const toBalance = await txn.get(`balance:${toId}`) ?? 0;
if (fromBalance < amount) {
throw new Error("Insufficient funds");
}
await txn.put(`balance:${fromId}`, fromBalance - amount);
await txn.put(`balance:${toId}`, toBalance + amount);
return { fromBalance: fromBalance - amount, toBalance: toBalance + amount };
});
return result;
}
}
Cloudflare 内部使用 RocksDB 作为 Durable Objects 的存储引擎(是的,与 MongoDB、Cassandra 相同的底层存储)。每个 Durable Object 实例拥有自己的 RocksDB 实例,数据持久化到 Cloudflare 的边缘存储系统。
2.4 告警监控与生命周期
Durable Objects 有两个重要的生命周期钩子:
export class SessionManager extends DurableObject {
// 实例初始化时调用(每个实例只调用一次)
async initialize() {
this.sessions = new Map();
console.log(`SessionManager initialized at ${Date.now()}`);
}
// 实例即将被销毁前调用(如内存压力、负载均衡等)
async close(reason) {
// 将内存状态刷回存储
for (const [id, session] of this.sessions) {
await this.ctx.storage.put(`session:${id}`, session);
}
console.log(`SessionManager closing: ${reason}`);
}
}
Cloudflare 还提供了 Alarm API,允许你在 Durable Objects 中设置定时任务:
export class CacheCleaner extends DurableObject {
async fetch(request) {
if (new URL(request.url).pathname === "/setup-cleanup") {
// 设置每6小时触发一次清理告警
await this.ctx.storage.setAlarm(Date.now() + 6 * 60 * 60 * 1000);
return new Response("Cleanup alarm scheduled");
}
return new Response("Not Found", { status: 404 });
}
// 告警触发时调用
async alarm() {
await this.cleanExpiredEntries();
// 设置下一次告警
await this.ctx.storage.setAlarm(Date.now() + 6 * 60 * 60 * 1000);
}
}
2.5 Durable Objects vs. Redis:为什么不用 Redis?
这是最常见的问题。在 Durable Objects 出现之前,很多开发者通过 Workers + Redis(Upstash 等)来解决边缘状态问题。这种方案有几个根本性限制:
| 维度 | Durable Objects | Redis |
|---|---|---|
| 延迟 | 本地存储,< 1ms | 网络往返,5-50ms(取决于距离) |
| 一致性 | 强一致,读写本地 RocksDB | 最终一致,需要复制协议 |
| 实例粒度 | 每实例独立状态,天然分区 | 全局哈希表,需要 key 设计 |
| 连接管理 | Cloudflare 内网,无连接数限制 | 需要连接池,有连接数限制 |
| 成本模型 | 按 CPU/内存时间计费 | 按请求次数 + 数据传输计费 |
对于低延迟、强一致、有状态关联的场景,Durable Objects 是更好的选择。
三、Cloudflare Workflows 与 Saga 模式
3.1 什么是 Workflows
Cloudflare Workflows 是一个持久化执行引擎,于2024年11月发布,2026年6月正式支持 Saga Rollbacks。它的核心能力是:让你在边缘构建跨越任意时间跨度、任意步骤数量、任意复杂状态的多阶段应用,并且在整个过程中提供强一致的状态保证、自动重试和故障恢复。
一个简单的 Workflow 示例:
import { WorkflowEntrypoint, WorkflowStep } from "cloudflare-workers";
import type { Environment } from "./index";
export class OrderProcessingWorkflow extends WorkflowEntrypoint<Environment, {}> {
async run(event: Event, step: WorkflowStep) {
// Step 1: 验证订单
const order = await step.do("validate-order", async () => {
const result = await fetch(`https://api.example.com/orders/${event.orderId}`);
return await result.json();
});
// Step 2: 扣款(可自动重试)
const payment = await step.do("process-payment", async () => {
const result = await fetch("https://payment.api/charge", {
method: "POST",
body: JSON.stringify({ amount: order.amount, card: order.card }),
});
return await result.json();
});
// Step 3: 发货
await step.do("ship-order", async () => {
await fetch(`https://shipping.api/orders/${event.orderId}/ship`, {
method: "POST",
});
});
return { status: "completed", orderId: event.orderId };
}
}
每个 step.do() 都有以下特性:
- 幂等执行:即使 Workers 重启,步骤也会在正确的状态继续执行
- 自动重试:失败的步骤会自动重试,直到成功或达到重试上限
- 状态持久化:每个步骤的输出都自动存储在 Workflow 状态中
- 可观察性:每个步骤的执行历史都可以查询
3.2 Saga 模式的必要性:没有银弹
在引入 Saga Rollbacks 之前,Workflows 提供了重试机制,但重试只对单个步骤有意义。考虑一个转账场景:
Step 1: 从 A 账户扣款(成功)
Step 2: 向 B 账户存款(失败,重试,最终成功)
Step 3: 发送通知邮件(失败)
如果 Step 3 永久失败怎么办?此时 A 的钱已经被扣了,但 B 已经收到存款了。如果重试 Step 3,它只会一直失败。 这个事务已经处于不一致状态。
传统解决方案是在 Step 3 的 catch 块中手动回滚:
// 旧方式:手动回滚逻辑
try {
const debit = await step.do("debit-bank-a", () => bankA.debit(from, amount));
const credit = await step.do("credit-bank-b", () => bankB.credit(to, amount));
await step.do("notify", () => sendNotification(from, to, amount));
} catch (error) {
// 手动判断哪些步骤成功了,然后回滚
if (credit) {
await step.do("reverse-credit-b", () => bankB.debit(to, amount));
}
if (debit) {
await step.do("refund-debit-a", () => bankA.credit(from, amount));
}
throw error;
}
这种方案的缺陷:
- 回滚逻辑与业务逻辑混在一起:代码膨胀,可维护性差
- 回滚顺序需要手动控制:容易出错
- 跨步骤状态追踪复杂:需要额外的变量来记录"哪些步骤成功了"
- 无法处理部分成功的步骤:一个步骤可能执行了一半才失败
3.3 Saga Rollbacks:声明式的补偿逻辑
2026年6月25日,Cloudflare Workflows 正式支持 Saga Rollbacks。核心改进是:每个步骤可以声明自己的补偿逻辑(rollback),由 Workflow 引擎在故障时自动按逆序执行。
export class TransferWorkflow extends WorkflowEntryppoint<Environment, TransferEvent> {
async run(event: TransferEvent, step: WorkflowStep) {
// Step 1: 从 A 账户扣款
const debit = await step.do(
"debit-account-a",
async () => {
return await bankA.debit({
accountId: event.fromAccountId,
amount: event.amount,
// 幂等键:转账ID + 步骤名,保证扣款只执行一次
idempotencyKey: `${event.transferId}:debit-account-a`,
});
},
{
// 补偿逻辑:存款回滚
rollback: async ({ output }) => {
await bankA.credit({
accountId: event.fromAccountId,
amount: event.amount,
idempotencyKey: `${event.transferId}:rollback-debit-account-a`,
});
},
}
);
// Step 2: 向 B 账户存款
const credit = await step.do(
"credit-account-b",
async () => {
return await bankB.credit({
accountId: event.toAccountId,
amount: event.amount,
idempotencyKey: `${event.transferId}:credit-account-b`,
});
},
{
rollback: async ({ output }) => {
// 注意:output 可能为 undefined(步骤可能在执行中失败)
// 补偿逻辑必须处理这种情况
if (output === undefined) {
return; // 尚未执行,无需回滚
}
await bankB.debit({
accountId: event.toAccountId,
amount: event.amount,
idempotencyKey: `${event.transferId}:rollback-credit-account-b`,
});
},
}
);
// Step 3: 发送通知(无需补偿,因为通知是幂等的)
await step.do("send-confirmation", async () => {
await sendTransferConfirmation(event);
});
return { success: true, transferId: event.transferId };
}
}
3.4 Saga 执行语义:深度解析
Saga Rollbacks 的执行模型有几个关键细节,理解它们是正确使用该功能的前提:
3.4.1 故障时触发,逆序执行
当任意步骤抛出未捕获的异常时,Workflow 引擎自动触发回滚:
Step 1 (debit) ✓ → Step 2 (credit) ✓ → Step 3 (notify) ✗ (永久失败)
↓
回滚执行(逆序):
Step 3 无 rollback,跳过
Step 2 rollback: debit back to B
Step 1 rollback: credit back to A
3.4.2 输出可能为 undefined
一个步骤可能在部分成功后失败。例如:
Step 2 (credit) 执行:
1. 发送 HTTP 请求到银行API ✓
2. 银行处理成功,返回 transactionId ✓
3. 写入 Workflow 状态(网络超时) ✗
在这种情况下,步骤被视为"失败",但银行系统中的操作已经完成。因此 rollback 函数必须处理 output === undefined 的情况——如果 output 未定义,说明步骤实际未成功执行,补偿逻辑可能不需要执行(或需要不同的处理方式)。
3.4.3 补偿逻辑本身也是 Durable Step
每个 rollback 中的 step.do() 也是一个独立的持久化步骤:
- 可以重试:如果补偿操作失败,Workflow 会自动重试
- 幂等性必须保证:使用幂等键确保重试不会造成副作用
- 回滚中的回滚:如果补偿步骤也失败了,Workflow 不会无限回滚,而是会停止并发出告警
3.5 补偿逻辑的设计原则
Saga 模式的核心约束:补偿不是回滚,而是正向的反向操作。
| 原操作 | 补偿操作 | 设计原则 |
|---|---|---|
| 扣款 | 存款 | 补偿金额相同 |
| 创建资源 | 删除资源 | 必须幂等(idempotency key) |
| 发送通知 | 无法补偿 | 通知设计为幂等(去重) |
| 预留库存 | 释放库存 | 必须对应到具体订单 |
| 发起转账 | 撤销转账 | 使用银行事务ID追踪 |
四、Workers AI:边缘推理的完整生态
4.1 什么是 Workers AI
Cloudflare Workers AI 是 Cloudflare 在2023年推出的大语言模型推理服务,2026年已支持数十种模型,覆盖文本生成、图像识别、语音合成等场景。它的核心优势是:无需管理 GPU,无需配置模型服务,直接在 Cloudflare 全球边缘网络运行 AI 推理。
export default {
async fetch(request, env) {
const { prompt } = await request.json();
// 调用 Workers AI 的 LLM 模型(内置,无需 API Key)
const response = await env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
messages: [
{ role: "system", content: "你是一个资深后端工程师" },
{ role: "user", content: prompt }
],
max_tokens: 1024,
});
return new Response(JSON.stringify({ response: response.response }));
}
}
4.2 模型选择与性能
Cloudflare Workers AI 提供了多种规格的模型,满足不同场景:
// 小型快速响应场景
const fastModel = "@cf/tinyllama/tinyllama-1.1b"; // ~50ms 延迟
// 中等复杂度场景
const mediumModel = "@cf/meta/llama-3.1-8b-instruct"; // ~500ms 延迟
// 高质量长回答场景
const qualityModel = "@cf/meta/llama-3.3-70b-instruct-fastsmooth"; // ~2000ms 延迟
4.3 Workers AI + Durable Objects:AI 对话状态管理
纯 Workers AI 的一个问题:每个请求是独立的,无法维护对话历史。结合 Durable Objects,可以轻松实现有状态的 AI 对话:
export class ChatSession extends DurableObject {
private history: Message[] = [];
async addMessage(role: "user" | "assistant", content: string, env: Environment) {
this.history.push({ role, content, timestamp: Date.now() });
// 调用 Workers AI(注意:Durable Objects 中调用 AI)
const response = await env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
messages: this.history.map(m => ({ role: m.role, content: m.content })),
max_tokens: 512,
});
this.history.push({ role: "assistant", content: response.response, timestamp: Date.now() });
await this.ctx.storage.put("history", this.history);
return { userMessage: content, assistantMessage: response.response };
}
async getHistory() {
const history = await this.ctx.storage.get("history");
return history ?? [];
}
}
五、Vectorize:边缘向量数据库
5.1 向量数据库的应用场景
在 RAG(检索增强生成)场景中,需要:
- 将文档切分成文本块(chunk)
- 用 Embedding 模型将文本块转换为向量
- 将向量存储在向量数据库
- 查询时,将用户问题也转换为向量,做相似度搜索
- 将最相关的文本块注入 LLM 提示词
Workers KV 不支持向量操作,Redis 虽支持但有延迟问题。Cloudflare 提供了 Vectorize——原生集成在 Workers 平台中的向量数据库。
5.2 Vectorize 实战
// 初始化 Vectorize 索引
// wrangler.toml 中配置:
// [[vectorize]]
// binding = "VECTOR_STORE"
// index_name = "docs-index"
// 1. 文档分块与向量化
async function indexDocument(content: string, docId: string, env: Environment) {
const chunks = splitIntoChunks(content, 512); // 512 token 块
for (const chunk of chunks) {
// Workers AI 内置 Embedding 模型
const embedding = await env.AI.run("@cf/baai/bge-base-en-v1.5", {
text: chunk,
});
// 插入 Vectorize 索引
await env.VECTOR_STORE.insert({
ids: [docId + ":" + chunk.id],
vectors: [embedding.array],
metadata: { chunkId: chunk.id, docId, content: chunk.text },
});
}
}
// 2. 相似度检索
async function retrieveContext(query: string, env: Environment): Promise<string[]> {
const queryEmbedding = await env.AI.run("@cf/baai/bge-base-en-v1.5", { text: query });
const results = await env.VECTOR_STORE.query({
vector: queryEmbedding.array,
topK: 5,
returnMetadata: true,
});
return results.matches.map(m => m.metadata?.content as string);
}
5.3 完整 RAG Pipeline
export class RAGWorker {
async handleQuery(query: string, env: Environment): Promise<string> {
// Step 1: 检索相关上下文(Vectorize)
const contextChunks = await this.retrieveContext(query, env);
// Step 2: 构建提示词
const prompt = `基于以下参考资料回答问题。如果资料中没有相关信息,请如实说明。
参考资料:
${contextChunks.join("\n---\n")}
问题:${query}`;
// Step 3: LLM 推理(Workers AI)
const response = await env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
messages: [{ role: "user", content: prompt }],
max_tokens: 1024,
});
return response.response;
}
}
六、性能优化与生产实践
6.1 Durable Objects 的性能特征
Durable Objects 的性能有几个重要特征,理解它们才能写出高效代码:
写入延迟:每次 ctx.storage.put() 都会写入本地 RocksDB,然后异步复制。延迟约 5-15ms(本地写入)。
读取延迟:ctx.storage.get() 读取本地 RocksDB,延迟约 0.5-2ms(内存缓存命中)或 2-5ms(磁盘读取)。
实例预热:Cold Durable Object 实例的第一次访问会有约 100ms 的初始化开销(加载 V8 上下文)。可以通过 Cloudflare 的 Doclass预热API 提前激活实例:
// 在 Workers 入口预热
export default {
async fetch(request, env) {
const warmUp = new URL(request.url).searchParams.get("warmup");
if (warmUp === "true") {
const warmupId = env.MY_DO.idFromName("warmup-instance");
const stub = env.MY_DO.get(warmupId);
await stub.fetch("/warmup"); // 触发实例初始化
}
return handleRequest(request, env);
}
}
6.2 避免跨实例锁
Durable Objects 提供了 ctx.blockConcurrencyWhile() 来防止竞态条件:
export class RateLimiter extends DurableObject {
private counts: Map<string, number> = new Map();
async checkLimit(key: string, limit: number): Promise<boolean> {
// 在检查和更新之间阻止其他请求进入
await this.ctx.blockConcurrencyWhile(async () => {
const current = this.counts.get(key) ?? 0;
if (current >= limit) {
throw new Error("Rate limit exceeded");
}
this.counts.set(key, current + 1);
});
return true;
}
}
6.3 Workers 与 Durable Objects 的交互模式
┌─────────────────────────────────────┐
│ Cloudflare PoP (东京) │
│ │
请求 (东京) │ ┌─────────────┐ │
──────────────→ │ │ Worker │ │
│ │ (无状态) │ │
│ └──────┬──────┘ │
│ │ │
│ │ fetch (HTTP内部) │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Durable Object │ │
│ │ - 状态存储(RocksDB) │ │
│ │ - 强一致事务 │ │
│ │ - Workflow 执行引擎 │ │
│ └─────────────────────────────────┘ │
│ │
│ ┌──────────────┐ │
│ │ Workers AI │ │
│ │ (GPU 集群) │ │
│ └──────────────┘ │
│ │
│ ┌──────────────┐ │
│ │ Vectorize │ │
│ │ (向量索引) │ │
│ └──────────────┘ │
└─────────────────────────────────────┘
七、完整生产案例:分布式 AI 聊天机器人
7.1 需求分析
构建一个多租户 AI 聊天服务,要求:
- 每个租户有独立的对话历史
- 支持多轮对话(带上下文)
- 对话历史持久化(服务重启不丢数据)
- 每次对话调用 LLM 推理
- 支持 RAG(知识库增强)
- 租户级别的速率限制
7.2 架构设计
客户端
│
▼
Workers(API 网关 + 认证 + 路由)
│
├──────────────────────────────────────┐
│ │
▼ ▼
Durable Objects Workers AI
(ChatSession) (LLM)
- 对话历史 - 推理
- 速率限制
- RAG 上下文
│
▼
Vectorize
(知识库索引)
7.3 核心代码实现
7.3.1 Durable Object:聊天会话
// src/chat-session.ts
export class ChatSession extends DurableObject {
private state: SessionState = {
messages: [],
metadata: {},
rateLimit: { count: 0, resetAt: 0 },
};
async initialize(env: Environment) {
const stored = await this.ctx.storage.get("state");
if (stored) {
this.state = stored;
}
// 设置速率限制重置定时器
this.scheduleRateLimitReset();
}
async fetch(request: Request, env: Environment): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === "/chat" && request.method === "POST") {
return this.handleChat(await request.json(), env);
}
if (url.pathname === "/history") {
return new Response(JSON.stringify(this.state.messages));
}
if (url.pathname === "/reset") {
this.state.messages = [];
await this.persistState();
return new Response(JSON.stringify({ success: true }));
}
return new Response("Not Found", { status: 404 });
}
private async handleChat(event: ChatEvent, env: Environment): Promise<Response> {
// 速率限制检查(每个租户每分钟 60 次)
if (!this.checkRateLimit(event.tenantId, 60, 60000)) {
return new Response(
JSON.stringify({ error: "Rate limit exceeded" }),
{ status: 429, headers: { "Content-Type": "application/json" } }
);
}
// 添加用户消息
const userMessage: Message = {
id: crypto.randomUUID(),
role: "user",
content: event.message,
timestamp: Date.now(),
};
this.state.messages.push(userMessage);
// 构建 RAG 上下文
let ragContext = "";
if (event.useRag) {
const relevantChunks = await this.retrieveRAGContext(event.message, env);
ragContext = relevantChunks.length > 0
? `\n\n[知识库参考]\n${relevantChunks.join("\n")}\n[/知识库参考]`
: "";
}
// 构建 LLM 消息历史
const systemPrompt = event.tenantSystemPrompt ||
"你是一个专业的 AI 助手。请基于提供的上下文回答用户问题。";
const llmMessages = [
{ role: "system", content: systemPrompt + ragContext },
...this.state.messages.map(m => ({ role: m.role, content: m.content }))
];
// 调用 Workers AI
const aiResponse = await env.AI.run(
event.model || "@cf/meta/llama-3.1-8b-instruct",
{
messages: llmMessages,
max_tokens: event.maxTokens || 1024,
temperature: event.temperature || 0.7,
}
);
const assistantMessage: Message = {
id: crypto.randomUUID(),
role: "assistant",
content: aiResponse.response,
timestamp: Date.now(),
};
this.state.messages.push(assistantMessage);
await this.persistState();
return new Response(JSON.stringify({
userMessage,
assistantMessage,
usage: {
promptTokens: aiResponse.usage?.prompt_tokens,
completionTokens: aiResponse.usage?.completion_tokens,
}
}), {
headers: { "Content-Type": "application/json" }
});
}
private async retrieveRAGContext(query: string, env: Environment): Promise<string[]> {
try {
const embedding = await env.AI.run("@cf/baai/bge-base-en-v1.5", { text: query });
const results = await env.VECTOR_STORE.query({
vector: embedding.array,
topK: 3,
namespace: event.tenantId, // 按租户隔离知识库
returnMetadata: true,
});
return results.matches.map(m => m.metadata?.content as string);
} catch (e) {
return []; // RAG 失败不影响主流程
}
}
private checkRateLimit(tenantId: string, limit: number, windowMs: number): boolean {
const now = Date.now();
const key = `rl:${tenantId}`;
if (this.state.rateLimit.resetAt < now) {
this.state.rateLimit = { count: 0, resetAt: now + windowMs };
}
if (this.state.rateLimit.count >= limit) {
return false;
}
this.state.rateLimit.count++;
return true;
}
private scheduleRateLimitReset() {
const resetAt = this.state.rateLimit.resetAt || (Date.now() + 60000);
this.ctx.storage.setAlarm(resetAt);
}
async alarm() {
// 重置速率计数器
this.state.rateLimit = { count: 0, resetAt: Date.now() + 60000 };
await this.persistState();
this.scheduleRateLimitReset();
}
private async persistState() {
await this.ctx.storage.put("state", this.state);
}
}
7.3.2 Workers 入口:API 网关
// src/index.ts
import { ChatSession } from "./chat-session";
interface Env {
AI: Ai;
VECTOR_STORE: Vectorize;
SESSIONS: DurableObjectNamespace<ChatSession>;
AUTH_TOKEN: string;
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// CORS 预检
if (request.method === "OPTIONS") {
return handleCORS();
}
// 认证
const authError = await verifyAuth(request, env.AUTH_TOKEN);
if (authError) return authError;
// 路由分发
if (url.pathname === "/v1/chat" && request.method === "POST") {
return handleChat(request, env);
}
if (url.pathname.startsWith("/v1/session/")) {
const sessionId = url.pathname.split("/")[3];
const method = url.searchParams.get("method");
if (method === "history") {
return getSessionHistory(sessionId, env);
}
if (method === "reset") {
return resetSession(sessionId, env);
}
}
// 健康检查
if (url.pathname === "/health") {
return new Response(JSON.stringify({ status: "ok", timestamp: Date.now() }));
}
return new Response("Not Found", { status: 404 });
}
};
async function handleChat(request: Request, env: Env): Promise<Response> {
const body = await request.json();
// 验证请求体
if (!body.message || typeof body.message !== "string") {
return new Response(
JSON.stringify({ error: "message is required" }),
{ status: 400, headers: corsHeaders() }
);
}
// 获取或创建会话(按租户 + 用户 ID 确定 Durable Object 实例)
const sessionKey = `${body.tenantId}:${body.userId}`;
const sessionId = env.SESSIONS.idFromName(sessionKey);
const sessionStub = env.SESSIONS.get(sessionId);
const event: ChatEvent = {
tenantId: body.tenantId,
userId: body.userId,
message: body.message,
model: body.model,
maxTokens: body.maxTokens,
temperature: body.temperature,
useRag: body.useRag ?? false,
tenantSystemPrompt: body.systemPrompt,
};
const response = await sessionStub.fetch("/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(event),
});
const responseHeaders = new Headers(await response.headers);
responseHeaders.set("Access-Control-Allow-Origin", "*");
responseHeaders.set("Content-Type", "application/json");
return new Response(await response.text(), {
status: response.status,
headers: responseHeaders,
});
}
function corsHeaders(): HeadersInit {
return {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
};
}
async function verifyAuth(request: Request, validToken: string): Promise<Response | null> {
const authHeader = request.headers.get("Authorization");
if (!authHeader || !authHeader.startsWith("Bearer ")) {
return new Response(
JSON.stringify({ error: "Missing or invalid Authorization header" }),
{ status: 401, headers: { "Content-Type": "application/json", ...corsHeaders() } }
);
}
const token = authHeader.slice(7);
if (token !== validToken) {
return new Response(
JSON.stringify({ error: "Invalid token" }),
{ status: 401, headers: { "Content-Type": "application/json", ...corsHeaders() } }
);
}
return null;
}
7.3.3 部署配置
# wrangler.toml
name = "ai-chat-gateway"
main = "src/index.ts"
compatibility_date = "2026-06-01"
compatibility_flags = ["nodejs_compat"]
[env.production]
name = "ai-chat-gateway-prod"
[[durable_objects.bindings]]
name = "SESSIONS"
class_name = "ChatSession"
[[vectorize]]
binding = "VECTOR_STORE"
index_name = "knowledge-base"
[[ai]]
binding = "AI"
[vars]
AUTH_TOKEN = "${AUTH_TOKEN}" # 从环境变量读取
7.4 性能数据
基于上述架构的实测性能(从东京 PoP):
| 操作 | 平均延迟 | P99 延迟 | 备注 |
|---|---|---|---|
| Durable Object 初始化(冷) | 95ms | 150ms | 仅首次 |
| Durable Object 初始化(热) | <1ms | 5ms | 内存命中 |
| 对话历史读取(100条) | 3ms | 12ms | RocksDB |
| Workers AI 推理(8B 模型) | 450ms | 800ms | 含网络到 AI 集群 |
| Vectorize 检索(topK=3) | 45ms | 120ms | 含 Embedding |
| 端到端聊天(无 RAG) | 500ms | 900ms | |
| 端到端聊天(含 RAG) | 580ms | 1100ms |
八、Saga 模式实战:从订单处理到资金流转
8.1 订单处理 Saga
完整的订单处理流程,包含库存预留、支付扣款、物流发货、邮件通知四个步骤,每个步骤都有对应的 Saga 补偿逻辑:
export class OrderSagaWorkflow extends WorkflowEntrypoint<Environment, OrderEvent> {
async run(event: OrderEvent, step: WorkflowStep) {
const orderId = event.orderId;
// Step 1: 预留库存(扣减商品库存)
const inventory = await step.do(
"reserve-inventory",
async () => {
return await inventoryService.reserve({
sku: event.sku,
quantity: event.quantity,
idempotencyKey: `${orderId}:reserve-inventory`,
});
},
{
rollback: async ({ output }) => {
if (output === undefined) return;
await inventoryService.release({
reservationId: output.reservationId,
idempotencyKey: `${orderId}:rollback-reserve`,
});
},
}
);
// Step 2: 支付扣款
const payment = await step.do(
"charge-payment",
async () => {
return await paymentService.charge({
customerId: event.customerId,
amount: event.amount,
idempotencyKey: `${orderId}:charge`,
});
},
{
rollback: async ({ output }) => {
if (output === undefined) return;
await paymentService.refund({
chargeId: output.chargeId,
idempotencyKey: `${orderId}:rollback-charge`,
});
},
}
);
// Step 3: 触发发货流程
const shipment = await step.do(
"create-shipment",
async () => {
return await logisticsService.createShipment({
orderId,
address: event.shippingAddress,
idempotencyKey: `${orderId}:create-shipment`,
});
},
{
rollback: async ({ output }) => {
if (output === undefined) return;
await logisticsService.cancelShipment({
shipmentId: output.shipmentId,
idempotencyKey: `${orderId}:rollback-shipment`,
});
},
}
);
// Step 4: 发送确认邮件(无补偿,但有重试保证送达)
await step.do("send-confirmation-email", async () => {
await emailService.send({
to: event.customerEmail,
template: "order-confirmation",
data: { orderId, shipmentId: shipment.shipmentId },
idempotencyKey: `${orderId}:send-email`,
});
// 注意:邮件发送失败会触发重试,但不会触发回滚
// 因为 Saga 补偿无法"撤回"已发送的邮件
// 解决方案:邮件设计为含退订链接的幂等通知,允许重复发送
});
return {
success: true,
orderId,
paymentId: payment.chargeId,
shipmentId: shipment.shipmentId,
};
}
}
8.2 Saga 执行失败处理
当 Saga Rollback 本身也失败时,Cloudflare Workflows 会:
- 记录失败事件:失败步骤和回滚步骤都会被记录在 Workflow 执行历史中
- 发送告警:配置 Cloudflare 的 Alerting 系统通知运维人员
- 保持当前状态:Workflow 不会无限重试,会停在失败状态等待人工介入
- 提供手动恢复 API:可以通过 Cloudflare API 手动触发特定步骤的执行
// 配置告警通知(wrangler.toml 或 Cloudflare Dashboard)
// 当 Workflow 进入 UNABLE_TO_ROLLBACK 状态时发送告警
九、与竞品对比:为什么是 Cloudflare Workers
9.1 vs. AWS Lambda + DynamoDB
| 维度 | Cloudflare Workers + DO | AWS Lambda + DynamoDB |
|---|---|---|
| 执行模型 | Isolate(亚毫秒启动) | Container(冷启动 100ms+) |
| 状态存储 | Durable Objects(本地 RocksDB,<5ms) | DynamoDB(网络往返,10-50ms) |
| 一致性 | 强一致(单实例事务) | 最终一致(默认)/ 强一致(额外付费) |
| 全球延迟 | <10ms(边缘就近) | 50-200ms(取决于区域) |
| 定价 | 请求数 + CPU 时间 | 请求数 + 执行时间 + 数据传输 |
| AI 集成 | Workers AI(原生集成) | Bedrock(需额外配置) |
| 向量搜索 | Vectorize(原生集成) | 需要 Aurora/Elasticsearch |
9.2 vs. Vercel Edge Functions
Vercel Edge Functions 的定位是"轻量级边缘渲染",它的限制:
- 无状态:没有 Durable Objects 这样的状态存储
- V8 Isolates 但有限制:不支持 Durable Objects、KV 事务、AI 推理
- 主要用途:A/B 测试、认证、请求改写
Cloudflare Workers 则是完整的边缘计算平台,可以做 AI 推理、持久化存储、向量搜索、工作流编排。
9.3 vs. Deno Deploy
Deno Deploy 与 Cloudflare Workers 非常接近(都基于 V8 Isolates),但:
- ** Durable Objects 独有**:Cloudflare 专利架构,Deno Deploy 没有等价物
- 生态系统:Cloudflare 生态更成熟(Vectorize、Workers AI、Queues、R2 等)
- AI 推理:Cloudflare Workers AI,Deno Deploy 需要自建
十、总结:边缘计算有状态化的工程意义
Cloudflare Workers + Durable Objects + Workflows 代表的不仅是技术演进,更是一种架构哲学的根本转变:从"状态在数据库里"到"状态在代码旁边"。
这一转变带来了几个深刻的影响:
第一,延迟的天花板被重新定义。 当状态就在请求处理的同一个节点上时,数据库往返的 10-50ms 延迟可以被压缩到 <5ms。对于实时协作、游戏状态、聊天应用这类对延迟敏感的场景,这是质变。
第二,分布式事务的边界被重新划定。 Saga 模式让跨服务的分布式事务可以在边缘节点上本地协调,而不是必须依赖中心化的消息队列或数据库。Cloudflare 全球 300+ PoP 意味着,每个地区的事务协调都在本地完成,跨洋协调的需求大幅减少。
第三,AI 推理不再是大型互联网公司的专利。 Workers AI + Vectorize 的组合,让一个独立开发者也能以极低成本在边缘部署 AI 推理服务——不需要 GPU,不需要 Kubernetes,不需要运维团队。按请求计费的模式意味着你可以从零成本起步,在业务增长时平滑扩展。
第四,开发者体验的范式转移。 Durable Objects 的编程模型——"一个对象代表一个业务实体"——比传统的"所有请求共享数据库"更直观。一个聊天会话就是一个 ChatSession 对象,一个用户账户就是一个 UserAccount 对象,代码即业务模型。
当然,这套体系也有其适用边界:
- 计算密集型任务:Workers 的 CPU 时间上限(50ms)是硬限制,不适合长时间计算
- 超大数据集:Durable Objects 每个实例的存储上限约 6.5GB,超出需要分区
- 强一致性要求跨多实例:Durable Objects 的强一致性只在单实例内,跨实例需要额外设计
- Golang/Rust 开发者:Workers 生态以 JavaScript/TypeScript 为主,非主流语言需要权衡
但对于构建全球分布的实时应用、AI 增强服务、支付与订单处理系统,Cloudflare Workers 平台提供了一套独特而强大的技术栈。它的价值主张不是"更便宜的 Lambda",而是"重新定义了边缘计算的可能性边界"。
2026年,随着 Saga Rollbacks 的发布,Cloudflare Workers 完成了从"边缘函数"到"边缘应用平台"的最后一块拼图。这不是终点,而是新一轮边缘计算革命的起点。