Kafka 4.0 彻底告别 ZooKeeper:KRaft 模式从原理到生产实战全解析
前言
2025 年,Apache Kafka 4.0 正式发布,这不仅仅是一个大版本更新——它标志着一个时代的终结。从 Kafka 诞生之初就如影随形的 ZooKeeper 依赖,终于被彻底移除。KRaft(Kafka Raft)成为唯一的集群协调模式。
如果你还在用 Kafka 3.x 带着 ZooKeeper 跑集群,是时候认真面对迁移这件事了。本文不会给你泛泛而谈的概述,而是从 Raft 共识协议的底层原理出发,结合 Docker 部署实战、Java/Go 客户端代码、Exactly-Once 语义实现、性能调优和生产级最佳实践,给你一份真正能落地的东西。
一、为什么必须告别 ZooKeeper?
1.1 ZooKeeper 的原罪
ZooKeeper 在 Kafka 中的角色是元数据管理——Broker 注册、Topic 分区分配、Controller 选举等核心协调工作都依赖它。但这个"外挂"带来了太多问题:
运维复杂度翻倍。 你不仅要管理 Kafka 集群,还要管理一套 ZooKeeper 集群。两套系统的版本兼容、配置调优、监控告警、故障排查,工作量直接翻倍。更别提 ZooKeeper 自己的 Observer、Snapshot 事务日志清理这些坑。
元数据瓶颈。 Kafka 集群规模上去之后,Topic 和 Partition 数量动辄上万。ZooKeeper 的 Watch 机制在这种规模下表现糟糕——每次元数据变更都会触发大量 Watch 通知,Controller 成为热点,整个集群的元数据更新延迟飙升。这个问题在 10 万+ Partition 的集群上尤为明显。
Controller 故障恢复慢。 旧架构下,Controller 故障需要通过 ZooKeeper 选举新 Controller,新 Controller 要从 ZooKeeper 加载全部元数据,然后才能开始工作。集群越大,恢复越慢,几十秒甚至几分钟的不可用窗口在生产环境中是致命的。
架构层面的割裂。 元数据存储在 ZooKeeper,实际数据在 Broker,两者之间的状态同步是一个天然的脆弱点。你一定遇到过 ZooKeeper 和 Kafka 元数据不一致的噩梦——删了 Topic 但 ZooKeeper 里还留着,或者反过来。
1.2 KRaft 不是"换了个 ZooKeeper"
一个常见的误解是:KRaft 就是把 ZooKeeper 的功能内嵌到 Kafka 里了。这个理解太浅。
KRaft 的本质是重新定义了 Kafka 的元数据架构。在旧架构中,元数据是 ZooKeeper 的"外部状态",Kafka 只是使用者;在 KRaft 架构中,元数据是一条条日志记录,存储在 Kafka 自身的内部 Topic(@metadata)中,通过 Raft 协议在 Controller Quorum 之间复制。
这意味着:
- 元数据变更变成了追加日志的操作,天然有序、可回放
- Broker 通过拉取元数据日志来同步状态,替代了推模式的 Watch
- Controller 不再需要"加载"元数据——它自己就是 Raft 日志的维护者
这不是简单的替换,这是架构范式的转变。
二、KRaft 核心原理:Raft 共识协议
2.1 Raft 协议三板斧
Raft 共识协议是 KRaft 的基石,由 Diego Ongaro 和 John Ousterhout 在 2014 年提出,以"易于理解"著称(相对于 Paxos)。它的核心机制有三个:
Leader Election(领导者选举)。 在任何时刻,Raft 集群中有且仅有一个 Leader 节点。所有写操作必须通过 Leader,Follower 只接受 Leader 的日志复制。当 Leader 故障时,Follower 通过超时机制触发选举,获得多数票的节点成为新 Leader。
选举过程的关键参数:
election.timeout.ms(默认 1000ms):Follower 在此时间内未收到 Leader 的心跳,发起选举request.timeout.ms(默认 2000ms):选举请求的超时时间- 一个节点要赢得选举,必须获得 Quorum(多数)节点的投票
Log Replication(日志复制)。 Leader 收到写请求后,将其追加到自己的日志中,然后并行地发送给所有 Follower。当多数 Follower 确认写入后,这条日志就被认为是"已提交"(Committed),Leader 通知客户端写入成功。
这个过程保证了:
- 已提交的日志不会丢失(多数节点已持久化)
- 所有节点的日志顺序一致(Leader 只有一个,日志只能从 Leader 追加)
Safety(安全性保证)。 Raft 保证了几个关键的不变量:
- Election Safety:每个 Term 最多一个 Leader
- Leader Append-Only:Leader 只追加日志,不删除不覆盖
- Log Matching:如果两条日志在同一 Index 上且 Term 相同,那么它们之前的所有日志也相同
- Leader Completeness:如果一个日志被提交,那么后续所有 Leader 都会包含这条日志
- State Machine Safety:任何节点在某个 Index 上应用的日志条目都相同
2.2 元数据日志机制
KRaft 中,所有的集群元数据变更——创建 Topic、修改配置、Broker 上下线、Partition Leader 切换——都被编码为一条条元数据日志记录(Metadata Record),追加到内部的 @metadata 日志中。
@metadata 日志结构(逻辑视图):
┌─────────┬──────────┬───────────────────────────────┐
│ Offset │ Term │ Record Type │
├─────────┼──────────┼───────────────────────────────┤
│ 0 │ 1 │ RegisterBrokerRecord(broker0) │
│ 1 │ 1 │ RegisterBrokerRecord(broker1) │
│ 2 │ 1 │ CreateTopicRecord(topicA) │
│ 3 │ 1 │ PartitionRecord(topicA-0) │
│ 4 │ 2 │ LeaderChangeRecord(broker1) │
│ ... │ ... │ ... │
└─────────┴──────────┴───────────────────────────────┘
每个 Broker 维护一个本地的元数据日志拉取偏移量。Broker 定期从 Controller 拉取自己缺失的元数据记录,然后重放到本地的元数据缓存中。这种拉模型(Pull Model)相比 ZooKeeper 的推模型(Push/Watch)有几个优势:
- 背压控制:Broker 可以按自己的节奏消费元数据,不会被海量 Watch 通知淹没
- 断点续传:Broker 重启后从上次的位置继续拉取,不需要全量加载
- 顺序保证:日志天然有序,不存在乱序更新的问题
2.3 Controller Quorum 与投票者机制
KRaft 集群中的 Controller 节点组成一个 Raft Quorum。通常部署 3 个或 5 个 Controller 节点,其中一个是活跃的 Leader Controller,其余是 Follower Controller。
投票者(Voter)机制是 KRaft 的一个重要设计。并非所有 Controller 都必须是投票者——你可以配置 Observer Controller,它们参与日志复制但不参与投票:
# server.properties - Controller 节点配置
node.id=1
process.roles=controller
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
controller.listener.names=CONTROLLER
# Observer Controller(不参与投票,只复制日志)
# 在 KRaft 中,可以通过不加入 voters 列表实现
Quorum 大小的选择遵循 Raft 的通用原则:N 个投票者可以容忍 (N-1)/2 个故障。
| 投票者数量 | 容忍故障数 | 推荐场景 |
|---|---|---|
| 3 | 1 | 中小规模集群 |
| 5 | 2 | 大规模关键集群 |
2.4 合并模式 vs 分离模式
KRaft 提供两种部署模式:
Combined 模式(合并模式)。 Broker 和 Controller 运行在同一个进程中。适合小规模集群或开发测试环境。
# server.properties - Combined 模式
process.roles=broker,controller
node.id=1
listeners=BROKER://:9092,CONTROLLER://:9093
controller.quorum.voters=1@localhost:9093,2@localhost:9095,3@localhost:9097
advertised.listeners=BROKER://localhost:9092
Separated 模式(分离模式)。 Broker 和 Controller 运行在不同的进程中,甚至不同的机器上。适合生产环境,职责隔离更清晰。
# Controller 节点 - controller.properties
process.roles=controller
node.id=1
listeners=CONTROLLER://:9093
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093
# Broker 节点 - broker.properties
process.roles=broker
node.id=4
listeners=BROKER://:9092
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093
advertised.listeners=BROKER://broker1:9092
生产环境强烈推荐分离模式。 原因很简单:Controller 负责元数据管理,对延迟敏感;Broker 负责数据读写,可能因为大流量导致 GC 或 CPU 飙升。合并在一起,Broker 的压力会直接影响 Controller 的稳定性,导致元数据操作延迟甚至超时。
2.5 Controller 与 Broker 职责分离
KRaft 架构下,Controller 和 Broker 的职责边界更加清晰:
Controller 的职责:
- 维护 Raft 日志和元数据状态机
- 处理管理操作(创建/删除 Topic、修改配置)
- 触发 Partition Leader 选举
- 监控 Broker 健康状态
- 向 Broker 推送元数据变更通知
Broker 的职责:
- 处理客户端的数据读写请求
- 存储消息数据
- 参与消费者组的协调
- 向 Controller 报告自身状态
关键变化:Broker 不再直接与 ZooKeeper 交互,所有元数据操作都通过 Controller 完成。这简化了 Broker 的代码路径,也消除了元数据不一致的根源。
三、Kafka 4.0 新特性详解
3.1 ZooKeeper 完全移除
这是最大的变更。Kafka 4.0 中:
- 所有 ZooKeeper 相关的配置参数和命令行工具被移除
--zookeeper参数不再被任何脚本接受- ZooKeeper 模式的 Broker 无法启动
- 迁移模式(从 ZooKeeper 迁移到 KRaft 的过渡模式)不再支持
如果你还在 Kafka 3.x 上使用 ZooKeeper 模式,升级到 4.0 之前必须先迁移到 KRaft 模式。没有捷径,没有回退。
3.2 新一代消费者重平衡协议
Kafka 4.0 将 Incremental Cooperative Rebalancing(增量协作重平衡)作为默认的消费者重平衡协议,替代了之前的 Eager(急切)协议。
旧协议的问题: Eager 重平衡时,所有消费者先 revoke 所有分区,然后重新分配。这导致整个消费者组在重平衡期间完全停止消费,即使最终分配结果没有变化。
新协议的改进:
- 只 revoke 需要移动的分区,未受影响的分区继续消费
- 分多轮完成重平衡,逐步调整分配
- 大幅减少"Stop-The-World"的时间窗口
// Kafka 4.0 中默认启用,无需额外配置
// 如果你需要显式指定:
properties.put("consumer.group.protocol", "consumer"); // 新协议
// 旧协议已移除,不可回退
3.3 点对点消息模型
Kafka 4.0 引入了更灵活的消费模式支持,允许更精细地控制分区与消费者的映射关系。这在某些场景下提供了类似点对点消息模型的能力,减少了不必要的分区复制开销。
3.4 移除旧协议 API
Kafka 4.0 移除了一批过时的 API 和功能:
- 旧版消费者 API(
kafka.consumer.SimpleConsumer) - 旧版生产者 API(
kafka.producer.Producer) - 基于 ZooKeeper 的管理工具
- 部分废弃的配置参数
这是 Kafka 社区的一贯风格——大版本不向后兼容,逼着你用新 API。虽然痛苦,但换来的是更干净的代码库和更简单的维护。
四、架构对比分析
4.1 旧架构 vs KRaft 架构
旧架构(ZooKeeper 模式):
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Broker 0 │ │ Broker 1 │ │ Broker 2 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└────────────────┼────────────────┘
│ Watch/通知
┌───────▼───────┐
│ ZooKeeper │
│ Ensemble │
└───────────────┘
KRaft 架构:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Broker 0 │ │ Broker 1 │ │ Broker 2 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ 拉取元数据日志 │
└────────────────┼────────────────┘
│
┌─────────────▼──────────────┐
│ Controller Quorum │
│ ┌───────┐ ┌───────┐ │
│ │Ctrl 0 │ │Ctrl 1 │ │
│ │Leader │ │Follower│ │
│ └───────┘ └───────┘ │
│ ┌───────┐ │
│ │Ctrl 2 │ │
│ │Follower│ │
│ └───────┘ │
└───────────────────────────┘
4.2 关键指标对比
| 维度 | ZooKeeper 模式 | KRaft 模式 |
|---|---|---|
| 组件数量 | Kafka + ZooKeeper | 仅 Kafka |
| Controller 故障恢复 | 10-60 秒 | 1-5 秒 |
| Partition 规模上限 | ~10 万 | ~200 万(理论值) |
| 元数据更新机制 | Watch 推送 | 日志拉取 |
| 运维复杂度 | 高(双系统) | 低(单系统) |
| 元数据一致性 | 可能不一致 | Raft 保证一致 |
| 集群启动时间 | 分钟级 | 秒级 |
五、Docker 部署实战
5.1 单机部署(Combined 模式)
适合本地开发和测试。
# docker-compose-single.yaml
version: '3.8'
services:
kafka:
image: apache/kafka:4.0.0
container_name: kafka-single
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: BROKER://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: BROKER://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-single:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# 日志目录
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka-data:/var/lib/kafka/data
volumes:
kafka-data:
启动并验证:
# 启动
docker compose -f docker-compose-single.yaml up -d
# 等待启动完成
sleep 10
# 创建 Topic
docker exec kafka-single /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic test-topic \
--partitions 3 --replication-factor 1
# 查看集群元数据
docker exec kafka-single /opt/kafka/bin/kafka-metadata.sh \
--snapshot /var/lib/kafka/data/meta.properties
# 生产消息
docker exec kafka-single /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic
# 消费消息
docker exec kafka-single /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic --from-beginning
5.2 三节点集群部署(Separated 模式)
生产级部署推荐方案:
# docker-compose-cluster.yaml
version: '3.8'
networks:
kafka-net:
driver: bridge
services:
kafka-controller-1:
image: apache/kafka:4.0.0
container_name: kafka-ctrl-1
networks: [kafka-net]
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- ctrl1-data:/var/lib/kafka/data
kafka-controller-2:
image: apache/kafka:4.0.0
container_name: kafka-ctrl-2
networks: [kafka-net]
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- ctrl2-data:/var/lib/kafka/data
kafka-controller-3:
image: apache/kafka:4.0.0
container_name: kafka-ctrl-3
networks: [kafka-net]
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- ctrl3-data:/var/lib/kafka/data
kafka-broker-1:
image: apache/kafka:4.0.0
container_name: kafka-broker-1
networks: [kafka-net]
ports:
- "9092:9092"
depends_on:
- kafka-controller-1
- kafka-controller-2
- kafka-controller-3
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: BROKER://:9092
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-broker-1:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_NUM_NETWORK_THREADS: 4
KAFKA_NUM_IO_THREADS: 8
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_NUM_PARTITIONS: 3
volumes:
- broker1-data:/var/lib/kafka/data
kafka-broker-2:
image: apache/kafka:4.0.0
container_name: kafka-broker-2
networks: [kafka-net]
ports:
- "9094:9092"
depends_on:
- kafka-controller-1
- kafka-controller-2
- kafka-controller-3
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: BROKER://:9092
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-broker-2:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
volumes:
- broker2-data:/var/lib/kafka/data
kafka-broker-3:
image: apache/kafka:4.0.0
container_name: kafka-broker-3
networks: [kafka-net]
ports:
- "9095:9092"
depends_on:
- kafka-controller-1
- kafka-controller-2
- kafka-controller-3
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: BROKER://:9092
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka-broker-3:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-ctrl-1:9093,2@kafka-ctrl-2:9093,3@kafka-ctrl-3:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
volumes:
- broker3-data:/var/lib/kafka/data
volumes:
ctrl1-data:
ctrl2-data:
ctrl3-data:
broker1-data:
broker2-data:
broker3-data:
5.3 关键配置参数详解
# ========== 核心身份配置 ==========
# 节点 ID,集群内唯一
node.id=1
# 进程角色:broker, controller, 或 broker,controller
process.roles=broker,controller
# ========== 监听器配置 ==========
# 监听器列表
listeners=BROKER://:9092,CONTROLLER://:9093
# 对外暴露的地址
advertised.listeners=BROKER://host1:9092
# 监听器名称映射
controller.listener.names=CONTROLLER
# 安全协议映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT
# ========== Controller Quorum 配置 ==========
# Quorum 投票者列表,格式:id1@host1:port1,id2@host2:port2,...
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093
# Leader 选举超时
controller.quorum.election.timeout.ms=1000
# 选举请求超时
controller.quorum.request.timeout.ms=2000
# Follower 拉取超时
controller.quorum.fetch.timeout.ms=2000
# ========== 存储配置 ==========
log.dirs=/var/lib/kafka/data
log.retention.hours=168
log.segment.bytes=1073741824
log.segment.delete.delay.ms=60000
# ========== 复制配置 ==========
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
default.replication.factor=3
min.insync.replicas=2
5.4 故障切换测试
部署完成后,验证集群的高可用性:
# 1. 创建测试 Topic
docker exec kafka-broker-1 /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka-broker-1:9092 \
--create --topic ha-test \
--partitions 6 --replication-factor 3
# 2. 查看分区 Leader 分布
docker exec kafka-broker-1 /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka-broker-1:9092 \
--describe --topic ha-test
# 3. 模拟 Broker 故障
docker stop kafka-broker-1
# 4. 再次查看分区 Leader(应该自动切换到其他 Broker)
docker exec kafka-broker-2 /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka-broker-2:9092 \
--describe --topic ha-test
# 5. 模拟 Controller 故障
docker stop kafka-ctrl-1
# 6. 验证集群仍可正常工作
docker exec kafka-broker-2 /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka-broker-2:9092 \
--list
# 7. 恢复节点
docker start kafka-ctrl-1
docker start kafka-broker-1
六、客户端代码示例
6.1 Java(Spring Boot + Kafka)
Maven 依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.0</version>
</dependency>
</dependencies>
生产者配置:
package com.example.kafka.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 性能优化参数
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB 批次
config.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待 10ms 凑批
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // LZ4 压缩
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864L); // 64MB 缓冲区
config.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有 ISR 确认
config.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试 3 次
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等生产者
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
消费者配置:
package com.example.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-demo-group");
// 手动提交 Offset
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 性能优化
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最少拉取 1KB
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 单次最多 500 条
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 会话超时 30s
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 处理超时 5min
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(3); // 3 个消费线程
return factory;
}
}
生产者服务:
package com.example.kafka.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* 同步发送消息
*/
public SendResult<String, String> sendSync(String topic, String key, String message) {
try {
CompletableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
return future.get(); // 阻塞等待确认
} catch (Exception e) {
log.error("同步发送消息失败: topic={}, key={}", topic, key, e);
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 异步发送消息(带回调)
*/
public void sendAsync(String topic, String key, String message) {
CompletableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("异步发送消息失败: topic={}, key={}, partition={}",
topic, key,
result != null ? result.getRecordMetadata().partition() : "N/A",
ex);
} else {
log.debug("消息发送成功: topic={}, partition={}, offset={}",
topic,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}
消费者服务(手动提交 Offset):
package com.example.kafka.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MessageConsumer {
@KafkaListener(topics = "test-topic", groupId = "kafka-demo-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
log.info("收到消息: topic={}, partition={}, offset={}, key={}, value={}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
// 处理业务逻辑
processMessage(record.value());
// 手动提交 Offset(处理成功后才提交)
acknowledgment.acknowledge();
log.debug("Offset 已提交: partition={}, offset={}",
record.partition(), record.offset());
} catch (Exception e) {
log.error("消息处理失败: partition={}, offset={}",
record.partition(), record.offset(), e);
// 不提交 Offset,下次重新消费
// 可以加入死信队列逻辑
}
}
private void processMessage(String message) {
// 你的业务逻辑
}
}
6.2 Go 客户端
使用 github.com/IBM/sarama 库(Kafka 4.0 兼容):
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/IBM/sarama"
)
// ==================== 生产者 ====================
func main() {
producer, err := createProducer()
if err != nil {
log.Fatalf("创建生产者失败: %v", err)
}
defer producer.Close()
// 发送消息
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("message-%d at %s", i, time.Now().Format(time.RFC3339))
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: "test-topic",
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(value),
})
if err != nil {
log.Printf("发送消息失败: %v", err)
continue
}
log.Printf("消息已发送: partition=%d, offset=%d, key=%s", partition, offset, key)
}
}
func createProducer() (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有 ISR 确认
config.Producer.Retry.Max = 3
config.Producer.Partitioner = sarama.NewHashPartitioner
// 性能优化
config.Producer.Flush.Messages = 100 // 凑满 100 条发送
config.Producer.Flush.Frequency = 10 * time.Millisecond // 或等待 10ms
config.Producer.Compression = sarama.CompressionLZ4
config.Net.MaxOpenRequests = 5
brokers := []string{"localhost:9092"}
return sarama.NewSyncProducer(brokers, config)
}
// ==================== 消费者 ====================
func startConsumer() {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.NewBalanceStrategyRange(), // 或 NewBalanceStrategyRoundRobin()
}
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Retry.Max = 3
config.Consumer.Fetch.Min = 1024 // 最少 1KB
config.Consumer.Fetch.Default = 1048576 // 默认 1MB
config.Consumer.MaxProcessingTime = 100 * time.Millisecond
brokers := []string{"localhost:9092"}
group, err := sarama.NewConsumerGroup(brokers, "kafka-demo-group", config)
if err != nil {
log.Fatalf("创建消费者组失败: %v", err)
}
defer group.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
handler := &consumerGroupHandler{}
// 优雅退出
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
go func() {
for {
if err := group.Consume(ctx, []string{"test-topic"}, handler); err != nil {
log.Printf("消费者组错误: %v", err)
}
if ctx.Err() != nil {
return
}
}
}()
<-sigchan
cancel()
log.Println("消费者已关闭")
}
type consumerGroupHandler struct{}
func (h *consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (h *consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (h *consumerGroupHandler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
for message := range claim.Messages() {
log.Printf("收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s",
message.Topic, message.Partition, message.Offset,
string(message.Key), string(message.Value))
// 处理业务逻辑
if err := processMessage(string(message.Value)); err != nil {
log.Printf("消息处理失败: offset=%d, err=%v", message.Offset, err)
// 不提交 Offset,等待重新消费
continue
}
// 手动提交 Offset
session.MarkMessage(message, "")
session.Commit()
}
return nil
}
func processMessage(value string) error {
// 你的业务逻辑
return nil
}
七、Exactly-Once 语义实现
Exactly-Once 是 Kafka 最强大的特性之一,也是最容易用错的。Kafka 4.0 中,Exactly-Once 语义的实现依赖三个机制的配合:
7.1 幂等生产者(Idempotent Producer)
幂等生产者保证单分区内消息不重复。开启后,Kafka 为每个生产者分配一个 Producer ID(PID)和递增的 Sequence Number,Broker 据此去重:
// 开启幂等生产者(Kafka 3.0+ 默认开启)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 等价于:
// acks=all
// retries=Integer.MAX_VALUE
// max.in.flight.requests.per.connection <= 5
注意: 幂等生产者只保证单分区内的去重,跨分区不保证。而且只在单个 Producer 实例的生命周期内有效——Producer 重启后获得新的 PID,无法去重之前的消息。
7.2 事务(Transactions)
事务保证跨分区、跨 Topic 的原子写入。配合消费者的事务读取,可以实现端到端的 Exactly-Once:
// 事务生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-producer-1"); // 必须!
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务(只需一次)
producer.initTransactions();
try {
// 开始事务
producer.beginTransaction();
// 发送多条消息到不同 Topic
producer.send(new ProducerRecord<>("topic-a", "key1", "value1"));
producer.send(new ProducerRecord<>("topic-b", "key2", "value2"));
// 提交消费者的 Offset(消费-处理-生产 模式)
producer.sendOffsetsToTransaction(
offsets, // Map<TopicPartition, OffsetAndMetadata>
consumerGroupId
);
// 提交事务
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 另一个具有相同 transactional.id 的生产者已启动
producer.close();
} catch (KafkaException e) {
// 中止事务
producer.abortTransaction();
}
7.3 消费者端配置
// 事务消费者配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tx-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读已提交的消息
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
isolation.level=read_committed 是关键——它确保消费者只读到已提交事务的消息,不会读到中止事务的"脏数据"。
7.4 Exactly-Once 的陷阱
- transactional.id 必须唯一且稳定。 相同的 transactional.id 在新 Producer 启动时会 fence 掉旧的 Producer。如果你使用动态生成,重启后无法恢复事务状态。
- 事务超时默认 60 秒。 如果你的处理逻辑超过这个时间,事务会自动中止。可以通过
transaction.timeout.ms调整。 - read_committed 有性能开销。 Broker 需要维护事务状态,消费者需要过滤未提交的消息。高吞吐场景下,延迟会增加。
- 跨系统不支持。 Kafka 事务不能和数据库事务协调——如果你需要"Kafka 写入 + 数据库写入"的原子性,只能用两阶段提交或幂等补偿。
八、性能优化与调优
8.1 生产者优化
# 批次大小(字节),达到此大小才发送
batch.size=32768
# 等待时间(毫秒),即使批次未满也发送
linger.ms=10
# 压缩类型:none, gzip, snappy, lz4, zstd
# lz4 压缩比和速度平衡最好,zstd 压缩比更高但更耗 CPU
compression.type=lz4
# 生产者缓冲区大小(字节)
buffer.memory=67108864
# 确认机制:0(不等确认), 1(Leader 确认), all(所有 ISR 确认)
acks=all
# 重试次数
retries=3
# 单个请求最大字节数
max.request.size=1048576
# 连接上最大未确认请求数(开启幂等时 <= 5)
max.in.flight.requests.per.connection=5
调优建议:
batch.size和linger.ms是吞吐量 vs 延迟的权衡。高吞吐场景增大 batch.size,低延迟场景减小 linger.ms- 压缩类型选择:CPU 充裕选 zstd(压缩比最好),CPU 敏感选 lz4(速度最快)
acks=all保证可靠性,但增加延迟。如果可以容忍少量丢消息,acks=1可以显著提升吞吐
8.2 消费者优化
# 单次拉取最小字节数,不够就等
fetch.min.bytes=1024
# 单次拉取最大字节数
fetch.max.bytes=5242880
# 单次 poll 返回的最大记录数
max.poll.records=500
# 等待拉取响应的最长时间
fetch.max.wait.ms=500
# 会话超时(心跳超时后踢出消费者组)
session.timeout.ms=30000
# 两次 poll 的最大间隔(处理超时)
max.poll.interval.ms=300000
# 心跳间隔(通常为 session.timeout.ms 的 1/3)
heartbeat.interval.ms=10000
# 自动提交间隔
auto.commit.interval.ms=5000
调优建议:
fetch.min.bytes增大可以减少拉取次数,提升吞吐,但增加延迟max.poll.records要和你的处理能力匹配——设置太大但处理不过来,会触发 rebalancemax.poll.interval.ms根据你的消息处理耗时设置——设置太小会导致频繁 rebalance
8.3 Broker 优化
# 日志段大小,影响索引粒度和文件切换频率
log.segment.bytes=1073741824
# I/O 线程数(处理磁盘读写),建议 = 磁盘数
num.io.threads=8
# 网络线程数(处理网络请求),建议 = CPU 核数
num.network.threads=4
# 日志刷盘策略(依赖操作系统 page cache 通常更好)
# log.flush.interval.messages=10000
# log.flush.interval.ms=1000
# 后台线程数(日志清理、压缩等)
background.threads=4
# Socket 发送/接收缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
调优建议:
- 不要手动配置
log.flush.interval,让操作系统的 Page Cache 来做。手动刷盘会严重降低吞吐 num.io.threads应该等于或略大于数据目录数(log.dirs中的目录数)num.network.threads在高并发场景下适当增大,但过多会导致上下文切换
8.4 KRaft 特定优化
# Controller 选举超时,越短故障恢复越快,但越容易误触发选举
controller.quorum.election.timeout.ms=1000
# 选举请求超时
controller.quorum.request.timeout.ms=2000
# Follower 拉取元数据日志超时
controller.quorum.fetch.timeout.ms=2000
# 元数据日志最大批次大小
controller.quorum.fetch.max.bytes=1048576
# 元数据日志同步频率
metadata.log.max.record.memory.bytes=5242880
# 元数据偏移量快照间隔(影响重启恢复速度)
metadata.log.offset.snapshot.interval.ms=10000
调优建议:
election.timeout.ms不要设太小(< 500ms),否则网络抖动会频繁触发选举- 元数据量大的集群(几千个 Topic),适当增大
fetch.max.bytes - 快照间隔影响重启恢复速度——间隔越大,重启后需要重放的日志越多
九、生产级最佳实践
9.1 集群规划建议
Controller 节点:
- 3 个 Controller(可容忍 1 个故障)或 5 个(可容忍 2 个故障)
- 专用机器,不要和 Broker 混部署
- 配置独立的高性能磁盘(SSD,用于元数据日志)
- CPU 和内存要求不高,但网络延迟要低
Broker 节点:
- 根据数据量和吞吐计算节点数
- 磁盘选择:JBOD(Just a Bunch Of Disks)优于 RAID,Kafka 自己管理分区分布
- 内存:足够的 Page Cache,通常 JVM 堆 6-8GB,剩余给 Page Cache
- 网络:万兆网卡在大规模集群中是必须的
容量估算公式:
每日数据量 = 每秒消息数 × 消息平均大小 × 86400
总存储需求 = 每日数据量 × 保留天数 × 副本数 × 1.1(预留 10%)
单 Broker 存储 = 总存储需求 / Broker 数
建议单 Broker 存储 < 磁盘容量的 70%
9.2 Topic 设计策略
分区数:
- 分区数 = 期望的并行消费者数
- 分区数 = 吞吐量需求 / 单分区吞吐量
- 分区数不宜过多——每个分区都有内存和文件句柄开销,且影响 Controller 元数据管理
- 单集群建议分区数 < 10 万(KRaft 模式下可以更高)
副本数:
- 生产环境最少 3 副本
min.insync.replicas=2(配合acks=all)- 跨机房部署考虑 Rack Awareness
Key 设计:
- 需要顺序保证的消息使用相同的 Key(路由到同一分区)
- 避免热点 Key——考虑添加随机后缀打散
- Key 为 null 时使用轮询分配,适合无序场景
9.3 安全配置
SASL/PLAIN 认证:
# Broker 配置
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# JAAS 配置(通过 JVM 参数传入)
# -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
# kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_producer="producer-secret"
user_consumer="consumer-secret";
};
SSL/TLS 加密:
# Broker SSL 配置
listeners=SSL://:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-pass
ssl.key.password=key-pass
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-pass
ssl.client.auth=required
ACL 授权:
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=false
# 创建 ACL
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:producer \
--operation Write --topic test-topic
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:consumer \
--operation Read --topic test-topic \
--operation Read --group consumer-group
9.4 监控指标
必须监控的 Broker 指标:
| 指标 | JMX MBean | 告警阈值 |
|---|---|---|
| 在线 Broker 数 | kafka.controller:type=KafkaController,name=ActiveBrokerCount | < 预期值 |
| 离线分区数 | kafka.controller:type=KafkaController,name=OfflinePartitionsCount | > 0 |
| 活跃 Controller | kafka.controller:type=KafkaController,name=ActiveControllerCount | != 1 |
| 请求处理延迟 | kafka.network:type=RequestMetrics,name=RequestQueueSize | 持续增长 |
| 日志段大小 | kafka.log:type=Log,name=Size,topic=*,partition=* | 接近磁盘容量 |
| ISR 缩减 | kafka.cluster:type=Partition,name=UnderReplicated,topic=*,partition=* | > 0 |
必须监控的生产者指标:
| 指标 | 说明 |
|---|---|
record-send-rate | 每秒发送记录数 |
record-error-rate | 每秒发送失败数 |
batch-size-avg | 平均批次大小 |
record-queue-time-avg | 记录在缓冲区等待时间 |
compression-rate | 压缩率 |
必须监控的消费者指标:
| 指标 | 说明 |
|---|---|
records-consumed-rate | 每秒消费记录数 |
records-lag-max | 最大消费延迟(条数) |
commit-rate | Offset 提交速率 |
join-rate | 加入消费者组频率(频繁 = rebalance 问题) |
Prometheus + Grafana 监控方案:
# kafka-exporter部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-exporter
spec:
replicas: 1
selector:
matchLabels:
app: kafka-exporter
template:
spec:
containers:
- name: kafka-exporter
image: danielqsj/kafka-exporter:latest
args:
- --kafka.server=kafka-broker-1:9092
- --kafka.server=kafka-broker-2:9092
- --kafka.server=kafka-broker-3:9092
- --web.listen-address=:9308
ports:
- containerPort: 9308
9.5 故障排查指南
问题 1:消费者频繁 Rebalance
症状:日志中出现 rebalance 相关信息,消费延迟飙升。
排查:
# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group your-group
# 检查消费者配置
# - max.poll.interval.ms 是否太小?
# - 消息处理是否太慢?
# - session.timeout.ms 是否合理?
解决:
- 增大
max.poll.interval.ms(如果处理耗时较长) - 减小
max.poll.records(减少单次处理量) - 检查 GC 问题(消费者进程是否频繁 Full GC)
问题 2:生产者发送超时
症状:TimeoutException 或 NotEnoughReplicasException。
排查:
# 检查 ISR 列表
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic your-topic
# 关注 ISR 是否少于预期
解决:
- ISR 不足:检查落后 Broker 的磁盘、网络、CPU
min.insync.replicas设置过高:评估是否可以降低- 网络延迟:检查 Broker 之间网络连通性
问题 3:KRaft Controller 选举异常
症状:集群无活跃 Controller,所有管理操作失败。
排查:
# 查看 Controller 日志
grep "KRaft" /var/log/kafka/server.log | tail -50
# 检查 Quorum 状态
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 \
describe --status
# 检查投票者配置是否一致
grep "controller.quorum.voters" /etc/kafka/server.properties
解决:
- 确保所有节点的
controller.quorum.voters配置一致 - 检查 Controller 节点之间的网络连通性
- 检查磁盘空间(元数据日志无法写入会导致 Controller 退出)
问题 4:元数据不一致
KRaft 模式下元数据不一致比 ZooKeeper 模式少得多,但仍可能发生:
# 检查元数据快照
kafka-dump-log.sh --files /var/lib/kafka/data/@metadata-0/00000000000000000000.log \
--cluster-metadata-decoder
# 强制元数据同步(慎用!)
# 删除 Broker 的元数据缓存,重启后会从 Controller 全量拉取
十、从 ZooKeeper 迁移到 KRaft
如果你还在 Kafka 3.x + ZooKeeper 上,迁移路径如下:
10.1 迁移前准备
- 升级到 Kafka 3.6+:Kafka 3.6 开始支持从 ZooKeeper 模式迁移到 KRaft 模式
- 确认集群健康:所有 Broker 在线,ISR 完整,无未完成的分区重分配
- 备份 ZooKeeper 数据:以防万一需要回滚
- 测试环境演练:先在测试环境完整走一遍迁移流程
10.2 迁移步骤(Kafka 3.6+)
# 1. 在 ZooKeeper 模式下启动迁移
# 在每个 Broker 的配置中添加:
# zookeeper.metadata.migration.enable=true
# 2. 逐步启动 KRaft Controller(与 ZooKeeper 并行运行)
# Controller 配置:
# controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093
# zookeeper.connect=zookeeper:2181 # 仍然连接 ZooKeeper
# 3. 验证迁移状态
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
# 4. 迁移完成后,移除 ZooKeeper 相关配置
# 删除 zookeeper.connect
# 删除 zookeeper.metadata.migration.enable
# 重启所有 Broker
# 5. 停止 ZooKeeper
10.3 注意事项
- 迁移过程中集群可以正常提供服务
- 迁移是不可逆的——一旦完成,不能回退到 ZooKeeper 模式
- Kafka 4.0 不支持迁移模式——如果你还在 ZooKeeper 模式,必须先在 3.6+ 上完成迁移,再升级到 4.0
十一、总结与展望
Kafka 4.0 移除 ZooKeeper 不是终点,而是一个新起点。
当前状态: KRaft 模式已经稳定,社区几年的打磨让它足够可靠。如果你是新项目,没有理由再碰 ZooKeeper。如果你是存量项目,迁移窗口已经打开——Kafka 3.6+ 提供了平滑迁移路径,没有理由再拖延。
未来方向:
- Partition 级别的元数据管理优化,支持百万级 Partition
- 更细粒度的 Controller 职责划分,降低单点压力
- 与 Kafka 生态更紧密的整合(Kafka Connect、Kafka Streams 都已经适配 KRaft)
- 云原生部署的持续优化
最后说一句: 技术选型从来不是选"最好的",而是选"最合适的"。但对于 Kafka + ZooKeeper 这个组合来说,KRaft 在几乎所有维度上都是更好的选择——更简单的运维、更快的恢复、更强的扩展性、更一致的元数据。这不是一个需要纠结的决定。
如果你的团队还在犹豫要不要迁移,我的建议很简单:开个测试环境,照着本文的步骤走一遍。 实际跑一遍,比看十篇文章都管用。