编程 Temporal Replay 2026 深度解析:Serverless Workers + 独立活动 + Workflow 流——持久执行范式重塑 AI Agent 基础设施

2026-06-30 01:46:49 +0800 CST views 9

Temporal Replay 2026 深度解析:Serverless Workers + 独立活动 + Workflow 流——持久执行范式重塑 AI Agent 基础设施

前言:为什么分布式系统的可靠性依然是个噩梦

写分布式系统的工程师都清楚一个残酷的现实:任何一步操作都可能在任何时刻失败。网络抖动、API 超时、服务宕机、数据库锁等待——这些不是小概率事件,而是你在生产环境中每天都在对抗的敌人。

为了解决这些问题,我们发明了重试机制、幂等设计、补偿事务、Saga 模式……每一套方案都解决了部分问题,却又带来了新的复杂性。结果是:业务代码被可靠性逻辑淹没,一个简单的"下单-扣款-发货"流程,背后可能要写几百行容错代码。

Temporal 就是来解决这个问题的。它的核心理念是:Write code as if failure doesn't exist——让你像写普通顺序代码一样写分布式系统,框架自动处理一切故障恢复。

2026 年 5 月,Temporal 在 Replay 2026 大会上发布了重磅更新:Serverless WorkersStandalone ActivitiesWorkflow Streams,以及与 Google ADK 和 OpenAI Agents SDK 的深度集成。这些更新不只是功能堆砌,而是从计算模式、执行粒度、流式交互三个维度,对持久执行范式的一次系统性重构。

本文将从架构原理出发,深度解析每一项新能力的本质,并给出代码实战。

一、持久执行:Temporal 的核心抽象

1.1 什么是 Durable Execution

Durable Execution(持久执行)是 Temporal 的核心概念。传统编程中,函数调用是瞬时的——调用栈执行完毕,状态就消失了。但 Temporal 的 Workflow(工作流)不是普通函数,它的状态会在每一步执行后被持久化

这意味着:

  • 即使服务崩溃,Workflow 也能在任意节点恢复,继续执行
  • 即使网络中断,Activity(活动)的执行结果会被记录,重试时有据可查
  • 即使代码被修改,历史 Workflow 仍按原逻辑 replay,不受影响
# 传统代码(伪代码):崩溃即丢失
def process_order(order_id):
    inventory = check_inventory(order_id)       # 步骤1
    payment = charge_customer(order_id)          # 步骤2
    ship_order(order_id)                         # 步骤3
    # 如果步骤2和3之间服务崩溃,步骤1的结果丢失了

# Temporal Workflow:崩溃可恢复
@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # 框架自动处理重试、状态恢复
        inventory = await workflow.execute_activity(
            check_inventory, order_id,
            start_to_close_timeout=timedelta(seconds=30)
        )
        payment = await workflow.execute_activity(
            charge_customer, order_id,
            start_to_close_timeout=timedelta(minutes=5)
        )
        result = await workflow.execute_activity(
            ship_order, order_id,
            start_to_close_timeout=timedelta(minutes=10)
        )
        return f"Order {order_id} processed: {result}"

注意这里的关键:workflow.execute_activity 不是普通函数调用,而是一个持久化的活动调用。Temporal 会在数据库中记录这个调用的输入、输出、执行状态,并保证它最终被完成(即使重试 100 次)。

1.2 Temporal 的三层架构

理解 Temporal 的新功能,需要先搞清楚它的三层架构:

┌─────────────────────────────────────────┐
│           Temporal Service               │  ← 持久化状态协调器
│  (Workflow Engine + Activity Workers)   │     管理 Workflow 历史
├─────────────────────────────────────────┤
│              Workers                    │  ← 执行业务逻辑的进程
│   (注册 Workflow / Activity 处理器)      │
├─────────────────────────────────────────┤
│           SDK (Client)                  │  ← 开发者写的代码
│  (Workflow 定义 + Activity 实现)         │
└─────────────────────────────────────────┘
  • Temporal Service:核心协调引擎,负责持久化所有 Workflow 状态、管理任务队列、处理重试逻辑
  • Workers:部署在你的基础设施上的进程,通过轮询任务队列来执行 Workflow 代码和 Activity 代码
  • SDK:开发者使用的客户端库,提供了定义 Workflow 和 Activity 的 API

在 Replay 2026 之前,Workers 必须常驻运行才能接收任务。这是传统架构的限制——你需要管理服务器、维护扩缩容策略、配置健康检查。Serverless Workers 的出现,正是为了解决这个痛点。

二、Serverless Workers:把基础设施负担降为零

2.1 传统 Worker 的运维成本

在 Serverless Workers 出现之前,部署 Temporal Workers 意味着你需要:

  • 管理服务器或容器:配置实例类型、自动扩缩容策略
  • 处理冷启动:新实例启动需要加载代码、连接数据库,可能有延迟
  • 为峰值负载付费:即使大部分时间负载很低,也需要保持一定数量的实例在线
  • 配置健康检查和优雅关闭:防止在处理请求时被意外终止

对于初创公司和小型团队来说,这是一笔不小的运维负担。对于 AI Agent 场景尤其棘手——LLM 调用的延迟本身就不稳定,再加上 Worker 的冷启动延迟,用户体验雪上加霜。

2.2 Serverless Workers 的架构原理

Temporal Replay 2026 引入了 Serverless Workers,首个支持的是 AWS Lambda

┌──────────────────────────────────────────────────┐
│              Temporal Cloud                       │
│  ┌────────────────────────────────────────────┐  │
│  │         Workflow Coordination Engine          │  │
│  │  (状态持久化 + 任务队列 + 重试策略)          │  │
│  └────────────────────────────────────────────┘  │
│                        ↑                         │
│                        │ 按需触发                │
│                        ↓                         │
│  ┌────────────────────────────────────────────┐  │
│  │         AWS Lambda (Serverless Worker)       │  │
│  │                                              │  │
│  │  函数执行 = 一个 Workflow 或 Activity 的执行  │  │
│  │  并发数 = Workflow 并行度(自动扩缩)         │  │
│  │  空闲时 = 缩容至零(按需启动)                │  │
│  └────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────┘

核心工作原理:

  1. 按需触发:Temporal Cloud 不再依赖 Worker 主动轮询任务队列,而是通过 AWS Event Source Mapping,在有任务时主动调用 Lambda 函数
  2. 自动扩缩:Lambda 的并发执行数自动匹配 Workflow 的并发度——100 个 Workflow 并行,就有 100 个 Lambda 实例同时运行;0 个任务就 0 个实例
  3. 自动优雅关闭:Lambda 有 15 分钟的超时限制,Temporal 会自动将长时间 Workflow 的状态 checkpoint 分割成多个短任务分批执行
  4. 零基础设施管理:你不需要配置 ASG(Auto Scaling Group)、健康检查、负载均衡器

2.3 代码实战:部署 Serverless Worker

# AWS Lambda 上部署 Temporal Worker
# temporal_serverless_worker.py

import temporalio
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker

# 定义 Activity(可复用的业务步骤)
class OrderActivities:
    async def check_inventory(self, order_id: str) -> dict:
        # 检查库存
        return {"sku": "PROD-001", "available": True, "quantity": 10}
    
    async def charge_customer(self, order_id: str) -> dict:
        # 扣款
        return {"transaction_id": "TXN-12345", "amount": 99.99, "status": "success"}
    
    async def ship_order(self, order_id: str) -> dict:
        # 发货
        return {"tracking_number": "SF1234567890", "carrier": "顺丰"}

# 定义 Workflow
@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        activities = OrderActivities()
        
        inventory = await workflow.execute_activity(
            activities.check_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
            # Serverless 模式下推荐设置心跳
            heartbeat_timeout=timedelta(seconds=10),
        )
        
        if not inventory["available"]:
            return f"Order {order_id} failed: out of stock"
        
        payment = await workflow.execute_activity(
            activities.charge_customer,
            order_id,
            start_to_close_timeout=timedelta(minutes=5),
        )
        
        shipment = await workflow.execute_activity(
            activities.ship_order,
            order_id,
            start_to_close_timeout=timedelta(minutes=10),
        )
        
        return f"Order {order_id} completed: {shipment['tracking_number']}"


# Lambda Handler(Serverless 入口)
def handler(event, context):
    """
    AWS Lambda 入口函数
    Temporal Cloud 会将任务队列的事件作为 event 传入
    """
    import asyncio
    
    async def run_worker():
        client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
        
        worker = Worker(
            client,
            task_queue="serverless-order-queue",  # 专用任务队列
            workflows=[OrderWorkflow],
            activities=[OrderActivities()],
            # Serverless 场景下推荐配置:
            max_concurrent_workflow_tasks=100,  # Lambda 并发上限
            max_cached_workflows=50,             # 减少内存占用
        )
        
        await worker.run()
    
    asyncio.run(run_worker())
# AWS SAM 模板 (template.yaml)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31'

Resources:
  TemporalServerlessFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: temporal_serverless_worker.handler
      Runtime: python3.11
      Timeout: 900  # Lambda 最大 15 分钟
      MemorySize: 512
      Environment:
        Variables:
          TEMPORAL_NAMESPACE: your-namespace
          TEMPORAL_TLS_CERT: /tmp/ca.crt
          TEMPORAL_API_KEY: !Ref TemporalApiKey
      Policies:
        - LambdaInvokeFunctionPolicy
        - SecretsManagerReadWrite
      Events:
        TemporalTrigger:
          Type: SQSEvent
          Properties:
            Queue: !GetAtt TemporalQueue.Arn
            BatchSize: 1

  TemporalQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 3600
      ReceiveMessageWaitTimeSeconds: 20

关键配置说明

  • start_to_close_timeout:单个 Activity 的最大执行时间,在 Serverless 模式下应小于 Lambda 的 15 分钟超时
  • heartbeat_timeout:Activity 执行期间的心跳间隔,用于检测长任务的状态
  • max_cached_workflows:限制 Lambda 函数同时管理的 Workflow 缓存数量,降低内存压力
  • SQS 作为事件源,在有任务时触发 Lambda

2.4 成本分析:Serverless vs 自管理 Worker

维度传统 Worker(EC2/ECS)Serverless Workers(Lambda)
基础设施成本按实例小时数计费,即使空闲按实际执行时间(GB-秒)计费,零任务时为零
扩缩容需要配置 ASG,冷启动 30-60s自动,Lambda 冷启动通常 <1s
最大并发受实例数量限制受 Lambda 并发配额限制(默认 1000)
适用场景高吞吐、持续负载突发流量、事件驱动、AI Agent 任务
适合运行时长无限制Activity ≤ 15 分钟(全流程可拆分)

对于 AI Agent 场景,Serverless Workers 的意义尤为重大:LLM 调用本身是不可预测的突发流量,Serverless 的自动扩缩天然匹配这种特征。

三、Standalone Activities:打破 Workflow 的边界

3.1 Activity 的历史局限

在 Temporal 1.x 版本中,Activity 有一个根本性限制:Activity 只能作为 Workflow 的子步骤存在

# 旧模式:Activity 必须被 Workflow 包裹
@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, job_id: str):
        # Activity 只能在 Workflow 内执行
        result = await workflow.execute_activity(
            process_job,
            job_id,
            start_to_close_timeout=timedelta(minutes=5)
        )
        return result

这带来几个现实问题:

  1. 轻量任务过度工程化:只是想执行一个异步任务(如发送邮件),也要先定义 Workflow,写起来很重
  2. 外部系统集成困难:当你想让外部服务触发 Temporal Activity 时,没有简单的方式
  3. 任务队列 + 重试逻辑的重复发明:很多团队有自己的任务队列系统(如 Sidekiq、Celery),想要 Temporal 的重试保证,但 Activity 只能从 Workflow 内部调用

3.2 Standalone Activities 的架构革新

Replay 2026 引入的 Standalone Activity(独立活动),允许 Activity 独立于 Workflow 运行,同时保留 Temporal 的所有可靠性保证:

  • 自动重试(可配置重试策略)
  • 持久化执行状态
  • 超时管理
  • 心跳检测
旧模式:
  Workflow → Activity → Activity → Activity
            (必须在 Workflow 内调用)

新模式:
  Workflow → Activity → Activity → Standalone Activity (可独立触发)
                              ↓
                    直接从外部系统/API 调用

Standalone Activity 的使用场景

  • 外部系统 webhook 触发:支付平台回调后直接触发对账 Activity
  • 定时任务:Cron 作业直接触发数据同步 Activity
  • 任务队列替代:用 Standalone Activity 替代 Celery/Sidekiq,享受更好的可靠性
  • 微服务间调用:服务 A 直接调用服务 B 的 Activity,无需共享 Workflow 定义

3.3 代码实战:Standalone Activity

# standalone_activity.py
import asyncio
from datetime import timedelta
from temporalio import activity
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.service_common import RetryPolicy

# 定义一个 Activity(既可以standalone运行,也可以在Workflow内运行)
class DataSyncActivities:
    @activity.defn
    async def sync_orders(self, date: str) -> dict:
        """
        同步订单数据——这个 Activity 既可以从 Workflow 调用,
        也可以独立触发(Standalone 模式)
        """
        print(f"Syncing orders for date: {date}")
        
        # 模拟从外部系统拉取数据
        orders = await fetch_orders_from_external_system(date)
        
        # 处理数据
        processed = transform_orders(orders)
        
        # 入库
        await insert_into_database(processed)
        
        return {
            "date": date,
            "total_orders": len(orders),
            "processed": len(processed),
            "status": "success"
        }

    @activity.defn
    async def send_notification(self, user_id: str, message: str) -> dict:
        """发送通知"""
        return await email_service.send(user_id, message)


async def main():
    client = await Client.connect("localhost:7233")
    
    # 方法1:在 Worker 中注册(用于 Workflow 内调用)
    worker = Worker(
        client,
        task_queue="data-sync-queue",
        activities=[DataSyncActivities()],
    )
    
    # 方法2:独立触发 Activity(无需 Worker,直接通过客户端调用)
    # 这行代码可以在任何地方执行——Webhook handler、定时任务、主程序
    result = await client.start_activity(
        DataSyncActivities().sync_orders,
        arg="2026-06-30",
        task_queue="data-sync-queue",
        # 标准 Temporal 可靠性配置
        start_to_close_timeout=timedelta(minutes=30),
        retry_policy=RetryPolicy(
            maximum_attempts=5,
            initial_interval=timedelta(seconds=10),
            backoff_coefficient=2.0,
            non_retryable_error_types=["ValidationError"],
        ),
        # 心跳用于长时间任务
        heartbeat_timeout=timedelta(seconds=30),
    )
    
    print(f"Activity result: {result}")


# 作为独立程序运行(定时任务或 Webhook)
if __name__ == "__main__":
    asyncio.run(main())
# 用 Standalone Activity 替代 Celery 的任务队列
# app/tasks.py

from temporalio.client import Client
from app.activities import DataSyncActivities
import asyncio

async def trigger_order_sync():
    """定时触发订单同步任务——替代 Celery 的 periodic_task"""
    client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
    
    # 异步触发,不需要等待结果(fire-and-forget)
    handle = await client.start_activity(
        DataSyncActivities().sync_orders,
        arg="2026-06-30",
        task_queue="data-sync-queue",
        start_to_close_timeout=timedelta(hours=2),
        # 失败重试由 Temporal 自动处理,无需 Celery 的 acks_late
        retry_policy=RetryPolicy(maximum_attempts=10),
        id="order-sync-2026-06-30",  # 幂等 ID,防止重复执行
    )
    
    print(f"Triggered activity: {handle.id}")
    return handle.id


# 定时触发(配合系统 cron 或 APScheduler)
# crontab: 0 2 * * * python -m app.tasks

Standalone Activity vs 传统任务队列的对比

特性Celery/SidekiqStandalone Activity
失败重试需要 acks_late + 配置自动,可配置重试策略
执行历史需接入监控才能查看内置完整执行历史
任务持久化Broker 需持久化配置Temporal Service 持久化
超时管理需手动配置原生支持
跨服务调用需序列化 + 网络调用统一的 Activity 接口
与 Workflow 集成不支持可以无缝移入 Workflow

四、Workflow Streams:实时 LLM 交互的持久化保证

4.1 AI Agent 流式输出的可靠性困境

AI Agent 的一个核心特征是流式输出:LLM 的 token 是逐个生成的,用户需要实时看到响应。但流式场景引入了独特的可靠性挑战:

  1. Token 生成中断:用户在看到部分回答时刷新页面,已经生成的 token 丢失
  2. Agent 工具调用中间状态:Agent 调用工具时,前端无法实时感知"正在调用"状态
  3. 流式回滚困难:如果 LLM 在生成长回答的中间失败,已生成的 token 无法恢复

传统方案是放弃流式,用批量响应代替——但这严重损害用户体验。Temporal 的 Workflow Streams 解决了这个问题。

4.2 Workflow Streams 的工作原理

Workflow Streams 利用 Temporal 的 Signal(信号)和 Update(更新) 机制,实现了持久的流式输出

LLM token 生成
    ↓
Temporal Workflow 捕获 token
    ↓
Signal 推送到 Temporal Service(持久化)
    ↓
前端 WebSocket/SSE 接收(实时)
    ↓
即使页面刷新 → 从 Temporal 重新拉取 → 恢复流状态

关键点:流数据被持久化了。即使前端崩溃、网络中断,Workflow 仍然在后台运行,重新连接后可以恢复到中断的位置继续接收 token。

4.3 代码实战:构建持久化 AI 对话流

# workflow_streams_demo.py
import asyncio
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.types import ClassProxy

# 定义流式 Agent Workflow
@workflow.defn
class StreamingAgentWorkflow:
    @workflow.run
    async def run(self, user_query: str, tools: list[dict]) -> str:
        """
        持久化流式 AI Agent:
        - 每次 LLM 生成 token 都通过 workflow.signal() 推送
        - 即使连接中断,Workflow 继续运行,重新连接后继续接收
        """
        messages = [{"role": "user", "content": user_query}]
        full_response = ""
        
        # 获取流式 LLM 响应
        async for token in self.call_llm_streaming(messages):
            full_response += token
            # 关键:每生成一个 token 就持久化到 Workflow 状态
            await workflow.execute_update(
                self.on_token_update,
                args=[token, full_response],
            )
            # 同时 Signal 给所有已连接的客户端
            await workflow.signal(
                self.on_new_token,
                token,  # Signal 参数
            )
        
        # 工具调用阶段
        tool_calls = self.extract_tool_calls(full_response)
        for tool_call in tool_calls:
            # 工具调用也持久化
            await workflow.signal(self.on_tool_call_start, tool_call)
            
            result = await workflow.execute_activity(
                run_tool,
                tool_call,
                start_to_close_timeout=timedelta(minutes=5),
            )
            
            messages.append({"role": "assistant", "content": full_response})
            messages.append({"role": "tool", "tool_call_id": tool_call["id"], "content": result})
            
            await workflow.signal(self.on_tool_call_complete, tool_call, result)
        
        return full_response
    
    async def call_llm_streaming(self, messages):
        """调用 LLM 流式 API,每次 yield 一个 token"""
        # 实际实现中调用 OpenAI / Anthropic 流式 API
        async for token in llm_client.chat.completions.create(
            messages=messages,
            model="gpt-4o",
            stream=True,
        ):
            yield token.choices[0].delta.content or ""
    
    async def on_token_update(self, token: str, full_response: str):
        """Update handler:更新 Workflow 内部的流状态"""
        pass  # 状态已在 Workflow run 方法中管理
    
    @workflow.signal
    async def on_new_token(self, token: str):
        """Signal handler:推送给 WebSocket 客户端"""
        # WebSocket 服务通过这个 Signal 获取每个 token
        pass
    
    @workflow.signal
    async def on_tool_call_start(self, tool_call: dict):
        pass
    
    @workflow.signal
    async def on_tool_call_complete(self, tool_call: dict, result: str):
        pass


# WebSocket 服务端:消费 Workflow Streams
class WebSocketStreamConsumer:
    async def handle_connection(self, websocket, workflow_id: str):
        client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
        handle = client.get_workflow_handle(workflow_id)
        
        # 方法1:Query 当前已生成的 token(页面刷新后恢复)
        query_result = await handle.query(StreamingAgentWorkflow.get_tokens)
        
        # 方法2:Signal 驱动——订阅新的 token
        stream = handle.signal_stream(
            StreamingAgentWorkflow.on_new_token,
        )
        
        async for token in stream:
            await websocket.send_text(token)
    
    @workflow.query
    async def get_tokens(self) -> list[str]:
        """Query:恢复已生成的 token 列表"""
        return self.tokens


# 前端代码(浏览器)
async def connect_to_workflow(workflow_id):
    ws = await websockets.connect(f"wss://your-server.com/ws/{workflow_id}")
    
    try:
        # 先查询已生成的 token,恢复页面状态
        existing_tokens = await query_workflow_state(workflow_id)
        display_tokens(existing_tokens)
        
        # 订阅新 token
        async for token in ws:
            append_token_to_display(token)
    except websockets.ConnectionClosed:
        # 连接中断,但 Workflow 仍在运行
        # 重新连接时,页面从 query 恢复
        reconnect_delay = 2
        await asyncio.sleep(reconnect_delay)
        await connect_to_workflow(workflow_id)

4.4 流式输出的持久化 vs 非持久化对比

维度普通流式(Server-Sent Events)Workflow Streams
刷新页面丢失所有已生成内容从 Temporal 恢复完整状态
网络中断流中断,需重新完整请求从中断处继续接收
服务重启已生成的 token 丢失Workflow 继续运行,持久化保证
工具调用状态前端无感知Signal 推送工具调用开始/完成
多客户端同步难以实现每个 Signal 可广播给所有客户端

五、外部负载存储:大型 AI 输入输出的可靠管理

5.1 AI 场景的负载困境

AI Agent 的一个独特挑战是大 Payload:一次 LLM 调用可能输入/输出 MB 级别的文本(长文档、代码库、图像 base64),而 Temporal 默认将所有负载存储在其 PostgreSQL 或 Cassandra 后端中。

这带来了两个实际问题:

  1. 数据库压力:大量 AI Workflow 的输入输出会撑爆数据库存储
  2. 传输效率:大 Payload 在 Workflow 状态序列化和网络传输中开销巨大

5.2 External Payload Storage 的架构

Temporal Replay 2026 引入了 External Storage(外部负载存储),允许你将大型 Payload 存储在外部系统(首选 AWS S3),Temporal 只存储对这些外部资源的引用:

Workflow 执行大型 LLM 调用
        ↓
Temporal 识别 Payload > 阈值(如 64KB)
        ↓
Payload 上传至 AWS S3(加密、分桶)
        ↓
Temporal 数据库只存储 S3 URI 引用
        ↓
Activity 读取结果时,从 S3 按需拉取

5.3 代码实战:S3 外部存储配置

# external_storage.py
from temporalio.contrib.external_storage import (
    S3StoragePlugin,
    S3StorageConfig,
)
from temporalio.client import Client
from temporalio.worker import Worker

# 配置 S3 外部存储
s3_config = S3StorageConfig(
    bucket="your-temporal-payloads",
    region="us-east-1",
    prefix="ai-workflows/",
    # SSE-KMS 加密,保障数据安全
    sse_kms_key_id="arn:aws:kms:us-east-1:123456789012:key/xxx",
    # 自动清理:Workflow 完成后 7 天删除 S3 对象
    auto_cleanup_after_days=7,
)

storage_plugin = S3StoragePlugin(s3_config)

async def main():
    client = await Client.connect(
        "your-namespace.temporalCloud.cloud:7233",
        # 启用外部存储
        data_converter=storage_plugin.to_temporal_data_converter(
            # 超过 64KB 的 Payload 自动存 S3
            large_payload_threshold=64 * 1024,
        ),
    )
    
    # 在 Workflow 中处理大 Payload
    @workflow.defn
    class DocumentProcessingWorkflow:
        @workflow.run
        async def run(self, document_url: str) -> dict:
            # 下载文档(可能 10MB+)
            document = await workflow.execute_activity(
                download_document,
                document_url,
                start_to_close_timeout=timedelta(minutes=10),
            )
            # 这个 document 超过阈值,自动存 S3,Workflow 状态只存 URI
            # 时空复杂度: O(1) 而不是 O(n),n=文档大小
            
            # LLM 分析文档
            analysis = await workflow.execute_activity(
                analyze_with_llm,
                document,  # 实际传的是 S3 URI
                start_to_close_timeout=timedelta(minutes=30),
            )
            
            return analysis

六、AI 生态集成:Google ADK 与 OpenAI Agents SDK

6.1 为什么 AI Agent 需要 Temporal

构建 AI Agent 时,开发者通常面临三个核心挑战:

  1. LLM 调用不可靠:GPT-4 API 有 1-2% 的失败率,没有重试会丢任务
  2. 工具执行无持久化:Agent 调用工具失败后,需要重新调用 LLM,已生成的上下文可能丢失
  3. 多步任务无状态:一个 Agent 执行 20 步任务,第 15 步崩溃,一切重来

Temporal 的答案是:让 AI Agent 的每一步都变成持久化的 Activity。这样,即使 LLM 调用失败、工具执行出错,Agent 都能从断点恢复,完整重放所有已完成的步骤。

6.2 Google ADK 集成

Google Agent Development Kit (ADK) 是 Google 提供的 AI Agent 开发框架。Temporal 与 Google ADK 的集成,使得 ADK Agent 的每个 LLM 调用和工具执行都在 Temporal 的持久化保障下运行:

# google_adk_integration.py
import asyncio
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from temporalio import workflow
from temporalio.worker import Worker
from temporalio.client import Client

# 将 ADK Agent 的工具封装为 Temporal Activity
class ADKToolsAsActivities:
    def __init__(self, adk_agent: Agent):
        self.adk_agent = adk_agent
    
    @workflow.activity.defn
    async def call_llm(self, messages: list, tools: list) -> dict:
        """ADK Agent 的 LLM 调用作为 Activity 执行"""
        response = await self.adk_agent.run_async(
            messages=messages,
            tools=tools,
        )
        # Temporal 自动重试,失败不会丢失上下文
        return {
            "content": response.text,
            "tool_calls": response.tool_calls or [],
        }
    
    @workflow.activity.defn
    async def execute_tool(self, tool_name: str, args: dict) -> dict:
        """Agent 工具执行作为 Activity,持久化结果"""
        result = await self.adk_agent.tools[tool_name].run_async(**args)
        # 工具执行结果被 Temporal 持久化,
        # Agent 重试时无需重新执行工具
        return result


# ADK Agent Workflow:结合 ADK 灵活性和 Temporal 可靠性
@workflow.defn
class DurableADKAgent:
    @workflow.run
    async def run(self, user_goal: str) -> str:
        tools = ADKToolsAsActivities(self.adk_agent)
        
        messages = [{"role": "user", "content": user_goal}]
        max_steps = 20
        step = 0
        
        while step < max_steps:
            step += 1
            
            # LLM 调用作为 Activity(有自动重试保证)
            llm_response = await workflow.execute_activity(
                tools.call_llm,
                args=[messages, self.get_tools_description()],
                start_to_close_timeout=timedelta(seconds=120),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
            
            if not llm_response.tool_calls:
                # Agent 决定完成
                return llm_response.content
            
            # 工具调用作为 Activity(结果持久化)
            for tool_call in llm_response.tool_calls:
                tool_result = await workflow.execute_activity(
                    tools.execute_tool,
                    args=[tool_call.name, tool_call.args],
                    start_to_close_timeout=timedelta(minutes=5),
                    retry_policy=RetryPolicy(maximum_attempts=5),
                )
                
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": tool_result,
                })
                # 工具结果被持久化,Agent 重试时不再重复执行
        
        return "Max steps reached"

6.3 OpenAI Agents SDK 沙箱支持

OpenAI 的 Agents SDK 引入了沙箱(Sandbox) 概念,用于安全隔离 Agent 的代码执行环境。Temporal 与 OpenAI Agents SDK 的集成,使得沙箱的执行也具有持久化保证:

# openai_agents_integration.py
from temporalio.contrib.openai_agents import (
    TemporalSandboxRunner,
)

# OpenAI Agents SDK 使用 Temporal 作为沙箱运行器
# 每次 Agent 在沙箱中执行命令时:
# 1. 命令作为 Activity 在 Temporal 中执行
# 2. 执行结果(包括文件系统变更)被持久化
# 3. 即使沙箱崩溃,Agent 可以无缝恢复并继续执行

sandbox_runner = TemporalSandboxRunner(
    task_queue="openai-sandbox-queue",
    sandbox_ttl=timedelta(minutes=30),
    # 沙箱文件变更自动持久化
    persist_files=["/tmp/agent_output/", "/tmp/agent_logs/"],
)

# OpenAI Agent 在 Temporal 管理的沙箱中运行
agent = Agent(
    model="gpt-4o",
    tools=[...],
    sandbox_runner=sandbox_runner,  # 沙箱执行有持久化保证
)

七、生产运营:规模化的可靠性保障

Replay 2026 还带来了一系列面向生产环境的运营能力升级,这些对大规模部署 Temporal 的团队至关重要:

7.1 Worker Versioning(Worker 版本控制)

当你要更新 Worker 的代码时,传统的做法是:

  1. 部署新版本 Workers
  2. 旧版本 Workers 优雅关闭
  3. 等待所有在途任务完成

问题在于:如果 Workflow A 正在用旧版代码执行步骤 1,你需要等它完整结束才能完全切换到新版本。

Worker Versioning 允许你在同一个 Task Queue 中运行多个版本的 Workers,Temporal 会智能路由:

  • 新 Workflow → 自动路由到新版 Worker
  • 旧 Workflow(历史版本代码) → 路由到对应版本的 Worker
  • 逐步迁移:灰度发布,先 10% 流量走新版本,稳定后逐步扩大
# worker_versioning.py
from temporalio.worker import Worker, BuildIdBasedVersioning

async def main():
    client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
    
    # v1 Worker(仍在运行,保护历史 Workflow)
    v1_worker = Worker(
        client,
        task_queue="order-processing",
        build_id="v1.0.0",  # 版本标识
        versioning_intent=BuildIdBasedVersioning.Intent.QUEUE,
        workflows=[OrderWorkflow],
        activities=[OrderActivities()],
    )
    
    # v2 Worker(处理新 Workflow)
    v2_worker = Worker(
        client,
        task_queue="order-processing",
        build_id="v2.0.0",
        # Auto-upgrade:新 Workflow 自动用新版本
        versioning_intent=BuildIdBasedVersioning.Intent.AUTO,
        workflows=[OrderWorkflowV2],  # 新版 Workflow
        activities=[OrderActivitiesV2()],
    )

7.2 Task Queue Priority & Fairness

在多租户或混合负载场景下,不同类型的任务需要不同的优先级:

# task_queue_priority.py
from temporalio.worker import TaskQueueSortHint

async def setup_priority_queues():
    client = await Client.connect("your-namespace.temporalCloud.cloud:7233")
    
    # 高优先级队列:支付、金融交易(延迟 < 100ms)
    high_priority_worker = Worker(
        client,
        task_queue="payment-tasks",
        activities=[PaymentActivities()],
        task_queue_sort_hint=TaskQueueSortHint.HIGH_THROUGHPUT,
        max_concurrent_activities=200,  # 高并发
    )
    
    # 低优先级队列:报表生成、数据导出(可延迟)
    low_priority_worker = Worker(
        client,
        task_queue="report-tasks",
        activities=[ReportActivities()],
        task_queue_sort_hint=TaskQueueFairness.SMOOTH,  # 平滑分配
        max_concurrent_activities=10,  # 限制并发
    )

7.3 Worker Status UI 与 OpenMetrics 集成

Replay 2026 引入的 Worker Status UIOpenMetrics 支持,让 Temporal 的可观测性达到了生产级标准:

# prometheus.yml 配置 Temporal OpenMetrics
scrape_configs:
  - job_name: 'temporal'
    metrics_path: '/metrics'
    params:
      format: ['openmetrics']
    static_configs:
      - targets: ['temporal-frontend.namespace.svc:9090']
    relabel_configs:
      # 提取关键指标
      - source_labels: [__name__]
        regex: 'temporal_workflow_(started|completed|failed)'

# 关键可观测性指标
# temporal_workflow_started_total           - Workflow 启动总数
# temporal_activity_execution_latency      - Activity 执行延迟
# temporal_workflow_task_latency           - Workflow 任务调度延迟
# temporal_retry_count                      - Activity 重试次数(反映系统稳定性)
# temporal_resource_usage                   - Worker 资源使用率

八、性能调优:生产环境的血泪经验

基于大量生产案例,以下是我总结的 Temporal 性能调优核心策略:

8.1 Activity 超时配置的艺术

超时配置太短 → 误判失败,触发不必要的重试
超时配置太长 → 故障检测延迟,影响恢复速度

# 黄金法则:根据 P99 延迟的 3 倍来设置超时

@workflow.activity.defn
async def fast_api_call(self, endpoint: str) -> dict:
    # 正常 P99 = 200ms → 超时设为 1s
    ...

@workflow.activity.defn  
async def database_query(self, query: str) -> list:
    # 正常 P99 = 500ms → 超时设为 3s
    # 添加重试率考虑:API 有 1% 超时率
    ...

@workflow.activity.defn
async def llm_completion(self, prompt: str) -> str:
    # LLM P99 = 10s(考虑峰值)→ 超时设为 60s
    # 但加上重试_policy: initial_interval=2s, max_attempts=3
    # 实际最大等待 = 2s + 4s + 8s = 14s,仍在超时内
    ...

8.2 Heartbeat:长时间任务的生命线

对于可能运行超过 1 分钟的 Activity,必须配置心跳

@workflow.activity.defn
async def train_ml_model(self, config: dict) -> dict:
    for epoch in range(config["epochs"]):
        # 执行训练步骤
        loss = await training_step(epoch)
        
        # 心跳:报告进度,延长超时
        # Temporal 会自动延长 start_to_close_timeout
        activity.heartbeat(
            progress=f"epoch {epoch+1}/{config['epochs']}",
            current_loss=loss,
        )

8.3 Workflow 状态最小化原则

Temporal 通过事件溯源(Event Sourcing)来重建 Workflow 状态——每次你访问 Workflow 的变量,框架都会 replay 历史事件。

# 避免的做法:存储大量数据在 Workflow 状态中
@workflow.run
async def run(self):
    large_data = await workflow.execute_activity(fetch_huge_dataset)
    self.all_data = large_data  # ❌ 大数据存在 Workflow 状态中,replay 极慢
    return process(self.all_data)

# 推荐的写法:只存储引用,结果在 Activity 间传递
@workflow.run
async def run(self):
    data_ref = await workflow.execute_activity(fetch_and_store_to_s3, config)
    # 只存储 S3 URI,不存储数据本身
    self.s3_ref = data_ref
    result = await workflow.execute_activity(process_from_s3, data_ref)
    return result

九、选型指南:什么时候用 Temporal

Temporal 不是万能药。以下是我的决策框架:

9.1 适合 Temporal 的场景

  • 多步骤业务流程:订单处理、金融交易、数据 ETL(>3 个步骤,有状态依赖)
  • AI Agent:LLM 调用 + 工具执行 + 长时推理(需要失败恢复和状态持久化)
  • 跨系统协调:需要保证跨多个服务/数据库的操作最终一致性
  • 长时间运行任务:批处理、报表生成、ML 训练(>5 分钟)
  • 需要完整执行历史的场景:金融合规、审计要求

9.2 可能过度设计的场景

  • 简单的一次性任务:发送一封邮件,不需要持久化
  • 无状态微服务:每个请求独立处理,不需要跨请求状态
  • 强一致性要求:Temporal 是最终一致性,不适合银行转账等强一致性场景
  • 团队刚起步:Temporal 有学习曲线,MVP 阶段可以先用简单队列

总结:持久执行范式的成熟化

Temporal Replay 2026 的更新,标志着持久执行这个理念从技术极客圈走向了主流工程实践:

  1. Serverless Workers 让 Temporal 的基础设施成本降到零,使小团队也能用上企业级可靠性
  2. Standalone Activities 将 Temporal 的能力从"编排"扩展到"任务执行",成为一个通用的可靠任务运行时
  3. Workflow Streams 解决了 AI Agent 流式交互的可靠性问题,让持久化覆盖到每个 token 级别
  4. AI 生态集成(Google ADK、OpenAI Agents SDK)使得 Temporal 成为 AI Agent 基础设施的标准选择
  5. 生产运营能力(Worker Versioning、Task Queue Priority、OpenMetrics)让 Temporal 在大规模生产环境中可维护

正如 Temporal 官网的那句宣言:"Write code as if failure doesn't exist"——这不只是 slogan,而是一套经过工程化验证的可靠执行范式。2026 年的这波更新,让这个范式真正走向了成熟。

如果你正在构建需要可靠性的 AI 应用,或者厌倦了每天和分布式系统的失败模式搏斗,Temporal Replay 2026 的这些新能力,值得你认真评估。


参考链接

  • Temporal 官方文档:https://docs.temporal.io
  • Replay 2026 公告:https://temporal.io/blog/replay-2026-product-announcements
  • Serverless Workers 文档:https://docs.temporal.io/evaluate/serverless-workers
  • Standalone Activities 文档:https://docs.temporal.io/standalone-activity
  • Workflow Streams 文档:https://docs.temporal.io/develop/python/workflows/workflow-streams
  • External Storage 文档:https://docs.temporal.io/external-storage

推荐文章

Vue3中如何处理路由和导航?
2024-11-18 16:56:14 +0800 CST
HTML和CSS创建的弹性菜单
2024-11-19 10:09:04 +0800 CST
Gin 框架的中间件 代码压缩
2024-11-19 08:23:48 +0800 CST
浏览器自动播放策略
2024-11-19 08:54:41 +0800 CST
程序员茄子在线接单