Temporal 深度解析:分布式持久化执行引擎如何终结微服务编排的地狱模式
写在前面
如果你在后端干过超过三年,大概率遇到过这样的场景:一个订单流程要调支付、扣库存、发通知、更新状态——每个步骤都是网络调用,都可能失败,都可能超时。你用消息队列串起来,加了重试,加了补偿,加了幂等,最后发现代码里 80% 的逻辑都在处理"万一失败了怎么办"。
更惨的是长流程。用户注册后要发邮件、初始化数据、触发推荐算法、同步到 BI 系统……任何一步挂了,你得知道挂在哪,从哪里恢复,中间状态怎么处理。于是你写了大量的状态机代码,数据库里存了一堆中间状态字段,代码变成了意大利面条。
Temporal 就是来终结这种地狱模式的。它不是又一个消息队列,也不是简单的任务调度器——它是一个持久化执行引擎(Durable Execution Engine)。你写的代码看起来就像同步代码,但它的执行是分布式的、持久的、容错的。进程挂了?重启后从断点继续。机器宕了?另一台机器接管执行。
这篇文章会从架构设计到底层实现,从 Go SDK 实战到性能调优,带你彻底搞懂 Temporal 到底是怎么做到"代码写起来像同步,跑起来像分布式"的。
一、为什么需要持久化执行引擎
1.1 微服务编排的三重困境
在深入 Temporal 之前,我们先搞清楚问题到底在哪。
困境一:失败处理
分布式系统中,失败是常态而非异常。网络分区、服务超时、进程崩溃、磁盘满——任何一次调用都可能失败。传统的处理方式:
// 传统的订单处理——地狱模式
func ProcessOrder(ctx context.Context, order *Order) error {
// 扣库存
if err := inventoryService.Deduct(ctx, order.Items); err != nil {
// 扣失败了,是重试还是放弃?如果部分扣了呢?
return fmt.Errorf("deduct inventory failed: %w", err)
}
// 发起支付
paymentResult, err := paymentService.Charge(ctx, order.Payment)
if err != nil {
// 支付失败了,库存要不要回滚?如果回滚也失败了呢?
_ = inventoryService.Compensate(ctx, order.Items) // 忽略错误?
return fmt.Errorf("payment failed: %w", err)
}
// 更新订单状态
if err := orderService.UpdateStatus(ctx, order.ID, "paid"); err != nil {
// 状态更新失败,但支付已经成功——怎么处理?
// 需要人工介入?
return fmt.Errorf("update status failed: %w", err)
}
// 发送通知
if err := notificationService.Send(ctx, order.UserID, "订单支付成功"); err != nil {
// 通知失败,但订单已经完成了
// 记日志?重试?不管了?
log.Printf("notification failed: %v", err)
}
return nil
}
看到问题了吗?每一步失败都需要不同的补偿逻辑,而补偿本身也可能失败。你的代码从"做业务"变成了"处理异常",而且越写越乱。
困境二:状态管理
长流程中的状态管理更是噩梦。一个审批流程可能持续数天,涉及多个角色:
type ApprovalStatus string
const (
ApprovalPending ApprovalStatus = "pending"
ApprovalApproved ApprovalStatus = "approved"
ApprovalRejected ApprovalStatus = "rejected"
ApprovalCancelled ApprovalStatus = "cancelled"
ApprovalCompensat ApprovalStatus = "compensating" // 补偿中
ApprovalTimeout ApprovalStatus = "timeout"
)
// 数据库里要存各种中间状态
type Approval struct {
ID string
Status ApprovalStatus
CurrentStep int
RetryCount int
LastError string
CompensationState string // 补偿状态
// ... 还有十几个字段
}
你需要一个状态机来管理这些状态的流转,但状态机本身也很复杂——尤其是涉及超时、重试、补偿的时候。
困境三:可观测性
出了问题之后,你怎么知道流程跑到哪了?日志散落在各个服务里,请求 ID 追踪链可能断裂。你得去 Elasticsearch 里搜日志,去数据库里查状态,然后人肉拼凑出整个流程的时间线。
1.2 现有方案的局限性
| 方案 | 优点 | 局限 |
|---|---|---|
| 消息队列(Kafka/RabbitMQ) | 解耦、异步、高吞吐 | 无内置状态管理、无长流程支持、消息丢失难以恢复 |
| Saga 模式 | 有补偿机制 | 每个补偿都要手写、状态管理复杂、调试困难 |
| 有限状态机(FSM) | 状态流转清晰 | 难以处理并行分支、超时逻辑复杂、代码膨胀 |
| AWS Step Functions | 托管、可视化 | 供应商锁定、冷启动延迟、自定义逻辑受限 |
| Cadence | Temporal 的前身 | 已停止维护、社区萎缩 |
Temporal 的核心洞察是:与其让你在各种基础设施上手动实现容错逻辑,不如让运行时帮你搞定。你写的代码就是业务逻辑,Temporal 保证这段逻辑最终一定会执行完成——不管遇到什么故障。
二、Temporal 核心架构
2.1 整体架构
Temporal 的架构可以用四个核心组件来理解:
┌──────────────────────────────────────────────────┐
│ Temporal Cluster │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ Frontend │ │ History │ │ Matching │ │
│ │ Service │ │ Service │ │ Service │ │
│ └────┬─────┘ └────┬────┘ └───────┬─────────┘ │
│ │ │ │ │
│ └──────────────┼───────────────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ Persistence │ │
│ │ (Cassandra/ │ │
│ │ PostgreSQL/ │ │
│ │ MySQL) │ │
│ └───────────────┘ │
└──────────────────────────────────────────────────┘
│ │
┌────┴─────┐ ┌────┴─────┐
│ Worker 1 │ │ Worker N │
│ (你的代码) │ │ (你的代码) │
└──────────┘ └──────────┘
Frontend Service:统一的 API 网关,处理认证、限流、请求路由。所有 SDK 和 CLI 都通过 Frontend 与集群交互。
History Service:Temporal 的大脑。每个 Workflow 的执行状态由一个 History Shard 管理。它是事件溯源(Event Sourcing)的核心——Workflow 的完整状态由一系列不可变事件组成。
Matching Service:任务调度器。当 Workflow 需要执行一个 Activity 时,Matching Service 负责将任务分配给合适的 Worker。它维护任务队列,支持优先级和速率限制。
Worker:你的代码运行的地方。Worker 通过长轮询(Long Poll)从 Matching Service 拉取任务,执行后把结果回报给 History Service。Worker 是无状态的——可以随时启停,不影响 Workflow 的执行。
2.2 事件溯源:Temporal 的魔法基础
Temporal 最核心的设计决策是事件溯源(Event Sourcing)。Workflow 的状态不是直接存储的,而是通过重放一系列事件来重建的。
Workflow Execution 的事件流:
┌────────────────────────────────────────────┐
│ WorkflowExecutionStarted │
│ WorkflowTaskScheduled │
│ WorkflowTaskStarted │
│ ActivityTaskScheduled (activity=Charge) │
│ WorkflowTaskCompleted │
│ ActivityTaskStarted │
│ ActivityTaskCompleted (result=success) │
│ WorkflowTaskScheduled │
│ WorkflowTaskStarted │
│ ActivityTaskScheduled (activity=Notify) │
│ ... │
└────────────────────────────────────────────┘
每一个状态变更都是一个事件,事件按序号追加到事件日志中,不可修改。当需要恢复 Workflow 状态时,只需从头重放这些事件。
这就是为什么 Temporal 能做到"代码写起来像同步":
func OrderWorkflow(ctx workflow.Context, order Order) error {
// 当执行到这一行时,Temporal 会记录一个 ActivityTaskScheduled 事件
// 然后 Workflow 代码挂起,等待 Activity 完成
var chargeResult ChargeResult
err := workflow.ExecuteActivity(ctx, chargeActivity, order.Payment).Get(ctx, &chargeResult)
if err != nil {
// 这里的 err 是 Activity 执行失败
// Temporal 会根据重试策略自动重试
return err
}
// 如果 Worker 崩溃,重启后 Temporal 会重放事件到这一行
// Activity 的结果从历史事件中获取,不会重复执行
var notifyResult NotifyResult
err = workflow.ExecuteActivity(ctx, notifyActivity, order.UserID).Get(ctx, ¬ifyResult)
return err
}
关键点:Activity 的结果被持久化在事件日志中。重放时,Get() 不会再次执行 Activity,而是直接从历史事件中读取结果。这就是 Temporal 的"确定性重放"——同一个 Workflow 实例,无论重放多少次,都会走到同样的状态。
2.3 Workflow 与 Activity 的本质区别
很多初学者搞不清 Workflow 和 Activity 的边界。一句话概括:
Workflow 是确定性的编排逻辑,Activity 是有副作用的外部交互。
// ❌ 错误:在 Workflow 中做有副作用的操作
func BadWorkflow(ctx workflow.Context) error {
// 不能在 Workflow 中直接访问数据库
db, _ := sql.Open("postgres", "...") // 这会破坏确定性!
// 不能在 Workflow 中发 HTTP 请求
http.Get("https://api.example.com") // 重放时会重复执行!
// 不能在 Workflow 中用当前时间
now := time.Now() // 每次重放时间不同,结果就不同!
return nil
}
// ✅ 正确:副作用放在 Activity 中
func GoodWorkflow(ctx workflow.Context) error {
// 通过 Activity 访问数据库
var result DBResult
err := workflow.ExecuteActivity(ctx, queryDBActivity).Get(ctx, &result)
// 通过 Activity 发 HTTP 请求
err = workflow.ExecuteActivity(ctx, callAPIActivity).Get(ctx, nil)
// 使用 Temporal 提供的确定性时间
now := workflow.Now(ctx) // 重放时返回事件记录的时间
return err
}
为什么这么严格?因为 Workflow 的代码会被重放。如果代码中有不确定性操作(比如读取数据库、发 HTTP 请求、获取当前时间),每次重放的结果可能不同,整个系统就会崩溃。
Temporal SDK 通过以下机制保证确定性:
- 拦截所有非确定性操作:Go SDK 通过代码分析工具检查 Workflow 函数中的非法操作
- 提供确定性替代:
workflow.Now()替代time.Now(),workflow.NewTimer()替代time.After() - 禁止全局状态:Workflow 中不能使用全局变量、闭包捕获的可变状态等
- Side Effect:如果确实需要非确定性值(如 UUID),必须通过
workflow.SideEffect()包装
func OrderWorkflow(ctx workflow.Context, order Order) error {
// 需要生成唯一 ID?用 Side Effect
var orderID string
workflow.SideEffect(ctx, func(ctx workflow.Context) string {
return uuid.New().String() // 非确定性操作放在 SideEffect 中
}).Get(&orderID)
// orderID 的值会被记录在事件中,重放时不会重新生成
return nil
}
三、深入 Temporal 内部:从任务调度到事件持久化
3.1 Workflow Execution 的生命周期
一个 Workflow 从创建到完成,经历以下阶段:
1. Client 调用 StartWorkflow()
→ Frontend 接收请求
→ History Service 创建 WorkflowExecutionStarted 事件
2. History Service 将 WorkflowTask 放入任务队列
→ Matching Service 接收任务
3. Worker 通过 Long Poll 拉取 WorkflowTask
→ 执行 Workflow 函数
→ 遇到 Activity 调用时,产出 ActivityTaskScheduled 事件
→ Workflow 函数挂起,等待 Activity 完成
4. Worker 报告 WorkflowTaskCompleted + 新产生的事件
→ History Service 将新事件追加到事件日志
5. History Service 将 ActivityTask 放入 Matching Service
→ Worker 拉取 ActivityTask
→ 执行 Activity 函数(可以有任何副作用)
→ 报告 ActivityTaskCompleted + 结果
6. History Service 收到 Activity 完成事件
→ 触发新的 WorkflowTask(唤醒挂起的 Workflow)
→ Worker 重新执行 Workflow 函数(重放)
→ 重放到 Activity 调用点时,从事件中读取结果
→ 继续执行后续逻辑
7. Workflow 函数执行完毕
→ 产出 WorkflowExecutionCompleted 事件
这个"执行-挂起-重放-继续"的循环,就是 Temporal 的核心执行模型。每次 Workflow 被唤醒时,它都会从头重放整个事件历史,直到到达当前需要执行的代码点。
你可能会担心性能——每次都重放所有事件?实际上 Temporal 做了大量优化:
- Workflow Task 的连续执行:如果 Workflow 函数中连续执行的是纯本地逻辑(不涉及 Activity 调用),所有代码在一个 Workflow Task 中完成,不需要挂起和重放
- Event Trimming:对于已经确认的早期事件,Temporal 可以做检查点,避免每次都从头重放
- History 长度限制:Temporal 默认限制单个 Workflow 的事件数量(约 50K),超出需要 Continue-As-New
3.2 History Service 的分片设计
History Service 是 Temporal 最复杂的组件,它的性能直接决定了集群的吞吐量。
History Service 采用**分片(Sharding)**架构:
History Service
├── Shard 0 → Workflow Execution A, B, C ...
├── Shard 1 → Workflow Execution D, E, F ...
├── Shard 2 → Workflow Execution G, H, I ...
└── Shard N → Workflow Execution X, Y, Z ...
每个 Shard 独立管理一组 Workflow Execution,负责:
- 追加事件到事件日志
- 执行 Workflow Task 的调度
- 处理超时和重试逻辑
分片的关键设计决策:
Workflow 到 Shard 的映射是固定的:一旦一个 Workflow 被分配到某个 Shard,它就一直属于那个 Shard。映射算法基于 Workflow ID 和 Run ID 的哈希。
每个 Shard 独立持久化:Shard 的事件写入数据库时,使用数据库的行级锁保证顺序性。这意味着同一个 Shard 内的事件是严格有序的。
Shard 数量决定并发上限:一个 Shard 同时只能处理一个 Workflow Task(对同一个 Workflow)。如果你的 Workflow 执行频率非常高,可能需要增加 Shard 数量。
// Shard 数量在集群启动时配置,不可动态修改
// 通常是 4 的倍数,根据预期负载选择
// 每个 Shard 大约占 10-20MB 内存
// 推荐公式:Shard 数量 = 预期峰值 TPS × 10
3.3 Matching Service 的任务调度
Matching Service 是 Worker 和 History Service 之间的桥梁,它的核心职责是将任务(Activity Task、Workflow Task)分配给合适的 Worker。
┌─────────────────────────────────────┐
│ Matching Service │
│ │
│ Task Queue: "order-queue" │
│ ┌─────────────────────────────┐ │
│ │ Task 1 (Activity: Charge) │ │
│ │ Task 2 (Activity: Notify) │ │
│ │ Task 3 (Workflow: Process) │ │
│ └─────────────────────────────┘ │
│ │
│ Worker Pool: │
│ ┌─────────────────────────────┐ │
│ │ Worker A (capabilities: *) │ │
│ │ Worker B (capabilities: *) │ │
│ │ Worker C (capabilities: *) │ │
│ └─────────────────────────────┘ │
│ │
│ 匹配策略:FIFO + 优先级 + 速率限制 │
└─────────────────────────────────────┘
Matching Service 的调度策略:
- FIFO 优先:同一优先级的任务按先进先出分配
- 优先级调度:高优先级任务优先分配,支持 0-255 的优先级范围
- 速率限制:每个 Task Queue 可以配置最大调度速率(TPS)
- Sticky Execution:Workflow Task 倾向于分配给上次执行该 Workflow 的 Worker,减少重放开销
Worker 与 Matching Service 的通信采用 Long Polling 模式:
// Worker 的长轮询机制
func (w *Worker) pollTask(ctx context.Context) (*task, error) {
// 发起长轮询,最多等待 60 秒
// 如果有任务立即返回,否则等待直到有任务或超时
resp, err := w.client.PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: w.namespace,
TaskQueue: &taskqueue.TaskQueue{Name: w.taskQueueName},
})
if resp == nil {
// 超时,重新轮询
return nil, nil
}
return resp, nil
}
四、Go SDK 实战:从零构建订单处理系统
4.1 项目结构
order-system/
├── cmd/
│ ├── worker/
│ │ └── main.go # Worker 入口
│ └── starter/
│ └── main.go # 启动 Workflow 的客户端
├── workflow/
│ ├── order_workflow.go # Workflow 定义
│ └── order_workflow_test.go
├── activity/
│ ├── inventory.go # 库存 Activity
│ ├── payment.go # 支付 Activity
│ └── notification.go # 通知 Activity
├── shared/
│ └── models.go # 共享数据结构
└── go.mod
4.2 数据模型
// shared/models.go
package shared
type Order struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Items []OrderItem `json:"items"`
Payment PaymentInfo `json:"payment"`
TotalAmount float64 `json:"total_amount"`
}
type OrderItem struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
type PaymentInfo struct {
Method string `json:"method"` // credit_card, alipay, wechat
Token string `json:"token"`
}
type ChargeResult struct {
TransactionID string `json:"transaction_id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
}
type DeductResult struct {
Success bool `json:"success"`
DeductedIDs []string `json:"deducted_ids"`
Insufficient []string `json:"insufficient"` // 库存不足的商品
}
type NotifyResult struct {
MessageID string `json:"message_id"`
Sent bool `json:"sent"`
}
4.3 Activity 实现
Activity 是实际的业务逻辑执行单元,可以有任何副作用:
// activity/inventory.go
package activity
import (
"context"
"fmt"
"log"
"time"
"order-system/shared"
)
// DeductInventory 扣减库存
// 注意:Activity 可以包含任何副作用——数据库操作、HTTP 请求等
func DeductInventory(ctx context.Context, items []shared.OrderItem) (*shared.DeductResult, error) {
log.Printf("扣除库存: %d 项商品", len(items))
result := &shared.DeductResult{
Success: true,
}
for _, item := range items {
// 模拟数据库操作
// 实际项目中这里会调用库存服务的 API
if item.ProductID == "OUT_OF_STOCK" {
result.Success = false
result.Insufficient = append(result.Insufficient, item.ProductID)
continue
}
result.DeductedIDs = append(result.DeductedIDs, item.ProductID)
log.Printf(" 扣除商品 %s x%d", item.ProductID, item.Quantity)
}
if !result.Success {
return result, fmt.Errorf("部分商品库存不足: %v", result.Insufficient)
}
return result, nil
}
// CompensateInventory 库存补偿(回滚)
func CompensateInventory(ctx context.Context, deductedIDs []string) error {
log.Printf("补偿库存: %v", deductedIDs)
for _, id := range deductedIDs {
// 实际项目中调用库存服务的回滚接口
log.Printf(" 回滚商品 %s", id)
}
return nil
}
// activity/payment.go
package activity
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"order-system/shared"
)
// ChargePayment 发起支付
func ChargePayment(ctx context.Context, payment shared.PaymentInfo, amount float64) (*shared.ChargeResult, error) {
log.Printf("发起支付: method=%s, amount=%.2f", payment.Method, amount)
// 模拟支付网关调用
time.Sleep(100 * time.Millisecond)
// 模拟 5% 的失败率(用于测试重试逻辑)
if rand.Float64() < 0.05 {
return nil, fmt.Errorf("支付网关超时,请稍后重试")
}
result := &shared.ChargeResult{
TransactionID: fmt.Sprintf("TXN-%d", time.Now().UnixNano()),
Amount: amount,
Status: "success",
}
log.Printf("支付成功: txn=%s", result.TransactionID)
return result, nil
}
// RefundPayment 退款
func RefundPayment(ctx context.Context, transactionID string) error {
log.Printf("退款: txn=%s", transactionID)
// 实际项目中调用支付网关的退款接口
return nil
}
// activity/notification.go
package activity
import (
"context"
"fmt"
"log"
"order-system/shared"
)
// SendOrderNotification 发送订单通知
func SendOrderNotification(ctx context.Context, userID string, message string) (*shared.NotifyResult, error) {
log.Printf("发送通知: user=%s, msg=%s", userID, message)
// 实际项目中调用消息推送服务
result := &shared.NotifyResult{
MessageID: fmt.Sprintf("MSG-%d", ctx.Value("request_id")),
Sent: true,
}
return result, nil
}
4.4 Workflow 实现
这是核心——Workflow 的代码看起来就像同步代码,但实际上是分布式的、持久化的:
// workflow/order_workflow.go
package workflow
import (
"fmt"
"time"
"go.temporal.io/sdk/workflow"
"order-system/shared"
)
// OrderWorkflow 订单处理工作流
// 这段代码看起来是同步的,但每一步都是持久化的、容错的
func OrderWorkflow(ctx workflow.Context, order shared.Order) (string, error) {
// 设置 Activity 的默认选项
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 5, // Activity 执行超时
HeartbeatTimeout: time.Minute, // 心跳超时(用于长时间 Activity)
RetryPolicy: &workflow.RetryPolicy{
InitialInterval: time.Second * 5, // 首次重试等待
BackoffCoefficient: 2.0, // 指数退避系数
MaximumInterval: time.Minute, // 最大重试间隔
MaximumAttempts: 5, // 最大重试次数
NonRetryableErrorTypes: []string{ // 不可重试的错误类型
"InsufficientInventoryError",
},
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// ========== 第一步:扣减库存 ==========
var deductResult shared.DeductResult
err := workflow.ExecuteActivity(ctx, "DeductInventory", order.Items).Get(ctx, &deductResult)
if err != nil {
// 库存不足,直接结束(不可重试的错误)
return "", fmt.Errorf("库存扣减失败: %w", err)
}
// ========== 第二步:发起支付 ==========
var chargeResult shared.ChargeResult
err = workflow.ExecuteActivity(ctx, "ChargePayment", order.Payment, order.TotalAmount).Get(ctx, &chargeResult)
if err != nil {
// 支付失败,需要补偿库存
log := workflow.GetLogger(ctx)
log.Info("支付失败,开始补偿库存", "error", err)
// 补偿操作也需要重试策略
compensateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 2,
RetryPolicy: &workflow.RetryPolicy{
InitialInterval: time.Second * 3,
BackoffCoefficient: 2.0,
MaximumAttempts: 10, // 补偿操作要确保成功
},
})
var compensateErr error
workflow.ExecuteActivity(compensateCtx, "CompensateInventory", deductResult.DeductedIDs).
Get(compensateCtx, &compensateErr)
if compensateErr != nil {
// 补偿也失败了!需要人工介入
// Temporal 会将这个 Workflow 标记为需要人工处理
log.Error("库存补偿失败,需要人工介入", "error", compensateErr)
return "", fmt.Errorf("支付失败且库存补偿失败: payment_err=%v, compensate_err=%v",
err, compensateErr)
}
return "", fmt.Errorf("支付失败,已补偿库存: %w", err)
}
// ========== 第三步:发送支付成功通知 ==========
// 通知失败不应该影响主流程,使用独立的重试策略
notifyCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &workflow.RetryPolicy{
InitialInterval: time.Second * 2,
BackoffCoefficient: 1.5,
MaximumAttempts: 3,
},
})
var notifyResult shared.NotifyResult
// 使用 workflow.ExecuteActivity 的异步模式
notifyFuture := workflow.ExecuteActivity(notifyCtx, "SendOrderNotification",
order.UserID, fmt.Sprintf("订单 %s 支付成功,金额 %.2f", order.ID, order.TotalAmount))
// 不等通知完成,继续后续流程
// 在 Workflow 结束前等待通知结果即可
_ = notifyFuture
// ========== 第四步:更新订单状态 ==========
// ... 其他业务逻辑
// 最终等待通知完成
if err := notifyFuture.Get(ctx, ¬ifyResult); err != nil {
// 通知失败不影响订单状态
workflow.GetLogger(ctx).Warn("通知发送失败,但订单已处理完成", "error", err)
}
return chargeResult.TransactionID, nil
}
4.5 Worker 注册
// cmd/worker/main.go
package main
import (
"log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/client"
"order-system/activity"
"order-system/workflow"
)
func main() {
// 创建 Temporal 客户端
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("无法连接 Temporal 服务", err)
}
defer c.Close()
// 创建 Worker
w := worker.New(c, "order-task-queue", worker.Options{
MaxConcurrentWorkflowTaskExecutionSize: 10, // 最大并发 Workflow 执行数
MaxConcurrentActivityExecutionSize: 50, // 最大并发 Activity 执行数
MaxConcurrentLocalActivityExecutionSize: 20, // 最大并发 Local Activity 执行数
WorkerActivitiesPerSecond: 100, // Worker 级别速率限制
})
// 注册 Workflow
w.RegisterWorkflow(workflow.OrderWorkflow)
// 注册 Activity
w.RegisterActivity(activity.DeductInventory)
w.RegisterActivity(activity.CompensateInventory)
w.RegisterActivity(activity.ChargePayment)
w.RegisterActivity(activity.RefundPayment)
w.RegisterActivity(activity.SendOrderNotification)
// 启动 Worker
log.Println("启动 Worker...")
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatalln("Worker 运行失败", err)
}
}
4.6 启动 Workflow
// cmd/starter/main.go
package main
import (
"context"
"fmt"
"log"
"time"
"go.temporal.io/sdk/client"
"order-system/shared"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("无法连接 Temporal 服务", err)
}
defer c.Close()
order := shared.Order{
ID: "ORD-20260418-001",
UserID: "USR-12345",
Items: []shared.OrderItem{
{ProductID: "PRD-A001", Quantity: 2, Price: 99.9},
{ProductID: "PRD-B002", Quantity: 1, Price: 299.0},
},
Payment: shared.PaymentInfo{
Method: "alipay",
Token: "pay_token_xxx",
},
TotalAmount: 498.8,
}
// 启动 Workflow
workflowRun, err := c.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
ID: order.ID, // 使用订单 ID 作为 Workflow ID
TaskQueue: "order-task-queue",
WorkflowExecutionTimeout: time.Hour * 24, // 整个 Workflow 最长执行时间
WorkflowRunTimeout: time.Hour * 2, // 单次 Run 最长执行时间
WorkflowTaskTimeout: time.Minute * 10, // 单个 Workflow Task 超时
CronSchedule: "", // 非 Cron 模式
}, workflow.OrderWorkflow, order)
if err != nil {
log.Fatalln("启动 Workflow 失败", err)
}
log.Printf("Workflow 已启动: ID=%s, RunID=%s", workflowRun.GetID(), workflowRun.GetRunID())
// 阻塞等待结果(可选,也可以异步查询)
var result string
err = workflowRun.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Workflow 执行失败", err)
}
log.Printf("Workflow 执行完成: 结果=%s", result)
fmt.Printf("交易ID: %s\n", result)
}
五、高级模式:让 Temporal 发挥最大价值
5.1 Signal:从外部向 Workflow 注入事件
Signal 是 Temporal 最强大的特性之一——它允许外部系统在 Workflow 执行过程中向其发送消息,Workflow 可以在任意时刻接收并处理这些消息。
// 审批流程——人工审批场景
func ApprovalWorkflow(ctx workflow.Context, request ApprovalRequest) (string, error) {
// 创建 Channel 用于接收审批信号
approvalChannel := workflow.NewReceiveChannel(ctx)
rejectionChannel := workflow.NewReceiveChannel(ctx)
// 设置审批超时
timer := workflow.NewTimer(ctx, time.Hour*72) // 72小时超时
// 发送审批请求通知
err := workflow.ExecuteActivity(ctx, notifyApproverActivity, request).Get(ctx, nil)
if err != nil {
return "", err
}
// 等待审批结果或超时
var approved bool
var approver string
workflow.Select(ctx,
// 接收审批通过信号
workflow.Receive(approvalChannel, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &approver)
approved = true
}),
// 接收审批拒绝信号
workflow.Receive(rejectionChannel, func(c workflow.ReceiveChannel, more bool) {
var reason string
c.Receive(ctx, &reason)
approved = false
}),
// 超时处理
workflow.Receive(timer.Done(), func(c workflow.ReceiveChannel, more bool) {
approved = false
}),
)
if approved {
// 执行通过后的逻辑
err = workflow.ExecuteActivity(ctx, processApprovalActivity, request, approver).Get(ctx, nil)
return "approved", err
}
return "rejected", nil
}
从外部发送 Signal:
// 外部服务发送审批信号
func approveOrder(c client.Client, workflowID string) error {
return c.SignalWorkflow(context.Background(), workflowID, "",
"approval-channel", "admin-user")
}
5.2 Query:实时查询 Workflow 状态
Query 允许外部系统查询 Workflow 的内部状态,而不会触发任何副作用:
func OrderWorkflow(ctx workflow.Context, order shared.Order) (string, error) {
// 维护内部状态
currentStep := "created"
// 注册 Query Handler
err := workflow.SetQueryHandler(ctx, "currentStep", func() (string, error) {
return currentStep, nil
})
if err != nil {
return "", err
}
err = workflow.SetQueryHandler(ctx, "orderInfo", func() (shared.Order, error) {
return order, nil
})
if err != nil {
return "", err
}
// 执行流程,更新状态
currentStep = "deducting_inventory"
// ... 扣减库存
currentStep = "charging_payment"
// ... 发起支付
currentStep = "completed"
return "done", nil
}
查询 Workflow 状态:
// 任何时刻都可以查询
resp, err := c.QueryWorkflow(context.Background(), workflowID, "", "currentStep")
var step string
resp.Get(&step)
fmt.Println("当前步骤:", step) // 输出: "当前步骤: charging_payment"
5.3 Child Workflow:嵌套编排
当你的流程变得复杂时,可以将子流程封装为独立的 Workflow:
// 主订单流程
func MasterOrderWorkflow(ctx workflow.Context, order shared.Order) error {
// 步骤1:库存预占
var inventoryResult string
err := workflow.ExecuteChildWorkflow(ctx, InventoryWorkflow, order.Items).
Get(ctx, &inventoryResult)
if err != nil {
return err
}
// 步骤2:支付处理
var paymentResult string
err = workflow.ExecuteChildWorkflow(ctx, PaymentWorkflow, order.Payment, order.TotalAmount).
Get(ctx, &paymentResult)
if err != nil {
// 支付失败,回滚库存
_ = workflow.ExecuteChildWorkflow(ctx, InventoryCompensateWorkflow, order.Items).
Get(ctx, nil)
return err
}
// 步骤3:物流创建
err = workflow.ExecuteChildWorkflow(ctx, ShippingWorkflow, order).
Get(ctx, nil)
return err
}
Child Workflow 的好处:
- 独立的历史记录:每个 Child Workflow 有自己的事件历史,不会撑爆 Parent 的历史
- 独立的重试策略:可以针对子流程设置不同的超时和重试策略
- 独立的状态查询:可以单独查询子流程的执行状态
- 资源隔离:不同的子流程可以使用不同的 Task Queue 和 Worker
5.4 Continue-As-New:解决历史膨胀问题
Temporal 的 Workflow 事件历史是无限增长的。对于长期运行的 Workflow(比如持续数月的订单跟踪),事件历史会越来越大,导致重放变慢。
Continue-As-New 是 Temporal 的解决方案——它会关闭当前 Workflow 执行,启动一个新的执行,同时传递必要的上下文:
// 长期运行的订单跟踪 Workflow
func OrderTrackingWorkflow(ctx workflow.Context, orderID string) error {
// 获取当前重放次数
info := workflow.GetInfo(ctx)
// 当历史事件接近限制时,使用 Continue-As-New
if info.GetCurrentHistoryLength() > 40000 {
// 传递必要的上下文到新的执行
return workflow.NewContinueAsNewError(ctx, OrderTrackingWorkflow, orderID)
}
// 正常的业务逻辑
for {
// 等待订单更新信号
var update OrderUpdate
workflow.NewReceiveChannel(ctx).Receive(ctx, &update)
// 处理更新...
}
}
5.5 Cron Workflow:定时调度
Temporal 原生支持 Cron 表达式,用于定时触发 Workflow:
// 每天 9:00 执行的数据同步 Workflow
workflowRun, err := c.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
ID: "daily-sync",
TaskQueue: "sync-task-queue",
CronSchedule: "0 9 * * *", // 每天9点
}, DataSyncWorkflow, syncConfig)
Cron Workflow 的特殊性:
- 每次触发都是一个新的 Run,但共享同一个 Workflow ID
- 如果上一次 Run 还没完成,新的触发会被跳过
- 可以通过
client.StartWorkflowOptions.CronSchedule设置
六、Temporal 的可观测性:不只是日志
6.1 内置的执行历史可视化
Temporal Web UI 是调试分布式流程的利器。每个 Workflow 的完整事件历史都可以可视化查看:
Timeline 视图:
─────────────────────────────────────────────────
00:00 WorkflowExecutionStarted
00:01 WorkflowTaskScheduled
00:01 WorkflowTaskStarted
00:01 ActivityTaskScheduled (DeductInventory)
00:02 ActivityTaskStarted
00:03 ActivityTaskCompleted (result: {Success: true})
00:03 WorkflowTaskScheduled
00:03 WorkflowTaskStarted
00:03 ActivityTaskScheduled (ChargePayment)
00:04 ActivityTaskStarted
00:06 ActivityTaskCompleted (result: {TransactionID: "TXN-123"})
...
你可以看到每一步的输入输出、执行时间、重试次数。这在排查问题时比翻日志效率高 100 倍。
6.2 自定义指标
Temporal SDK 内置了 Prometheus 指标,同时你也可以在 Workflow 和 Activity 中记录自定义指标:
func ChargePayment(ctx context.Context, payment shared.PaymentInfo, amount float64) (*shared.ChargeResult, error) {
// 记录自定义指标
activity.GetMetricsHandler(ctx).Counter("payment_charges").
Inc(1)
start := time.Now()
result, err := doCharge(payment, amount)
// 记录耗时
activity.GetMetricsHandler(ctx).Timer("payment_duration").
Record(time.Since(start))
if err != nil {
activity.GetMetricsHandler(ctx).Counter("payment_failures").
Inc(1)
}
return result, err
}
6.3 与 OpenTelemetry 集成
Temporal 原生支持 OpenTelemetry,可以无缝接入你的分布式追踪系统:
import (
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/contrib/tally"
"go.temporal.io/sdk/contrib/opentelemetry"
)
func setupTracing() (client.Options, worker.Options) {
// 创建 OpenTelemetry TracerProvider
tp := sdktrace.NewTracerProvider(...)
// 创建 Temporal 的拦截器
tracingInterceptor, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{
Tracer: tp.Tracer("temporal-worker"),
})
clientOpts := client.Options{
HostPort: "localhost:7233",
Interceptors: []interceptor.ClientInterceptor{tracingInterceptor},
}
workerOpts := worker.Options{
Interceptors: []interceptor.WorkerInterceptor{tracingInterceptor},
}
return clientOpts, workerOpts
}
七、性能优化与生产实践
7.1 Local Activity:减少网络开销
如果你的 Activity 不需要持久化(比如纯计算、本地缓存读取),可以使用 Local Activity:
func OrderWorkflow(ctx workflow.Context, order shared.Order) error {
// Local Activity 在 Worker 进程内执行,不经过 Matching Service
// 适合轻量级操作:校验、计算、本地缓存读取
lao := workflow.LocalActivityOptions{
ScheduleToCloseTimeout: time.Second * 30,
}
ctx = workflow.WithLocalActivityOptions(ctx, lao)
var validationResult error
err := workflow.ExecuteLocalActivity(ctx, validateOrderActivity, order).Get(ctx, &validationResult)
if validationResult != nil {
return validationResult
}
// 重量级操作仍然使用普通 Activity
var deductResult shared.DeductResult
err = workflow.ExecuteActivity(ctx, "DeductInventory", order.Items).Get(ctx, &deductResult)
return err
}
Local Activity vs Regular Activity 的选择:
| 特性 | Local Activity | Regular Activity |
|---|---|---|
| 执行位置 | Worker 进程内 | 独立的 Worker 进程 |
| 网络开销 | 无 | 需要 Matching Service 中转 |
| 持久化 | 不单独持久化 | 每个结果独立持久化 |
| 超时 | 短(秒级) | 长(分钟级) |
| 重试 | 支持 | 支持 |
| 适用场景 | 校验、计算、缓存 | API调用、DB操作 |
7.2 批量处理与扇出模式
当你需要并行处理大量任务时,Temporal 的扇出(Fan-Out)模式非常高效:
// 批量处理订单
func BatchOrderWorkflow(ctx workflow.Context, orders []shared.Order) error {
// 创建子 Workflow 的 Future 列表
futures := make([]workflow.Future, len(orders))
// 扇出:同时启动所有子 Workflow
for i, order := range orders {
futures[i] = workflow.ExecuteChildWorkflow(ctx, OrderWorkflow, order)
}
// 扇入:等待所有子 Workflow 完成
var failedCount int
for i, future := range futures {
var result string
if err := future.Get(ctx, &result); err != nil {
workflow.GetLogger(ctx).Error("子流程执行失败",
"order_id", orders[i].ID, "error", err)
failedCount++
}
}
if failedCount > 0 {
return fmt.Errorf("%d 个订单处理失败", failedCount)
}
return nil
}
7.3 速率限制:保护下游服务
Temporal 提供了三层速率限制:
// 1. Task Queue 级别的速率限制
w := worker.New(c, "order-task-queue", worker.Options{
WorkerActivitiesPerSecond: 100, // 每个 Worker 每秒最多执行 100 个 Activity
TaskQueueActivitiesPerSecond: 500, // 整个 Task Queue 每秒最多 500 个 Activity
})
// 2. Activity 级别的速率限制
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 5,
// 通过 Task Queue 实现细粒度控制
}
7.4 数据库选择与调优
Temporal 支持三种数据库后端,各有特点:
| 数据库 | 优势 | 劣势 | 推荐场景 |
|---|---|---|---|
| Cassandra | 高写入吞吐、水平扩展 | 运维复杂、无 ACID | 大规模、高吞吐 |
| PostgreSQL | 成熟稳定、ACID、运维简单 | 单分片写入瓶颈 | 中小规模、强一致性 |
| MySQL | 团队熟悉、运维简单 | 性能不如 PG | 已有 MySQL 基础设施 |
PostgreSQL 调优建议:
-- 关键配置参数
shared_buffers = '4GB' -- 共享缓冲区
work_mem = '256MB' -- 排序/哈希内存
maintenance_work_mem = '1GB' -- 维护操作内存
effective_cache_size = '12GB' -- 查询规划器缓存估计
max_connections = 200 -- 最大连接数
random_page_cost = 1.1 -- SSD 存储优化
effective_io_concurrency = 200 -- SSD 并发 IO
7.5 生产部署最佳实践
集群规模估算:
假设:
- 峰值 1000 Workflow/秒
- 平均每个 Workflow 10 个 Activity
- 每个 Activity 平均 100ms
Shard 数量 = 峰值 TPS × 10 = 10000
History Service 节点 = Shard 数量 / 500(每节点约 500 Shard)
Matching Service 节点 = Worker 节点数 / 10
Frontend 节点 = History 节点数 / 2
高可用配置:
# docker-compose.yml(生产环境参考)
services:
temporal:
image: temporalio/auto-setup:latest
environment:
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=${DB_PASSWORD}
- POSTGRES_SEEDS=postgresql
- NUM_HISTORY_SHARDS=1024
- ENABLE_ES=true
- ES_SEEDS=elasticsearch
depends_on:
- postgresql
- elasticsearch
postgresql:
image: postgres:15
environment:
- POSTGRES_USER=temporal
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- pgdata:/var/lib/postgresql/data
elasticsearch:
image: elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
volumes:
- esdata:/usr/share/elasticsearch/data
监控指标:
| 指标 | 告警阈值 | 说明 |
|---|---|---|
service_errors | > 100/min | 服务端错误率 |
persistence_latency | p99 > 500ms | 数据库延迟 |
workflow_task_latency | p99 > 1s | Workflow Task 处理延迟 |
workflow_task_queue_latency | p99 > 5s | 任务排队延迟 |
num_pollers | < 1 | Worker 数量不足 |
八、Temporal vs 竞品:什么场景该选什么
8.1 Temporal vs AWS Step Functions
| 维度 | Temporal | Step Functions |
|---|---|---|
| 部署模式 | 自托管/云托管 | 纯托管 |
| 代码语言 | Go/Java/Python/TS/PHP | JSON/ASL |
| 本地开发 | 完整支持 | 需要模拟器 |
| 状态大小 | 50MB+ | 256KB |
| 执行时长 | 无限制 | 1年 |
| 调试体验 | 完整事件历史 | 有限的可视化 |
| 成本 | 基础设施成本 | 按状态转换计费 |
| 供应商锁定 | 无 | AWS 生态 |
选择建议:如果你的团队已经重度依赖 AWS,且流程不复杂,Step Functions 更省心。如果你需要复杂的编排逻辑、对代码有完整控制权、或者不想被供应商锁定,Temporal 是更好的选择。
8.2 Temporal vs Apache Airflow
| 维度 | Temporal | Airflow |
|---|---|---|
| 定位 | 应用级编排 | 数据管道编排 |
| 实时性 | 毫秒级调度 | 分钟级调度 |
| 状态管理 | 内置持久化 | 数据库存储 |
| 交互 | Signal/Query | 有限 |
| 代码复用 | SDK 直接集成 | Operator 封装 |
| 数据处理 | 需自行集成 | 内置丰富算子 |
选择建议:Airflow 适合数据工程场景(ETL、数据管道、批处理),Temporal 适合应用级编排(订单流程、审批流程、微服务编排)。
8.3 Temporal vs 自建消息队列方案
很多团队最初会选择 Kafka + 状态机的方式来实现分布式编排。这种方案的问题:
- 开发成本高:重试、超时、补偿、幂等——每个模式都要自己实现
- 可观测性差:流程状态散落在各个服务和数据库中,难以全局追踪
- 维护成本高:状态机逻辑与业务逻辑耦合,难以演进
- 测试困难:模拟分布式场景下的各种失败模式非常复杂
Temporal 的价值在于:把这些分布式系统的通用难题抽象成平台能力,让开发者专注于业务逻辑。
九、常见踩坑与解决方案
9.1 非确定性错误
问题:Workflow 代码在重放时产生不同的事件序列,导致 Non-Deterministic Error。
常见原因:
// ❌ 使用了 time.Now()
now := time.Now() // 每次重放时间不同
// ❌ 使用了随机数
rand.Seed(time.Now().UnixNano())
val := rand.Intn(100) // 每次重放结果不同
// ❌ 遍历 map(Go 中 map 遍历顺序不确定)
for k, v := range myMap { // 顺序不确定
// ...
}
// ❌ 条件分支中调用了 Activity
if someCondition { // 重放时 someCondition 可能为 true
workflow.ExecuteActivity(ctx, activityA)
}
解决方案:
// ✅ 使用 Temporal 提供的确定性 API
now := workflow.Now(ctx)
// ✅ 非确定性值使用 SideEffect
var val int
workflow.SideEffect(ctx, func(ctx workflow.Context) int {
return rand.Intn(100)
}).Get(&val)
// ✅ map 遍历前排序
keys := make([]string, 0, len(myMap))
for k := range myMap {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := myMap[k]
// ...
}
9.2 历史事件过长
问题:Workflow 事件历史超过 50K,导致 Workflow Task 超时或内存溢出。
解决方案:
// 方案1:Continue-As-New
if workflow.GetInfo(ctx).GetCurrentHistoryLength() > 40000 {
return workflow.NewContinueAsNewError(ctx, MyWorkflow, state)
}
// 方案2:将循环拆分为 Child Workflow
for i := 0; i < 10000; i++ {
if i % 100 == 0 {
// 每 100 次迭代启动一个 Child Workflow
workflow.ExecuteChildWorkflow(ctx, BatchProcessWorkflow, batch)
}
}
9.3 Activity 超时配置不当
问题:Activity 执行时间超过了 StartToCloseTimeout,导致频繁重试。
解决方案:
ao := workflow.ActivityOptions{
// StartToCloseTimeout: Activity 从开始执行到完成的最长时间
// 这个值应该覆盖 99.9% 的正常执行时间
StartToCloseTimeout: time.Minute * 5,
// ScheduleToCloseTimeout: 从 Activity 被调度到完成的最长时间
// 包含重试时间,应该远大于 StartToCloseTimeout
ScheduleToCloseTimeout: time.Minute * 30,
// HeartbeatTimeout: 长时间运行的 Activity 需要定期心跳
// 如果 Activity 可能运行超过 1 分钟,必须设置心跳
HeartbeatTimeout: time.Second * 30,
}
心跳的实现:
func LongRunningActivity(ctx context.Context, task Task) error {
totalSteps := len(task.Steps)
for i, step := range task.Steps {
// 检查上下文是否已取消
if ctx.Err() != nil {
return ctx.Err()
}
// 报告心跳进度
activity.RecordHeartbeat(ctx, i, totalSteps)
// 执行步骤
if err := processStep(step); err != nil {
return err
}
}
return nil
}
9.4 Worker 优雅关闭
问题:Worker 直接被 kill,正在执行的 Activity 丢失进度。
解决方案:
// 使用 Interrupt Channel 实现优雅关闭
w := worker.New(c, "order-task-queue", worker.Options{})
// 注册信号处理
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// 启动 Worker
go func() {
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatal(err)
}
}()
// 等待信号
<-sigCh
log.Println("收到停止信号,正在优雅关闭 Worker...")
// 停止 Worker(等待正在执行的任务完成)
w.Stop()
log.Println("Worker 已停止")
十、展望:Temporal 的未来方向
10.1 Nexus:跨 Namespace 编排
Temporal 正在开发 Nexus 功能,允许不同 Namespace(甚至不同集群)的 Workflow 互相调用。这对多团队协作场景非常有价值:
// 团队 A 的 Workflow 调用团队 B 的服务
func TeamAWorkflow(ctx workflow.Context, request Request) error {
// 通过 Nexus 调用另一个 Namespace 的 Workflow
result, err := workflow.ExecuteNexus(ctx, nexus.EndpointConfig{
Namespace: "team-b",
Workflow: "ProcessPayment",
}, request)
return err
}
10.2 与 AI Agent 的结合
随着 AI Agent 的兴起,Temporal 在 Agent 编排领域的应用越来越广泛。OpenAI Agents SDK 已经与 Temporal 集成(公测阶段),用 Temporal 来管理 Agent 的执行流程、工具调用和状态恢复:
// AI Agent 执行 Workflow
func AgentWorkflow(ctx workflow.Context, task string) (string, error) {
// 步骤1:LLM 推理
var thinking string
err := workflow.ExecuteActivity(ctx, llmInference, task).Get(ctx, &thinking)
// 步骤2:工具调用
var toolResult string
err = workflow.ExecuteActivity(ctx, callTool, thinking).Get(ctx, &toolResult)
// 步骤3:结果整合
var finalAnswer string
err = workflow.ExecuteActivity(ctx, llmInference,
fmt.Sprintf("基于工具结果 %s 回答问题 %s", toolResult, task)).Get(ctx, &finalAnswer)
return finalAnswer, err
}
这种模式下,每个 LLM 调用和工具调用都是持久化的、可追溯的、可恢复的。Agent 进程挂了?重启后继续推理。Token 用完了?下次从断点继续。这对构建可靠的 AI Agent 系统至关重要。
10.3 社区与生态
截至 2026 年 4 月,Temporal 在 GitHub 上已有超过 12K Star,贡献者超过 500 人。SDK 覆盖 Go、Java、Python、TypeScript、PHP 五种语言。核心团队已经拿到超过 1.3 亿美元的融资,商业模式清晰——开源核心 + 云托管(Temporal Cloud)。
写在最后
Temporal 不是银弹,它有自己的学习曲线和运维成本。但如果你正在处理以下场景,它值得认真考虑:
- 长流程编排:任何需要跨越多个服务、持续数分钟到数天的业务流程
- 高可靠性要求:不能接受流程中断、数据丢失的场景
- 复杂的补偿逻辑:需要 Saga 模式、人工干预、超时处理的场景
- 可观测性需求:需要完整追踪流程执行状态、快速定位问题的场景
Temporal 的核心价值在于一个范式转换:从"在业务代码中写容错逻辑"转变为"在容错引擎上写业务逻辑"。这个转变看似简单,但它从根本上改变了你思考和编写分布式系统的方式。
代码写起来像同步,跑起来像分布式,失败了自动恢复——这就是持久化执行引擎的承诺。Temporal 做到了。