Temporal Replay 2026 深度解析:Serverless Workers + 独立活动 + Workflow 流——持久执行范式重塑 AI Agent 基础设施
前言:为什么分布式系统的可靠性依然是个噩梦
写分布式系统的工程师都清楚一个残酷的现实:任何一步操作都可能在任何时刻失败。网络抖动、API 超时、服务宕机、数据库锁等待——这些不是小概率事件,而是你在生产环境中每天都在对抗的敌人。
为了解决这些问题,我们发明了重试机制、幂等设计、补偿事务、Saga 模式……每一套方案都解决了部分问题,却又带来了新的复杂性。结果是:业务代码被可靠性逻辑淹没,一个简单的"下单-扣款-发货"流程,背后可能要写几百行容错代码。
Temporal 就是来解决这个问题的。它的核心理念是:Write code as if failure doesn't exist——让你像写普通顺序代码一样写分布式系统,框架自动处理一切故障恢复。
2026 年 5 月,Temporal 在 Replay 2026 大会上发布了重磅更新:Serverless Workers、Standalone Activities、Workflow Streams,以及与 Google ADK 和 OpenAI Agents SDK 的深度集成。这些更新不只是功能堆砌,而是从计算模式、执行粒度、流式交互三个维度,对持久执行范式的一次系统性重构。
本文将从架构原理出发,深度解析每一项新能力的本质,并给出代码实战。
一、持久执行:Temporal 的核心抽象
1.1 什么是 Durable Execution
Durable Execution(持久执行)是 Temporal 的核心概念。传统编程中,函数调用是瞬时的——调用栈执行完毕,状态就消失了。但 Temporal 的 Workflow(工作流)不是普通函数,它的状态会在每一步执行后被持久化。
这意味着:
- 即使服务崩溃,Workflow 也能在任意节点恢复,继续执行
- 即使网络中断,Activity(活动)的执行结果会被记录,重试时有据可查
- 即使代码被修改,历史 Workflow 仍按原逻辑 replay,不受影响
# 传统代码(伪代码):崩溃即丢失
def process_order(order_id):
inventory = check_inventory(order_id) # 步骤1
payment = charge_customer(order_id) # 步骤2
ship_order(order_id) # 步骤3
# 如果步骤2和3之间服务崩溃,步骤1的结果丢失了
# Temporal Workflow:崩溃可恢复
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# 框架自动处理重试、状态恢复
inventory = await workflow.execute_activity(
check_inventory, order_id,
start_to_close_timeout=timedelta(seconds=30)
)
payment = await workflow.execute_activity(
charge_customer, order_id,
start_to_close_timeout=timedelta(minutes=5)
)
result = await workflow.execute_activity(
ship_order, order_id,
start_to_close_timeout=timedelta(minutes=10)
)
return f"Order {order_id} processed: {result}"
注意这里的关键:workflow.execute_activity 不是普通函数调用,而是一个持久化的活动调用。Temporal 会在数据库中记录这个调用的输入、输出、执行状态,并保证它最终被完成(即使重试 100 次)。
1.2 Temporal 的三层架构
理解 Temporal 的新功能,需要先搞清楚它的三层架构:
┌─────────────────────────────────────────┐
│ Temporal Service │ ← 持久化状态协调器
│ (Workflow Engine + Activity Workers) │ 管理 Workflow 历史
├─────────────────────────────────────────┤
│ Workers │ ← 执行业务逻辑的进程
│ (注册 Workflow / Activity 处理器) │
├─────────────────────────────────────────┤
│ SDK (Client) │ ← 开发者写的代码
│ (Workflow 定义 + Activity 实现) │
└─────────────────────────────────────────┘
- Temporal Service:核心协调引擎,负责持久化所有 Workflow 状态、管理任务队列、处理重试逻辑
- Workers:部署在你的基础设施上的进程,通过轮询任务队列来执行 Workflow 代码和 Activity 代码
- SDK:开发者使用的客户端库,提供了定义 Workflow 和 Activity 的 API
在 Replay 2026 之前,Workers 必须常驻运行才能接收任务。这是传统架构的限制——你需要管理服务器、维护扩缩容策略、配置健康检查。Serverless Workers 的出现,正是为了解决这个痛点。
二、Serverless Workers:把基础设施负担降为零
2.1 传统 Worker 的运维成本
在 Serverless Workers 出现之前,部署 Temporal Workers 意味着你需要:
- 管理服务器或容器:配置实例类型、自动扩缩容策略
- 处理冷启动:新实例启动需要加载代码、连接数据库,可能有延迟
- 为峰值负载付费:即使大部分时间负载很低,也需要保持一定数量的实例在线
- 配置健康检查和优雅关闭:防止在处理请求时被意外终止
对于初创公司和小型团队来说,这是一笔不小的运维负担。对于 AI Agent 场景尤其棘手——LLM 调用的延迟本身就不稳定,再加上 Worker 的冷启动延迟,用户体验雪上加霜。
2.2 Serverless Workers 的架构原理
Temporal Replay 2026 引入了 Serverless Workers,首个支持的是 AWS Lambda。
┌──────────────────────────────────────────────────┐
│ Temporal Cloud │
│ ┌────────────────────────────────────────────┐ │
│ │ Workflow Coordination Engine │ │
│ │ (状态持久化 + 任务队列 + 重试策略) │ │
│ └────────────────────────────────────────────┘ │
│ ↑ │
│ │ 按需触发 │
│ ↓ │
│ ┌────────────────────────────────────────────┐ │
│ │ AWS Lambda (Serverless Worker) │ │
│ │ │ │
│ │ 函数执行 = 一个 Workflow 或 Activity 的执行 │ │
│ │ 并发数 = Workflow 并行度(自动扩缩) │ │
│ │ 空闲时 = 缩容至零(按需启动) │ │
│ └────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘
核心工作原理:
- 按需触发:Temporal Cloud 不再依赖 Worker 主动轮询任务队列,而是通过 AWS Event Source Mapping,在有任务时主动调用 Lambda 函数
- 自动扩缩:Lambda 的并发执行数自动匹配 Workflow 的并发度——100 个 Workflow 并行,就有 100 个 Lambda 实例同时运行;0 个任务就 0 个实例
- 自动优雅关闭:Lambda 有 15 分钟的超时限制,Temporal 会自动将长时间 Workflow 的状态 checkpoint 分割成多个短任务分批执行
- 零基础设施管理:你不需要配置 ASG(Auto Scaling Group)、健康检查、负载均衡器
2.3 代码实战:部署 Serverless Worker
# AWS Lambda 上部署 Temporal Worker
# temporal_serverless_worker.py
import temporalio
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
# 定义 Activity(可复用的业务步骤)
class OrderActivities:
async def check_inventory(self, order_id: str) -> dict:
# 检查库存
return {"sku": "PROD-001", "available": True, "quantity": 10}
async def charge_customer(self, order_id: str) -> dict:
# 扣款
return {"transaction_id": "TXN-12345", "amount": 99.99, "status": "success"}
async def ship_order(self, order_id: str) -> dict:
# 发货
return {"tracking_number": "SF1234567890", "carrier": "顺丰"}
# 定义 Workflow
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
activities = OrderActivities()
inventory = await workflow.execute_activity(
activities.check_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=30),
# Serverless 模式下推荐设置心跳
heartbeat_timeout=timedelta(seconds=10),
)
if not inventory["available"]:
return f"Order {order_id} failed: out of stock"
payment = await workflow.execute_activity(
activities.charge_customer,
order_id,
start_to_close_timeout=timedelta(minutes=5),
)
shipment = await workflow.execute_activity(
activities.ship_order,
order_id,
start_to_close_timeout=timedelta(minutes=10),
)
return f"Order {order_id} completed: {shipment['tracking_number']}"
# Lambda Handler(Serverless 入口)
def handler(event, context):
"""
AWS Lambda 入口函数
Temporal Cloud 会将任务队列的事件作为 event 传入
"""
import asyncio
async def run_worker():
client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
worker = Worker(
client,
task_queue="serverless-order-queue", # 专用任务队列
workflows=[OrderWorkflow],
activities=[OrderActivities()],
# Serverless 场景下推荐配置:
max_concurrent_workflow_tasks=100, # Lambda 并发上限
max_cached_workflows=50, # 减少内存占用
)
await worker.run()
asyncio.run(run_worker())
# AWS SAM 模板 (template.yaml)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31'
Resources:
TemporalServerlessFunction:
Type: AWS::Serverless::Function
Properties:
Handler: temporal_serverless_worker.handler
Runtime: python3.11
Timeout: 900 # Lambda 最大 15 分钟
MemorySize: 512
Environment:
Variables:
TEMPORAL_NAMESPACE: your-namespace
TEMPORAL_TLS_CERT: /tmp/ca.crt
TEMPORAL_API_KEY: !Ref TemporalApiKey
Policies:
- LambdaInvokeFunctionPolicy
- SecretsManagerReadWrite
Events:
TemporalTrigger:
Type: SQSEvent
Properties:
Queue: !GetAtt TemporalQueue.Arn
BatchSize: 1
TemporalQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 3600
ReceiveMessageWaitTimeSeconds: 20
关键配置说明:
start_to_close_timeout:单个 Activity 的最大执行时间,在 Serverless 模式下应小于 Lambda 的 15 分钟超时heartbeat_timeout:Activity 执行期间的心跳间隔,用于检测长任务的状态max_cached_workflows:限制 Lambda 函数同时管理的 Workflow 缓存数量,降低内存压力- SQS 作为事件源,在有任务时触发 Lambda
2.4 成本分析:Serverless vs 自管理 Worker
| 维度 | 传统 Worker(EC2/ECS) | Serverless Workers(Lambda) |
|---|---|---|
| 基础设施成本 | 按实例小时数计费,即使空闲 | 按实际执行时间(GB-秒)计费,零任务时为零 |
| 扩缩容 | 需要配置 ASG,冷启动 30-60s | 自动,Lambda 冷启动通常 <1s |
| 最大并发 | 受实例数量限制 | 受 Lambda 并发配额限制(默认 1000) |
| 适用场景 | 高吞吐、持续负载 | 突发流量、事件驱动、AI Agent 任务 |
| 适合运行时长 | 无限制 | Activity ≤ 15 分钟(全流程可拆分) |
对于 AI Agent 场景,Serverless Workers 的意义尤为重大:LLM 调用本身是不可预测的突发流量,Serverless 的自动扩缩天然匹配这种特征。
三、Standalone Activities:打破 Workflow 的边界
3.1 Activity 的历史局限
在 Temporal 1.x 版本中,Activity 有一个根本性限制:Activity 只能作为 Workflow 的子步骤存在。
# 旧模式:Activity 必须被 Workflow 包裹
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, job_id: str):
# Activity 只能在 Workflow 内执行
result = await workflow.execute_activity(
process_job,
job_id,
start_to_close_timeout=timedelta(minutes=5)
)
return result
这带来几个现实问题:
- 轻量任务过度工程化:只是想执行一个异步任务(如发送邮件),也要先定义 Workflow,写起来很重
- 外部系统集成困难:当你想让外部服务触发 Temporal Activity 时,没有简单的方式
- 任务队列 + 重试逻辑的重复发明:很多团队有自己的任务队列系统(如 Sidekiq、Celery),想要 Temporal 的重试保证,但 Activity 只能从 Workflow 内部调用
3.2 Standalone Activities 的架构革新
Replay 2026 引入的 Standalone Activity(独立活动),允许 Activity 独立于 Workflow 运行,同时保留 Temporal 的所有可靠性保证:
- 自动重试(可配置重试策略)
- 持久化执行状态
- 超时管理
- 心跳检测
旧模式:
Workflow → Activity → Activity → Activity
(必须在 Workflow 内调用)
新模式:
Workflow → Activity → Activity → Standalone Activity (可独立触发)
↓
直接从外部系统/API 调用
Standalone Activity 的使用场景:
- 外部系统 webhook 触发:支付平台回调后直接触发对账 Activity
- 定时任务:Cron 作业直接触发数据同步 Activity
- 任务队列替代:用 Standalone Activity 替代 Celery/Sidekiq,享受更好的可靠性
- 微服务间调用:服务 A 直接调用服务 B 的 Activity,无需共享 Workflow 定义
3.3 代码实战:Standalone Activity
# standalone_activity.py
import asyncio
from datetime import timedelta
from temporalio import activity
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.service_common import RetryPolicy
# 定义一个 Activity(既可以standalone运行,也可以在Workflow内运行)
class DataSyncActivities:
@activity.defn
async def sync_orders(self, date: str) -> dict:
"""
同步订单数据——这个 Activity 既可以从 Workflow 调用,
也可以独立触发(Standalone 模式)
"""
print(f"Syncing orders for date: {date}")
# 模拟从外部系统拉取数据
orders = await fetch_orders_from_external_system(date)
# 处理数据
processed = transform_orders(orders)
# 入库
await insert_into_database(processed)
return {
"date": date,
"total_orders": len(orders),
"processed": len(processed),
"status": "success"
}
@activity.defn
async def send_notification(self, user_id: str, message: str) -> dict:
"""发送通知"""
return await email_service.send(user_id, message)
async def main():
client = await Client.connect("localhost:7233")
# 方法1:在 Worker 中注册(用于 Workflow 内调用)
worker = Worker(
client,
task_queue="data-sync-queue",
activities=[DataSyncActivities()],
)
# 方法2:独立触发 Activity(无需 Worker,直接通过客户端调用)
# 这行代码可以在任何地方执行——Webhook handler、定时任务、主程序
result = await client.start_activity(
DataSyncActivities().sync_orders,
arg="2026-06-30",
task_queue="data-sync-queue",
# 标准 Temporal 可靠性配置
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(
maximum_attempts=5,
initial_interval=timedelta(seconds=10),
backoff_coefficient=2.0,
non_retryable_error_types=["ValidationError"],
),
# 心跳用于长时间任务
heartbeat_timeout=timedelta(seconds=30),
)
print(f"Activity result: {result}")
# 作为独立程序运行(定时任务或 Webhook)
if __name__ == "__main__":
asyncio.run(main())
# 用 Standalone Activity 替代 Celery 的任务队列
# app/tasks.py
from temporalio.client import Client
from app.activities import DataSyncActivities
import asyncio
async def trigger_order_sync():
"""定时触发订单同步任务——替代 Celery 的 periodic_task"""
client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
# 异步触发,不需要等待结果(fire-and-forget)
handle = await client.start_activity(
DataSyncActivities().sync_orders,
arg="2026-06-30",
task_queue="data-sync-queue",
start_to_close_timeout=timedelta(hours=2),
# 失败重试由 Temporal 自动处理,无需 Celery 的 acks_late
retry_policy=RetryPolicy(maximum_attempts=10),
id="order-sync-2026-06-30", # 幂等 ID,防止重复执行
)
print(f"Triggered activity: {handle.id}")
return handle.id
# 定时触发(配合系统 cron 或 APScheduler)
# crontab: 0 2 * * * python -m app.tasks
Standalone Activity vs 传统任务队列的对比:
| 特性 | Celery/Sidekiq | Standalone Activity |
|---|---|---|
| 失败重试 | 需要 acks_late + 配置 | 自动,可配置重试策略 |
| 执行历史 | 需接入监控才能查看 | 内置完整执行历史 |
| 任务持久化 | Broker 需持久化配置 | Temporal Service 持久化 |
| 超时管理 | 需手动配置 | 原生支持 |
| 跨服务调用 | 需序列化 + 网络调用 | 统一的 Activity 接口 |
| 与 Workflow 集成 | 不支持 | 可以无缝移入 Workflow |
四、Workflow Streams:实时 LLM 交互的持久化保证
4.1 AI Agent 流式输出的可靠性困境
AI Agent 的一个核心特征是流式输出:LLM 的 token 是逐个生成的,用户需要实时看到响应。但流式场景引入了独特的可靠性挑战:
- Token 生成中断:用户在看到部分回答时刷新页面,已经生成的 token 丢失
- Agent 工具调用中间状态:Agent 调用工具时,前端无法实时感知"正在调用"状态
- 流式回滚困难:如果 LLM 在生成长回答的中间失败,已生成的 token 无法恢复
传统方案是放弃流式,用批量响应代替——但这严重损害用户体验。Temporal 的 Workflow Streams 解决了这个问题。
4.2 Workflow Streams 的工作原理
Workflow Streams 利用 Temporal 的 Signal(信号)和 Update(更新) 机制,实现了持久的流式输出:
LLM token 生成
↓
Temporal Workflow 捕获 token
↓
Signal 推送到 Temporal Service(持久化)
↓
前端 WebSocket/SSE 接收(实时)
↓
即使页面刷新 → 从 Temporal 重新拉取 → 恢复流状态
关键点:流数据被持久化了。即使前端崩溃、网络中断,Workflow 仍然在后台运行,重新连接后可以恢复到中断的位置继续接收 token。
4.3 代码实战:构建持久化 AI 对话流
# workflow_streams_demo.py
import asyncio
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.types import ClassProxy
# 定义流式 Agent Workflow
@workflow.defn
class StreamingAgentWorkflow:
@workflow.run
async def run(self, user_query: str, tools: list[dict]) -> str:
"""
持久化流式 AI Agent:
- 每次 LLM 生成 token 都通过 workflow.signal() 推送
- 即使连接中断,Workflow 继续运行,重新连接后继续接收
"""
messages = [{"role": "user", "content": user_query}]
full_response = ""
# 获取流式 LLM 响应
async for token in self.call_llm_streaming(messages):
full_response += token
# 关键:每生成一个 token 就持久化到 Workflow 状态
await workflow.execute_update(
self.on_token_update,
args=[token, full_response],
)
# 同时 Signal 给所有已连接的客户端
await workflow.signal(
self.on_new_token,
token, # Signal 参数
)
# 工具调用阶段
tool_calls = self.extract_tool_calls(full_response)
for tool_call in tool_calls:
# 工具调用也持久化
await workflow.signal(self.on_tool_call_start, tool_call)
result = await workflow.execute_activity(
run_tool,
tool_call,
start_to_close_timeout=timedelta(minutes=5),
)
messages.append({"role": "assistant", "content": full_response})
messages.append({"role": "tool", "tool_call_id": tool_call["id"], "content": result})
await workflow.signal(self.on_tool_call_complete, tool_call, result)
return full_response
async def call_llm_streaming(self, messages):
"""调用 LLM 流式 API,每次 yield 一个 token"""
# 实际实现中调用 OpenAI / Anthropic 流式 API
async for token in llm_client.chat.completions.create(
messages=messages,
model="gpt-4o",
stream=True,
):
yield token.choices[0].delta.content or ""
async def on_token_update(self, token: str, full_response: str):
"""Update handler:更新 Workflow 内部的流状态"""
pass # 状态已在 Workflow run 方法中管理
@workflow.signal
async def on_new_token(self, token: str):
"""Signal handler:推送给 WebSocket 客户端"""
# WebSocket 服务通过这个 Signal 获取每个 token
pass
@workflow.signal
async def on_tool_call_start(self, tool_call: dict):
pass
@workflow.signal
async def on_tool_call_complete(self, tool_call: dict, result: str):
pass
# WebSocket 服务端:消费 Workflow Streams
class WebSocketStreamConsumer:
async def handle_connection(self, websocket, workflow_id: str):
client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
handle = client.get_workflow_handle(workflow_id)
# 方法1:Query 当前已生成的 token(页面刷新后恢复)
query_result = await handle.query(StreamingAgentWorkflow.get_tokens)
# 方法2:Signal 驱动——订阅新的 token
stream = handle.signal_stream(
StreamingAgentWorkflow.on_new_token,
)
async for token in stream:
await websocket.send_text(token)
@workflow.query
async def get_tokens(self) -> list[str]:
"""Query:恢复已生成的 token 列表"""
return self.tokens
# 前端代码(浏览器)
async def connect_to_workflow(workflow_id):
ws = await websockets.connect(f"wss://your-server.com/ws/{workflow_id}")
try:
# 先查询已生成的 token,恢复页面状态
existing_tokens = await query_workflow_state(workflow_id)
display_tokens(existing_tokens)
# 订阅新 token
async for token in ws:
append_token_to_display(token)
except websockets.ConnectionClosed:
# 连接中断,但 Workflow 仍在运行
# 重新连接时,页面从 query 恢复
reconnect_delay = 2
await asyncio.sleep(reconnect_delay)
await connect_to_workflow(workflow_id)
4.4 流式输出的持久化 vs 非持久化对比
| 维度 | 普通流式(Server-Sent Events) | Workflow Streams |
|---|---|---|
| 刷新页面 | 丢失所有已生成内容 | 从 Temporal 恢复完整状态 |
| 网络中断 | 流中断,需重新完整请求 | 从中断处继续接收 |
| 服务重启 | 已生成的 token 丢失 | Workflow 继续运行,持久化保证 |
| 工具调用状态 | 前端无感知 | Signal 推送工具调用开始/完成 |
| 多客户端同步 | 难以实现 | 每个 Signal 可广播给所有客户端 |
五、外部负载存储:大型 AI 输入输出的可靠管理
5.1 AI 场景的负载困境
AI Agent 的一个独特挑战是大 Payload:一次 LLM 调用可能输入/输出 MB 级别的文本(长文档、代码库、图像 base64),而 Temporal 默认将所有负载存储在其 PostgreSQL 或 Cassandra 后端中。
这带来了两个实际问题:
- 数据库压力:大量 AI Workflow 的输入输出会撑爆数据库存储
- 传输效率:大 Payload 在 Workflow 状态序列化和网络传输中开销巨大
5.2 External Payload Storage 的架构
Temporal Replay 2026 引入了 External Storage(外部负载存储),允许你将大型 Payload 存储在外部系统(首选 AWS S3),Temporal 只存储对这些外部资源的引用:
Workflow 执行大型 LLM 调用
↓
Temporal 识别 Payload > 阈值(如 64KB)
↓
Payload 上传至 AWS S3(加密、分桶)
↓
Temporal 数据库只存储 S3 URI 引用
↓
Activity 读取结果时,从 S3 按需拉取
5.3 代码实战:S3 外部存储配置
# external_storage.py
from temporalio.contrib.external_storage import (
S3StoragePlugin,
S3StorageConfig,
)
from temporalio.client import Client
from temporalio.worker import Worker
# 配置 S3 外部存储
s3_config = S3StorageConfig(
bucket="your-temporal-payloads",
region="us-east-1",
prefix="ai-workflows/",
# SSE-KMS 加密,保障数据安全
sse_kms_key_id="arn:aws:kms:us-east-1:123456789012:key/xxx",
# 自动清理:Workflow 完成后 7 天删除 S3 对象
auto_cleanup_after_days=7,
)
storage_plugin = S3StoragePlugin(s3_config)
async def main():
client = await Client.connect(
"your-namespace.temporalCloud.cloud:7233",
# 启用外部存储
data_converter=storage_plugin.to_temporal_data_converter(
# 超过 64KB 的 Payload 自动存 S3
large_payload_threshold=64 * 1024,
),
)
# 在 Workflow 中处理大 Payload
@workflow.defn
class DocumentProcessingWorkflow:
@workflow.run
async def run(self, document_url: str) -> dict:
# 下载文档(可能 10MB+)
document = await workflow.execute_activity(
download_document,
document_url,
start_to_close_timeout=timedelta(minutes=10),
)
# 这个 document 超过阈值,自动存 S3,Workflow 状态只存 URI
# 时空复杂度: O(1) 而不是 O(n),n=文档大小
# LLM 分析文档
analysis = await workflow.execute_activity(
analyze_with_llm,
document, # 实际传的是 S3 URI
start_to_close_timeout=timedelta(minutes=30),
)
return analysis
六、AI 生态集成:Google ADK 与 OpenAI Agents SDK
6.1 为什么 AI Agent 需要 Temporal
构建 AI Agent 时,开发者通常面临三个核心挑战:
- LLM 调用不可靠:GPT-4 API 有 1-2% 的失败率,没有重试会丢任务
- 工具执行无持久化:Agent 调用工具失败后,需要重新调用 LLM,已生成的上下文可能丢失
- 多步任务无状态:一个 Agent 执行 20 步任务,第 15 步崩溃,一切重来
Temporal 的答案是:让 AI Agent 的每一步都变成持久化的 Activity。这样,即使 LLM 调用失败、工具执行出错,Agent 都能从断点恢复,完整重放所有已完成的步骤。
6.2 Google ADK 集成
Google Agent Development Kit (ADK) 是 Google 提供的 AI Agent 开发框架。Temporal 与 Google ADK 的集成,使得 ADK Agent 的每个 LLM 调用和工具执行都在 Temporal 的持久化保障下运行:
# google_adk_integration.py
import asyncio
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from temporalio import workflow
from temporalio.worker import Worker
from temporalio.client import Client
# 将 ADK Agent 的工具封装为 Temporal Activity
class ADKToolsAsActivities:
def __init__(self, adk_agent: Agent):
self.adk_agent = adk_agent
@workflow.activity.defn
async def call_llm(self, messages: list, tools: list) -> dict:
"""ADK Agent 的 LLM 调用作为 Activity 执行"""
response = await self.adk_agent.run_async(
messages=messages,
tools=tools,
)
# Temporal 自动重试,失败不会丢失上下文
return {
"content": response.text,
"tool_calls": response.tool_calls or [],
}
@workflow.activity.defn
async def execute_tool(self, tool_name: str, args: dict) -> dict:
"""Agent 工具执行作为 Activity,持久化结果"""
result = await self.adk_agent.tools[tool_name].run_async(**args)
# 工具执行结果被 Temporal 持久化,
# Agent 重试时无需重新执行工具
return result
# ADK Agent Workflow:结合 ADK 灵活性和 Temporal 可靠性
@workflow.defn
class DurableADKAgent:
@workflow.run
async def run(self, user_goal: str) -> str:
tools = ADKToolsAsActivities(self.adk_agent)
messages = [{"role": "user", "content": user_goal}]
max_steps = 20
step = 0
while step < max_steps:
step += 1
# LLM 调用作为 Activity(有自动重试保证)
llm_response = await workflow.execute_activity(
tools.call_llm,
args=[messages, self.get_tools_description()],
start_to_close_timeout=timedelta(seconds=120),
retry_policy=RetryPolicy(maximum_attempts=3),
)
if not llm_response.tool_calls:
# Agent 决定完成
return llm_response.content
# 工具调用作为 Activity(结果持久化)
for tool_call in llm_response.tool_calls:
tool_result = await workflow.execute_activity(
tools.execute_tool,
args=[tool_call.name, tool_call.args],
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(maximum_attempts=5),
)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result,
})
# 工具结果被持久化,Agent 重试时不再重复执行
return "Max steps reached"
6.3 OpenAI Agents SDK 沙箱支持
OpenAI 的 Agents SDK 引入了沙箱(Sandbox) 概念,用于安全隔离 Agent 的代码执行环境。Temporal 与 OpenAI Agents SDK 的集成,使得沙箱的执行也具有持久化保证:
# openai_agents_integration.py
from temporalio.contrib.openai_agents import (
TemporalSandboxRunner,
)
# OpenAI Agents SDK 使用 Temporal 作为沙箱运行器
# 每次 Agent 在沙箱中执行命令时:
# 1. 命令作为 Activity 在 Temporal 中执行
# 2. 执行结果(包括文件系统变更)被持久化
# 3. 即使沙箱崩溃,Agent 可以无缝恢复并继续执行
sandbox_runner = TemporalSandboxRunner(
task_queue="openai-sandbox-queue",
sandbox_ttl=timedelta(minutes=30),
# 沙箱文件变更自动持久化
persist_files=["/tmp/agent_output/", "/tmp/agent_logs/"],
)
# OpenAI Agent 在 Temporal 管理的沙箱中运行
agent = Agent(
model="gpt-4o",
tools=[...],
sandbox_runner=sandbox_runner, # 沙箱执行有持久化保证
)
七、生产运营:规模化的可靠性保障
Replay 2026 还带来了一系列面向生产环境的运营能力升级,这些对大规模部署 Temporal 的团队至关重要:
7.1 Worker Versioning(Worker 版本控制)
当你要更新 Worker 的代码时,传统的做法是:
- 部署新版本 Workers
- 旧版本 Workers 优雅关闭
- 等待所有在途任务完成
问题在于:如果 Workflow A 正在用旧版代码执行步骤 1,你需要等它完整结束才能完全切换到新版本。
Worker Versioning 允许你在同一个 Task Queue 中运行多个版本的 Workers,Temporal 会智能路由:
- 新 Workflow → 自动路由到新版 Worker
- 旧 Workflow(历史版本代码) → 路由到对应版本的 Worker
- 逐步迁移:灰度发布,先 10% 流量走新版本,稳定后逐步扩大
# worker_versioning.py
from temporalio.worker import Worker, BuildIdBasedVersioning
async def main():
client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
# v1 Worker(仍在运行,保护历史 Workflow)
v1_worker = Worker(
client,
task_queue="order-processing",
build_id="v1.0.0", # 版本标识
versioning_intent=BuildIdBasedVersioning.Intent.QUEUE,
workflows=[OrderWorkflow],
activities=[OrderActivities()],
)
# v2 Worker(处理新 Workflow)
v2_worker = Worker(
client,
task_queue="order-processing",
build_id="v2.0.0",
# Auto-upgrade:新 Workflow 自动用新版本
versioning_intent=BuildIdBasedVersioning.Intent.AUTO,
workflows=[OrderWorkflowV2], # 新版 Workflow
activities=[OrderActivitiesV2()],
)
7.2 Task Queue Priority & Fairness
在多租户或混合负载场景下,不同类型的任务需要不同的优先级:
# task_queue_priority.py
from temporalio.worker import TaskQueueSortHint
async def setup_priority_queues():
client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
# 高优先级队列:支付、金融交易(延迟 < 100ms)
high_priority_worker = Worker(
client,
task_queue="payment-tasks",
activities=[PaymentActivities()],
task_queue_sort_hint=TaskQueueSortHint.HIGH_THROUGHPUT,
max_concurrent_activities=200, # 高并发
)
# 低优先级队列:报表生成、数据导出(可延迟)
low_priority_worker = Worker(
client,
task_queue="report-tasks",
activities=[ReportActivities()],
task_queue_sort_hint=TaskQueueFairness.SMOOTH, # 平滑分配
max_concurrent_activities=10, # 限制并发
)
7.3 Worker Status UI 与 OpenMetrics 集成
Replay 2026 引入的 Worker Status UI 和 OpenMetrics 支持,让 Temporal 的可观测性达到了生产级标准:
# prometheus.yml 配置 Temporal OpenMetrics
scrape_configs:
- job_name: 'temporal'
metrics_path: '/metrics'
params:
format: ['openmetrics']
static_configs:
- targets: ['temporal-frontend.namespace.svc:9090']
relabel_configs:
# 提取关键指标
- source_labels: [__name__]
regex: 'temporal_workflow_(started|completed|failed)'
# 关键可观测性指标
# temporal_workflow_started_total - Workflow 启动总数
# temporal_activity_execution_latency - Activity 执行延迟
# temporal_workflow_task_latency - Workflow 任务调度延迟
# temporal_retry_count - Activity 重试次数(反映系统稳定性)
# temporal_resource_usage - Worker 资源使用率
八、性能调优:生产环境的血泪经验
基于大量生产案例,以下是我总结的 Temporal 性能调优核心策略:
8.1 Activity 超时配置的艺术
超时配置太短 → 误判失败,触发不必要的重试
超时配置太长 → 故障检测延迟,影响恢复速度
# 黄金法则:根据 P99 延迟的 3 倍来设置超时
@workflow.activity.defn
async def fast_api_call(self, endpoint: str) -> dict:
# 正常 P99 = 200ms → 超时设为 1s
...
@workflow.activity.defn
async def database_query(self, query: str) -> list:
# 正常 P99 = 500ms → 超时设为 3s
# 添加重试率考虑:API 有 1% 超时率
...
@workflow.activity.defn
async def llm_completion(self, prompt: str) -> str:
# LLM P99 = 10s(考虑峰值)→ 超时设为 60s
# 但加上重试_policy: initial_interval=2s, max_attempts=3
# 实际最大等待 = 2s + 4s + 8s = 14s,仍在超时内
...
8.2 Heartbeat:长时间任务的生命线
对于可能运行超过 1 分钟的 Activity,必须配置心跳:
@workflow.activity.defn
async def train_ml_model(self, config: dict) -> dict:
for epoch in range(config["epochs"]):
# 执行训练步骤
loss = await training_step(epoch)
# 心跳:报告进度,延长超时
# Temporal 会自动延长 start_to_close_timeout
activity.heartbeat(
progress=f"epoch {epoch+1}/{config['epochs']}",
current_loss=loss,
)
8.3 Workflow 状态最小化原则
Temporal 通过事件溯源(Event Sourcing)来重建 Workflow 状态——每次你访问 Workflow 的变量,框架都会 replay 历史事件。
# 避免的做法:存储大量数据在 Workflow 状态中
@workflow.run
async def run(self):
large_data = await workflow.execute_activity(fetch_huge_dataset)
self.all_data = large_data # ❌ 大数据存在 Workflow 状态中,replay 极慢
return process(self.all_data)
# 推荐的写法:只存储引用,结果在 Activity 间传递
@workflow.run
async def run(self):
data_ref = await workflow.execute_activity(fetch_and_store_to_s3, config)
# 只存储 S3 URI,不存储数据本身
self.s3_ref = data_ref
result = await workflow.execute_activity(process_from_s3, data_ref)
return result
九、选型指南:什么时候用 Temporal
Temporal 不是万能药。以下是我的决策框架:
9.1 适合 Temporal 的场景
- 多步骤业务流程:订单处理、金融交易、数据 ETL(>3 个步骤,有状态依赖)
- AI Agent:LLM 调用 + 工具执行 + 长时推理(需要失败恢复和状态持久化)
- 跨系统协调:需要保证跨多个服务/数据库的操作最终一致性
- 长时间运行任务:批处理、报表生成、ML 训练(>5 分钟)
- 需要完整执行历史的场景:金融合规、审计要求
9.2 可能过度设计的场景
- 简单的一次性任务:发送一封邮件,不需要持久化
- 无状态微服务:每个请求独立处理,不需要跨请求状态
- 强一致性要求:Temporal 是最终一致性,不适合银行转账等强一致性场景
- 团队刚起步:Temporal 有学习曲线,MVP 阶段可以先用简单队列
总结:持久执行范式的成熟化
Temporal Replay 2026 的更新,标志着持久执行这个理念从技术极客圈走向了主流工程实践:
- Serverless Workers 让 Temporal 的基础设施成本降到零,使小团队也能用上企业级可靠性
- Standalone Activities 将 Temporal 的能力从"编排"扩展到"任务执行",成为一个通用的可靠任务运行时
- Workflow Streams 解决了 AI Agent 流式交互的可靠性问题,让持久化覆盖到每个 token 级别
- AI 生态集成(Google ADK、OpenAI Agents SDK)使得 Temporal 成为 AI Agent 基础设施的标准选择
- 生产运营能力(Worker Versioning、Task Queue Priority、OpenMetrics)让 Temporal 在大规模生产环境中可维护
正如 Temporal 官网的那句宣言:"Write code as if failure doesn't exist"——这不只是 slogan,而是一套经过工程化验证的可靠执行范式。2026 年的这波更新,让这个范式真正走向了成熟。
如果你正在构建需要可靠性的 AI 应用,或者厌倦了每天和分布式系统的失败模式搏斗,Temporal Replay 2026 的这些新能力,值得你认真评估。
参考链接:
- Temporal 官方文档:https://docs.temporal.io
- Replay 2026 公告:https://temporal.io/blog/replay-2026-product-announcements
- Serverless Workers 文档:https://docs.temporal.io/evaluate/serverless-workers
- Standalone Activities 文档:https://docs.temporal.io/standalone-activity
- Workflow Streams 文档:https://docs.temporal.io/develop/python/workflows/workflow-streams
- External Storage 文档:https://docs.temporal.io/external-storage