背景:分布式系统的可靠性困境
分布式系统开发者每天都在和同一类问题搏斗:网络抖动导致 API 调用失败、部分成功的事务如何回滚、长流程执行到一半进程崩溃怎么办、跨服务协调时代码状态如何保持一致。
这些问题在传统的微服务架构下已经足够棘手。当 AI Agent 出现后,情况变得更糟——一个 Agent 可能需要调用数十次 LLM API、操作多个外部工具、执行数小时的推理链路。如果中途有任何一步失败,是重试整个流程?还是从断点恢复?抑或干脆让用户重新发起?
Temporal 给了我们一个答案:把「失败」从开发者的心智负担中彻底移除。代码执行到哪一步,状态就持久化到哪一步;任何故障恢复后,自动从断点继续,不需要任何补偿逻辑。
2026 年 6 月,Temporal 在 Replay 2026 大会上宣布了下一代平台能力——Serverless Workers、Standalone Activities、Workflow Streams,以及对 Google ADK 和 OpenAI Agents SDK 的深度集成。本文将深入解析这些能力的底层原理、适用场景,以及对 AI 工程化未来的影响。
一、持久执行:重新理解「代码执行」
1.1 传统编程模型的隐含假设
传统编程模型有一个隐含的假设:代码执行是连续的、有状态的,且进程存续期间状态有效。一旦进程重启,所有内存状态消失,你需要从数据库或缓存中重建。
# 传统 Web 服务:状态在内存中,进程重启 = 状态丢失
class OrderProcessor:
def __init__(self):
self.processed_items = [] # 内存状态
async def process_order(self, order_id: str):
items = await fetch_order_items(order_id)
for item in items:
result = await charge_payment(item)
self.processed_items.append(result) # 内存状态更新
这个模型在单机、单进程场景下完全正常。但一旦你需要:
- 跨服务协调
- 处理网络故障
- 支持暂停/恢复
- 应对进程崩溃
你就要写大量胶水代码:幂等设计、补偿事务、状态持久化、重试逻辑……这些代码往往比业务逻辑本身还复杂。
1.2 Event Sourcing 的思路
一种思路是 Event Sourcing——把状态变更全部记录为不可变事件,恢复状态时从事件序列重放:
# Event Sourcing 思路:所有操作都是事件
events = [
OrderPlaced(order_id="ORD-001"),
PaymentCharged(item_id="ITEM-A", amount=100),
PaymentCharged(item_id="ITEM-B", amount=200),
InventoryReserved(item_id="ITEM-A"),
# ... 进程崩溃了
]
# 恢复时,从头重放所有事件重建状态
state = OrderState()
for event in events:
state = apply_event(state, event)
这解决了部分问题,但你仍然需要自己管理事件存储、处理并发、处理部分失败。Temporal 做的,是把这个模式平台化——平台负责事件持久化、状态重建、故障恢复,你只需要写业务代码。
1.3 Temporal 的核心理念:代码即状态机
Temporal 的核心抽象是 Workflow——一个用你熟悉的编程语言编写的、有确定性的状态机:
# Temporal Workflow:确定性 + 持久化状态
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str) -> OrderResult:
# Temporal 自动捕获每一步的状态
order = await activities.fetch_order(order_id)
total = 0
for item in order.items:
receipt = await activities.charge_payment(item)
await activities.reserve_inventory(item)
total += receipt.amount
# 即使 Worker 崩溃,replay 时会从这里继续
invoice = await activities.generate_invoice(order_id, total)
return OrderResult(order_id=order_id, total=total, invoice_id=invoice.id)
关键点:Workflow 代码必须具有确定性——给定相同的输入,必须产生相同的结果。这意味着你不能直接调用随机数、当前时间、或外部不可控状态,这些都必须通过 Temporal 的 Activity 机制来访问。
二、Temporal 架构深度解析
2.1 核心组件
┌─────────────────────────────────────────────────────┐
│ Temporal Service │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │
│ │ Frontend │ │ Matching │ │ History Host │ │
│ │ Gateway │──│ Service │──│ (状态机引擎) │ │
│ └──────────┘ └──────────┘ └──────────────────┘ │
│ │ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Persistence Layer │ │
│ │ (Cassandra / PostgreSQL / MySQL) │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────────┐
│ Worker A │ │ Worker B │
│ [Workflow Code] │ │ [Activity Code] │
│ [Activity Code] │ │ │
└──────────────────┘ └──────────────────────┘
- History Host:Temporal 的核心引擎,负责管理每个 Workflow 实例的状态历史(所有执行事件)
- Matching Service:将 Workflow/Activity 任务分发给可用的 Worker
- Persistence Layer:使用数据库存储事件历史(默认 Cassandra,也支持 PostgreSQL/MySQL)
- Worker:运行你的 Workflow 代码和 Activity 代码的进程
2.2 事件历史:持久执行的秘密
Temporal 对每个 Workflow 维护一个不可变的事件历史:
WorkflowExecutionStarted
↓
ActivityTaskScheduled (fetch_order)
↓
ActivityTaskCompleted (fetch_order → {items: [...]})
↓
ActivityTaskScheduled (charge_payment ITEM-A)
↓
ActivityTaskCompleted (charge_payment ITEM-A → {receipt_id: R1})
↓
ActivityTaskScheduled (charge_payment ITEM-B)
← Worker 崩溃!重新连接到 Matching Service
↓
WorkflowTaskStarted ← Temporal 自动调度 Workflow 继续执行
↓
(replay 历史) → 跳过已完成的 Activity,从 charge_payment ITEM-B 继续
Workflow 代码的每次执行(await activities.xxx())都产生一个事件。故障恢复后,Worker 重连,从断点继续执行——Temporal 在幕后处理了状态重建和任务调度。
2.3 Workflow 代码的确定性约束
由于 Temporal 需要 replay 历史来确定性地重建状态,Workflow 代码必须遵守以下约束:
# ✅ 允许:确定性操作
@workflow.defn
class GoodWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# 数学运算、字符串操作、workflow.signal() 接收的数据
result = compute_price(100, 0.15) # 确定性
status = workflow.wait_until(lambda: condition_met()) # Temporal 原语
# ❌ 禁止:非确定性操作
class BadWorkflow:
@workflow.run
async def run(self) -> None:
import random
value = random.random() # 禁止:每次结果不同,replay 会不一致
import time
now = time.time() # 禁止:时间在流逝,replay 时值会变
import os
pid = os.getpid() # 禁止:Worker 可能不同
# 应该通过 Activity 访问外部状态
current_time = await activities.get_current_time() # ✅
random_value = await activities.get_random_id() # ✅
这个约束是 Temporal 能工作的基础,但也要求开发者有意识地分离「纯业务逻辑(Workflow)」和「有副作用的操作(Activity)」。
三、Activity:与外部世界交互的桥梁
3.1 Activity 的定义与特点
Activity 是 Workflow 中所有「可能失败、需要重试、与外部系统交互」的逻辑单元:
# Activity 定义:可以包含重试逻辑、超时控制、心跳
@activities.defn
class OrderActivities:
@activity.defn
async def charge_payment(self, item: Item) -> PaymentReceipt:
"""Activity 默认带重试,失败自动重试直到超时"""
response = await http.post(
f"https://payment.example.com/charge",
json={"item_id": item.id, "amount": item.price}
)
return PaymentReceipt.parse_obj(response.json())
@activity.defn
@activity.retry(maximum_attempts=1) # 只重试一次
@activity.start_to_close_timeout(timedelta(seconds=5))
async def send_notification(self, user_id: str, message: str) -> None:
"""指定重试策略和超时"""
await email_service.send(user_id, message)
3.2 Activity 的执行语义
Activity 与 Workflow 的关键区别:
| 特性 | Workflow | Activity |
|---|---|---|
| 执行保证 | 确定性,可 replay | 最多一次/至少一次/精确一次 |
| 重试 | 无(代码是纯函数式) | 有(通过注解配置) |
| 超时控制 | 由平台管理 | 可自定义 start_to_close、schedule_to_start |
| 并发 | 单线程(事件循环驱动) | 可并行 |
| 状态持久化 | 每次 await 持久化 | 仅开始/结束事件 |
3.3 心跳机制:长时 Activity 的优雅处理
对于可能运行几分钟甚至几小时的 Activity(如 AI 推理、批量数据处理),Temporal 提供了心跳机制:
@activity.defn
async def train_model(self, dataset_id: str, config: ModelConfig) -> str:
"""长时 Activity:训练机器学习模型,可能需要数小时"""
dataset = await self.load_dataset(dataset_id)
model = Model(config)
for epoch in range(config.total_epochs):
# 定期心跳:报告进度,供取消/查询使用
activity.heartbeat(f"Epoch {epoch}/{config.total_epochs} completed")
# 检查是否被取消
if activity.is_cancelled():
raise ActivityError("Training cancelled by user")
await model.train_epoch(dataset)
model_path = await self.save_model(model)
return model_path
心跳让 Workflow 可以查询 Activity 的中间进度,也允许用户在长时 Activity 运行时取消执行。Temporal 的取消是协作式的——不是强制 kill,而是设置取消标志,Activity 自行检查并优雅退出。
四、Replay 2026 新特性深度解析
4.1 Serverless Workers:零运维的弹性扩展
传统 Temporal 部署要求开发者自己管理 Worker 进程——需要配置服务器、设置扩缩容策略、处理冷启动问题。
Serverless Workers 让 Temporal 本身成为执行运行时:你不需要自己维护任何 Worker 进程。Temporal Cloud 自动为你分配计算资源,按实际执行时间计费。
# Serverless Workers 使用方式:对开发者代码零修改
# 原有代码无需改变,Temporal 自动处理弹性扩缩容
# 只需要在启动时指定 runtime
from temporal.serverless import start_runtime
start_runtime(
workflows=[OrderWorkflow, ShipmentWorkflow],
activities=[charge_payment, reserve_inventory],
# Temporal 会在需要时自动扩缩 Worker
)
技术原理:Temporal 在 Serverless 模式下将 Workflow 执行调度到其托管的容器化运行时,而非用户自建的 Worker 池。冷启动时使用预热的容器池来降低延迟。
适用场景:
- 峰值流量不可预测的业务(如电商大促)
- 不希望维护基础设施的开发团队
- 临时性/事件驱动的工作流
4.2 Standalone Activities:独立执行与跨租户复用
传统的 Activity 必须与特定 Workflow 定义绑定,通过 Task Queue 路由到特定 Worker 池。Standalone Activities 允许 Activity 独立注册和调用,跨 Workflow 复用。
# Standalone Activity:不再绑定到特定 Workflow
from temporal import activity
@activity.defn
@activity.standalone # 新特性:声明为 Standalone Activity
async def check_credit_score(self, customer_id: str) -> CreditScore:
"""一个可被任何 Workflow 调用的共享 Activity"""
score = await external_api.get_credit_score(customer_id)
return CreditScore(value=score, provider="equifax")
# 在任何 Workflow 中使用
@workflow.defn
class LoanApprovalWorkflow:
@workflow.run
async def run(self, application_id: str) -> ApprovalResult:
# 独立 Activity 通过动态路由找到合适的处理者
score = await activities.execute("check_credit_score", customer_id="C-123")
if score.value < 600:
return ApprovalResult(approved=False, reason="低信用分")
return ApprovalResult(approved=True)
技术优势:
- Activity 实现可以在多个 Workflow 间共享,减少代码重复
- Activity 的版本管理更独立,不受 Workflow 版本影响
- 支持 Activity 的跨 Namespace 调用(不同业务域的 Activity 互相调用)
4.3 Workflow Streams:流式执行与实时反馈
传统 Temporal Workflow 的执行是全量完成后返回结果——即使 Workflow 内部产生大量中间数据,也必须等到整个 Workflow 结束才能看到。
Workflow Streams 引入了流式输出:Workflow 可以在执行过程中实时推送数据,调用方可以逐步消费:
# Workflow Streams:实时流式输出
@workflow.defn
class DataPipelineWorkflow:
@workflow.run
async def run(self, dataset_id: str) -> StreamHandle:
"""返回一个流式句柄,调用方可逐步接收结果"""
dataset = await activities.load_dataset(dataset_id)
stream = workflow.new_output_stream() # 新 API:创建输出流
for batch in dataset.batches(100):
result = await activities.process_batch(batch)
# 实时推送处理结果
await stream.send({
"batch_id": batch.id,
"processed": len(result.items),
"errors": result.error_count,
"timestamp": workflow.now() # Workflow 内部时间(确定性)
})
await stream.close()
return stream.finalize()
调用方(无论是 gRPC 客户端还是 HTTP 流)可以实时获取每个批次的处理结果,而不需要等待整个 Pipeline 完成。
适用场景:
- 长时数据处理 Pipeline,需要实时查看进度
- AI 推理链路,需要流式输出中间结果(如 token 生成过程)
- 监控告警系统,需要实时推送事件
4.4 Google ADK 集成:Temporal 驱动 Agent 执行
Google Agent Development Kit(ADK)是 Google 推出的 AI Agent 开发框架,支持构建多步骤推理、多工具调用的 Agent 系统。
Temporal 与 Google ADK 的集成将持久执行带入了 Agent 开发:
# Google ADK + Temporal:让 Agent 执行具有持久性
from google.adk import Agent, Tool
from temporal import workflow
# 定义 Agent 的工具集
tools = [
Tool(name="search", description="搜索网络", handler=search_handler),
Tool(name="code_exec", description="执行代码", handler=code_exec_handler),
Tool(name="file_write", description="写文件", handler=file_write_handler),
]
# ADK Agent 定义
agent = Agent(
model="gemini-2.5-pro",
tools=tools,
instructions="你是一个全栈工程师,负责完成用户的编程任务"
)
# Temporal 包装 Agent 执行:支持暂停、恢复、步骤历史追踪
@workflow.defn
class AgentExecutionWorkflow:
@workflow.run
async def run(self, task: str, max_steps: int = 20) -> AgentResult:
agent_runner = AgentRunner(agent) # ADK 的 Agent 运行器
step = 0
while step < max_steps:
# Temporal 自动持久化每个步骤的 Agent 状态
result = await agent_runner.step(task)
workflow.wait_for_signal("user_approval") # 支持人工审批
if result.is_final:
return AgentResult(final_answer=result.message)
await workflow.sleep(timedelta(seconds=result.cooldown))
step += 1
raise WorkflowError("Agent execution exceeded max steps")
这解决了 AI Agent 开发中的核心痛点:Agent 执行到一半崩溃后怎么办?有了 Temporal 的持久化,Agent 的每一步推理状态都被完整记录,即使 Worker 重启,也能从断点继续。
4.5 OpenAI Agents SDK 集成
类似地,Temporal 也与 OpenAI 的 Agents SDK 集成,提供了相同的持久执行保障:
# OpenAI Agents SDK + Temporal
from agents import Agent, function_tool
from temporal import workflow
@function_tool
def search_web(query: str) -> str:
"""OpenAI Agents SDK 风格的工具定义"""
return search_engine.query(query)
@function_tool
def run_python(code: str) -> str:
"""在沙箱中执行 Python 代码"""
return sandbox.execute(code)
research_agent = Agent(
name="Research Agent",
instructions="你是一个专业的研究助手",
tools=[search_web, run_python]
)
@workflow.defn
class ResearchWorkflow:
@workflow.run
async def run(self, topic: str) -> ResearchReport:
"""研究工作流:Agent 自动执行搜索、分析、总结"""
# Temporal 记录每个 Agent 步骤,可回溯、可重现
findings = await research_agent.run(
f"深入研究 {topic} 的最新进展,输出结构化报告",
context={"max_searches": 10}
)
# 每个 LLM 调用、每次工具使用都有完整的执行历史
execution_trace = workflow.get_history() # 获取完整执行轨迹
return ResearchReport(
topic=topic,
findings=findings.summary,
sources=findings.citations,
execution_steps=len(execution_trace.events) # 可审计的执行步数
)
这意味着什么:Agent 的每次 LLM 调用、每次工具使用都被完整记录。即使 Agent 跑了 200 步花了 2 小时后崩溃,你也能精确恢复状态,而不是从头开始。
五、AI Agent 可靠性的行业共识
5.1 为什么 AI Agent 特别需要持久执行
AI Agent 与传统软件最本质的区别在于执行路径的不确定性:
- 传统软件:给定输入,执行路径基本固定(虽然有 if-else 分支,但分支数量有限)
- AI Agent:给定任务,Agent 可能选择完全不同的工具组合、调用顺序、推理路径
这种不确定性让传统的「事务 + 回滚」机制几乎不可用。你不能简单地说「如果第 15 步失败就回滚到第 10 步」——因为 Agent 的第 10 步和第 15 步可能完全不相关,简单的回滚可能导致状态不一致。
Temporal 的方法更优雅:每个步骤(Activity)的执行结果都被持久化,无论 Agent 选择怎样的执行路径,重启后都能精确恢复到任意历史状态。
5.2 持久执行 vs. 检查点:技术路径对比
| 维度 | 检查点(Checkpointing) | 持久执行(Durable Execution) |
|---|---|---|
| 状态保存粒度 | 手动指定保存点 | 每一步自动持久化 |
| 恢复精度 | 到最近的检查点 | 精确到每一步之后 |
| 开发者负担 | 高(需手动管理保存/恢复) | 低(平台自动处理) |
| 并发控制 | 复杂 | 由事件历史保证 |
| 适用场景 | 科学计算、批量处理 | 业务流程、AI Agent |
5.3 Temporal 在 AI 生态中的定位
┌─────────────────────────────────────────────┐
│ AI Agent Layer │
│ (Google ADK / OpenAI Agents SDK / LangChain)│
└──────────────────────┬──────────────────────┘
│ 执行 + 状态持久化
▼
┌─────────────────────────────────────────────┐
│ Durable Execution Layer │
│ (Temporal / Restate) │
│ ┌─────────────┬──────────────┬───────────┐ │
│ │ Workflow │ Activity │ Streams │ │
│ │ (状态机) │ (可重试操作) │ (流式输出) │ │
│ └─────────────┴──────────────┴───────────┘ │
└──────────────────────┬──────────────────────┘
│ 事件历史 + 任务调度
▼
┌─────────────────────────────────────────────┐
│ Persistence Layer │
│ (PostgreSQL / Cassandra / MySQL) │
└─────────────────────────────────────────────┘
Temporal 在 AI Agent 栈中处于执行层——不负责 Agent 的推理逻辑,而是负责确保 Agent 的执行具有可靠性、可观测性和可追溯性。
5.4 OpenAI 和 NVIDIA 怎么用 Temporal
从 Temporal 官网披露的案例:
NVIDIA:用 Temporal 管理跨云 GPU 调度。训练任务横跨多个云服务商的 GPU 集群,网络中断、节点故障极为常见。Temporal 的持久执行确保训练任务从断点恢复,而不是从头重跑(一个大规模 GPU 训练任务从头重跑的成本可能是数十万美元)。
OpenAI:Temporal 作为 AI Pipeline 的编排层。大模型的推理请求、Prompt 版本管理、模型版本切换,都通过 Workflow 来协调,确保每次请求的可追溯性。
Descript:音视频处理需要长时间运行的 AI 推理(语音识别、视频转码)。Temporal 保证即使处理到 90% 时服务器崩溃,也能从 90% 处继续。
六、生产环境实战:构建 AI Agent 工作流
6.1 场景:多步骤 AI 代码审查 Agent
让我们构建一个完整的例子:一个代码审查 Agent,需要搜索代码库、分析代码、生成审查意见,并支持人工复核。
import asyncio
from datetime import timedelta
from enum import Enum
from dataclasses import dataclass
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
@dataclass
class ReviewTask:
repo_url: str
pr_number: int
requester: str
priority: str = "normal"
@dataclass
class ReviewResult:
pr_number: int
issues: list[dict]
suggestions: list[str]
overall_score: float
approved: bool
# ─── Activities ───────────────────────────────────────────
@activity.defn
@activity.retry(retry_policy=RetryPolicy(maximum_attempts=3))
async def clone_repository(repo_url: str) -> str:
"""克隆代码仓库到工作目录"""
import subprocess
local_path = f"/tmp/repos/{hash(repo_url)}"
subprocess.run(["git", "clone", "--depth", "1", repo_url, local_path], check=True)
return local_path
@activity.defn
@activity.retry(retry_policy=RetryPolicy(maximum_attempts=2))
async def analyze_code_changes(repo_path: str, pr_number: int) -> dict:
"""分析 PR 的代码变更,使用 AI"""
import subprocess
result = subprocess.run(
["git", "diff", f"origin/main...refs/pull/{pr_number}/head"],
cwd=repo_path,
capture_output=True,
text=True
)
diff = result.stdout
# 调用 AI 模型分析代码变更
analysis = await llm_analyze(diff)
return analysis
@activity.defn
async def generate_review_report(analysis: dict) -> str:
"""生成结构化的审查报告"""
report = format_review_markdown(analysis)
return report
@activity.defn
async def post_review_comment(repo_url: str, pr_number: int, report: str) -> str:
"""将审查结果发布为 PR 评论"""
# GitHub API 调用
return await github.create_pr_comment(repo_url, pr_number, report)
# ─── Workflow ─────────────────────────────────────────────
@workflow.defn
class CodeReviewWorkflow:
@workflow.run
async def run(self, task: ReviewTask) -> ReviewResult:
workflow_id = workflow.info().workflow_id
logger.info(f"开始代码审查 Workflow: {workflow_id}")
# Step 1: 克隆仓库
repo_path = await activities.clone_repository(task.repo_url)
logger.info(f"仓库克隆到: {repo_path}")
# Step 2: 获取 PR 变更
changes = await activities.get_pr_diff(task.repo_url, task.pr_number)
# Step 3: AI 分析(可心跳取消的长时 Activity)
try:
analysis = await activities.analyze_code_with_ai(
changes,
context={"priority": task.priority}
)
except ActivityError as e:
logger.error(f"AI 分析失败: {e}")
analysis = {"issues": [], "score": 0.0, "error": str(e)}
# Step 4: 生成报告
report = await activities.generate_review_report(analysis)
# Step 5: 人工复核(Signal 机制)
await workflow.wait_condition(
lambda: review_decision[workflow_id] is not None,
timeout=timedelta(hours=24) # 24小时超时
)
decision = review_decision[workflow_id]
# Step 6: 发布结果
comment_url = await activities.post_review_comment(
task.repo_url, task.pr_number, report
)
return ReviewResult(
pr_number=task.pr_number,
issues=analysis.get("issues", []),
suggestions=analysis.get("suggestions", []),
overall_score=analysis.get("score", 0.0),
approved=decision == "approve"
)
@workflow.query
async def get_status(self) -> str:
"""查询当前审查状态"""
return current_status
@workflow.signal
async def receive_review_decision(self, decision: str) -> None:
"""接收人工复核决定"""
workflow_id = workflow.info().workflow_id
review_decision[workflow_id] = decision
6.2 故障恢复演示
假设在 post_review_comment Activity 执行时,网络中断导致 Activity 超时:
Timeline:
T0: Workflow 开始
T1: clone_repository() ✓ (持久化)
T2: get_pr_diff() ✓ (持久化)
T3: analyze_code_with_ai() ✓ (持久化,耗时45分钟)
T4: generate_review_report() ✓ (持久化)
T5: receive_review_decision("approve") ✓ (Signal,持久化)
T6: post_review_comment() ← 网络超时!Activity 失败
Worker 崩溃,重新连接...
T7: Temporal 自动重试 post_review_comment() (Activity 有重试策略)
T8: Activity 成功 ✓
T9: Workflow 正常结束 ✓
整个过程中,没有一行代码需要处理重试逻辑或补偿事务——Temporal 自动处理了一切。
6.3 监控与可观测性
Temporal 自带完整的工作流可观测性:
# 使用 Temporal CLI 查看 Workflow 状态
$ tctl workflow show --workflow_id wrk-12345
# 输出:
# Workflow Id: wrk-12345
# Status: Running (Activity in progress)
#
# History:
# 1. WorkflowExecutionStarted
# 2. ActivityTaskScheduled: clone_repository
# 3. ActivityTaskCompleted: repo_path=/tmp/repos/abc123
# 4. ActivityTaskScheduled: analyze_code_with_ai
# 5. ActivityTaskCompleted: analysis={score: 0.82, issues: [...]}
# 6. SignalReceived: receive_review_decision = "approve"
# 7. ActivityTaskScheduled: post_review_comment (RETRY 1/3)
每个 Workflow 的完整执行历史都可以查看,包括所有 Activity 的输入输出、信号事件、超时信息。这对于审计和调试来说是巨大的价值。
七、Workflow 设计模式与最佳实践
7.1 Saga 模式:分布式事务的简洁实现
传统的 Saga 模式需要手动编写补偿逻辑(每个正向操作对应一个反向撤销操作)。在 Temporal 中,Saga 变成了简单的 try-catch:
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order: Order) -> OrderResult:
try:
# Temporal 自动记录每个步骤,无需手动补偿
inventory = await activities.reserve_inventory(order.items)
payment = await activities.charge_payment(order.payment_info)
shipment = await activities.create_shipment(order.shipping_address)
return OrderResult(status="completed", shipment_id=shipment.id)
except ActivityError as e:
# Temporal 自动 replay 到异常点,然后执行补偿
# 你只需要描述补偿逻辑,Temporal 负责调度
await activities.release_inventory(inventory.reservation_id)
await activities.refund_payment(payment.transaction_id)
raise WorkflowError(f"Order failed: {e}")
Temporal 的 replay 机制天然支持 Saga——当 Activity 抛出异常时,Workflow 的 replay 会跳过失败的 Activity,直接执行补偿逻辑。
7.2 人类参与回路(Human-in-the-Loop)
许多业务场景需要人工审批:订单金额超过阈值、AI 输出需要人工确认、异常情况需要人工处理。Temporal 提供了优雅的 Signal 机制:
@workflow.defn
class ApprovalWorkflow:
@workflow.run
async def run(self, task: ApprovalTask) -> ApprovalResult:
# 提交审批请求
await activities.submit_for_review(task)
# 等待人工审批(可设置超时)
approval = await workflow.wait_condition(
predicate=lambda sig: sig.get("approved") is not None,
timeout=timedelta(hours=48)
)
if not approval.get("approved"):
return ApprovalResult(status="rejected", reason=approval.get("reason"))
# 审批通过,继续执行
return await self.execute_approved_task(task)
7.3 并发执行与 Join
一个 Workflow 中可能有多个独立的 Activity 可以并行执行:
@workflow.run
async def run(self, order: Order) -> OrderResult:
# 三个独立的查询可以并行执行
user_task = asyncio.create_task(
activities.get_user_info(order.user_id)
)
inventory_task = asyncio.create_task(
activities.check_inventory(order.items)
)
pricing_task = asyncio.create_task(
activities.calculate_pricing(order.items)
)
# 等待所有结果
user, inventory, pricing = await asyncio.gather(
user_task, inventory_task, pricing_task
)
# 全部就绪后才继续
return OrderResult(user=user, inventory=inventory, pricing=pricing)
Temporal 的事件历史会记录每个 create_task 和 gather,故障恢复后能精确 replay 到正确的并行状态。
八、性能优化与生产级调参
8.1 Workflow 执行的性能开销
Temporal 的持久化保证是有代价的:每个 Activity 调用的结果都需要写入持久化存储。这意味着:
- 高频调用(毫秒级)的场景不适合用 Activity
- 纯内存计算不适合用 Activity
优化策略:
- 批量处理:把多个操作合并为一个 Activity,减少持久化次数
- 本地缓冲:在 Worker 内存中缓冲数据,定期批量提交
- 选择合适的超时:避免过短的超时导致不必要的重试
# ❌ 低效:每个订单项单独 Activity(高频持久化)
for item in order.items:
receipt = await activities.charge_item(item) # 100个商品 = 100次持久化
# ✅ 优化:批量处理(单次持久化)
receipts = await activities.charge_items_batch(order.items) # 1次持久化
8.2 Worker 资源规划
┌──────────────────────────────────────────────┐
│ Task Queue: order-processing │
│ Workflow Workers: 5(并发执行 5 个 Workflow)│
│ Activity Workers: 20(支持 20 并发 Activity)│
│ │
│ 每个 Worker 约 0.5 CPU core │
│ Activity Worker 根据 IO 类型可达 10x 扩缩 │
└──────────────────────────────────────────────┘
Activity 通常是 IO 密集型(网络调用、数据库查询),可以配置较高的并发度。Workflow Worker 是事件循环驱动,单线程,每个 Workflow 实例不会阻塞其他实例。
8.3 数据库选型建议
| 数据库 | 适用规模 | 特点 |
|---|---|---|
| PostgreSQL | 中小型(< 1000 WFS) | 运维简单,主流云原生支持 |
| MySQL | 中小型(< 1000 WFS) | 生态广泛,部分 Temporal 功能受限 |
| Cassandra | 大型(> 5000 WFS) | 横向扩展强,运维复杂 |
| ScyllaDB | 大型(> 5000 WFS) | Cassandra 兼容,性能更优 |
对于大多数团队,PostgreSQL 是最佳起点——运维简单,与现有基础设施兼容,Temporal Cloud 默认使用 CockroachDB(PostgreSQL 兼容)。
九、与其他方案的对比
9.1 Temporal vs. AWS Step Functions
| 维度 | Temporal | AWS Step Functions |
|---|---|---|
| 编程模型 | 代码优先(Python/Go/TS) | 声明式 JSON DSL |
| 语言支持 | 多语言 SDK | Amazon States Language |
| 执行模型 | 持久执行(replay) | 事件驱动(无 replay) |
| 状态管理 | 内置 | 需要配合 DynamoDB |
| 成本 | 自建 or Cloud | 按执行步数计费 |
| AI Agent 集成 | 官方 ADK/Agents SDK | 无官方集成 |
Step Functions 的局限:状态需要开发者自己管理,异常恢复需要额外的补偿逻辑。如果业务逻辑简单(少量步骤),Step Functions 足够;如果需要处理 AI Agent 的复杂执行链路,Temporal 更合适。
9.2 Temporal vs. Restate(竞品对比)
Restate 是另一个持久执行框架,使用更轻量的增量日志记录(ILR)而非完整事件历史,声称有更低的存储开销。
主要区别:Restate 的 Workflow 是用 RSocket 协议通信的 gRPC 服务,适合微服务场景;Temporal 的 Workflow 是用 SDK 编程,适合复杂业务逻辑。
十、总结与展望
10.1 Temporal 解决了什么问题
Temporal 的核心价值主张是:让开发者用普通的代码写法,获得分布式系统级别的可靠性。
你不需要学习事件溯源的复杂理论,不需要写 Saga 补偿逻辑,不需要管理幂等性,不需要处理重试风暴。你只需要写普通的 async 函数,Temporal 负责把每一步都变成可恢复的。
10.2 Replay 2026 的意义
Replay 2026 的四个宣布——Serverless Workers、Standalone Activities、Workflow Streams、Google ADK/OpenAI Agents SDK 集成——标志着 Temporal 从「通用工作流引擎」向「AI Agent 基础设施」的战略性转型。
Serverless Workers 降低了使用门槛,Standalone Activities 让 Activity 生态更开放,Workflow Streams 支持了 AI 应用对流式输出的需求,而与 ADK/Agents SDK 的深度集成则直接将 Temporal 定位为 AI Agent 可靠性的标准答案。
10.3 未来展望
AI Agent 持久化成为行业标准:随着 AI Agent 在生产环境中的普及,「Agent 执行必须可持久」会成为共识。Temporal 在这个趋势中处于领先位置。
多 Agent 协作编排:未来的 AI 系统不会是单个 Agent,而是多个 Agent 协作的复杂系统。Temporal 的 Workflow 作为协调层的价值将进一步凸显。
与 MCP 的深度整合:MCP(Model Context Protocol)定义了 Agent 与工具的通信协议,Temporal 提供了执行保障。两者结合可能是 AI Agent 工程化的最终形态。
边缘计算与 Serverless 融合:Serverless Workers 的成熟将让 Temporal 的执行层扩展到边缘节点,AI Agent 可以在离用户最近的地方执行,同时享受持久化的可靠性保障。
附录:快速上手指南
环境准备
# 安装 Temporal CLI
brew install temporalio/tap/temporal
# 启动本地开发集群(Docker Compose)
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker-compose up
# 验证连接
temporal workflow list
第一个 Workflow
# 创建新项目
python3 -m venv temporal_env && source temporal_env/bin/activate
pip install temporalio
# 创建 workflow.py(完整代码见上文)
# 运行 Worker
python worker.py
# 触发 Workflow
python starter.py
相关资源
- 官方文档:https://docs.temporal.io
- GitHub:https://github.com/temporalio/temporal
- Replay 2026 回顾:https://temporal.io/replay-2026
- Google ADK 集成示例:https://github.com/temporalio/samples-google-adk
- OpenAI Agents SDK 集成:https://github.com/temporalio/samples-openai-agents