编程 消息队列与任务调度:从内存队列到生产级架构的实战指南

2026-06-27 13:44:00 +0800 CST views 5

消息队列与任务调度:从内存队列到生产级架构的实战指南

消息队列和任务调度系统的核心价值是解耦和削峰。引入它们不是为了炫技,而是为了解决实际问题:异步处理耗时操作、解耦服务间依赖、平滑流量高峰。本文从最简单的内存队列开始,带你一步步演进到生产级消息架构,涵盖 Redis 队列、RabbitMQ、Kafka 三种方案,配合完整代码实战和性能调优。


一、为什么需要消息队列?

1.1 三个经典场景

场景一:用户注册后的异步通知

# ❌ 同步处理:用户等待 5 秒才能看到注册成功
def register_user(email, password):
    user = create_user(email, password)
    send_welcome_email(email)        # 2秒
    send_sms_notification(email)     # 1秒
    sync_to_crm_system(email)        # 2秒
    return {"status": "ok"}

用户点击注册后,必须等所有通知发送完毕才能看到成功页面。如果 CRM 系统挂了,注册流程直接失败。

# ✅ 异步处理:用户立即得到响应
def register_user(email, password):
    user = create_user(email, password)
    queue.push({
        "type": "welcome_email",
        "email": email
    })
    queue.push({
        "type": "sms_notification",
        "email": email
    })
    queue.push({
        "type": "crm_sync",
        "email": email
    })
    return {"status": "ok"}  # 用户立即得到响应

场景二:秒杀活动的流量削峰

秒杀开始瞬间:10万用户同时请求 → 数据库崩溃

解决方案:
用户请求 → 消息队列(缓冲) → 后端按数据库承受能力消费

场景三:服务解耦

订单服务 ──依赖──> 库存服务
         └──依赖──> 支付服务
         └──依赖──> 物流服务

问题:任何一个下游服务挂掉,订单服务受影响

改进:
订单服务 ──发布事件──> 消息队列
库存服务 ←──订阅──┘
支付服务 ←──订阅──┘
物流服务 ←──订阅──┘

1.2 消息队列的核心价值

价值说明
异步处理耗时操作放入队列,主流程快速响应
解耦生产者只管发消息,不关心谁来消费
削峰流量高峰先入队,后端按能力消费
容错消费者挂了,消息不丢,恢复后继续
扩展性加消费者就能提高处理能力

二、从最简单的内存队列开始

2.1 Python queue.Queue 实现

import queue
import threading
import time
from dataclasses import dataclass
from typing import Callable, Any
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class Task:
    """任务定义"""
    task_type: str
    payload: dict
    created_at: float = time.time()


class MemoryTaskQueue:
    """
    内存任务队列(生产环境不推荐)
    
    优点:简单、无依赖、速度快
    缺点:进程重启消息丢失、无法分布式、无持久化
    """
    
    def __init__(self, maxsize: int = 10000):
        self.queue = queue.Queue(maxsize=maxsize)
        self.handlers: dict[str, Callable] = {}
        self.workers: list[threading.Thread] = []
        self.running = False
    
    def register_handler(self, task_type: str, handler: Callable):
        """注册任务处理器"""
        self.handlers[task_type] = handler
        logger.info(f"Registered handler for task type: {task_type}")
    
    def submit(self, task_type: str, payload: dict) -> bool:
        """提交任务"""
        try:
            task = Task(task_type=task_type, payload=payload)
            self.queue.put(task, block=False)
            logger.debug(f"Task submitted: {task_type}")
            return True
        except queue.Full:
            logger.error(f"Queue full, task rejected: {task_type}")
            return False
    
    def _worker(self, worker_id: int):
        """工作线程"""
        logger.info(f"Worker {worker_id} started")
        while self.running:
            try:
                task = self.queue.get(timeout=1.0)
                
                handler = self.handlers.get(task.task_type)
                if handler:
                    try:
                        start = time.time()
                        handler(task.payload)
                        elapsed = time.time() - start
                        logger.info(
                            f"Worker {worker_id} processed {task.task_type} "
                            f"in {elapsed:.2f}s"
                        )
                    except Exception as e:
                        logger.error(
                            f"Handler error for {task.task_type}: {e}",
                            exc_info=True
                        )
                else:
                    logger.warning(f"No handler for task type: {task.task_type}")
                
                self.queue.task_done()
            except queue.Empty:
                continue
        logger.info(f"Worker {worker_id} stopped")
    
    def start(self, num_workers: int = 4):
        """启动消费者"""
        self.running = True
        for i in range(num_workers):
            worker = threading.Thread(
                target=self._worker,
                args=(i,),
                daemon=True
            )
            worker.start()
            self.workers.append(worker)
        logger.info(f"Started {num_workers} workers")
    
    def stop(self):
        """停止消费者"""
        self.running = False
        for worker in self.workers:
            worker.join(timeout=5)
        logger.info("All workers stopped")
    
    def get_stats(self) -> dict:
        """获取队列统计"""
        return {
            "queue_size": self.queue.qsize(),
            "workers": len(self.workers),
            "handlers": list(self.handlers.keys())
        }


# ========== 使用示例 ==========

def handle_send_email(payload: dict):
    """邮件发送处理器"""
    email = payload.get("email")
    subject = payload.get("subject", "No Subject")
    
    # 模拟发送邮件
    time.sleep(0.5)
    logger.info(f"Email sent to {email}: {subject}")


def handle_generate_report(payload: dict):
    """报表生成处理器"""
    report_id = payload.get("report_id")
    
    # 模拟生成报表
    time.sleep(2.0)
    logger.info(f"Report generated: {report_id}")


def main():
    """主函数"""
    task_queue = MemoryTaskQueue(maxsize=1000)
    
    # 注册处理器
    task_queue.register_handler("send_email", handle_send_email)
    task_queue.register_handler("generate_report", handle_generate_report)
    
    # 启动消费者
    task_queue.start(num_workers=3)
    
    # 提交任务
    for i in range(10):
        task_queue.submit("send_email", {
            "email": f"user{i}@example.com",
            "subject": f"Welcome {i}"
        })
    
    task_queue.submit("generate_report", {
        "report_id": "report-001"
    })
    
    # 等待处理完成
    time.sleep(5)
    
    # 查看统计
    print(task_queue.get_stats())
    
    # 停止
    task_queue.stop()


if __name__ == "__main__":
    main()

2.2 内存队列的问题

问题说明影响
消息丢失进程重启,内存数据全部丢失生产环境不可接受
无法分布式只能在单进程内使用无法水平扩展
无持久化重启后无法恢复关键任务无法保证
无确认机制消费失败消息就丢了需要自己实现重试
无监控难以了解队列状态排障困难

2.3 什么时候可以用内存队列?

  • 任务丢失可接受(如日志收集、实时统计)
  • 单机部署,无需扩展
  • 快速原型验证
  • 单元测试和开发环境

生产环境必须使用持久化消息队列!


三、Redis 实现轻量级消息队列

3.1 Redis 队列的优势

  • 持久化(AOF / RDB)
  • 支持发布订阅
  • 延迟队列(Sorted Set)
  • 简单易用
  • 性能高

3.2 基于 List 的简单队列

import redis
import json
import threading
import time
import logging
from typing import Callable, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class RedisTaskQueue:
    """
    基于 Redis List 的任务队列
    
    LPUSH 入队,BRPOP 阻塞出队
    """
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379/0",
        queue_name: str = "tasks"
    ):
        self.redis = redis.from_url(redis_url)
        self.queue_name = queue_name
        self.handlers: dict[str, Callable] = {}
        self.workers: list[threading.Thread] = []
        self.running = False
    
    def register_handler(self, task_type: str, handler: Callable):
        """注册任务处理器"""
        self.handlers[task_type] = handler
    
    def submit(
        self,
        task_type: str,
        payload: dict,
        priority: int = 0
    ) -> bool:
        """
        提交任务
        
        priority: 优先级(数字越大越先执行)
        """
        task = {
            "type": task_type,
            "payload": payload,
            "priority": priority,
            "created_at": time.time(),
            "attempts": 0
        }
        
        try:
            # LPUSH 从左侧入队
            self.redis.lpush(self.queue_name, json.dumps(task))
            logger.debug(f"Task submitted: {task_type}")
            return True
        except Exception as e:
            logger.error(f"Failed to submit task: {e}")
            return False
    
    def _worker(self, worker_id: int):
        """工作线程"""
        logger.info(f"Redis worker {worker_id} started")
        
        while self.running:
            try:
                # BRPOP 阻塞弹出,超时 1 秒
                result = self.redis.brpop(self.queue_name, timeout=1)
                if not result:
                    continue
                
                _, task_json = result
                task = json.loads(task_json)
                task_type = task.get("type")
                payload = task.get("payload", {})
                
                handler = self.handlers.get(task_type)
                if handler:
                    try:
                        start = time.time()
                        handler(payload)
                        elapsed = time.time() - start
                        logger.info(
                            f"Worker {worker_id} processed {task_type} "
                            f"in {elapsed:.2f}s"
                        )
                    except Exception as e:
                        logger.error(
                            f"Handler error for {task_type}: {e}",
                            exc_info=True
                        )
                        
                        # 重试逻辑
                        attempts = task.get("attempts", 0) + 1
                        if attempts < 3:  # 最多重试 3 次
                            task["attempts"] = attempts
                            self.redis.lpush(
                                self.queue_name,
                                json.dumps(task)
                            )
                            logger.info(
                                f"Task {task_type} requeued (attempt {attempts})"
                            )
                else:
                    logger.warning(f"No handler for task type: {task_type}")
                    
            except json.JSONDecodeError as e:
                logger.error(f"Invalid task JSON: {e}")
            except Exception as e:
                logger.error(f"Worker error: {e}", exc_info=True)
        
        logger.info(f"Redis worker {worker_id} stopped")
    
    def start(self, num_workers: int = 4):
        """启动消费者"""
        self.running = True
        for i in range(num_workers):
            worker = threading.Thread(
                target=self._worker,
                args=(i,),
                daemon=True
            )
            worker.start()
            self.workers.append(worker)
        logger.info(f"Started {num_workers} Redis workers")
    
    def stop(self):
        """停止消费者"""
        self.running = False
        for worker in self.workers:
            worker.join(timeout=5)
        logger.info("All Redis workers stopped")
    
    def get_queue_size(self) -> int:
        """获取队列长度"""
        return self.redis.llen(self.queue_name)
    
    def purge(self):
        """清空队列"""
        self.redis.delete(self.queue_name)
        logger.info(f"Queue {self.queue_name} purged")


# ========== 使用示例 ==========

def handle_process_order(payload: dict):
    """订单处理"""
    order_id = payload.get("order_id")
    time.sleep(1.0)  # 模拟处理
    logger.info(f"Order processed: {order_id}")


def main():
    task_queue = RedisTaskQueue(
        redis_url="redis://localhost:6379/0",
        queue_name="orders"
    )
    
    task_queue.register_handler("process_order", handle_process_order)
    task_queue.start(num_workers=2)
    
    # 提交任务
    for i in range(5):
        task_queue.submit("process_order", {"order_id": f"ORD-{i}"})
    
    time.sleep(10)
    print(f"Queue size: {task_queue.get_queue_size()}")
    task_queue.stop()


if __name__ == "__main__":
    main()

3.3 延迟队列:基于 Sorted Set

import redis
import json
import time
import threading
import logging
from typing import Callable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class RedisDelayedQueue:
    """
    基于 Redis Sorted Set 的延迟队列
    
    score = 执行时间戳
    定时扫描 score <= 当前时间 的任务
    """
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379/0",
        queue_name: str = "delayed_tasks"
    ):
        self.redis = redis.from_url(redis_url)
        self.queue_name = queue_name
        self.handlers: dict[str, Callable] = {}
        self.running = False
    
    def register_handler(self, task_type: str, handler: Callable):
        """注册处理器"""
        self.handlers[task_type] = handler
    
    def submit(
        self,
        task_type: str,
        payload: dict,
        delay_seconds: float = 0
    ) -> bool:
        """
        提交延迟任务
        
        delay_seconds: 延迟秒数(0 表示立即执行)
        """
        execute_at = time.time() + delay_seconds
        
        task = {
            "type": task_type,
            "payload": payload,
            "execute_at": execute_at
        }
        
        try:
            # ZADD: score = 执行时间戳
            self.redis.zadd(
                self.queue_name,
                {json.dumps(task): execute_at}
            )
            logger.info(
                f"Delayed task submitted: {task_type}, "
                f"execute in {delay_seconds}s"
            )
            return True
        except Exception as e:
            logger.error(f"Failed to submit delayed task: {e}")
            return False
    
    def _consumer(self):
        """消费者线程"""
        logger.info("Delayed queue consumer started")
        
        while self.running:
            try:
                now = time.time()
                
                # ZPOPMIN: 弹出 score 最小的元素(原子操作)
                # 获取 score <= now 的任务
                results = self.redis.zrangebyscore(
                    self.queue_name,
                    min=0,
                    max=now,
                    start=0,
                    num=10  # 每次最多取 10 个
                )
                
                if not results:
                    time.sleep(0.1)  # 无任务时休眠 100ms
                    continue
                
                for task_json in results:
                    # 从有序集合移除
                    self.redis.zrem(self.queue_name, task_json)
                    
                    task = json.loads(task_json)
                    task_type = task.get("type")
                    payload = task.get("payload", {})
                    
                    handler = self.handlers.get(task_type)
                    if handler:
                        try:
                            start = time.time()
                            handler(payload)
                            elapsed = time.time() - start
                            logger.info(
                                f"Delayed task {task_type} "
                                f"executed in {elapsed:.2f}s"
                            )
                        except Exception as e:
                            logger.error(
                                f"Handler error for {task_type}: {e}",
                                exc_info=True
                            )
                    else:
                        logger.warning(
                            f"No handler for delayed task: {task_type}"
                        )
                        
            except Exception as e:
                logger.error(f"Delayed queue consumer error: {e}")
                time.sleep(1)
        
        logger.info("Delayed queue consumer stopped")
    
    def start(self):
        """启动消费者"""
        self.running = True
        thread = threading.Thread(target=self._consumer, daemon=True)
        thread.start()
        logger.info("Delayed queue started")
    
    def stop(self):
        """停止消费者"""
        self.running = False
    
    def get_pending_count(self) -> int:
        """获取待执行任务数"""
        return self.redis.zcard(self.queue_name)


# ========== 使用示例 ==========

def handle_reminder(payload: dict):
    """提醒任务"""
    user_id = payload.get("user_id")
    message = payload.get("message")
    logger.info(f"Reminder for {user_id}: {message}")


def main():
    queue = RedisDelayedQueue(queue_name="reminders")
    queue.register_handler("reminder", handle_reminder)
    queue.start()
    
    # 提交延迟任务
    queue.submit("reminder", {
        "user_id": "user-001",
        "message": "Meeting in 5 minutes"
    }, delay_seconds=5)
    
    queue.submit("reminder", {
        "user_id": "user-002",
        "message": "Take a break"
    }, delay_seconds=10)
    
    print(f"Pending tasks: {queue.get_pending_count()}")
    
    # 等待执行
    time.sleep(15)
    queue.stop()


if __name__ == "__main__":
    main()

3.4 Redis 队列的优缺点

优点缺点
持久化支持无消息确认机制(需自己实现)
性能高无复杂的路由规则
延迟队列实现简单无死信队列
部署简单无管理界面
支持发布订阅消息顺序不能严格保证

四、RabbitMQ:企业级消息队列

4.1 RabbitMQ 核心概念

Producer → Exchange → Queue → Consumer

Exchange 类型:
- Direct: 精确匹配 routing key
- Topic: 模式匹配 routing key
- Fanout: 广播到所有绑定队列
- Headers: 基于消息头匹配

4.2 完整实战代码

import pika
import json
import time
import logging
from typing import Callable, Optional
from dataclasses import dataclass

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class RabbitMQConfig:
    """RabbitMQ 配置"""
    host: str = "localhost"
    port: int = 5672
    username: str = "guest"
    password: str = "guest"
    virtual_host: str = "/"


class RabbitMQProducer:
    """RabbitMQ 生产者"""
    
    def __init__(self, config: RabbitMQConfig):
        self.config = config
        self.connection: Optional[pika.BlockingConnection] = None
        self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
    
    def connect(self):
        """建立连接"""
        credentials = pika.PlainCredentials(
            self.config.username,
            self.config.password
        )
        parameters = pika.ConnectionParameters(
            host=self.config.host,
            port=self.config.port,
            virtual_host=self.config.virtual_host,
            credentials=credentials
        )
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        logger.info("RabbitMQ producer connected")
    
    def declare_queue(
        self,
        queue_name: str,
        durable: bool = True,
        arguments: Optional[dict] = None
    ):
        """
        声明队列
        
        durable: 是否持久化
        arguments: 额外参数(TTL、最大长度等)
        """
        args = arguments or {}
        self.channel.queue_declare(
            queue=queue_name,
            durable=durable,
            arguments=args
        )
        logger.info(f"Queue declared: {queue_name}")
    
    def declare_exchange(
        self,
        exchange_name: str,
        exchange_type: str = "direct",
        durable: bool = True
    ):
        """声明交换机"""
        self.channel.exchange_declare(
            exchange=exchange_name,
            exchange_type=exchange_type,
            durable=durable
        )
        logger.info(f"Exchange declared: {exchange_name}")
    
    def bind_queue(
        self,
        queue_name: str,
        exchange_name: str,
        routing_key: str = ""
    ):
        """绑定队列到交换机"""
        self.channel.queue_bind(
            queue=queue_name,
            exchange=exchange_name,
            routing_key=routing_key
        )
        logger.info(f"Queue {queue_name} bound to {exchange_name}")
    
    def publish(
        self,
        exchange: str,
        routing_key: str,
        message: dict,
        content_type: str = "application/json",
        delivery_mode: int = 2  # 2 = persistent
    ):
        """
        发布消息
        
        delivery_mode:
            1 = non-persistent
            2 = persistent(持久化)
        """
        properties = pika.BasicProperties(
            content_type=content_type,
            delivery_mode=delivery_mode
        )
        
        self.channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=json.dumps(message),
            properties=properties
        )
        logger.debug(f"Message published to {exchange}/{routing_key}")
    
    def close(self):
        """关闭连接"""
        if self.connection:
            self.connection.close()
        logger.info("RabbitMQ producer disconnected")


class RabbitMQConsumer:
    """RabbitMQ 消费者"""
    
    def __init__(self, config: RabbitMQConfig):
        self.config = config
        self.connection: Optional[pika.BlockingConnection] = None
        self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
        self.handlers: dict[str, Callable] = {}
    
    def connect(self):
        """建立连接"""
        credentials = pika.PlainCredentials(
            self.config.username,
            self.config.password
        )
        parameters = pika.ConnectionParameters(
            host=self.config.host,
            port=self.config.port,
            virtual_host=self.config.virtual_host,
            credentials=credentials
        )
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        logger.info("RabbitMQ consumer connected")
    
    def register_handler(
        self,
        queue_name: str,
        handler: Callable,
        auto_ack: bool = False
    ):
        """注册消息处理器"""
        self.handlers[queue_name] = {
            "handler": handler,
            "auto_ack": auto_ack
        }
    
    def _on_message(self, channel, method, properties, body):
        """消息回调"""
        queue_name = method.routing_key
        
        try:
            message = json.loads(body)
            handler_info = self.handlers.get(queue_name, {}).get("handler")
            
            if handler_info:
                handler_info(message)
                logger.info(f"Message processed from {queue_name}")
            else:
                logger.warning(f"No handler for queue: {queue_name}")
            
            # 手动确认
            if not self.handlers.get(queue_name, {}).get("auto_ack"):
                channel.basic_ack(delivery_tag=method.delivery_tag)
                
        except json.JSONDecodeError as e:
            logger.error(f"Invalid JSON: {e}")
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=False
            )
        except Exception as e:
            logger.error(f"Handler error: {e}", exc_info=True)
            # 重回队列
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=True
            )
    
    def start(self, prefetch_count: int = 10):
        """启动消费"""
        # 设置预取数量(公平分发)
        self.channel.basic_qos(prefetch_count=prefetch_count)
        
        for queue_name in self.handlers:
            self.channel.basic_consume(
                queue=queue_name,
                on_message_callback=self._on_message
            )
            logger.info(f"Consuming from queue: {queue_name}")
        
        logger.info("RabbitMQ consumer started, waiting for messages...")
        self.channel.start_consuming()
    
    def stop(self):
        """停止消费"""
        if self.channel:
            self.channel.stop_consuming()
        if self.connection:
            self.connection.close()
        logger.info("RabbitMQ consumer stopped")


# ========== 使用示例:订单处理系统 ==========

def setup_rabbitmq_infrastructure():
    """设置 RabbitMQ 基础设施"""
    config = RabbitMQConfig()
    producer = RabbitMQProducer(config)
    producer.connect()
    
    # 声明交换机
    producer.declare_exchange("orders", exchange_type="direct")
    producer.declare_exchange("notifications", exchange_type="fanout")
    
    # 声明队列
    producer.declare_queue(
        "order_payment",
        durable=True,
        arguments={
            "x-message-ttl": 86400000,  # 24 小时 TTL
            "x-max-length": 10000       # 最大 1 万条
        }
    )
    
    producer.declare_queue(
        "order_shipping",
        durable=True
    )
    
    producer.declare_queue(
        "email_notifications",
        durable=True
    )
    
    producer.declare_queue(
        "sms_notifications",
        durable=True
    )
    
    # 绑定队列
    producer.bind_queue("order_payment", "orders", "payment")
    producer.bind_queue("order_shipping", "orders", "shipping")
    producer.bind_queue("email_notifications", "notifications")
    producer.bind_queue("sms_notifications", "notifications")
    
    producer.close()
    logger.info("RabbitMQ infrastructure setup complete")


def handle_payment(message: dict):
    """处理支付"""
    order_id = message.get("order_id")
    amount = message.get("amount")
    
    time.sleep(0.5)  # 模拟处理
    logger.info(f"Payment processed: {order_id}, amount: {amount}")


def handle_shipping(message: dict):
    """处理发货"""
    order_id = message.get("order_id")
    address = message.get("address")
    
    time.sleep(1.0)
    logger.info(f"Shipping processed: {order_id}, address: {address}")


def handle_email_notification(message: dict):
    """发送邮件通知"""
    email = message.get("email")
    subject = message.get("subject")
    
    time.sleep(0.3)
    logger.info(f"Email sent: {email} - {subject}")


def run_producer():
    """运行生产者"""
    config = RabbitMQConfig()
    producer = RabbitMQProducer(config)
    producer.connect()
    
    # 发布订单消息
    for i in range(5):
        producer.publish(
            exchange="orders",
            routing_key="payment",
            message={
                "order_id": f"ORD-{i}",
                "amount": 100.0 * (i + 1)
            }
        )
        
        producer.publish(
            exchange="orders",
            routing_key="shipping",
            message={
                "order_id": f"ORD-{i}",
                "address": f"Address {i}"
            }
        )
        
        # 广播通知
        producer.publish(
            exchange="notifications",
            routing_key="",
            message={
                "email": f"user{i}@example.com",
                "subject": f"Order {i} Confirmed"
            }
        )
    
    producer.close()


def run_consumer():
    """运行消费者"""
    config = RabbitMQConfig()
    consumer = RabbitMQConsumer(config)
    consumer.connect()
    
    consumer.register_handler("order_payment", handle_payment)
    consumer.register_handler("order_shipping", handle_shipping)
    consumer.register_handler("email_notifications", handle_email_notification)
    
    try:
        consumer.start(prefetch_count=5)
    except KeyboardInterrupt:
        consumer.stop()


if __name__ == "__main__":
    import sys
    
    if len(sys.argv) < 2:
        print("Usage: python rabbitmq_demo.py [setup|producer|consumer]")
        sys.exit(1)
    
    command = sys.argv[1]
    
    if command == "setup":
        setup_rabbitmq_infrastructure()
    elif command == "producer":
        run_producer()
    elif command == "consumer":
        run_consumer()
    else:
        print(f"Unknown command: {command}")

4.3 RabbitMQ 高级特性

死信队列(Dead Letter Queue)

def setup_dead_letter_queue(producer: RabbitMQProducer):
    """设置死信队列"""
    
    # 声明死信交换机
    producer.declare_exchange("dlx", "direct")
    
    # 声明死信队列
    producer.declare_queue("dead_letter_queue", durable=True)
    producer.bind_queue("dead_letter_queue", "dlx", "dead")
    
    # 声明主队列,关联死信队列
    producer.channel.queue_declare(
        queue="main_queue",
        durable=True,
        arguments={
            "x-dead-letter-exchange": "dlx",
            "x-dead-letter-routing-key": "dead",
            "x-message-ttl": 60000  # 60 秒 TTL
        }
    )

延迟消息(通过死信队列实现)

def setup_delayed_queue(producer: RabbitMQProducer, delay_ms: int = 5000):
    """设置延迟队列"""
    
    # 延迟队列(不设消费者,消息过期后进入目标队列)
    producer.channel.queue_declare(
        queue="delay_queue",
        durable=True,
        arguments={
            "x-dead-letter-exchange": "",
            "x-dead-letter-routing-key": "target_queue",
            "x-message-ttl": delay_ms
        }
    )
    
    # 目标队列
    producer.declare_queue("target_queue", durable=True)


def send_delayed_message(producer: RabbitMQProducer, message: dict):
    """发送延迟消息"""
    producer.publish(
        exchange="",
        routing_key="delay_queue",
        message=message
    )

4.4 RabbitMQ 性能优化

优化项建议
prefetch_count根据消费者处理能力设置,避免消息积压
消息持久化delivery_mode=2,但会降低性能
队列大小设置 x-max-length 限制,避免内存溢出
TTL设置合理的消息过期时间
连接池复用连接,避免频繁创建销毁
批量发布使用事务或发布确认批量发送

五、Kafka:高吞吐消息队列

5.1 Kafka 核心概念

Producer → Topic (Partition 0, Partition 1, ...) → Consumer Group

关键特性:
- 分区:提高并发,保证分区内顺序
- 副本:高可用
- 消费者组:负载均衡
- 持久化:消息保留策略

5.2 Python Kafka 实战

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
from typing import Callable, Optional
from dataclasses import dataclass

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class KafkaConfig:
    """Kafka 配置"""
    bootstrap_servers: str = "localhost:9092"
    group_id: str = "default-group"
    auto_offset_reset: str = "earliest"
    enable_auto_commit: bool = False


class KafkaProducerClient:
    """Kafka 生产者"""
    
    def __init__(self, config: KafkaConfig):
        self.config = config
        self.producer: Optional[KafkaProducer] = None
    
    def connect(self):
        """建立连接"""
        self.producer = KafkaProducer(
            bootstrap_servers=self.config.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # 等待所有副本确认
            retries=3,
            max_in_flight_requests_per_connection=1  # 保证顺序
        )
        logger.info("Kafka producer connected")
    
    def send(
        self,
        topic: str,
        value: dict,
        key: Optional[str] = None,
        partition: Optional[int] = None,
        callback: Optional[Callable] = None
    ):
        """
        发送消息
        
        key: 相同 key 的消息会发送到同一分区
        partition: 指定分区(通常不设置,让 Kafka 自动分配)
        """
        future = self.producer.send(
            topic=topic,
            value=value,
            key=key,
            partition=partition
        )
        
        if callback:
            future.add_callback(callback)
        
        future.add_errback(lambda e: logger.error(f"Send failed: {e}"))
        logger.debug(f"Message sent to topic {topic}")
    
    def flush(self):
        """刷新缓冲区"""
        self.producer.flush()
    
    def close(self):
        """关闭连接"""
        if self.producer:
            self.producer.flush()
            self.producer.close()
        logger.info("Kafka producer disconnected")


class KafkaConsumerClient:
    """Kafka 消费者"""
    
    def __init__(self, config: KafkaConfig):
        self.config = config
        self.consumer: Optional[KafkaConsumer] = None
        self.handlers: dict[str, Callable] = {}
    
    def connect(self):
        """建立连接"""
        self.consumer = KafkaConsumer(
            bootstrap_servers=self.config.bootstrap_servers,
            group_id=self.config.group_id,
            auto_offset_reset=self.config.auto_offset_reset,
            enable_auto_commit=self.config.enable_auto_commit,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None
        )
        logger.info("Kafka consumer connected")
    
    def subscribe(self, topics: list[str]):
        """订阅主题"""
        self.consumer.subscribe(topics)
        logger.info(f"Subscribed to topics: {topics}")
    
    def register_handler(self, topic: str, handler: Callable):
        """注册消息处理器"""
        self.handlers[topic] = handler
    
    def start(self):
        """开始消费"""
        logger.info("Kafka consumer started, waiting for messages...")
        
        try:
            for message in self.consumer:
                topic = message.topic
                handler = self.handlers.get(topic)
                
                if handler:
                    try:
                        handler(message.value, message.key)
                        logger.info(
                            f"Processed message from {topic} "
                            f"[partition={message.partition}, offset={message.offset}]"
                        )
                    except Exception as e:
                        logger.error(
                            f"Handler error for {topic}: {e}",
                            exc_info=True
                        )
                        # 可以选择跳过或提交
                else:
                    logger.warning(f"No handler for topic: {topic}")
                
                # 手动提交偏移量
                self.consumer.commit()
                
        except KeyboardInterrupt:
            logger.info("Consumer interrupted")
        finally:
            self.close()
    
    def close(self):
        """关闭连接"""
        if self.consumer:
            self.consumer.close()
        logger.info("Kafka consumer disconnected")


# ========== 使用示例:日志处理系统 ==========

def handle_order_event(value: dict, key: str):
    """处理订单事件"""
    event_type = value.get("event_type")
    order_id = value.get("order_id")
    
    logger.info(f"Order event: {event_type}, order_id: {order_id}")
    # 处理逻辑...


def handle_user_event(value: dict, key: str):
    """处理用户事件"""
    event_type = value.get("event_type")
    user_id = value.get("user_id")
    
    logger.info(f"User event: {event_type}, user_id: {user_id}")
    # 处理逻辑...


def run_kafka_producer():
    """运行 Kafka 生产者"""
    config = KafkaConfig()
    producer = KafkaProducerClient(config)
    producer.connect()
    
    # 发送订单事件
    for i in range(10):
        producer.send(
            topic="order_events",
            key=f"order-{i}",
            value={
                "event_type": "created",
                "order_id": f"ORD-{i}",
                "amount": 100.0 * (i + 1),
                "timestamp": "2026-06-27T10:00:00Z"
            }
        )
    
    # 发送用户事件
    for i in range(5):
        producer.send(
            topic="user_events",
            key=f"user-{i}",
            value={
                "event_type": "registered",
                "user_id": f"USR-{i}",
                "email": f"user{i}@example.com"
            }
        )
    
    producer.flush()
    producer.close()


def run_kafka_consumer():
    """运行 Kafka 消费者"""
    config = KafkaConfig(group_id="order-processor")
    consumer = KafkaConsumerClient(config)
    consumer.connect()
    
    consumer.subscribe(["order_events", "user_events"])
    consumer.register_handler("order_events", handle_order_event)
    consumer.register_handler("user_events", handle_user_event)
    
    consumer.start()


if __name__ == "__main__":
    import sys
    
    if len(sys.argv) < 2:
        print("Usage: python kafka_demo.py [producer|consumer]")
        sys.exit(1)
    
    command = sys.argv[1]
    
    if command == "producer":
        run_kafka_producer()
    elif command == "consumer":
        run_kafka_consumer()
    else:
        print(f"Unknown command: {command}")

5.3 Kafka 消费者组与分区

Topic: orders (3 partitions)

Consumer Group A:
  - Consumer 1: Partition 0
  - Consumer 2: Partition 1
  - Consumer 3: Partition 2

如果 Consumer 2 宕机:
  - Consumer 1: Partition 0
  - Consumer 3: Partition 1, Partition 2  (自动重新分配)

分区策略

# 指定 key,Kafka 根据哈希分配分区
producer.send(topic="orders", key="user-123", value={...})

# 同一用户的所有订单事件会进入同一分区,保证顺序

# 自定义分区器
from kafka.partitioner import Partitioner

class UserIdPartitioner(Partitioner):
    """用户 ID 分区器"""
    
    def __call__(self, key, all_partitions, available_partitions):
        if key is None:
            return available_partitions[0]
        
        user_id = key.decode('utf-8')
        partition_index = hash(user_id) % len(all_partitions)
        return all_partitions[partition_index]


# 使用自定义分区器
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    partitioner=UserIdPartitioner()
)

5.4 Kafka 消息保留策略

# 基于时间(默认 7 天)
log.retention.hours=168

# 基于大小
log.retention.bytes=1073741824  # 1GB

# 基于日志段
log.segment.bytes=1073741824
log.segment.ms=604800000  # 7 天

# 紧凑策略(保留最新值)
log.cleanup.policy=compact

六、三种方案对比

6.1 功能对比

特性RedisRabbitMQKafka
持久化AOF/RDB支持支持
消息顺序单队列保证单队列保证分区内保证
消息确认无(需实现)支持偏移量提交
延迟队列支持(ZSet)支持(DLX)不原生支持
事务支持支持支持
路由简单强大简单
流处理不支持不支持支持(Kafka Streams)
管理界面

6.2 性能对比

场景RedisRabbitMQKafka
吞吐量10-50 万/秒5-20 万/秒50-100 万/秒
延迟< 1ms1-10ms5-20ms
资源占用

6.3 适用场景

Redis:
  - 轻量级异步任务
  - 延迟队列
  - 消息量不大(< 10 万/秒)
  - 已有 Redis 基础设施

RabbitMQ:
  - 复杂路由需求
  - 需要消息确认、死信队列
  - 企业级应用
  - 中等吞吐量

Kafka:
  - 大数据流处理
  - 日志收集
  - 高吞吐量(> 10 万/秒)
  - 需要消息回溯

七、生产级最佳实践

7.1 幂等性设计

每个任务处理器必须幂等!

def handle_payment(message: dict):
    """幂等的支付处理器"""
    order_id = message.get("order_id")
    
    # 检查是否已处理
    if redis.get(f"payment:processed:{order_id}"):
        logger.info(f"Order {order_id} already processed, skipping")
        return
    
    try:
        # 处理支付
        process_payment(message)
        
        # 标记为已处理
        redis.setex(
            f"payment:processed:{order_id}",
            86400,  # 24 小时过期
            "1"
        )
    except Exception as e:
        logger.error(f"Payment failed: {e}")
        raise

7.2 降级方案

每个环节都要有降级方案,单点故障不能拖垮整个链路!

class PaymentService:
    """支付服务"""
    
    def process(self, order_id: str) -> bool:
        try:
            # 主流程
            return self._process_via_stripe(order_id)
        except Exception as e:
            logger.error(f"Stripe failed: {e}")
            
            try:
                # 降级方案 1:备用支付渠道
                return self._process_via_paypal(order_id)
            except Exception as e:
                logger.error(f"PayPal failed: {e}")
                
                # 降级方案 2:记录日志,稍后重试
                self._save_to_retry_queue(order_id)
                return False

7.3 监控指标

消息积压和消费延迟是必须关注的指标!

from prometheus_client import Counter, Gauge, Histogram

# 消息计数
messages_total = Counter(
    'queue_messages_total',
    'Total messages processed',
    ['queue', 'status']
)

# 队列长度
queue_size = Gauge(
    'queue_size',
    'Current queue size',
    ['queue']
)

# 处理时间
processing_time = Histogram(
    'message_processing_seconds',
    'Time spent processing messages',
    ['queue']
)


def monitor_queue_size(redis_client, queue_name: str):
    """监控队列长度"""
    size = redis_client.llen(queue_name)
    queue_size.labels(queue=queue_name).set(size)
    
    if size > 10000:
        logger.warning(f"Queue {queue_name} backlog: {size}")
    
    return size

7.4 架构演进原则

阶段一:验证业务逻辑
  ↓
内存队列(开发/测试)

阶段二:小规模生产
  ↓
Redis 队列

阶段三:业务增长
  ↓
RabbitMQ / Kafka

原则:
1. 从最简单的方案开始
2. 验证业务逻辑正确后再迁移
3. 每一步都有明确的收益
4. 别一上来就搞分布式消息集群

八、总结

消息队列不是银弹,引入它们是为了解决实际问题:

  1. 异步处理:耗时操作放入队列,主流程快速响应
  2. 解耦:生产者和消费者通过队列解耦
  3. 削峰:流量高峰先入队,后端按能力消费
  4. 容错:消费者挂了,消息不丢,恢复后继续

选择建议

  • Redis:轻量级、已有基础设施、消息量不大
  • RabbitMQ:企业级、复杂路由、消息确认
  • Kafka:大数据、高吞吐、流处理

记住三个铁律

  1. 每个任务处理器必须幂等
  2. 每个环节都要有降级方案
  3. 监控先行,消息积压和消费延迟是必须关注的指标

从最简单的内存队列开始,验证业务逻辑正确后再迁移到 Redis 或 Kafka。架构演进应该是渐进式的,每一步都有明确的收益。


参考资料

  • Redis 官方文档:https://redis.io/docs/
  • RabbitMQ 官方文档:https://www.rabbitmq.com/docs
  • Kafka 官方文档:https://kafka.apache.org/documentation/
  • Python Redis 客户端:https://github.com/redis/redis-py
  • Python Kafka 客户端:https://github.com/dpkp/kafka-python

推荐文章

api接口怎么对接
2024-11-19 09:42:47 +0800 CST
一个有趣的进度条
2024-11-19 09:56:04 +0800 CST
JavaScript中的常用浏览器API
2024-11-18 23:23:16 +0800 CST
如何开发易支付插件功能
2024-11-19 08:36:25 +0800 CST
程序员茄子在线接单