编程 万字深度解析 Temporal 工作流编排:当分布式系统学会「故障自愈」——从事件溯源到生产级持久化执行的完全指南(2026)

2026-07-01 06:45:30 +0800 CST views 8

万字深度解析 Temporal 工作流编排:当分布式系统学会「故障自愈」——从事件溯源到生产级持久化执行的完全指南(2026)

Temporal 是微服务架构中缺失的那块拼图——它不是队列,不是状态机,而是一个「让代码在任何故障下都能从断点继续执行」的持久化执行引擎。本文从事件溯源原理到生产级部署,彻底拆解 Temporal 的技术内核。

摘要

2026 年,分布式系统的复杂性已经达到一个临界点:微服务数量突破三位数、跨服务事务频繁、网络分区成为常态、部分故障几乎每秒都在发生。传统方案(消息队列 + 数据库状态机)在这些场景下越来越力不从心——代码必须时刻考虑「如果这一步失败了怎么办」。

Temporal 给出了一个根本性的答案:让普通的 Go/Java/Python 代码具备「故障自愈」能力。你写的 Workflow 代码,即使在执行过程中进程崩溃、网络中断、数据库故障,重启后也能从最后一个成功的步骤继续执行,就像什么都没发生过一样。

本文深度解析 Temporal 的核心原理(Event Sourcing、Command Pattern、Deterministic Replay)、架构设计(Worker、History Service、Matching Service)、Go SDK 实战(Workflow、Activity、Child Workflow、Saga Pattern)、生产级部署(Prometheus 监控、Grafana 仪表盘、灰度发布、版本ing)、性能优化(吞吐量与延迟调优),以及何时应该选择 Temporal、何时不应该用 Temporal。


目录

  1. 为什么需要 Temporal?——分布式系统的「失败地狱」
  2. Temporal 是什么?——重新定义「可靠执行」
  3. 核心概念速览
  4. 架构深度解析
  5. 事件溯源(Event Sourcing)原理
  6. 确定性重放(Deterministic Replay)
  7. 快速上手:第一个 Temporal Workflow
  8. Workflow 编写规范与陷阱
  9. Activity:分布式系统中的「副作用」
  10. 错误处理与重试策略
  11. Saga 模式:分布式事务的最终一致性
  12. Child Workflow 与 Parent-Child 关系
  13. Signal:向运行中的 Workflow 发送消息
  14. Query:查询 Workflow 状态
  15. 版本ing:不停机的代码升级
  16. 生产级部署
  17. 可观测性:Metrics、Tracing、Logging
  18. 性能优化实战
  19. Temporal vs 其他方案对比
  20. 真实案例:Uber、Netflix、Box 如何使用 Temporal
  21. 总结与展望

1. 为什么需要 Temporal?——分布式系统的「失败地狱」

1.1 传统分布式系统的痛点

假设你正在编写一个电商订单处理流程:

1. 创建订单(数据库写入)
2. 扣减库存(调用库存服务)
3. 扣减账户余额(调用支付服务)
4. 发送确认邮件(调用邮件服务)
5. 更新订单状态为「已完成」

在理想情况下,这段代码按顺序执行,一切顺利。但生产环境中:

  • 步骤 2 失败(库存服务超时):订单已创建,但库存未扣减。是重试?还是标记订单为失败?如果重试时网络又断了呢?
  • 步骤 3 失败(余额不足):订单和库存怎么办?已经扣减的库存如何回滚?
  • 步骤 4 失败(邮件服务宕机):订单已经创建、库存已扣、钱已扣,但用户没收到确认邮件。是重试邮件?还是整个流程回滚?
  • 服务进程在步骤 2 之后崩溃:重启后,服务不知道订单处理到哪一步了。是从头开始(可能导致重复扣减)?还是跳过步骤 1 和 2(但怎么知道它们已经成功了)?

传统解决方案是在每一步之后把「当前进度」写入数据库,并在服务重启后检查进度,然后从断点继续执行。但这带来了新的问题:

  • 进度状态表的 schema 会随着业务流程变化而频繁修改
  • 「执行逻辑」和「状态管理」紧密耦合,代码难以维护
  • 重试逻辑、超时处理、幂等性保证需要手写大量样板代码
  • 跨服务的事务一致性(Saga 模式)实现复杂,容易出错

1.2 Temporal 的解决思路

Temporal 的核心洞察是:把「执行状态」和「业务逻辑」完全分离

在 Temporal 中,你写的代码看起来完全是同步的、顺序的:

func OrderWorkflow(ctx workflow.Context, orderID string) error {
    // 1. 创建订单
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    
    var createResult string
    err := workflow.ExecuteActivity(ctx, CreateOrderActivity, orderID).Get(ctx, &createResult)
    if err != nil {
        return err
    }
    
    // 2. 扣减库存
    var inventoryResult string
    err = workflow.ExecuteActivity(ctx, DeductInventoryActivity, orderID).Get(ctx, &inventoryResult)
    if err != nil {
        // 失败:需要回滚创建订单
        _ = workflow.ExecuteActivity(ctx, CancelOrderActivity, orderID).Get(ctx, nil)
        return err
    }
    
    // 3. 扣减余额
    err = workflow.ExecuteActivity(ctx, DeductBalanceActivity, orderID).Get(ctx, nil)
    if err != nil {
        // 失败:需要回滚库存和订单
        _ = workflow.ExecuteActivity(ctx, RestoreInventoryActivity, orderID).Get(ctx, nil)
        _ = workflow.ExecuteActivity(ctx, CancelOrderActivity, orderID).Get(ctx, nil)
        return err
    }
    
    // 4. 发送邮件
    _ = workflow.ExecuteActivity(ctx, SendEmailActivity, orderID).Get(ctx, nil)
    
    // 5. 完成
    _ = workflow.ExecuteActivity(ctx, MarkOrderCompleteActivity, orderID).Get(ctx, nil)
    
    return nil
}

关键点:这段代码在 Temporal 中运行,具有「故障自愈」能力:

  • 如果服务在步骤 2 之后崩溃,Temporal 会在另一个 Worker 上从步骤 3 继续执行(步骤 1 和 2 不会重跑)
  • 如果 Activity 执行失败(网络超时、服务宕机),Temporal 会自动重试(可配置重试策略)
  • 整个 Workflow 的执行状态由 Temporal 持久化存储,开发者无需手动管理

2. Temporal 是什么?——重新定义「可靠执行」

2.1 Temporal 的定义

Temporal 是一个持久化执行引擎(Durable Execution Engine)。它的核心能力是:

让任意代码(Workflow)在任何故障(进程崩溃、网络中断、机器宕机)的情况下,都能从最后一个成功的断点继续执行,且保证执行结果的最终一致性。

Temporal 不是:

  • ❌ 消息队列(RabbitMQ、Kafka)—— Temporal 不负责消息路由,而是负责「有状态的长时运行流程」
  • ❌ 任务调度器(Celery、Quartz)—— Temporal 不仅调度任务,还保证任务的完整执行(包括失败恢复)
  • ❌ 状态机框架(AWS Step Functions)—— Temporal 更灵活,支持复杂的编程逻辑(循环、条件、错误处理)
  • ❌ 工作流引擎(Camunda、Zeebe)—— Temporal 更底层,直接以代码定义工作流(而非 XML/BPMN)

2.2 Temporal 的核心架构

Temporal 采用「控制平面 + 数据平面」分离的架构:

┌─────────────────────────────────────────────────────────────┐
│                      Temporal Cluster                       │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │   Frontend   │  │    History   │  │   Matching   │      │
│  │   Service    │  │   Service    │  │   Service    │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
│         │                  │                  │              │
│         └──────────────────┼──────────────────┘              │
│                            │                                 │
│                    ┌───────┴────────┐                        │
│                    │  Persistence   │                        │
│                    │  (PostgreSQL/  │                        │
│                    │   MySQL/Cassandra)                       │
│                    └────────────────┘                        │
└─────────────────────────────────────────────────────────────┘
         │                            │
         │  gRPC                     │  Task Queue
         ▼                            ▼
┌─────────────┐              ┌─────────────┐
│   Worker    │              │   Worker    │
│  (Go/Java/  │              │  (Go/Java/  │
│   Python)   │              │   Python)   │
└─────────────┘              └─────────────┘

核心组件

  1. Frontend Service:接收来自客户端的请求(启动 Workflow、发送 Signal、查询状态)
  2. History Service:核心组件,负责记录 Workflow 执行的历史事件(Event Sourcing),并在 Worker 失败时调度重试
  3. Matching Service:将 Task(Activity Task、Workflow Task)分发给可用的 Worker
  4. Persistence Layer:持久化存储 Workflow 状态(事件历史、当前状态)

Worker

  • Worker 是执行 Workflow 和 Activity 的进程
  • Worker 可以水平扩展(添加更多 Worker 节点)
  • Worker 从 Task Queue 拉取任务执行

3. 核心概念速览

3.1 Workflow

Workflow 是 Temporal 的核心抽象。它定义了「要做什么」的业务逻辑。

特性

  • 持久化执行:即使 Worker 崩溃,Workflow 也会从断点继续执行
  • 确定性:同样的输入,必须产生同样的执行路径(不能调用随机数、当前时间等不确定函数)
  • 可重放:Temporal 会根据事件历史「重放」Workflow 代码,以恢复内存状态
func MyWorkflow(ctx workflow.Context, input string) (string, error) {
    // Workflow 代码:定义执行逻辑
    // 不能调用:time.Now()、math.Random()、goroutine、channel
    // 只能调用:workflow.Sleep()、workflow.ExecuteActivity()、workflow.WaitCondition()
    
    var result string
    err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, &result)
    if err != nil {
        return "", err
    }
    return result, nil
}

3.2 Activity

Activity 是 Workflow 中执行「副作用」的单元。比如:调用外部 API、写入数据库、发送邮件。

特性

  • 可重试:Activity 失败后可以自动重试(可配置重试策略)
  • 幂等性:Activity 可能被多次执行(由于重试),必须是幂等的
  • 超时控制:可以设置 StartToCloseTimeout、HeartbeatTimeout
func MyActivity(ctx context.Context, input string) (string, error) {
    // Activity 代码:执行副作用
    // 可以调用:任何 Go 代码(time.Now()、goroutine、第三方库)
    
    result, err := callExternalAPI(input)
    return result, err
}

3.3 Task Queue

Task Queue 是 Worker 拉取任务的队列。

  • Workflow Task Queue:用于调度 Workflow 执行
  • Activity Task Queue:用于调度 Activity 执行
// 启动 Workflow,指定 Task Queue
client.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
    ID:        "workflow-id-123",
    TaskQueue: "my-task-queue",
}, MyWorkflow, "input")

3.4 Namespace

Namespace 是 Temporal 中的多租户隔离单元。

  • 不同 Namespace 的 Workflow 完全隔离
  • 可以用于:环境隔离(dev/staging/prod)、团队隔离、客户隔离
// 连接到指定 Namespace
client, err := client.Dial(client.Options{
    HostPort:  "temporal:7233",
    Namespace: "production",
})

3.5 Worker

Worker 是执行 Workflow 和 Activity 的进程。

func main() {
    c, err := client.Dial(client.Options{HostPort: "temporal:7233"})
    if err != nil {
        log.Fatalln("Unable to create Temporal client", err)
    }
    defer c.Close()

    w := worker.New(c, "my-task-queue", worker.Options{})
    
    // 注册 Workflow 和 Activity
    w.RegisterWorkflow(MyWorkflow)
    w.RegisterActivity(MyActivity)
    
    // 启动 Worker(阻塞)
    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalln("Unable to start Worker", err)
    }
}

4. 架构深度解析

4.1 Temporal 的执行模型

Temporal 的执行模型基于事件溯源(Event Sourcing)命令模式(Command Pattern)

执行流程

  1. 客户端调用 ExecuteWorkflow(),Temporal 记录 WorkflowExecutionStarted 事件
  2. History Service 调度第一个 WorkflowTask 到 Task Queue
  3. Worker 拉取 WorkflowTask,执行 Workflow 代码
  4. Workflow 代码调用 ExecuteActivity(),Temporal 记录 ActivityTaskScheduled 事件
  5. History Service 调度 ActivityTask 到 Task Queue
  6. Worker 拉取 ActivityTask,执行 Activity 代码
  7. Activity 执行完成,Worker 返回结果
  8. Temporal 记录 ActivityTaskCompleted 事件
  9. History Service 调度下一个 WorkflowTask
  10. Worker 重放 Workflow 代码(从事件历史恢复状态),继续执行
  11. 重复步骤 4-10,直到 Workflow 完成

关键点:Workflow 代码会被执行多次

  • 第一次:正常执行
  • 后续:Worker 失败重启后,Temporal 会「重放」Workflow 代码,从事件历史恢复状态

因此,Workflow 代码必须是确定性的(Deterministic)。

4.2 事件历史(Event History)

Temporal 将 Workflow 的每次执行都记录为一系列事件。

示例事件历史

1. WorkflowExecutionStarted
2. WorkflowTaskScheduled
3. WorkflowTaskStarted
4. WorkflowTaskCompleted
5. ActivityTaskScheduled (Activity: CreateOrder)
6. ActivityTaskStarted
7. ActivityTaskCompleted (Result: "order-123")
8. ActivityTaskScheduled (Activity: DeductInventory)
9. ActivityTaskStarted
10. ActivityTaskFailed (Error: "timeout")
11. ActivityTaskScheduled (Activity: DeductInventory)  # 重试
12. ActivityTaskStarted
13. ActivityTaskCompleted
...

事件历史的作用

  • 故障恢复:Worker 重启后,根据事件历史重放 Workflow 状态
  • 审计:可以查看 Workflow 的完整执行历史
  • 调试:可以「回放」Workflow 执行过程

4.3 确定性重放(Deterministic Replay)

问题:Workflow 代码在执行过程中会调用 ExecuteActivity()。如果 Worker 重启,Temporal 需要「恢复」Workflow 的内存状态。但 Activity 是异步的(需要等待完成),如何恢复?

解决:Temporal 使用事件历史来重放 Workflow 代码。

重放过程

  1. Worker 重启后,Temporal 将事件历史发送给 Worker
  2. Worker 重新执行 Workflow 代码
  3. 当代码执行到 ExecuteActivity() 时,Temporal 检查事件历史中是否已经有对应的 ActivityTaskCompleted 事件
    • 如果有:直接返回结果,不实际执行 Activity
    • 如果没有:实际执行 Activity
  4. 这样,Workflow 代码执行到「断点」时,内存状态就恢复了

要求:Workflow 代码必须是确定性的

错误示例(不确定性代码):

func MyWorkflow(ctx workflow.Context) error {
    // 错误:time.Now() 每次执行结果不同
    now := time.Now()
    
    // 错误:随机数
    randNum := rand.Intn(100)
    
    // 错误:goroutine(执行顺序不确定)
    go func() {
        fmt.Println("hello")
    }()
    
    return nil
}

正确示例

func MyWorkflow(ctx workflow.Context) error {
    // 正确:使用 workflow.Now()
    now := workflow.Now(ctx)
    
    // 正确:使用 workflow.Random()
    randNum := workflow.Random(ctx, 100)
    
    // 正确:使用 workflow.Go()
    workflow.Go(ctx, func(ctx workflow.Context) {
        // ...
    })
    
    return nil
}

5. 事件溯源(Event Sourcing)原理

5.1 什么是事件溯源?

事件溯源是一种架构模式:系统的状态不是直接存储的,而是存储为一系列不可变的事件。当前状态是通过「重放」这些事件计算出来的。

传统方式(CRUD)

订单状态:{ "status": "PAID", "updated_at": "2026-01-01" }

事件溯源方式

事件 1:OrderCreated    { "order_id": "123", "amount": 100, "timestamp": "..." }
事件 2:PaymentDeducted { "order_id": "123", "amount": 100, "timestamp": "..." }
事件 3:OrderPaid       { "order_id": "123", "timestamp": "..." }

当前状态 = 重放事件 1 → 2 → 3。

5.2 Temporal 中的事件溯源

Temporal 使用事件溯源来实现「故障恢复」:

  • Workflow 的每次执行都记录为事件
  • Worker 重启后,根据事件历史「重放」Workflow 代码,恢复内存状态

优势

  1. 完整的审计日志:可以查看 Workflow 的完整执行历史
  2. 时间旅行调试:可以将 Workflow 状态「回滚」到任意历史事件
  3. 故障恢复:Worker 崩溃后,可以从事件历史恢复

劣势

  1. 事件历史可能很大:长时间运行的 Workflow 会产生大量事件
    • Temporal 解决方案:可以「截断」旧事件(保留最近 N 个事件)
  2. 重放开销:Worker 重启后需要重放事件历史
    • Temporal 解决方案:缓存(Worker 本地缓存事件历史)

6. 确定性重放(Deterministic Replay)

6.1 为什么需要确定性?

Workflow 代码会被执行多次(正常执行 + 重放)。如果代码是非确定性的(比如调用 time.Now()),那么重放时得到的结果会和第一次执行不同,导致状态不一致。

示例

func MyWorkflow(ctx workflow.Context) error {
    // 非确定性:每次执行结果不同
    waitTime := rand.Intn(10)  // 可能是 3,也可能是 7
    workflow.Sleep(ctx, time.Duration(waitTime)*time.Second)
    return nil
}
  • 第一次执行:waitTime = 3,记录事件 TimerScheduled (3s)
  • Worker 重启,重放:
    • 重放时 waitTime = 7(随机数种子不同)
    • 代码尝试记录 TimerScheduled (7s)
    • Temporal 发现事件历史中是 TimerScheduled (3s),不一致 → 报错

6.2 Temporal 如何保证确定性?

Temporal 提供了一套确定性的 API 来替代不确定的标准库函数:

标准库Temporal API说明
time.Now()workflow.Now(ctx)返回事件发生时间(固定)
time.Sleep()workflow.Sleep()确定性的定时器
rand.Intn()workflow.Random()使用固定的随机数种子
go func()workflow.Go()确定性的 goroutine
select {}workflow.Select()确定性的 select
channelworkflow.Channel确定性的 channel

禁用的函数

  • time.Now()(使用 workflow.Now()
  • time.Sleep()(使用 workflow.Sleep()
  • rand.*(使用 workflow.Random()
  • go func()(使用 workflow.Go()
  • ❌ 直接创建 goroutine
  • ❌ 直接操作 channel

6.3 重放模式(Replay Mode)

Temporal Worker 在执行 Workflow 时,有两种模式:

  1. 正常执行模式:第一次执行 Workflow 代码
  2. 重放模式:根据事件历史重放 Workflow 代码

如何判断当前是否处于重放模式?

func MyWorkflow(ctx workflow.Context) error {
    // 判断是否在重放
    if workflow.IsReplaying(ctx) {
        // 重放模式:不执行副作用(比如打印日志)
        // 因为这段代码在第一次执行时已经执行过了
    } else {
        // 正常模式:可以执行副作用
        fmt.Println("Workflow started")
    }
    
    // ...
    return nil
}

注意:通常不需要手动判断 IsReplaying(),Temporal 会自动处理。


7. 快速上手:第一个 Temporal Workflow

7.1 安装 Temporal

本地开发:使用 Temporal Standalone 模式(所有组件运行在一个进程中)

# 安装 Temporal CLI
brew install temporal

# 启动本地 Temporal 服务
temporal server start-dev

生产环境:使用 Temporal Cloud 或自托管 Temporal Cluster

# 使用 Docker Compose 部署 Temporal Cluster
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker-compose up

7.2 第一个 Workflow

项目结构

my-temporal-app/
├── go.mod
├── go.sum
├── worker/
│   └── main.go
└── workflow/
    ├── workflow.go
    └── activity.go

workflow/workflow.go

package workflow

import (
    "context"
    "time"
    
    "go.temporal.io/sdk/workflow"
)

// SayHelloWorkflow 是一个简单的 Workflow:说 Hello
func SayHelloWorkflow(ctx workflow.Context, name string) (string, error) {
    // 记录开始日志(仅在非重放模式下执行)
    workflow.GetLogger(ctx).Info("SayHelloWorkflow started", "name", name)
    
    // 调用 Activity
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    
    var result string
    err := workflow.ExecuteActivity(ctx, SayHelloActivity, name).Get(ctx, &result)
    if err != nil {
        return "", err
    }
    
    // 记录完成日志
    workflow.GetLogger(ctx).Info("SayHelloWorkflow completed", "result", result)
    
    return result, nil
}

workflow/activity.go

package workflow

import (
    "context"
    "fmt"
)

// SayHelloActivity 是一个简单的 Activity:返回 "Hello, {name}!"
func SayHelloActivity(ctx context.Context, name string) (string, error) {
    // Activity 可以调用任意 Go 代码
    result := fmt.Sprintf("Hello, %s!", name)
    return result, nil
}

worker/main.go

package main

import (
    "log"
    
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
    
    "my-temporal-app/workflow"
)

func main() {
    // 创建 Temporal 客户端
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        log.Fatalln("Unable to create Temporal client", err)
    }
    defer c.Close()
    
    // 创建 Worker
    w := worker.New(c, "hello-task-queue", worker.Options{})
    
    // 注册 Workflow 和 Activity
    w.RegisterWorkflow(workflow.SayHelloWorkflow)
    w.RegisterActivity(workflow.SayHelloActivity)
    
    // 启动 Worker(阻塞)
    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalln("Unable to start Worker", err)
    }
}

启动 Workflow(客户端)

package main

import (
    "context"
    "log"
    "time"
    
    "go.temporal.io/sdk/client"
    
    "my-temporal-app/workflow"
)

func main() {
    // 创建 Temporal 客户端
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        log.Fatalln("Unable to create Temporal client", err)
    }
    defer c.Close()
    
    // 启动 Workflow
    workflowOptions := client.StartWorkflowOptions{
        ID:        "hello-workflow-1",
        TaskQueue: "hello-task-queue",
    }
    
    we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, workflow.SayHelloWorkflow, "World")
    if err != nil {
        log.Fatalln("Unable to execute workflow", err)
    }
    
    // 等待 Workflow 完成
    var result string
    err = we.Get(context.Background(), &result)
    if err != nil {
        log.Fatalln("Workflow failed", err)
    }
    
    log.Println("Workflow completed", "result", result)
}

7.3 运行

# 终端 1:启动 Temporal 服务
temporal server start-dev

# 终端 2:启动 Worker
go run worker/main.go

# 终端 3:启动 Workflow
go run client/main.go

输出

2026/01/01 12:00:00 SayHelloWorkflow started {name: World}
2026/01/01 12:00:00 SayHelloWorkflow completed {result: Hello, World!}
2026/01/01 12:00:00 Workflow completed {result: Hello, World!}

8. Workflow 编写规范与陷阱

8.1 确定性规则

必须遵守

  1. ❌ 不要调用 time.Now() → ✅ 使用 workflow.Now(ctx)
  2. ❌ 不要调用 time.Sleep() → ✅ 使用 workflow.Sleep()
  3. ❌ 不要使用 rand 包 → ✅ 使用 workflow.Random()
  4. ❌ 不要直接创建 goroutine → ✅ 使用 workflow.Go()
  5. ❌ 不要直接操作 channel → ✅ 使用 workflow.NewChannel()
  6. ❌ 不要使用 select → ✅ 使用 workflow.Select()
  7. ❌ 不要调用外部 API → ✅ 使用 Activity
  8. ❌ 不要写文件/数据库 → ✅ 使用 Activity

示例:错误的 Workflow

func BadWorkflow(ctx workflow.Context) error {
    // 错误 1:调用 time.Now()
    now := time.Now()  // ❌
    
    // 错误 2:使用 rand
    num := rand.Intn(100)  // ❌
    
    // 错误 3:直接创建 goroutine
    go func() {  // ❌
        fmt.Println("hello")
    }()
    
    // 错误 4:调用外部 API
    resp, err := http.Get("https://api.example.com")  // ❌
    
    return nil
}

示例:正确的 Workflow

func GoodWorkflow(ctx workflow.Context) error {
    // 正确 1:使用 workflow.Now()
    now := workflow.Now(ctx)  // ✅
    
    // 正确 2:使用 workflow.Random()
    num := workflow.Random(ctx, 100)  // ✅
    
    // 正确 3:使用 workflow.Go()
    workflow.Go(ctx, func(ctx workflow.Context) {  // ✅
        workflow.GetLogger(ctx).Info("hello")
    })
    
    // 正确 4:使用 Activity 调用外部 API
    var result string
    err := workflow.ExecuteActivity(ctx, CallAPIActivity).Get(ctx, &result)  // ✅
    
    return nil
}

8.2 循环与条件

Workflow 中可以使用循环和条件,但必须是确定性的

示例:确定性循环

func LoopWorkflow(ctx workflow.Context, count int) error {
    for i := 0; i < count; i++ {
        // 确定性:循环次数由输入参数决定(固定)
        var result string
        err := workflow.ExecuteActivity(ctx, MyActivity, i).Get(ctx, &result)
        if err != nil {
            return err
        }
    }
    return nil
}

示例:不确定性循环(错误)

func BadLoopWorkflow(ctx workflow.Context) error {
    // 错误:循环次数取决于外部状态(不确定)
    count := getCountFromDatabase()  // ❌ 不能这样做
    
    for i := 0; i < count; i++ {
        // ...
    }
    return nil
}

8.3 异常处理

Workflow 中可以使用 deferrecover

func MyWorkflow(ctx workflow.Context) (err error) {
    // defer:无论是否 panic,都会执行
    defer func() {
        if r := recover(); r != nil {
            err = fmt.Errorf("workflow panicked: %v", r)
        }
    }()
    
    // 调用 Activity
    err = workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
    if err != nil {
        // 处理错误
        return err
    }
    
    return nil
}

9. Activity:分布式系统中的「副作用」

9.1 Activity 的特性

Activity 是 Workflow 中执行「副作用」的单元。

特性

  1. 可重试:Activity 失败后可以自动重试
  2. 幂等性:Activity 可能被多次执行(由于重试),必须是幂等的
  3. 超时控制:可以设置 StartToCloseTimeout、HeartbeatTimeout
  4. 心跳(Heartbeat):长时间运行的 Activity 可以定期发送心跳,证明自己还活着

9.2 Activity 重试策略

func MyWorkflow(ctx workflow.Context) error {
    // 配置 Activity 重试策略
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    1 * time.Second,  // 首次重试等待 1s
            MaximumInterval:    10 * time.Second, // 最多等待 10s
            BackoffCoefficient: 2.0,              // 退避系数:1s → 2s → 4s → 8s → 10s
            MaximumAttempts:    5,                 // 最多重试 5 次
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    
    // 调用 Activity(失败会自动重试)
    err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
    return err
}

9.3 心跳(Heartbeat)

长时间运行的 Activity 应该定期发送心跳,防止被判定为「超时」。

func LongRunningActivity(ctx context.Context) error {
    // 获取心跳详情
    heartbeatDetails := temporal.GetHeartbeatDetails(ctx)
    
    // 执行长时间任务
    for i := 0; i < 100; i++ {
        // 执行一部分工作
        // ...
        
        // 发送心跳(附带进度信息)
        temporal.Heartbeat(ctx, i)
        
        // 检查是否被取消
        if temporal.IsCanceled(ctx) {
            return fmt.Errorf("activity canceled")
        }
        
        time.Sleep(1 * time.Second)
    }
    
    return nil
}

9.4 Activity 超时控制

func MyWorkflow(ctx workflow.Context) error {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,  // Activity 必须在 30s 内完成
        HeartbeatTimeout:    5 * time.Second,   // 每 5s 必须发送一次心跳
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    
    err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, nil)
    return err
}

10. 错误处理与重试策略

10.1 Workflow 级别的重试

Workflow 失败后,可以配置重试策略。

func main() {
    c, _ := client.Dial(client.Options{HostPort: "localhost:7233"})
    defer c.Close()
    
    workflowOptions := client.StartWorkflowOptions{
        ID:        "my-workflow",
        TaskQueue: "my-task-queue",
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    1 * time.Second,
            MaximumInterval:    10 * time.Second,
            BackoffCoefficient: 2.0,
            MaximumAttempts:    3,
        },
    }
    
    we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, MyWorkflow)
    // ...
}

10.2 Activity 级别的重试

9.2 Activity 重试策略

10.3 自定义错误处理

func MyWorkflow(ctx workflow.Context) error {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            NonRetryableErrorTypes: []string{"InvalidInputError"},  // 这些错误不重试
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    
    err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
    if err != nil {
        var applicationErr *temporal.ApplicationError
        if errors.As(err, &applicationErr) {
            if applicationErr.Type() == "InvalidInputError" {
                // 不重试,直接返回
                return err
            }
        }
        // 其他错误会重试
        return err
    }
    
    return nil
}

11. Saga 模式:分布式事务的最终一致性

11.1 什么是 Saga 模式?

Saga 模式是一种分布式事务模式:将大事务拆分为一系列本地事务,每个本地事务都有对应的「补偿事务」。如果某个本地事务失败,则按顺序执行前面所有本地事务的补偿事务,实现回滚。

示例:电商订单流程

事务 1:创建订单
事务 2:扣减库存
事务 3:扣减余额
事务 4:发送确认邮件

如果事务 3 失败:
  补偿 2:恢复库存
  补偿 1:取消订单

11.2 Temporal 中实现 Saga

Temporal 提供了 workflow.Saga 来简化 Saga 模式的实现。

func OrderWorkflow(ctx workflow.Context, orderID string) error {
    // 创建 Saga
    saga := workflow.NewSaga(ctx)
    
    // 事务 1:创建订单
    var orderResult string
    err := workflow.ExecuteActivity(ctx, CreateOrderActivity, orderID).Get(ctx, &orderResult)
    if err != nil {
        return err
    }
    // 注册补偿事务
    saga.AddCompensation(ctx, CancelOrderActivity, orderID)
    
    // 事务 2:扣减库存
    var inventoryResult string
    err = workflow.ExecuteActivity(ctx, DeductInventoryActivity, orderID).Get(ctx, &inventoryResult)
    if err != nil {
        // 失败:执行补偿事务(取消订单)
        return saga.Compensate(ctx, nil)
    }
    // 注册补偿事务
    saga.AddCompensation(ctx, RestoreInventoryActivity, orderID)
    
    // 事务 3:扣减余额
    err = workflow.ExecuteActivity(ctx, DeductBalanceActivity, orderID).Get(ctx, nil)
    if err != nil {
        // 失败:执行补偿事务(恢复库存、取消订单)
        return saga.Compensate(ctx, nil)
    }
    // 注册补偿事务
    saga.AddCompensation(ctx, RefundBalanceActivity, orderID)
    
    // 事务 4:发送邮件
    _ = workflow.ExecuteActivity(ctx, SendEmailActivity, orderID).Get(ctx, nil)
    
    return nil
}

11.3 手动实现 Saga

如果不使用 workflow.Saga,也可以手动实现:

func OrderWorkflowManual(ctx workflow.Context, orderID string) (err error) {
    // defer:如果出错,执行补偿事务
    defer func() {
        if err != nil {
            // 执行补偿事务(逆序)
            _ = workflow.ExecuteActivity(ctx, RefundBalanceActivity, orderID).Get(ctx, nil)
            _ = workflow.ExecuteActivity(ctx, RestoreInventoryActivity, orderID).Get(ctx, nil)
            _ = workflow.ExecuteActivity(ctx, CancelOrderActivity, orderID).Get(ctx, nil)
        }
    }()
    
    // 事务 1
    err = workflow.ExecuteActivity(ctx, CreateOrderActivity, orderID).Get(ctx, nil)
    if err != nil {
        return err
    }
    
    // 事务 2
    err = workflow.ExecuteActivity(ctx, DeductInventoryActivity, orderID).Get(ctx, nil)
    if err != nil {
        return err
    }
    
    // 事务 3
    err = workflow.ExecuteActivity(ctx, DeductBalanceActivity, orderID).Get(ctx, nil)
    if err != nil {
        return err
    }
    
    return nil
}

12. Child Workflow 与 Parent-Child 关系

12.1 什么是 Child Workflow?

Child Workflow 是由 Parent Workflow 启动的 Workflow。

使用场景

  • 将一个大 Workflow 拆分为多个小 Workflow
  • 并行执行多个独立的子流程
  • 需要独立的重试策略、超时控制

12.2 启动 Child Workflow

func ParentWorkflow(ctx workflow.Context) error {
    // 启动 Child Workflow
    childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
        WorkflowID:        "child-workflow-1",
        ParentClosePolicy: workflow.ParentClosePolicyTerminate,  // Parent 失败时,Child 也终止
    })
    
    var childResult string
    err := workflow.ExecuteChildWorkflow(childCtx, ChildWorkflow, "input").Get(ctx, &childResult)
    if err != nil {
        return err
    }
    
    workflow.GetLogger(ctx).Info("Child workflow completed", "result", childResult)
    return nil
}

func ChildWorkflow(ctx workflow.Context, input string) (string, error) {
    // Child Workflow 逻辑
    return "child result", nil
}

12.3 Parent-Child 关系

ParentClosePolicy

  • ParentClosePolicyTerminate:Parent 失败时,Child 也被终止
  • ParentClosePolicyAbandon:Parent 失败时,Child 继续执行
  • ParentClosePolicyRequestCancel:Parent 失败时,请求取消 Child

13. Signal:向运行中的 Workflow 发送消息

13.1 什么是 Signal?

Signal 是向运行中的 Workflow 发送消息的机制。

使用场景

  • 用户取消订单(发送 cancel 信号)
  • 管理员审批(发送 approve 信号)
  • 动态修改 Workflow 参数

13.2 接收 Signal

func MyWorkflow(ctx workflow.Context) error {
    // 注册 Signal 处理器
    selector := workflow.NewSelector(ctx)
    
    var cancelRequested bool
    signalChan := workflow.GetSignalChannel(ctx, "cancel")
    selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
        c.Receive(ctx, nil)
        cancelRequested = true
        workflow.GetLogger(ctx).Info("Cancel signal received")
    })
    
    // 执行逻辑,检查 cancelRequested
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    
    err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
    if err != nil {
        return err
    }
    
    // 等待 Signal 或 Activity 完成
    for {
        if cancelRequested {
            return fmt.Errorf("workflow canceled")
        }
        
        // 等待 1s 或 Signal 到达
        selector.Select(ctx)
        
        // 检查 Activity 是否完成
        // ...
    }
}

13.3 发送 Signal

func main() {
    c, _ := client.Dial(client.Options{HostPort: "localhost:7233"})
    defer c.Close()
    
    // 发送 Signal 到运行中的 Workflow
    err := c.SignalWorkflow(context.Background(), "my-workflow-id", "", "cancel", nil)
    if err != nil {
        log.Fatalln("Unable to signal workflow", err)
    }
}

14. Query:查询 Workflow 状态

14.1 什么是 Query?

Query 是查询运行中 Workflow 状态的机制。与 Signal 不同,Query 不修改 Workflow 状态。

14.2 注册 Query

func MyWorkflow(ctx workflow.Context) error {
    // 状态
    var orderStatus string = "CREATED"
    
    // 注册 Query 处理器
    workflow.SetQueryHandler(ctx, "get_status", func() (string, error) {
        return orderStatus, nil
    })
    
    // 更新状态
    orderStatus = "PROCESSING"
    _ = workflow.ExecuteActivity(ctx, ProcessOrderActivity).Get(ctx, nil)
    orderStatus = "COMPLETED"
    
    return nil
}

14.3 发送 Query

func main() {
    c, _ := client.Dial(client.Options{HostPort: "localhost:7233"})
    defer c.Close()
    
    // 查询 Workflow 状态
    response, err := c.QueryWorkflow(context.Background(), "my-workflow-id", "", "get_status")
    if err != nil {
        log.Fatalln("Unable to query workflow", err)
    }
    
    var status string
    response.Get(&status)
    log.Println("Workflow status:", status)
}

15. 版本ing:不停机的代码升级

15.1 问题:Workflow 代码升级后,如何兼容旧实例?

假设你的 Workflow 代码已经部署,且有 100 个运行中的 Workflow 实例。现在你需要修改 Workflow 逻辑(比如增加一个新步骤)。如果直接升级代码,旧实例重放时会使用新逻辑,可能导致不一致。

15.2 使用 workflow.GetVersion()

Temporal 提供了 workflow.GetVersion() 来实现版本兼容。

func MyWorkflow(ctx workflow.Context) error {
    // 获取当前代码版本
    version := workflow.GetVersion(ctx, "add-new-step", workflow.DefaultVersion, 1)
    
    // 原有逻辑
    _ = workflow.ExecuteActivity(ctx, Step1Activity).Get(ctx, nil)
    
    // 新逻辑:仅在新版本中执行
    if version == 1 {
        _ = workflow.ExecuteActivity(ctx, NewStepActivity).Get(ctx, nil)
    }
    
    // 原有逻辑
    _ = workflow.ExecuteActivity(ctx, Step2Activity).Get(ctx, nil)
    
    return nil
}

原理

  • 旧实例(使用 DefaultVersion):GetVersion() 返回 workflow.DefaultVersion,不执行新步骤
  • 新实例(使用版本 1):GetVersion() 返回 1,执行新步骤

15.3 版本迁移最佳实践

  1. 先部署新代码:新代码同时支持旧版本和新版本
  2. 等待所有旧实例完成:确保没有运行中的旧实例
  3. 删除旧版本代码:清理 GetVersion() 中的旧版本分支

16. 生产级部署

16.1 Temporal Cluster 部署

生产环境推荐架构

┌─────────────────────────────────────────────────┐
│               Load Balancer                     │
│                   (HAProxy)                     │
└──────────────┬──────────────────────────────────┘
               │
       ┌───────┴───────┐
       │               │
┌──────▼─────┐  ┌──────▼─────┐
│ Frontend   │  │ Frontend   │  (2+ 实例,高可用)
│ Service    │  │ Service    │
└──────┬─────┘  └──────┬─────┘
       │               │
       └───────┬───────┘
               │
       ┌───────┴───────┐
       │               │
┌──────▼─────┐  ┌──────▼─────┐
│  History   │  │  History   │  (2+ 实例)
│  Service   │  │  Service   │
└──────┬─────┘  └──────┬─────┘
       │               │
       └───────┬───────┘
               │
       ┌───────┴───────┐
       │               │
┌──────▼─────┐  ┌──────▼─────┐
│  Matching  │  │  Matching  │  (2+ 实例)
│  Service   │  │  Service   │
└──────┬─────┘  └──────┬─────┘
       │               │
       └───────┬───────┘
               │
       ┌───────┴───────┐
       │               │
┌──────▼─────┐  ┌──────▼─────┐
│  Database  │  │  Database  │  (PostgreSQL/MySQL 主从复制)
│ (Primary)  │  │ (Replica)  │
└────────────┘  └────────────┘

使用 Docker Compose 部署(适用于小团队):

# docker-compose.yml
version: '3.8'
services:
  temporal:
    image: temporalio/auto-setup:latest
    ports:
      - "7233:7233"
    environment:
      - DB=postgresql
      - DB_PORT=5432
      - POSTGRES_USER=temporal
      - POSTGRES_PWD=temporal
      - POSTGRES_DB=temporal
    depends_on:
      - postgres
  
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: temporal
      POSTGRES_PASSWORD: temporal
      POSTGRES_DB: temporal
    volumes:
      - postgres-data:/var/lib/postgresql/data
  
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    depends_on:
      - prometheus

volumes:
  postgres-data:

16.2 Worker 部署

Worker 部署建议

  1. 水平扩展:根据 Task Queue 的 backlog 大小,动态添加/移除 Worker 实例
  2. 资源隔离:Workflow Worker 和 Activity Worker 分开部署(避免 Activity 长时间运行阻塞 Workflow)
  3. 健康检查:使用 /health 端点(Temporal SDK 提供)
  4. 优雅关闭:捕获 SIGTERM 信号,完成当前任务后再退出
func main() {
    c, _ := client.Dial(client.Options{HostPort: "temporal:7233"})
    defer c.Close()
    
    w := worker.New(c, "my-task-queue", worker.Options{})
    w.RegisterWorkflow(MyWorkflow)
    w.RegisterActivity(MyActivity)
    
    // 优雅关闭
    go func() {
        <-worker.InterruptCh()
        log.Println("Shutting down worker...")
        w.Stop()  // 完成当前任务后退出
    }()
    
    err := w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalln("Unable to start worker", err)
    }
}

16.3 数据库选择

Temporal 支持多种数据库:

数据库适用场景性能
PostgreSQL生产环境(推荐)
MySQL生产环境
Cassandra大规模部署(百万级 Workflow)最高
SQLite本地开发/测试

PostgreSQL 配置建议

# postgresql.conf
max_connections = 1000
shared_buffers = 4GB
effective_cache_size = 12GB
maintenance_work_mem = 1GB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 500

17. 可观测性:Metrics、Tracing、Logging

17.1 Metrics

Temporal 提供了丰富的 Prometheus Metrics。

关键指标

  • temporal_workflow_started:Workflow 启动次数
  • temporal_workflow_completed:Workflow 完成次数
  • temporal_workflow_failed:Workflow 失败次数
  • temporal_activity_execution_latency:Activity 执行延迟
  • temporal_worker_task_slots_available:Worker 可用任务槽位数

配置 Prometheus

# prometheus.yml
scrape_configs:
  - job_name: 'temporal'
    static_configs:
      - targets: ['temporal:8233']  # Temporal Metrics 端口
  - job_name: 'worker'
    static_configs:
      - targets: ['worker:8234']  # Worker Metrics 端口

17.2 Tracing

Temporal 支持 OpenTelemetry Tracing。

func main() {
    // 创建 OpenTelemetry Tracer
    tracerProvider, _ := newOtelTracerProvider()
    
    c, _ := client.Dial(client.Options{
        HostPort: "temporal:7233",
        Tracer:   tracerProvider.Tracer("my-app"),
    })
    defer c.Close()
    
    w := worker.New(c, "my-task-queue", worker.Options{
        Tracer: tracerProvider.Tracer("my-app"),
    })
    // ...
}

17.3 Logging

Workflow 中使用 workflow.GetLogger(ctx) 记录日志。

func MyWorkflow(ctx workflow.Context) error {
    logger := workflow.GetLogger(ctx)
    
    logger.Info("Workflow started", "input", "value")
    logger.Warn("Something is wrong")
    logger.Error("Activity failed", "error", err)
    
    return nil
}

日志最佳实践

  1. 使用结构化日志(key-value 格式)
  2. 包含 Workflow ID、Run ID
  3. 记录关键业务事件(订单创建、支付完成)

18. 性能优化实战

18.1 Workflow 性能优化

问题:Workflow 重放时,需要读取完整的事件历史。如果事件历史很大(比如 10 万事件),重放会很慢。

优化方案

  1. 使用 ContinueAsNew
    • 当事件历史超过一定大小(比如 1 万事件),创建新的 Workflow Run
    • 新 Run 的事件历史从空开始
func MyLongRunningWorkflow(ctx workflow.Context) error {
    for {
        // 执行一批任务
        // ...
        
        // 检查事件历史大小
        if workflow.GetInfo(ctx).GetCurrentHistoryLength() > 10000 {
            // 创建新 Run
            return workflow.NewContinueAsNewError(ctx, MyLongRunningWorkflow, newInput)
        }
    }
}
  1. 减少事件数量
    • 避免过多的小型 Activity(合并为一个大型 Activity)
    • 使用 workflow.ExecuteLocalActivity()(不记录事件)

18.2 Activity 性能优化

  1. 并行执行独立 Activity
func MyWorkflow(ctx workflow.Context) error {
    // 并行执行多个独立的 Activity
    selector := workflow.NewSelector(ctx)
    
    var result1 string
    future1 := workflow.ExecuteActivity(ctx, Activity1)
    selector.AddFuture(future1, func(f workflow.Future) {
        f.Get(ctx, &result1)
    })
    
    var result2 string
    future2 := workflow.ExecuteActivity(ctx, Activity2)
    selector.AddFuture(future2, func(f workflow.Future) {
        f.Get(ctx, &result2)
    })
    
    // 等待所有 Activity 完成
    selector.Select(ctx)
    selector.Select(ctx)
    
    return nil
}
  1. 使用 Local Activity
    • Local Activity 在 Worker 本地执行,不通过 Temporal Cluster
    • 适用于:执行时间短(< 1s)、失败影响小的任务
func MyWorkflow(ctx workflow.Context) error {
    // 使用 Local Activity(不记录事件,性能更高)
    var result string
    err := workflow.ExecuteLocalActivity(ctx, MyLocalActivity, "input").Get(ctx, &result)
    return err
}

18.3 Worker 性能优化

  1. 增加 Worker 数量:水平扩展
  2. 调整 Task Queue 并发度
w := worker.New(c, "my-task-queue", worker.Options{
    MaxConcurrentActivityExecution:       100,  // 最多同时执行 100 个 Activity
    MaxConcurrentWorkflowTaskExecution:   50,   // 最多同时执行 50 个 Workflow Task
    MaxConcurrentLocalActivityExecution:  200,  // 最多同时执行 200 个 Local Activity
})

19. Temporal vs 其他方案对比

19.1 Temporal vs 消息队列(RabbitMQ、Kafka)

特性Temporal消息队列
持久化执行✅ 原生支持❌ 需要手动实现
故障恢复✅ 自动❌ 手动(消费端 ACK)
分布式事务✅ Saga 模式❌ 需要手动实现
状态管理✅ 自动❌ 手动(数据库)
超时控制✅ 原生支持❌ 手动(延迟队列)
可观测性✅ Web UI❌ 需要第三方工具

选择建议

  • 需要有状态的长时运行流程:选择 Temporal
  • 简单的事件通知:选择消息队列

19.2 Temporal vs AWS Step Functions

特性TemporalAWS Step Functions
编程模型代码定义(Go/Java/Python)JSON/YAML 定义
灵活性✅ 高(支持循环、条件、错误处理)❌ 低(受限于状态机模型)
自托管✅ 支持❌ 仅云服务
成本✅ 开源免费❌ 按状态转换收费
多云支持✅ 支持❌ 仅 AWS

选择建议

  • 需要复杂编程逻辑:选择 Temporal
  • 简单流程 + 深度集成 AWS:选择 Step Functions

19.3 Temporal vs Camunda、Zeebe

特性TemporalCamunda/Zeebe
编程模型代码定义BPMN 图形化定义
目标用户开发者业务分析师 + 开发者
灵活性✅ 高❌ 受限于 BPMN
学习曲线❌ 陡峭(需要理解事件溯源)✅ 平缓(图形化)

选择建议

  • 开发者主导的项目:选择 Temporal
  • 需要业务分析师参与流程设计:选择 Camunda

20. 真实案例:Uber、Netflix、Box 如何使用 Temporal

20.1 Uber:支付流程编排

Uber 使用 Temporal(前身是 Cadence)来编排支付流程。

挑战

  • 支付涉及多个服务(银行卡、PayPal、Uber Cash)
  • 需要保证最终一致性(不能多扣、不能少扣)
  • 部分服务可能超时或失败

Temporal 解决方案

  • 每个支付流程定义为一个 Workflow
  • 使用 Saga 模式实现分布式事务
  • 失败自动重试,保证最终一致性

20.2 Netflix:内容转码流程

Netflix 使用 Temporal 来编排视频转码流程。

挑战

  • 视频转码是长时间运行的任务(几小时到几天)
  • 需要支持断点续传(Worker 崩溃后从中断点继续)
  • 需要并行处理多个视频片段

Temporal 解决方案

  • 每个视频转码任务定义为一个 Workflow
  • 并行转码多个视频片段(Child Workflow)
  • Worker 崩溃后,Temporal 自动调度到其他 Worker 继续执行

20.3 Box:文件处理流程

Box 使用 Temporal 来编排文件处理流程(病毒扫描、缩略图生成、元数据提取)。

挑战

  • 文件处理涉及多个步骤,每个步骤可能失败
  • 需要保证文件处理的原子性(要么全部成功,要么全部失败)
  • 需要支持手动审批(比如病毒扫描失败)

Temporal 解决方案

  • 每个文件处理任务定义为一个 Workflow
  • 使用 Signal 实现手动审批
  • 使用 Query 实现处理进度查询

21. 总结与展望

21.1 Temporal 的核心价值

  1. 简化分布式系统开发

    • 不需要手动管理状态、重试、超时
    • 代码看起来是同步的、顺序的,但实际是分布式的、容错的
  2. 提高系统可靠性

    • 自动故障恢复
    • 保证最终一致性
  3. 提高开发效率

    • 不需要写大量的样板代码(重试、超时、状态管理)
    • 专注于业务逻辑

21.2 何时应该使用 Temporal?

适合使用 Temporal 的场景

  • ✅ 长时运行的流程(几秒到几天)
  • ✅ 需要保证最终一致性的分布式事务
  • ✅ 需要故障恢复(从断点继续执行)
  • ✅ 需要人工审批(Signal/Query)
  • ✅ 需要查询流程状态(Query)

不适合使用 Temporal 的场景

  • ❌ 简单的 CRUD 操作(直接用数据库)
  • ❌ 高吞吐量、低延迟的请求(直接用 gRPC/HTTP)
  • ❌ 实时流处理(用 Flink/Spark)

21.3 Temporal 的未来

  1. 多云支持:Temporal Cloud 已经支持 AWS、GCP、Azure
  2. 更多语言 SDK:已经支持 Go、Java、Python、TypeScript,未来会支持更多语言
  3. 更好的可观测性:Temporal Web UI 正在改进(更好的可视化、调试工具)
  4. 性能优化:事件历史压缩、更快的重放

参考资源

  1. 官方文档:https://docs.temporal.io/
  2. GitHub:https://github.com/temporalio/temporal
  3. 社区论坛:https://community.temporal.io/
  4. 示例代码:https://github.com/temporalio/samples-go

作者注:本文基于 Temporal v1.25.0(2026 年 6 月)编写。Temporal 是一个快速发展的开源项目,具体 API 可能会有所变化,请以官方文档为准。


文章字数:约 18000 字

标签Temporal 工作流编排 分布式系统 持久化执行 事件溯源 Go 微服务 Saga模式 故障恢复 云原生

推荐文章

Vue3中的自定义指令有哪些变化?
2024-11-18 07:48:06 +0800 CST
html一个包含iPhoneX和MacBook模拟器
2024-11-19 08:03:47 +0800 CST
聚合支付管理系统
2025-07-23 13:33:30 +0800 CST
动态渐变背景
2024-11-19 01:49:50 +0800 CST
Vue3中如何处理组件的单元测试?
2024-11-18 15:00:45 +0800 CST
ElasticSearch简介与安装指南
2024-11-19 02:17:38 +0800 CST
程序员茄子在线接单