事件溯源与CQRS深度实战:从状态存储到事件日志的架构革命
引言:为什么传统CRUD不够用了?
你有没有遇到过这样的问题:用户说"昨天数据明明是对的,今天怎么就变了?"你打开数据库,只看到当前状态,却无法追溯状态是如何一步步变成现在这样的。又或者,业务方要求"给我一个历史报表,统计上个月某一天的库存快照",你只能在心里默默叹气——因为你的系统只存了"当前状态",历史数据早就被覆盖了。
这不仅是数据追溯的问题。在高并发场景下,多个用户同时修改同一条记录,乐观锁、悲观锁、数据库行锁……各种锁机制让系统复杂度直线上升。更糟糕的是,当业务越来越复杂,数据库表越来越大,读写性能开始下降,你开始考虑读写分离,却发现主从延迟、数据一致性等问题接踵而来。
事件溯源(Event Sourcing)和CQRS(Command Query Responsibility Segregation)就是为解决这些问题而生的架构模式。它们不是银弹,但在特定场景下,确实能带来传统CRUD无法比拟的优势。本文将深入探讨这两个模式的原理、实现方式、适用场景以及生产级实践。
第一部分:事件溯源的核心思想
1.1 从"状态存储"到"事件日志"
传统CRUD模式的核心是状态存储:数据库中存储的是实体的当前状态,每次更新操作都会覆盖之前的状态。例如,一个订单从"创建"到"已支付"再到"已发货",数据库中只保留"已发货"这个最终状态。
-- 传统CRUD:覆盖状态
UPDATE orders SET status = 'PAID' WHERE id = 123;
UPDATE orders SET status = 'SHIPPED' WHERE id = 123;
-- 历史状态丢失
事件溯源的核心思想是用事件序列代替当前状态:不存储实体的当前状态,而是存储导致状态变化的所有事件。任何时刻的状态都可以通过重放事件序列来重建。
事件序列:
1. OrderCreated(orderId=123, items=[...], total=99.9)
2. OrderPaid(orderId=123, paymentId=pay_456, paidAt=2026-05-08 10:30:00)
3. OrderShipped(orderId=123, trackingNo=SF123456, shippedAt=2026-05-08 14:00:00)
当前状态:通过重放上述事件计算得出
这种方式的本质转变带来了一系列优势:
完整的审计日志:每个事件都是不可变的事实,天然形成完整的操作历史。
时间旅行能力:可以重建任意历史时刻的系统状态,支持"回放"和"撤销"操作。
业务语义明确:事件名称直接反映业务含义(如OrderCreated、OrderPaid),比单纯的状态字段更有表达力。
并发友好:事件只追加不修改,天然避免了更新冲突。
1.2 事件的本质
在事件溯源中,事件是不可变的事实记录。一个事件通常包含以下信息:
public abstract class DomainEvent {
private final String eventId; // 全局唯一ID
private final String aggregateId; // 聚合根ID
private final String eventType; // 事件类型
private final Instant timestamp; // 发生时间
private final int version; // 版本号(用于乐观锁)
private final Map<String, Object> metadata; // 元数据(如操作人、IP等)
}
// 具体事件
public class OrderCreatedEvent extends DomainEvent {
private final String orderId;
private final String customerId;
private final List<OrderItem> items;
private final BigDecimal totalAmount;
private final String shippingAddress;
}
事件的关键特性:
不可变性:事件一旦发生,就不能被修改或删除。如果需要"撤销"某个事件,只能发布一个补偿事件。
时序性:事件按时间顺序存储,顺序决定了状态演化的路径。
原子性:每个事件代表一个原子操作,要么发生,要么不发生。
幂等性:相同事件被处理多次,应该产生相同的结果。
1.3 聚合根与事件流
聚合根(Aggregate Root)是领域驱动设计(DDD)中的核心概念,在事件溯源中扮演着事件管理者的角色。每个聚合根管理自己的一组事件,通过应用事件来维护内部状态。
public class OrderAggregate {
private String orderId;
private OrderStatus status;
private List<OrderItem> items;
private BigDecimal totalAmount;
private String paymentId;
private String trackingNo;
// 当前版本号,用于乐观并发控制
private int version = 0;
// 待发布的新事件列表
private final List<DomainEvent> pendingEvents = new ArrayList<>();
// 创建订单(命令处理)
public void create(CreateOrderCommand command) {
if (status != null) {
throw new IllegalStateException("Order already exists");
}
// 验证业务规则
if (command.getItems().isEmpty()) {
throw new IllegalArgumentException("Order must have at least one item");
}
// 发布事件
applyChange(new OrderCreatedEvent(
command.getOrderId(),
command.getCustomerId(),
command.getItems(),
calculateTotal(command.getItems()),
command.getShippingAddress()
));
}
// 支付订单(命令处理)
public void pay(PayOrderCommand command) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("Only created order can be paid");
}
applyChange(new OrderPaidEvent(
orderId,
command.getPaymentId(),
Instant.now()
));
}
// 发货(命令处理)
public void ship(ShipOrderCommand command) {
if (status != OrderStatus.PAID) {
throw new IllegalStateException("Only paid order can be shipped");
}
applyChange(new OrderShippedEvent(
orderId,
command.getTrackingNo(),
Instant.now()
));
}
// 应用事件(状态变更)
private void applyChange(DomainEvent event) {
apply(event);
pendingEvents.add(event);
}
// 重放事件(状态重建)
public void apply(DomainEvent event) {
if (event instanceof OrderCreatedEvent e) {
this.orderId = e.getOrderId();
this.items = e.getItems();
this.totalAmount = e.getTotalAmount();
this.status = OrderStatus.CREATED;
} else if (event instanceof OrderPaidEvent e) {
this.paymentId = e.getPaymentId();
this.status = OrderStatus.PAID;
} else if (event instanceof OrderShippedEvent e) {
this.trackingNo = e.getTrackingNo();
this.status = OrderStatus.SHIPPED;
}
this.version++;
}
// 从历史事件重建状态
public static OrderAggregate fromEvents(List<DomainEvent> events) {
OrderAggregate aggregate = new OrderAggregate();
events.forEach(aggregate::apply);
aggregate.pendingEvents.clear(); // 重建时不产生新事件
return aggregate;
}
}
1.4 事件存储(Event Store)
事件存储是专门用于存储事件序列的数据库,与传统数据库有以下不同:
- 只追加写入:事件只追加,不更新、不删除。
- 乐观并发控制:使用版本号而非锁机制。
- 流式组织:事件按聚合根组织成流。
public interface EventStore {
// 追加事件到指定流
void append(String streamId, List<DomainEvent> events, int expectedVersion);
// 加载指定流的所有事件
List<DomainEvent> load(String streamId);
// 从指定版本开始加载事件
List<DomainEvent> loadFromVersion(String streamId, int version);
// 订阅所有新事件
void subscribe(Consumer<DomainEvent> subscriber);
}
// 基于数据库的实现
public class JdbcEventStore implements EventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@Override
public void append(String streamId, List<DomainEvent> events, int expectedVersion) {
jdbcTemplate.execute((ConnectionCallback<Void>) connection -> {
// 检查当前版本(乐观锁)
Integer currentVersion = getCurrentVersion(connection, streamId);
if (currentVersion != null && currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Expected version " + expectedVersion +
" but was " + currentVersion
);
}
// 插入事件
PreparedStatement ps = connection.prepareStatement(
"INSERT INTO events (event_id, stream_id, event_type, event_data, version, timestamp) " +
"VALUES (?, ?, ?, ?, ?, ?)"
);
int version = expectedVersion;
for (DomainEvent event : events) {
ps.setString(1, event.getEventId());
ps.setString(2, streamId);
ps.setString(3, event.getEventType());
ps.setString(4, objectMapper.writeValueAsString(event));
ps.setInt(5, ++version);
ps.setTimestamp(6, Timestamp.from(event.getTimestamp()));
ps.addBatch();
}
ps.executeBatch();
return null;
});
}
@Override
public List<DomainEvent> load(String streamId) {
return jdbcTemplate.query(
"SELECT event_data, event_type FROM events WHERE stream_id = ? ORDER BY version",
(rs, rowNum) -> deserializeEvent(rs.getString("event_type"), rs.getString("event_data")),
streamId
);
}
}
事件表设计:
CREATE TABLE events (
event_id VARCHAR(36) PRIMARY KEY,
stream_id VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
event_data JSON NOT NULL,
version INT NOT NULL,
timestamp TIMESTAMP(6) NOT NULL,
metadata JSON,
INDEX idx_stream_version (stream_id, version)
);
第二部分:CQRS——读写分离的架构模式
2.1 CQRS的核心思想
CQRS(Command Query Responsibility Segregation)的核心思想是将**命令(写操作)和查询(读操作)**分离,使用不同的模型处理。
传统架构中,读和写共用同一个模型:
┌─────────────────────────────────────┐
│ 同一个模型 │
│ ┌─────────────────────────────┐ │
│ │ Order Entity │ │
│ │ - id, status, items... │ │
│ │ - create() update() find() │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
↑ ↓
写操作 读操作
CQRS架构将读写分离:
┌──────────────────────┐ ┌──────────────────────┐
│ 写模型(命令端) │ │ 读模型(查询端) │
│ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │ OrderAggregate │ │────→│ │ OrderSummary │ │
│ │ - 状态管理 │ │事件 │ │ - 查询优化 │ │
│ │ - 业务规则 │ │同步 │ │ - 投影视图 │ │
│ │ - 事件发布 │ │ │ │ - 缓存支持 │ │
│ └────────────────┘ │ │ └────────────────┘ │
└──────────────────────┘ └──────────────────────┘
↑ ↓
命令处理 查询处理
2.2 为什么要分离读写模型?
性能优化的差异:
写操作需要保证事务一致性、业务规则完整性,通常涉及复杂的验证逻辑。读操作则追求快速响应,可能需要不同的数据结构(如JOIN优化、预计算字段)。
// 写模型:保证业务一致性
public class OrderAggregate {
public void create(CreateOrderCommand cmd) {
// 验证库存
validateInventory(cmd.getItems());
// 验证价格
validatePricing(cmd.getItems(), cmd.getCoupons());
// 计算优惠
BigDecimal discount = calculateDiscount(cmd);
// 发布事件
applyChange(new OrderCreatedEvent(...));
}
}
// 读模型:优化查询性能
@Table("order_summaries")
public class OrderSummary {
private String orderId;
private String customerName; // 冗余存储,避免JOIN
private String status;
private Integer itemCount; // 预计算
private BigDecimal totalAmount;
private Instant createdAt;
// 不需要复杂的业务规则
}
扩展性的差异:
读操作的频率通常远高于写操作(读写比例可能达到10:1甚至更高)。分离后,可以独立扩展读写端:读端可以添加缓存、使用读副本、部署多个查询服务实例。
技术选型的灵活性:
写端可能选择关系型数据库保证事务,读端可能选择NoSQL数据库优化查询。例如:
- 写端:PostgreSQL + Event Store
- 读端:Elasticsearch(全文搜索)、Redis(缓存)、MongoDB(文档查询)
2.3 命令(Command)与命令处理器
命令是表达"意图"的对象,它描述了"用户想要做什么",而非"系统状态如何变化"。
// 命令定义
public record CreateOrderCommand(
String orderId,
String customerId,
List<OrderItem> items,
String shippingAddress,
String couponCode
) implements Command {}
public record PayOrderCommand(
String orderId,
String paymentId,
BigDecimal amount
) implements Command {}
// 命令处理器
@Component
public class OrderCommandHandler {
private final EventStore eventStore;
private final OrderRepository orderRepository;
@Transactional
public void handle(CreateOrderCommand command) {
// 加载聚合根(如果是新建,则为空)
OrderAggregate order = orderRepository.load(command.orderId())
.orElse(new OrderAggregate());
// 执行命令
order.create(command);
// 持久化事件
eventStore.append(
"order-" + command.orderId(),
order.getPendingEvents(),
order.getVersion() - order.getPendingEvents().size()
);
// 发布事件到消息队列(异步处理)
order.getPendingEvents().forEach(eventBus::publish);
}
@Transactional
public void handle(PayOrderCommand command) {
// 从事件存储重建聚合根
List<DomainEvent> events = eventStore.load("order-" + command.orderId());
OrderAggregate order = OrderAggregate.fromEvents(events);
// 执行命令
order.pay(command);
// 持久化新事件
eventStore.append(
"order-" + command.orderId(),
order.getPendingEvents(),
order.getVersion() - order.getPendingEvents().size()
);
}
}
2.4 投影(Projection)与读模型
投影是将事件流转换为读模型的过程。每当有新事件产生,投影处理器就会更新对应的读模型。
@Component
public class OrderSummaryProjection {
private final OrderSummaryRepository repository;
@EventHandler
public void on(OrderCreatedEvent event) {
OrderSummary summary = new OrderSummary();
summary.setOrderId(event.getOrderId());
summary.setCustomerId(event.getCustomerId());
summary.setStatus("CREATED");
summary.setItemCount(event.getItems().size());
summary.setTotalAmount(event.getTotalAmount());
summary.setCreatedAt(event.getTimestamp());
repository.save(summary);
}
@EventHandler
public void on(OrderPaidEvent event) {
repository.findByOrderId(event.getOrderId())
.ifPresent(summary -> {
summary.setStatus("PAID");
summary.setPaidAt(event.getPaidAt());
repository.save(summary);
});
}
@EventHandler
public void on(OrderShippedEvent event) {
repository.findByOrderId(event.getOrderId())
.ifPresent(summary -> {
summary.setStatus("SHIPPED");
summary.setTrackingNo(event.getTrackingNo());
summary.setShippedAt(event.getShippedAt());
repository.save(summary);
});
}
@EventHandler
public void on(OrderCancelledEvent event) {
repository.findByOrderId(event.getOrderId())
.ifPresent(summary -> {
summary.setStatus("CANCELLED");
summary.setCancelledAt(event.getCancelledAt());
summary.setCancelReason(event.getReason());
repository.save(summary);
});
}
}
2.5 最终一致性
CQRS的一个重要特点是读写模型的最终一致性。命令端发布事件后,读模型可能需要几毫秒到几秒的时间才能完成更新。这意味着:
- 用户提交命令后立即查询,可能看不到最新状态。
- 需要在UI层处理这种情况(如显示"处理中"状态,或使用WebSocket推送更新)。
// 处理最终一致性的策略
// 策略1:命令返回后等待读模型更新
@PostMapping("/orders")
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
String orderId = UUID.randomUUID().toString();
// 发送命令
commandBus.send(new CreateOrderCommand(orderId, ...));
// 等待读模型更新(最多等待5秒)
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> orderQueryService.findById(orderId).isPresent());
return ResponseEntity.ok(orderQueryService.findById(orderId).orElseThrow());
}
// 策略2:返回命令ID,客户端轮询
@PostMapping("/orders")
public ResponseEntity<CommandResult> createOrder(@RequestBody CreateOrderRequest request) {
String orderId = UUID.randomUUID().toString();
String commandId = UUID.randomUUID().toString();
// 异步处理命令
commandBus.sendAsync(new CreateOrderCommand(orderId, ..., commandId));
return ResponseEntity.accepted()
.body(new CommandResult(commandId, orderId, "PROCESSING"));
}
// 策略3:WebSocket推送更新
@EventHandler
public void on(OrderCreatedEvent event) {
// 更新读模型
updateReadModel(event);
// 推送给客户端
webSocketService.sendToUser(
event.getCustomerId(),
"order:created",
new OrderNotification(event.getOrderId(), "CREATED")
);
}
第三部分:事件溯源与CQRS的联合应用
3.1 为什么事件溯源常与CQRS结合?
事件溯源和CQRS是两个独立的模式,但它们经常一起使用,原因如下:
事件溯源产生事件流,CQRS需要事件流:事件溯源的核心输出是事件序列,而CQRS的投影处理器需要订阅事件来更新读模型。两者天然契合。
事件溯源优化写端,CQRS优化读端:事件溯源让写端通过追加事件获得高性能,CQRS让读端通过专用模型获得查询灵活性。
分离关注点,降低复杂度:写端专注于业务规则和状态一致性,读端专注于查询性能和用户体验。
3.2 完整架构设计
┌──────────────────────────────────────────────────────────────────────┐
│ 客户端层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────────┐│
│ │ Web App │ │ Mobile App │ │ 第三方集成 ││
│ └─────────────┘ └─────────────┘ └─────────────────────────────┘│
└───────────────────────────────┬──────────────────────────────────────┘
│
┌───────────────────────────────┴──────────────────────────────────────┐
│ API网关层 │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ REST API: POST /commands, GET /queries ││
│ │ WebSocket: 订阅事件更新 ││
│ └─────────────────────────────────────────────────────────────────┘│
└───────────────┬─────────────────────────────────────┬────────────────┘
│ 命令 │ 查询
↓ ↓
┌───────────────────────────────┐ ┌────────────────────────────────────┐
│ 命令端(写模型) │ │ 查询端(读模型) │
│ ┌─────────────────────────┐ │ │ ┌────────────────────────────┐ │
│ │ Command Handler │ │ │ │ Query Service │ │
│ │ - 接收命令 │ │ │ │ - 处理查询请求 │ │
│ │ - 加载聚合根 │ │ │ │ - 返回投影视图 │ │
│ │ - 执行业务规则 │ │ │ └────────────────────────────┘ │
│ │ - 发布事件 │ │ │ ↑ │
│ └─────────────────────────┘ │ │ │ │
│ ↓ │ │ ┌────────────────────────────┐ │
│ ┌─────────────────────────┐ │ │ │ Read Model Store │ │
│ │ Event Store │ │ │ │ - PostgreSQL (查询优化) │ │
│ │ - 追加事件 │──┼──→│ │ - Redis (缓存) │ │
│ │ - 乐观并发控制 │ │ │ │ - Elasticsearch (搜索) │ │
│ │ - 事件流管理 │ │ │ └────────────────────────────┘ │
│ └─────────────────────────┘ │ │ │
│ ↓ │ │ ┌────────────────────────────┐ │
│ ┌─────────────────────────┐ │ │ │ Projection Handlers │ │
│ │ Event Publisher │ │ │ │ - 订阅事件 │ │
│ │ - 发布到消息队列 │ │ │ │ - 更新读模型 │ │
│ └─────────────────────────┘ │ │ └────────────────────────────┘ │
└───────────────────────────────┘ └────────────────────────────────────┘
3.3 Spring Boot实现示例
项目结构:
src/main/java/com/example/ordersystem/
├── command/ # 命令端
│ ├── api/
│ │ └── OrderCommandController.java
│ ├── application/
│ │ └── OrderCommandService.java
│ ├── domain/
│ │ ├── OrderAggregate.java
│ │ ├── events/
│ │ │ ├── OrderCreatedEvent.java
│ │ │ ├── OrderPaidEvent.java
│ │ │ └── OrderShippedEvent.java
│ │ └── commands/
│ │ ├── CreateOrderCommand.java
│ │ └── PayOrderCommand.java
│ └── infrastructure/
│ ├── EventStore.java
│ └── JdbcEventStore.java
├── query/ # 查询端
│ ├── api/
│ │ └── OrderQueryController.java
│ ├── application/
│ │ └── OrderQueryService.java
│ ├── domain/
│ │ └── OrderSummary.java
│ └── infrastructure/
│ ├── OrderSummaryRepository.java
│ └── OrderSummaryProjection.java
└── shared/
└── EventBus.java
命令端实现:
// OrderCommandController.java
@RestController
@RequestMapping("/api/orders")
public class OrderCommandController {
private final OrderCommandService commandService;
@PostMapping
public ResponseEntity<CommandResponse> createOrder(
@RequestBody @Valid CreateOrderRequest request) {
String orderId = UUID.randomUUID().toString();
commandService.createOrder(new CreateOrderCommand(
orderId,
request.getCustomerId(),
request.getItems(),
request.getShippingAddress(),
request.getCouponCode()
));
return ResponseEntity.accepted()
.body(new CommandResponse(orderId, "CREATED"));
}
@PostMapping("/{orderId}/pay")
public ResponseEntity<CommandResponse> payOrder(
@PathVariable String orderId,
@RequestBody @Valid PayOrderRequest request) {
commandService.payOrder(new PayOrderCommand(
orderId,
request.getPaymentId(),
request.getAmount()
));
return ResponseEntity.accepted()
.body(new CommandResponse(orderId, "PAID"));
}
}
// OrderCommandService.java
@Service
@Transactional
public class OrderCommandService {
private final EventStore eventStore;
private final EventBus eventBus;
public void createOrder(CreateOrderCommand command) {
OrderAggregate order = new OrderAggregate();
order.create(command);
String streamId = "order-" + command.getOrderId();
eventStore.append(streamId, order.getPendingEvents(), 0);
order.getPendingEvents().forEach(eventBus::publish);
}
public void payOrder(PayOrderCommand command) {
String streamId = "order-" + command.getOrderId();
List<DomainEvent> events = eventStore.load(streamId);
OrderAggregate order = OrderAggregate.fromEvents(events);
order.pay(command);
eventStore.append(streamId, order.getPendingEvents(), order.getVersion() - order.getPendingEvents().size());
order.getPendingEvents().forEach(eventBus::publish);
}
}
查询端实现:
// OrderQueryController.java
@RestController
@RequestMapping("/api/orders")
public class OrderQueryController {
private final OrderQueryService queryService;
@GetMapping("/{orderId}")
public ResponseEntity<OrderSummary> getOrder(@PathVariable String orderId) {
return queryService.findById(orderId)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping
public Page<OrderSummary> listOrders(
@RequestParam(required = false) String customerId,
@RequestParam(required = false) String status,
Pageable pageable) {
return queryService.findByCriteria(customerId, status, pageable);
}
@GetMapping("/search")
public List<OrderSummary> searchOrders(@RequestParam String keyword) {
return queryService.search(keyword);
}
}
// OrderSummaryProjection.java
@Component
public class OrderSummaryProjection {
private final OrderSummaryRepository repository;
private final ElasticsearchTemplate elasticsearchTemplate;
@EventListener
@Async
public void on(OrderCreatedEvent event) {
// 保存到PostgreSQL
OrderSummary summary = OrderSummary.builder()
.orderId(event.getOrderId())
.customerId(event.getCustomerId())
.status("CREATED")
.itemCount(event.getItems().size())
.totalAmount(event.getTotalAmount())
.createdAt(event.getTimestamp())
.build();
repository.save(summary);
// 同步到Elasticsearch(用于搜索)
elasticsearchTemplate.save(buildSearchDocument(event));
}
@EventListener
@Async
public void on(OrderPaidEvent event) {
repository.findByOrderId(event.getOrderId())
.ifPresent(summary -> {
summary.setStatus("PAID");
summary.setPaidAt(event.getPaidAt());
summary.setPaymentId(event.getPaymentId());
repository.save(summary);
});
}
@EventListener
@Async
public void on(OrderShippedEvent event) {
repository.findByOrderId(event.getOrderId())
.ifPresent(summary -> {
summary.setStatus("SHIPPED");
summary.setTrackingNo(event.getTrackingNo());
summary.setShippedAt(event.getShippedAt());
repository.save(summary);
});
}
}
第四部分:高级话题与生产实践
4.1 快照(Snapshot)优化
当聚合根的事件数量非常大时,每次重建状态都需要重放所有事件,性能会下降。快照是一种优化手段:定期保存聚合根的完整状态,重建时只需加载最近的快照,然后重放快照之后的事件。
public class Snapshot {
private String aggregateId;
private String aggregateType;
private int version;
private String state; // JSON序列化的状态
private Instant timestamp;
}
public class OrderAggregate {
// 从快照+增量事件重建
public static OrderAggregate fromSnapshotAndEvents(
Snapshot snapshot,
List<DomainEvent> events) {
// 反序列化快照状态
OrderAggregate aggregate = objectMapper.readValue(
snapshot.getState(),
OrderAggregate.class
);
// 应用增量事件
events.forEach(aggregate::apply);
return aggregate;
}
}
// 快照策略
@Component
public class SnapshotStrategy {
private static final int SNAPSHOT_THRESHOLD = 100; // 每100个事件生成一次快照
public boolean shouldCreateSnapshot(List<DomainEvent> events, int currentVersion) {
return events.size() >= SNAPSHOT_THRESHOLD;
}
public Snapshot createSnapshot(OrderAggregate aggregate) {
return new Snapshot(
aggregate.getOrderId(),
"Order",
aggregate.getVersion(),
objectMapper.writeValueAsString(aggregate),
Instant.now()
);
}
}
4.2 Saga模式与分布式事务
在微服务架构中,一个业务操作可能涉及多个服务的事件溯源聚合根。Saga模式通过事件编排来协调跨服务的事务。
// 订单创建Saga
public class CreateOrderSaga {
private final EventBus eventBus;
public void start(CreateOrderCommand command) {
// 步骤1:创建订单
eventBus.publish(new OrderCreationStarted(command));
// 步骤2:预留库存(监听OrderCreated事件后执行)
// 步骤3:处理支付(监听InventoryReserved事件后执行)
// 步骤4:确认订单(监听PaymentProcessed事件后执行)
}
// 补偿逻辑
@EventHandler
public void on(PaymentFailedEvent event) {
// 补偿:释放库存
eventBus.publish(new ReleaseInventory(event.getOrderId()));
// 补偿:取消订单
eventBus.publish(new CancelOrder(event.getOrderId(), "Payment failed"));
}
}
// Saga状态机
public enum SagaState {
STARTED,
ORDER_CREATED,
INVENTORY_RESERVED,
PAYMENT_PROCESSING,
COMPLETED,
COMPENSATING,
FAILED
}
4.3 事件版本兼容性
随着业务演进,事件的schema可能发生变化。需要处理新旧版本事件的兼容性问题。
策略1:向上兼容
新版本事件保留所有旧字段,只添加新字段。
// V1事件
public class OrderCreatedEventV1 {
private String orderId;
private List<OrderItem> items;
}
// V2事件(兼容V1)
public class OrderCreatedEventV2 {
private String orderId;
private List<OrderItem> items;
private String couponCode; // 新增字段
private BigDecimal discount; // 新增字段
}
策略2:事件升级器
在加载事件时,自动将旧版本事件转换为新版本。
@Component
public class EventUpgrader {
private final ObjectMapper objectMapper;
public DomainEvent upgrade(String eventType, String eventData, int version) {
return switch (eventType) {
case "OrderCreatedEvent" -> {
if (version == 1) {
OrderCreatedEventV1 v1 = objectMapper.readValue(eventData, OrderCreatedEventV1.class);
yield new OrderCreatedEventV2(
v1.getOrderId(),
v1.getItems(),
null, // 默认值
BigDecimal.ZERO
);
}
yield objectMapper.readValue(eventData, OrderCreatedEventV2.class);
}
default -> objectMapper.readValue(eventData, DomainEvent.class);
};
}
}
4.4 事件重放与系统恢复
事件溯源系统的一个重要能力是可以重建整个系统状态。这在以下场景非常有用:
- 引入新的读模型投影
- 修复读模型的bug
- 系统迁移或灾备恢复
@Service
public class EventReplayService {
private final EventStore eventStore;
private final List<ProjectionHandler> projections;
public void replayAll() {
// 清空所有读模型
projections.forEach(ProjectionHandler::clear);
// 重放所有事件
eventStore.subscribeAll(event -> {
projections.forEach(p -> p.handle(event));
});
}
public void replayFrom(Instant from) {
eventStore.subscribeFrom(from, event -> {
projections.forEach(p -> p.handle(event));
});
}
public void rebuildProjection(String projectionName) {
ProjectionHandler projection = projections.stream()
.filter(p -> p.getName().equals(projectionName))
.findFirst()
.orElseThrow();
projection.clear();
eventStore.subscribeAll(projection::handle);
}
}
4.5 性能优化实践
事件存储优化:
-- 分区表(按时间分区)
CREATE TABLE events (
event_id VARCHAR(36) PRIMARY KEY,
stream_id VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
event_data JSON NOT NULL,
version INT NOT NULL,
timestamp TIMESTAMP(6) NOT NULL,
metadata JSON
) PARTITION BY RANGE (timestamp);
-- 创建分区
CREATE TABLE events_2026_01 PARTITION OF events
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE events_2026_02 PARTITION OF events
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- 索引优化
CREATE INDEX idx_stream_version ON events (stream_id, version);
CREATE INDEX idx_timestamp ON events (timestamp);
读模型缓存策略:
@Service
public class CachedOrderQueryService {
private final RedisTemplate<String, OrderSummary> redisTemplate;
private final OrderSummaryRepository repository;
@Cacheable(value = "order-summary", key = "#orderId")
public Optional<OrderSummary> findById(String orderId) {
return repository.findByOrderId(orderId);
}
@CacheEvict(value = "order-summary", key = "#event.orderId")
@EventListener
public void on(OrderUpdatedEvent event) {
// 事件更新时自动清除缓存
}
// 预热热点数据
@Scheduled(fixedRate = 300000) // 每5分钟
public void warmUpCache() {
List<String> hotOrderIds = findHotOrders();
hotOrderIds.forEach(this::findById);
}
}
第五部分:适用场景与决策指南
5.1 适合事件溯源+CQRS的场景
| 场景 | 原因 |
|---|---|
| 金融交易系统 | 需要完整的审计日志,任何状态变化都必须可追溯 |
| 订单履约系统 | 状态流转复杂,需要支持回滚、补偿等操作 |
| 协作编辑系统 | 需要支持撤销/重做、版本对比、冲突解决 |
| 审计合规系统 | 法规要求保留完整历史记录 |
| IoT数据处理 | 高写入频率,读查询模式多样化 |
| 事件驱动架构 | 已有事件基础设施,容易集成 |
5.2 不适合的场景
| 场景 | 原因 |
|---|---|
| 简单CRUD应用 | 引入事件溯源会增加复杂度,收益不大 |
| 报表查询为主 | 读操作远多于写操作,但不需要复杂的状态管理 |
| 团队不熟悉DDD | 学习曲线陡峭,容易误用 |
| 实时性要求极高 | 最终一致性可能导致用户体验问题 |
| 数据量极大的单实体 | 事件数量过多会影响重建性能 |
5.3 引入决策流程
开始
│
├─ 是否需要完整审计日志?
│ ├─ 是 → 考虑事件溯源
│ └─ 否 → 继续
│
├─ 是否需要时间旅行(重建历史状态)?
│ ├─ 是 → 事件溯源
│ └─ 否 → 继续
│
├─ 读写比例是否显著不平衡(>10:1)?
│ ├─ 是 → 考虑CQRS
│ └─ 否 → 传统架构可能足够
│
├─ 是否需要独立的读写扩展?
│ ├─ 是 → CQRS
│ └─ 否 → 继续
│
└─ 业务复杂度是否高(多聚合根协作)?
├─ 是 → DDD + 事件溯源
└─ 否 → 传统CRUD
结语:架构的本质是权衡
事件溯源和CQRS不是银弹,它们解决了一类问题,同时也引入了新的复杂度。选择它们意味着:
获得的好处:
- 完整的审计日志
- 时间旅行能力
- 读写独立扩展
- 更好的业务表达力
- 更容易实现事件驱动架构
付出的代价:
- 学习曲线陡峭
- 最终一致性的处理
- 事件版本管理的复杂度
- 调试和排障难度增加
- 需要额外的基础设施(事件存储、消息队列)
在决定采用之前,请认真评估你的业务场景、团队能力和运维能力。好的架构不是最先进的架构,而是最适合当前阶段的架构。
参考资料:
- Martin Fowler: Event Sourcing
- Greg Young: CQRS Documents
- Vaughn Vernon: Domain-Driven Design
- EventStoreDB Documentation
- Axon Framework Reference