消息队列与任务调度:从内存队列到生产级架构的实战指南
消息队列和任务调度系统的核心价值是解耦和削峰。引入它们不是为了炫技,而是为了解决实际问题:异步处理耗时操作、解耦服务间依赖、平滑流量高峰。本文从最简单的内存队列开始,带你一步步演进到生产级消息架构,涵盖 Redis 队列、RabbitMQ、Kafka 三种方案,配合完整代码实战和性能调优。
一、为什么需要消息队列?
1.1 三个经典场景
场景一:用户注册后的异步通知
# ❌ 同步处理:用户等待 5 秒才能看到注册成功
def register_user(email, password):
user = create_user(email, password)
send_welcome_email(email) # 2秒
send_sms_notification(email) # 1秒
sync_to_crm_system(email) # 2秒
return {"status": "ok"}
用户点击注册后,必须等所有通知发送完毕才能看到成功页面。如果 CRM 系统挂了,注册流程直接失败。
# ✅ 异步处理:用户立即得到响应
def register_user(email, password):
user = create_user(email, password)
queue.push({
"type": "welcome_email",
"email": email
})
queue.push({
"type": "sms_notification",
"email": email
})
queue.push({
"type": "crm_sync",
"email": email
})
return {"status": "ok"} # 用户立即得到响应
场景二:秒杀活动的流量削峰
秒杀开始瞬间:10万用户同时请求 → 数据库崩溃
解决方案:
用户请求 → 消息队列(缓冲) → 后端按数据库承受能力消费
场景三:服务解耦
订单服务 ──依赖──> 库存服务
└──依赖──> 支付服务
└──依赖──> 物流服务
问题:任何一个下游服务挂掉,订单服务受影响
改进:
订单服务 ──发布事件──> 消息队列
库存服务 ←──订阅──┘
支付服务 ←──订阅──┘
物流服务 ←──订阅──┘
1.2 消息队列的核心价值
| 价值 | 说明 |
|---|---|
| 异步处理 | 耗时操作放入队列,主流程快速响应 |
| 解耦 | 生产者只管发消息,不关心谁来消费 |
| 削峰 | 流量高峰先入队,后端按能力消费 |
| 容错 | 消费者挂了,消息不丢,恢复后继续 |
| 扩展性 | 加消费者就能提高处理能力 |
二、从最简单的内存队列开始
2.1 Python queue.Queue 实现
import queue
import threading
import time
from dataclasses import dataclass
from typing import Callable, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Task:
"""任务定义"""
task_type: str
payload: dict
created_at: float = time.time()
class MemoryTaskQueue:
"""
内存任务队列(生产环境不推荐)
优点:简单、无依赖、速度快
缺点:进程重启消息丢失、无法分布式、无持久化
"""
def __init__(self, maxsize: int = 10000):
self.queue = queue.Queue(maxsize=maxsize)
self.handlers: dict[str, Callable] = {}
self.workers: list[threading.Thread] = []
self.running = False
def register_handler(self, task_type: str, handler: Callable):
"""注册任务处理器"""
self.handlers[task_type] = handler
logger.info(f"Registered handler for task type: {task_type}")
def submit(self, task_type: str, payload: dict) -> bool:
"""提交任务"""
try:
task = Task(task_type=task_type, payload=payload)
self.queue.put(task, block=False)
logger.debug(f"Task submitted: {task_type}")
return True
except queue.Full:
logger.error(f"Queue full, task rejected: {task_type}")
return False
def _worker(self, worker_id: int):
"""工作线程"""
logger.info(f"Worker {worker_id} started")
while self.running:
try:
task = self.queue.get(timeout=1.0)
handler = self.handlers.get(task.task_type)
if handler:
try:
start = time.time()
handler(task.payload)
elapsed = time.time() - start
logger.info(
f"Worker {worker_id} processed {task.task_type} "
f"in {elapsed:.2f}s"
)
except Exception as e:
logger.error(
f"Handler error for {task.task_type}: {e}",
exc_info=True
)
else:
logger.warning(f"No handler for task type: {task.task_type}")
self.queue.task_done()
except queue.Empty:
continue
logger.info(f"Worker {worker_id} stopped")
def start(self, num_workers: int = 4):
"""启动消费者"""
self.running = True
for i in range(num_workers):
worker = threading.Thread(
target=self._worker,
args=(i,),
daemon=True
)
worker.start()
self.workers.append(worker)
logger.info(f"Started {num_workers} workers")
def stop(self):
"""停止消费者"""
self.running = False
for worker in self.workers:
worker.join(timeout=5)
logger.info("All workers stopped")
def get_stats(self) -> dict:
"""获取队列统计"""
return {
"queue_size": self.queue.qsize(),
"workers": len(self.workers),
"handlers": list(self.handlers.keys())
}
# ========== 使用示例 ==========
def handle_send_email(payload: dict):
"""邮件发送处理器"""
email = payload.get("email")
subject = payload.get("subject", "No Subject")
# 模拟发送邮件
time.sleep(0.5)
logger.info(f"Email sent to {email}: {subject}")
def handle_generate_report(payload: dict):
"""报表生成处理器"""
report_id = payload.get("report_id")
# 模拟生成报表
time.sleep(2.0)
logger.info(f"Report generated: {report_id}")
def main():
"""主函数"""
task_queue = MemoryTaskQueue(maxsize=1000)
# 注册处理器
task_queue.register_handler("send_email", handle_send_email)
task_queue.register_handler("generate_report", handle_generate_report)
# 启动消费者
task_queue.start(num_workers=3)
# 提交任务
for i in range(10):
task_queue.submit("send_email", {
"email": f"user{i}@example.com",
"subject": f"Welcome {i}"
})
task_queue.submit("generate_report", {
"report_id": "report-001"
})
# 等待处理完成
time.sleep(5)
# 查看统计
print(task_queue.get_stats())
# 停止
task_queue.stop()
if __name__ == "__main__":
main()
2.2 内存队列的问题
| 问题 | 说明 | 影响 |
|---|---|---|
| 消息丢失 | 进程重启,内存数据全部丢失 | 生产环境不可接受 |
| 无法分布式 | 只能在单进程内使用 | 无法水平扩展 |
| 无持久化 | 重启后无法恢复 | 关键任务无法保证 |
| 无确认机制 | 消费失败消息就丢了 | 需要自己实现重试 |
| 无监控 | 难以了解队列状态 | 排障困难 |
2.3 什么时候可以用内存队列?
- 任务丢失可接受(如日志收集、实时统计)
- 单机部署,无需扩展
- 快速原型验证
- 单元测试和开发环境
生产环境必须使用持久化消息队列!
三、Redis 实现轻量级消息队列
3.1 Redis 队列的优势
- 持久化(AOF / RDB)
- 支持发布订阅
- 延迟队列(Sorted Set)
- 简单易用
- 性能高
3.2 基于 List 的简单队列
import redis
import json
import threading
import time
import logging
from typing import Callable, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RedisTaskQueue:
"""
基于 Redis List 的任务队列
LPUSH 入队,BRPOP 阻塞出队
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379/0",
queue_name: str = "tasks"
):
self.redis = redis.from_url(redis_url)
self.queue_name = queue_name
self.handlers: dict[str, Callable] = {}
self.workers: list[threading.Thread] = []
self.running = False
def register_handler(self, task_type: str, handler: Callable):
"""注册任务处理器"""
self.handlers[task_type] = handler
def submit(
self,
task_type: str,
payload: dict,
priority: int = 0
) -> bool:
"""
提交任务
priority: 优先级(数字越大越先执行)
"""
task = {
"type": task_type,
"payload": payload,
"priority": priority,
"created_at": time.time(),
"attempts": 0
}
try:
# LPUSH 从左侧入队
self.redis.lpush(self.queue_name, json.dumps(task))
logger.debug(f"Task submitted: {task_type}")
return True
except Exception as e:
logger.error(f"Failed to submit task: {e}")
return False
def _worker(self, worker_id: int):
"""工作线程"""
logger.info(f"Redis worker {worker_id} started")
while self.running:
try:
# BRPOP 阻塞弹出,超时 1 秒
result = self.redis.brpop(self.queue_name, timeout=1)
if not result:
continue
_, task_json = result
task = json.loads(task_json)
task_type = task.get("type")
payload = task.get("payload", {})
handler = self.handlers.get(task_type)
if handler:
try:
start = time.time()
handler(payload)
elapsed = time.time() - start
logger.info(
f"Worker {worker_id} processed {task_type} "
f"in {elapsed:.2f}s"
)
except Exception as e:
logger.error(
f"Handler error for {task_type}: {e}",
exc_info=True
)
# 重试逻辑
attempts = task.get("attempts", 0) + 1
if attempts < 3: # 最多重试 3 次
task["attempts"] = attempts
self.redis.lpush(
self.queue_name,
json.dumps(task)
)
logger.info(
f"Task {task_type} requeued (attempt {attempts})"
)
else:
logger.warning(f"No handler for task type: {task_type}")
except json.JSONDecodeError as e:
logger.error(f"Invalid task JSON: {e}")
except Exception as e:
logger.error(f"Worker error: {e}", exc_info=True)
logger.info(f"Redis worker {worker_id} stopped")
def start(self, num_workers: int = 4):
"""启动消费者"""
self.running = True
for i in range(num_workers):
worker = threading.Thread(
target=self._worker,
args=(i,),
daemon=True
)
worker.start()
self.workers.append(worker)
logger.info(f"Started {num_workers} Redis workers")
def stop(self):
"""停止消费者"""
self.running = False
for worker in self.workers:
worker.join(timeout=5)
logger.info("All Redis workers stopped")
def get_queue_size(self) -> int:
"""获取队列长度"""
return self.redis.llen(self.queue_name)
def purge(self):
"""清空队列"""
self.redis.delete(self.queue_name)
logger.info(f"Queue {self.queue_name} purged")
# ========== 使用示例 ==========
def handle_process_order(payload: dict):
"""订单处理"""
order_id = payload.get("order_id")
time.sleep(1.0) # 模拟处理
logger.info(f"Order processed: {order_id}")
def main():
task_queue = RedisTaskQueue(
redis_url="redis://localhost:6379/0",
queue_name="orders"
)
task_queue.register_handler("process_order", handle_process_order)
task_queue.start(num_workers=2)
# 提交任务
for i in range(5):
task_queue.submit("process_order", {"order_id": f"ORD-{i}"})
time.sleep(10)
print(f"Queue size: {task_queue.get_queue_size()}")
task_queue.stop()
if __name__ == "__main__":
main()
3.3 延迟队列:基于 Sorted Set
import redis
import json
import time
import threading
import logging
from typing import Callable
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RedisDelayedQueue:
"""
基于 Redis Sorted Set 的延迟队列
score = 执行时间戳
定时扫描 score <= 当前时间 的任务
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379/0",
queue_name: str = "delayed_tasks"
):
self.redis = redis.from_url(redis_url)
self.queue_name = queue_name
self.handlers: dict[str, Callable] = {}
self.running = False
def register_handler(self, task_type: str, handler: Callable):
"""注册处理器"""
self.handlers[task_type] = handler
def submit(
self,
task_type: str,
payload: dict,
delay_seconds: float = 0
) -> bool:
"""
提交延迟任务
delay_seconds: 延迟秒数(0 表示立即执行)
"""
execute_at = time.time() + delay_seconds
task = {
"type": task_type,
"payload": payload,
"execute_at": execute_at
}
try:
# ZADD: score = 执行时间戳
self.redis.zadd(
self.queue_name,
{json.dumps(task): execute_at}
)
logger.info(
f"Delayed task submitted: {task_type}, "
f"execute in {delay_seconds}s"
)
return True
except Exception as e:
logger.error(f"Failed to submit delayed task: {e}")
return False
def _consumer(self):
"""消费者线程"""
logger.info("Delayed queue consumer started")
while self.running:
try:
now = time.time()
# ZPOPMIN: 弹出 score 最小的元素(原子操作)
# 获取 score <= now 的任务
results = self.redis.zrangebyscore(
self.queue_name,
min=0,
max=now,
start=0,
num=10 # 每次最多取 10 个
)
if not results:
time.sleep(0.1) # 无任务时休眠 100ms
continue
for task_json in results:
# 从有序集合移除
self.redis.zrem(self.queue_name, task_json)
task = json.loads(task_json)
task_type = task.get("type")
payload = task.get("payload", {})
handler = self.handlers.get(task_type)
if handler:
try:
start = time.time()
handler(payload)
elapsed = time.time() - start
logger.info(
f"Delayed task {task_type} "
f"executed in {elapsed:.2f}s"
)
except Exception as e:
logger.error(
f"Handler error for {task_type}: {e}",
exc_info=True
)
else:
logger.warning(
f"No handler for delayed task: {task_type}"
)
except Exception as e:
logger.error(f"Delayed queue consumer error: {e}")
time.sleep(1)
logger.info("Delayed queue consumer stopped")
def start(self):
"""启动消费者"""
self.running = True
thread = threading.Thread(target=self._consumer, daemon=True)
thread.start()
logger.info("Delayed queue started")
def stop(self):
"""停止消费者"""
self.running = False
def get_pending_count(self) -> int:
"""获取待执行任务数"""
return self.redis.zcard(self.queue_name)
# ========== 使用示例 ==========
def handle_reminder(payload: dict):
"""提醒任务"""
user_id = payload.get("user_id")
message = payload.get("message")
logger.info(f"Reminder for {user_id}: {message}")
def main():
queue = RedisDelayedQueue(queue_name="reminders")
queue.register_handler("reminder", handle_reminder)
queue.start()
# 提交延迟任务
queue.submit("reminder", {
"user_id": "user-001",
"message": "Meeting in 5 minutes"
}, delay_seconds=5)
queue.submit("reminder", {
"user_id": "user-002",
"message": "Take a break"
}, delay_seconds=10)
print(f"Pending tasks: {queue.get_pending_count()}")
# 等待执行
time.sleep(15)
queue.stop()
if __name__ == "__main__":
main()
3.4 Redis 队列的优缺点
| 优点 | 缺点 |
|---|---|
| 持久化支持 | 无消息确认机制(需自己实现) |
| 性能高 | 无复杂的路由规则 |
| 延迟队列实现简单 | 无死信队列 |
| 部署简单 | 无管理界面 |
| 支持发布订阅 | 消息顺序不能严格保证 |
四、RabbitMQ:企业级消息队列
4.1 RabbitMQ 核心概念
Producer → Exchange → Queue → Consumer
Exchange 类型:
- Direct: 精确匹配 routing key
- Topic: 模式匹配 routing key
- Fanout: 广播到所有绑定队列
- Headers: 基于消息头匹配
4.2 完整实战代码
import pika
import json
import time
import logging
from typing import Callable, Optional
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class RabbitMQConfig:
"""RabbitMQ 配置"""
host: str = "localhost"
port: int = 5672
username: str = "guest"
password: str = "guest"
virtual_host: str = "/"
class RabbitMQProducer:
"""RabbitMQ 生产者"""
def __init__(self, config: RabbitMQConfig):
self.config = config
self.connection: Optional[pika.BlockingConnection] = None
self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
def connect(self):
"""建立连接"""
credentials = pika.PlainCredentials(
self.config.username,
self.config.password
)
parameters = pika.ConnectionParameters(
host=self.config.host,
port=self.config.port,
virtual_host=self.config.virtual_host,
credentials=credentials
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logger.info("RabbitMQ producer connected")
def declare_queue(
self,
queue_name: str,
durable: bool = True,
arguments: Optional[dict] = None
):
"""
声明队列
durable: 是否持久化
arguments: 额外参数(TTL、最大长度等)
"""
args = arguments or {}
self.channel.queue_declare(
queue=queue_name,
durable=durable,
arguments=args
)
logger.info(f"Queue declared: {queue_name}")
def declare_exchange(
self,
exchange_name: str,
exchange_type: str = "direct",
durable: bool = True
):
"""声明交换机"""
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type=exchange_type,
durable=durable
)
logger.info(f"Exchange declared: {exchange_name}")
def bind_queue(
self,
queue_name: str,
exchange_name: str,
routing_key: str = ""
):
"""绑定队列到交换机"""
self.channel.queue_bind(
queue=queue_name,
exchange=exchange_name,
routing_key=routing_key
)
logger.info(f"Queue {queue_name} bound to {exchange_name}")
def publish(
self,
exchange: str,
routing_key: str,
message: dict,
content_type: str = "application/json",
delivery_mode: int = 2 # 2 = persistent
):
"""
发布消息
delivery_mode:
1 = non-persistent
2 = persistent(持久化)
"""
properties = pika.BasicProperties(
content_type=content_type,
delivery_mode=delivery_mode
)
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=properties
)
logger.debug(f"Message published to {exchange}/{routing_key}")
def close(self):
"""关闭连接"""
if self.connection:
self.connection.close()
logger.info("RabbitMQ producer disconnected")
class RabbitMQConsumer:
"""RabbitMQ 消费者"""
def __init__(self, config: RabbitMQConfig):
self.config = config
self.connection: Optional[pika.BlockingConnection] = None
self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
self.handlers: dict[str, Callable] = {}
def connect(self):
"""建立连接"""
credentials = pika.PlainCredentials(
self.config.username,
self.config.password
)
parameters = pika.ConnectionParameters(
host=self.config.host,
port=self.config.port,
virtual_host=self.config.virtual_host,
credentials=credentials
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logger.info("RabbitMQ consumer connected")
def register_handler(
self,
queue_name: str,
handler: Callable,
auto_ack: bool = False
):
"""注册消息处理器"""
self.handlers[queue_name] = {
"handler": handler,
"auto_ack": auto_ack
}
def _on_message(self, channel, method, properties, body):
"""消息回调"""
queue_name = method.routing_key
try:
message = json.loads(body)
handler_info = self.handlers.get(queue_name, {}).get("handler")
if handler_info:
handler_info(message)
logger.info(f"Message processed from {queue_name}")
else:
logger.warning(f"No handler for queue: {queue_name}")
# 手动确认
if not self.handlers.get(queue_name, {}).get("auto_ack"):
channel.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
except Exception as e:
logger.error(f"Handler error: {e}", exc_info=True)
# 重回队列
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
def start(self, prefetch_count: int = 10):
"""启动消费"""
# 设置预取数量(公平分发)
self.channel.basic_qos(prefetch_count=prefetch_count)
for queue_name in self.handlers:
self.channel.basic_consume(
queue=queue_name,
on_message_callback=self._on_message
)
logger.info(f"Consuming from queue: {queue_name}")
logger.info("RabbitMQ consumer started, waiting for messages...")
self.channel.start_consuming()
def stop(self):
"""停止消费"""
if self.channel:
self.channel.stop_consuming()
if self.connection:
self.connection.close()
logger.info("RabbitMQ consumer stopped")
# ========== 使用示例:订单处理系统 ==========
def setup_rabbitmq_infrastructure():
"""设置 RabbitMQ 基础设施"""
config = RabbitMQConfig()
producer = RabbitMQProducer(config)
producer.connect()
# 声明交换机
producer.declare_exchange("orders", exchange_type="direct")
producer.declare_exchange("notifications", exchange_type="fanout")
# 声明队列
producer.declare_queue(
"order_payment",
durable=True,
arguments={
"x-message-ttl": 86400000, # 24 小时 TTL
"x-max-length": 10000 # 最大 1 万条
}
)
producer.declare_queue(
"order_shipping",
durable=True
)
producer.declare_queue(
"email_notifications",
durable=True
)
producer.declare_queue(
"sms_notifications",
durable=True
)
# 绑定队列
producer.bind_queue("order_payment", "orders", "payment")
producer.bind_queue("order_shipping", "orders", "shipping")
producer.bind_queue("email_notifications", "notifications")
producer.bind_queue("sms_notifications", "notifications")
producer.close()
logger.info("RabbitMQ infrastructure setup complete")
def handle_payment(message: dict):
"""处理支付"""
order_id = message.get("order_id")
amount = message.get("amount")
time.sleep(0.5) # 模拟处理
logger.info(f"Payment processed: {order_id}, amount: {amount}")
def handle_shipping(message: dict):
"""处理发货"""
order_id = message.get("order_id")
address = message.get("address")
time.sleep(1.0)
logger.info(f"Shipping processed: {order_id}, address: {address}")
def handle_email_notification(message: dict):
"""发送邮件通知"""
email = message.get("email")
subject = message.get("subject")
time.sleep(0.3)
logger.info(f"Email sent: {email} - {subject}")
def run_producer():
"""运行生产者"""
config = RabbitMQConfig()
producer = RabbitMQProducer(config)
producer.connect()
# 发布订单消息
for i in range(5):
producer.publish(
exchange="orders",
routing_key="payment",
message={
"order_id": f"ORD-{i}",
"amount": 100.0 * (i + 1)
}
)
producer.publish(
exchange="orders",
routing_key="shipping",
message={
"order_id": f"ORD-{i}",
"address": f"Address {i}"
}
)
# 广播通知
producer.publish(
exchange="notifications",
routing_key="",
message={
"email": f"user{i}@example.com",
"subject": f"Order {i} Confirmed"
}
)
producer.close()
def run_consumer():
"""运行消费者"""
config = RabbitMQConfig()
consumer = RabbitMQConsumer(config)
consumer.connect()
consumer.register_handler("order_payment", handle_payment)
consumer.register_handler("order_shipping", handle_shipping)
consumer.register_handler("email_notifications", handle_email_notification)
try:
consumer.start(prefetch_count=5)
except KeyboardInterrupt:
consumer.stop()
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python rabbitmq_demo.py [setup|producer|consumer]")
sys.exit(1)
command = sys.argv[1]
if command == "setup":
setup_rabbitmq_infrastructure()
elif command == "producer":
run_producer()
elif command == "consumer":
run_consumer()
else:
print(f"Unknown command: {command}")
4.3 RabbitMQ 高级特性
死信队列(Dead Letter Queue)
def setup_dead_letter_queue(producer: RabbitMQProducer):
"""设置死信队列"""
# 声明死信交换机
producer.declare_exchange("dlx", "direct")
# 声明死信队列
producer.declare_queue("dead_letter_queue", durable=True)
producer.bind_queue("dead_letter_queue", "dlx", "dead")
# 声明主队列,关联死信队列
producer.channel.queue_declare(
queue="main_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "dead",
"x-message-ttl": 60000 # 60 秒 TTL
}
)
延迟消息(通过死信队列实现)
def setup_delayed_queue(producer: RabbitMQProducer, delay_ms: int = 5000):
"""设置延迟队列"""
# 延迟队列(不设消费者,消息过期后进入目标队列)
producer.channel.queue_declare(
queue="delay_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "target_queue",
"x-message-ttl": delay_ms
}
)
# 目标队列
producer.declare_queue("target_queue", durable=True)
def send_delayed_message(producer: RabbitMQProducer, message: dict):
"""发送延迟消息"""
producer.publish(
exchange="",
routing_key="delay_queue",
message=message
)
4.4 RabbitMQ 性能优化
| 优化项 | 建议 |
|---|---|
| prefetch_count | 根据消费者处理能力设置,避免消息积压 |
| 消息持久化 | delivery_mode=2,但会降低性能 |
| 队列大小 | 设置 x-max-length 限制,避免内存溢出 |
| TTL | 设置合理的消息过期时间 |
| 连接池 | 复用连接,避免频繁创建销毁 |
| 批量发布 | 使用事务或发布确认批量发送 |
五、Kafka:高吞吐消息队列
5.1 Kafka 核心概念
Producer → Topic (Partition 0, Partition 1, ...) → Consumer Group
关键特性:
- 分区:提高并发,保证分区内顺序
- 副本:高可用
- 消费者组:负载均衡
- 持久化:消息保留策略
5.2 Python Kafka 实战
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
from typing import Callable, Optional
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class KafkaConfig:
"""Kafka 配置"""
bootstrap_servers: str = "localhost:9092"
group_id: str = "default-group"
auto_offset_reset: str = "earliest"
enable_auto_commit: bool = False
class KafkaProducerClient:
"""Kafka 生产者"""
def __init__(self, config: KafkaConfig):
self.config = config
self.producer: Optional[KafkaProducer] = None
def connect(self):
"""建立连接"""
self.producer = KafkaProducer(
bootstrap_servers=self.config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # 等待所有副本确认
retries=3,
max_in_flight_requests_per_connection=1 # 保证顺序
)
logger.info("Kafka producer connected")
def send(
self,
topic: str,
value: dict,
key: Optional[str] = None,
partition: Optional[int] = None,
callback: Optional[Callable] = None
):
"""
发送消息
key: 相同 key 的消息会发送到同一分区
partition: 指定分区(通常不设置,让 Kafka 自动分配)
"""
future = self.producer.send(
topic=topic,
value=value,
key=key,
partition=partition
)
if callback:
future.add_callback(callback)
future.add_errback(lambda e: logger.error(f"Send failed: {e}"))
logger.debug(f"Message sent to topic {topic}")
def flush(self):
"""刷新缓冲区"""
self.producer.flush()
def close(self):
"""关闭连接"""
if self.producer:
self.producer.flush()
self.producer.close()
logger.info("Kafka producer disconnected")
class KafkaConsumerClient:
"""Kafka 消费者"""
def __init__(self, config: KafkaConfig):
self.config = config
self.consumer: Optional[KafkaConsumer] = None
self.handlers: dict[str, Callable] = {}
def connect(self):
"""建立连接"""
self.consumer = KafkaConsumer(
bootstrap_servers=self.config.bootstrap_servers,
group_id=self.config.group_id,
auto_offset_reset=self.config.auto_offset_reset,
enable_auto_commit=self.config.enable_auto_commit,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None
)
logger.info("Kafka consumer connected")
def subscribe(self, topics: list[str]):
"""订阅主题"""
self.consumer.subscribe(topics)
logger.info(f"Subscribed to topics: {topics}")
def register_handler(self, topic: str, handler: Callable):
"""注册消息处理器"""
self.handlers[topic] = handler
def start(self):
"""开始消费"""
logger.info("Kafka consumer started, waiting for messages...")
try:
for message in self.consumer:
topic = message.topic
handler = self.handlers.get(topic)
if handler:
try:
handler(message.value, message.key)
logger.info(
f"Processed message from {topic} "
f"[partition={message.partition}, offset={message.offset}]"
)
except Exception as e:
logger.error(
f"Handler error for {topic}: {e}",
exc_info=True
)
# 可以选择跳过或提交
else:
logger.warning(f"No handler for topic: {topic}")
# 手动提交偏移量
self.consumer.commit()
except KeyboardInterrupt:
logger.info("Consumer interrupted")
finally:
self.close()
def close(self):
"""关闭连接"""
if self.consumer:
self.consumer.close()
logger.info("Kafka consumer disconnected")
# ========== 使用示例:日志处理系统 ==========
def handle_order_event(value: dict, key: str):
"""处理订单事件"""
event_type = value.get("event_type")
order_id = value.get("order_id")
logger.info(f"Order event: {event_type}, order_id: {order_id}")
# 处理逻辑...
def handle_user_event(value: dict, key: str):
"""处理用户事件"""
event_type = value.get("event_type")
user_id = value.get("user_id")
logger.info(f"User event: {event_type}, user_id: {user_id}")
# 处理逻辑...
def run_kafka_producer():
"""运行 Kafka 生产者"""
config = KafkaConfig()
producer = KafkaProducerClient(config)
producer.connect()
# 发送订单事件
for i in range(10):
producer.send(
topic="order_events",
key=f"order-{i}",
value={
"event_type": "created",
"order_id": f"ORD-{i}",
"amount": 100.0 * (i + 1),
"timestamp": "2026-06-27T10:00:00Z"
}
)
# 发送用户事件
for i in range(5):
producer.send(
topic="user_events",
key=f"user-{i}",
value={
"event_type": "registered",
"user_id": f"USR-{i}",
"email": f"user{i}@example.com"
}
)
producer.flush()
producer.close()
def run_kafka_consumer():
"""运行 Kafka 消费者"""
config = KafkaConfig(group_id="order-processor")
consumer = KafkaConsumerClient(config)
consumer.connect()
consumer.subscribe(["order_events", "user_events"])
consumer.register_handler("order_events", handle_order_event)
consumer.register_handler("user_events", handle_user_event)
consumer.start()
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python kafka_demo.py [producer|consumer]")
sys.exit(1)
command = sys.argv[1]
if command == "producer":
run_kafka_producer()
elif command == "consumer":
run_kafka_consumer()
else:
print(f"Unknown command: {command}")
5.3 Kafka 消费者组与分区
Topic: orders (3 partitions)
Consumer Group A:
- Consumer 1: Partition 0
- Consumer 2: Partition 1
- Consumer 3: Partition 2
如果 Consumer 2 宕机:
- Consumer 1: Partition 0
- Consumer 3: Partition 1, Partition 2 (自动重新分配)
分区策略
# 指定 key,Kafka 根据哈希分配分区
producer.send(topic="orders", key="user-123", value={...})
# 同一用户的所有订单事件会进入同一分区,保证顺序
# 自定义分区器
from kafka.partitioner import Partitioner
class UserIdPartitioner(Partitioner):
"""用户 ID 分区器"""
def __call__(self, key, all_partitions, available_partitions):
if key is None:
return available_partitions[0]
user_id = key.decode('utf-8')
partition_index = hash(user_id) % len(all_partitions)
return all_partitions[partition_index]
# 使用自定义分区器
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
partitioner=UserIdPartitioner()
)
5.4 Kafka 消息保留策略
# 基于时间(默认 7 天)
log.retention.hours=168
# 基于大小
log.retention.bytes=1073741824 # 1GB
# 基于日志段
log.segment.bytes=1073741824
log.segment.ms=604800000 # 7 天
# 紧凑策略(保留最新值)
log.cleanup.policy=compact
六、三种方案对比
6.1 功能对比
| 特性 | Redis | RabbitMQ | Kafka |
|---|---|---|---|
| 持久化 | AOF/RDB | 支持 | 支持 |
| 消息顺序 | 单队列保证 | 单队列保证 | 分区内保证 |
| 消息确认 | 无(需实现) | 支持 | 偏移量提交 |
| 延迟队列 | 支持(ZSet) | 支持(DLX) | 不原生支持 |
| 事务 | 支持 | 支持 | 支持 |
| 路由 | 简单 | 强大 | 简单 |
| 流处理 | 不支持 | 不支持 | 支持(Kafka Streams) |
| 管理界面 | 无 | 有 | 有 |
6.2 性能对比
| 场景 | Redis | RabbitMQ | Kafka |
|---|---|---|---|
| 吞吐量 | 10-50 万/秒 | 5-20 万/秒 | 50-100 万/秒 |
| 延迟 | < 1ms | 1-10ms | 5-20ms |
| 资源占用 | 低 | 中 | 高 |
6.3 适用场景
Redis:
- 轻量级异步任务
- 延迟队列
- 消息量不大(< 10 万/秒)
- 已有 Redis 基础设施
RabbitMQ:
- 复杂路由需求
- 需要消息确认、死信队列
- 企业级应用
- 中等吞吐量
Kafka:
- 大数据流处理
- 日志收集
- 高吞吐量(> 10 万/秒)
- 需要消息回溯
七、生产级最佳实践
7.1 幂等性设计
每个任务处理器必须幂等!
def handle_payment(message: dict):
"""幂等的支付处理器"""
order_id = message.get("order_id")
# 检查是否已处理
if redis.get(f"payment:processed:{order_id}"):
logger.info(f"Order {order_id} already processed, skipping")
return
try:
# 处理支付
process_payment(message)
# 标记为已处理
redis.setex(
f"payment:processed:{order_id}",
86400, # 24 小时过期
"1"
)
except Exception as e:
logger.error(f"Payment failed: {e}")
raise
7.2 降级方案
每个环节都要有降级方案,单点故障不能拖垮整个链路!
class PaymentService:
"""支付服务"""
def process(self, order_id: str) -> bool:
try:
# 主流程
return self._process_via_stripe(order_id)
except Exception as e:
logger.error(f"Stripe failed: {e}")
try:
# 降级方案 1:备用支付渠道
return self._process_via_paypal(order_id)
except Exception as e:
logger.error(f"PayPal failed: {e}")
# 降级方案 2:记录日志,稍后重试
self._save_to_retry_queue(order_id)
return False
7.3 监控指标
消息积压和消费延迟是必须关注的指标!
from prometheus_client import Counter, Gauge, Histogram
# 消息计数
messages_total = Counter(
'queue_messages_total',
'Total messages processed',
['queue', 'status']
)
# 队列长度
queue_size = Gauge(
'queue_size',
'Current queue size',
['queue']
)
# 处理时间
processing_time = Histogram(
'message_processing_seconds',
'Time spent processing messages',
['queue']
)
def monitor_queue_size(redis_client, queue_name: str):
"""监控队列长度"""
size = redis_client.llen(queue_name)
queue_size.labels(queue=queue_name).set(size)
if size > 10000:
logger.warning(f"Queue {queue_name} backlog: {size}")
return size
7.4 架构演进原则
阶段一:验证业务逻辑
↓
内存队列(开发/测试)
阶段二:小规模生产
↓
Redis 队列
阶段三:业务增长
↓
RabbitMQ / Kafka
原则:
1. 从最简单的方案开始
2. 验证业务逻辑正确后再迁移
3. 每一步都有明确的收益
4. 别一上来就搞分布式消息集群
八、总结
消息队列不是银弹,引入它们是为了解决实际问题:
- 异步处理:耗时操作放入队列,主流程快速响应
- 解耦:生产者和消费者通过队列解耦
- 削峰:流量高峰先入队,后端按能力消费
- 容错:消费者挂了,消息不丢,恢复后继续
选择建议:
- Redis:轻量级、已有基础设施、消息量不大
- RabbitMQ:企业级、复杂路由、消息确认
- Kafka:大数据、高吞吐、流处理
记住三个铁律:
- 每个任务处理器必须幂等
- 每个环节都要有降级方案
- 监控先行,消息积压和消费延迟是必须关注的指标
从最简单的内存队列开始,验证业务逻辑正确后再迁移到 Redis 或 Kafka。架构演进应该是渐进式的,每一步都有明确的收益。
参考资料:
- Redis 官方文档:https://redis.io/docs/
- RabbitMQ 官方文档:https://www.rabbitmq.com/docs
- Kafka 官方文档:https://kafka.apache.org/documentation/
- Python Redis 客户端:https://github.com/redis/redis-py
- Python Kafka 客户端:https://github.com/dpkp/kafka-python