mx-rmq


Namemx-rmq JSON
Version 1.0.2 PyPI version JSON
download
home_pageNone
Summary基于Redis的异步消息队列系统
upload_time2025-07-12 10:01:35
maintainerNone
docs_urlNone
authorNone
requires_python>=3.12
licenseMIT
keywords async distributed message-queue redis
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # MX-RMQ 使用指南

[![Python 3.12+](https://img.shields.io/badge/python-3.12+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Redis](https://img.shields.io/badge/redis-5.0+-red.svg)](https://redis.io/)

MX-RMQ 是一个高性能、可靠的基于Redis的分布式消息队列系统,支持普通消息、延时消息、优先级消息,具备完善的监控和重试机制。

## 目录

- [特性概览](#特性概览)
- [快速开始](#快速开始)
- [安装](#安装)
- [基本使用](#基本使用)
- [高级功能](#高级功能)
- [配置参考](#配置参考)
- [API 参考](#api-参考)
- [监控和管理](#监控和管理)
- [部署指南](#部署指南)
- [最佳实践](#最佳实践)
- [故障排除](#故障排除)

## 特性概览

- 🚀 **高性能**: 基于Redis的内存存储,支持10,000+消息/秒的吞吐量
- 🔄 **可靠性**: 原子性Lua脚本操作,保证消息不丢失
- ⏰ **延时消息**: 支持任意时间延迟的消息调度
- 🏷️ **优先级**: 支持高、中、低优先级消息处理
- 🔁 **自动重试**: 可配置的重试机制和指数退避
- 💀 **死信队列**: 失败消息自动进入死信队列,支持人工干预
- 📊 **监控指标**: 实时监控队列状态、处理时间、吞吐率等
- 🛑 **优雅停机**: 支持优雅停机,确保消息处理完成
- 🔧 **易于使用**: 简洁的API设计,开箱即用

## 快速开始

### 30秒快速体验

```python
import asyncio
from mx_rmq import MQConfig, RedisMessageQueue

async def handle_order(payload: dict) -> None:
    """处理订单消息"""
    print(f"处理订单: {payload['order_id']}")
    # 你的业务逻辑
    await asyncio.sleep(1)

async def main():
    # 创建消息队列
    mq = RedisMessageQueue()
    
    # 注册消息处理器
    mq.register("order_created", handle_order)
    
    # 生产消息
    await mq.produce("order_created", {
        "order_id": "ORD_123",
        "user_id": 456,
        "amount": 99.99
    })
    
    # 启动消费者(会阻塞)
    await mq.start_dispatch_consuming()

if __name__ == "__main__":
    asyncio.run(main())
```

## 安装

### 使用 uv (推荐)

```bash
# 添加到现有项目
uv add mx-rmq

# 或者从源码安装
git clone https://github.com/CodingOX/mx-rmq.git
cd mx-rmq
uv sync
```

### 使用 pip

```bash
pip install mx-rmq

# 或从源码安装
pip install git+https://github.com/CodingOX/mx-rmq.git
```

### 系统要求

- Python 3.12+
- Redis 5.0+
- 推荐:Redis 6.0+ (更好的性能)

## 基本使用

### 1. 创建消息队列

```python
from mx_rmq import MQConfig, RedisMessageQueue

# 使用默认配置
mq = RedisMessageQueue()

# 或自定义配置
config = MQConfig(
    redis_url="redis://localhost:6379",
    max_workers=10,
    task_queue_size=20
)
mq = RedisMessageQueue(config)
```

### 2. 注册消息处理器

```python
# 方式1: 使用装饰器
@mq.register("user_registration")
async def handle_user_registration(payload: dict) -> None:
    user_id = payload['user_id']
    email = payload['email']
    print(f"欢迎新用户: {user_id} ({email})")

# 方式2: 直接注册
async def handle_payment(payload: dict) -> None:
    print(f"处理支付: {payload}")

mq.register("payment_completed", handle_payment)
```

### 3. 生产消息

```python
# 生产普通消息
message_id = await mq.produce("user_registration", {
    "user_id": 12345,
    "email": "user@example.com",
    "timestamp": "2024-01-01T00:00:00Z"
})

print(f"消息已发送: {message_id}")
```

### 4. 启动消费者

```python
# 启动消费者(会阻塞,直到收到停机信号)
await mq.start_dispatch_consuming()
```

## 高级功能

### 延时消息

```python
# 5分钟后发送提醒
await mq.produce(
    topic="send_reminder",
    payload={"user_id": 123, "type": "payment_due"},
    delay=300  # 300秒后执行
)

# 1小时后发送邮件
await mq.produce(
    topic="send_email",
    payload={
        "to": "user@example.com",
        "subject": "订单确认",
        "body": "感谢您的订单..."
    },
    delay=3600  # 1小时后执行
)
```

### 优先级消息

```python
from mx_rmq import MessagePriority

# 高优先级消息(优先处理)
await mq.produce(
    topic="system_alert",
    payload={"level": "critical", "message": "系统告警"},
    priority=MessagePriority.HIGH
)

# 普通优先级(默认)
await mq.produce(
    topic="user_activity",
    payload={"user_id": 123, "action": "login"},
    priority=MessagePriority.NORMAL
)

# 低优先级消息(最后处理)
await mq.produce(
    topic="analytics_data",
    payload={"event": "page_view", "page": "/home"},
    priority=MessagePriority.LOW
)
```

### 自定义重试配置

```python
config = MQConfig(
    redis_url="redis://localhost:6379",
    max_retries=5,  # 最大重试5次
    retry_delays=[30, 60, 300, 900, 1800],  # 重试间隔:30s, 1m, 5m, 15m, 30m
    processing_timeout=300,  # 5分钟处理超时
)

mq = RedisMessageQueue(config)
```

### 消息生存时间(TTL)

```python
# 设置消息1小时后过期
await mq.produce(
    topic="temp_notification",
    payload={"message": "临时通知"},
    ttl=3600  # 1小时后过期
)
```

### 批量生产消息

```python
# 批量发送多个消息
messages = [
    {"topic": "order_created", "payload": {"order_id": f"ORD_{i}"}}
    for i in range(100)
]

for msg in messages:
    await mq.produce(msg["topic"], msg["payload"])
```

## 配置参考

### MQConfig 完整参数

```python
from mx_rmq import MQConfig

config = MQConfig(
    # Redis 连接配置
    redis_url="redis://localhost:6379",      # Redis连接URL
    redis_db=0,                              # Redis数据库编号 (0-15)
    redis_password=None,                     # Redis密码
    queue_prefix="",                         # 队列前缀,用于多环境隔离
    connection_pool_size=20,                 # 连接池大小
    
    # 消费者配置
    max_workers=5,                           # 最大工作协程数
    task_queue_size=8,                       # 本地任务队列大小
    
    # 消息生命周期配置
    message_ttl=86400,                       # 消息TTL(秒),默认24小时
    processing_timeout=180,                  # 消息处理超时(秒),默认3分钟
    
    # 重试配置
    max_retries=3,                           # 最大重试次数
    retry_delays=[60, 300, 1800],           # 重试延迟间隔(秒)
    
    # 死信队列配置
    enable_dead_letter=True,                 # 是否启用死信队列
    
    # 监控配置
    monitor_interval=30,                     # 监控检查间隔(秒)
    expired_check_interval=10,               # 过期消息检查间隔(秒)
    processing_monitor_interval=30,          # Processing队列监控间隔(秒)
    batch_size=100,                          # 批处理大小
)
```

### 环境变量配置

支持通过环境变量配置:

```bash
export REDIS_URL="redis://localhost:6379"
export REDIS_PASSWORD="your_password"
export MQ_MAX_WORKERS=10
export MQ_TASK_QUEUE_SIZE=20
export MQ_MESSAGE_TTL=86400
```

```python
import os
from mx_rmq import MQConfig

config = MQConfig(
    redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
    redis_password=os.getenv("REDIS_PASSWORD"),
    max_workers=int(os.getenv("MQ_MAX_WORKERS", "5")),
    task_queue_size=int(os.getenv("MQ_TASK_QUEUE_SIZE", "8")),
    message_ttl=int(os.getenv("MQ_MESSAGE_TTL", "86400")),
)
```

## API 参考

### RedisMessageQueue 类

#### 初始化

```python
def __init__(self, config: MQConfig | None = None) -> None:
    """
    初始化消息队列
    
    Args:
        config: 消息队列配置,如为None则使用默认配置
    """
```

#### 核心方法

```python
async def produce(
    self,
    topic: str,
    payload: dict[str, Any],
    delay: int = 0,
    priority: MessagePriority = MessagePriority.NORMAL,
    ttl: int | None = None,
    message_id: str | None = None,
) -> str:
    """
    生产消息
    
    Args:
        topic: 主题名称
        payload: 消息负载(必须是可JSON序列化的字典)
        delay: 延迟执行时间(秒),0表示立即执行
        priority: 消息优先级
        ttl: 消息生存时间(秒),None使用配置默认值
        message_id: 消息ID,None则自动生成UUID
        
    Returns:
        消息ID(字符串)
        
    Raises:
        ValueError: 参数验证失败
        RedisError: Redis操作失败
    """

def register(self, topic: str, handler: Callable) -> None:
    """
    注册消息处理器
    
    Args:
        topic: 主题名称
        handler: 处理函数,必须是async函数,接受一个dict参数
        
    Raises:
        ValueError: 处理器不是可调用对象
    """

async def start_dispatch_consuming(self) -> None:
    """
    启动消息分发和消费
    
    此方法会阻塞,直到收到停机信号(SIGINT/SIGTERM)
    
    Raises:
        RuntimeError: 系统未正确初始化
        RedisError: Redis连接错误
    """

async def cleanup(self) -> None:
    """
    清理资源,关闭Redis连接池
    """
```

### Message 类

```python
@dataclass
class Message:
    """消息数据类"""
    id: str                    # 消息唯一ID
    version: str               # 消息格式版本
    topic: str                 # 主题名称  
    payload: dict[str, Any]    # 消息负载
    priority: MessagePriority  # 消息优先级
    created_at: int           # 创建时间戳(毫秒)
    meta: MessageMeta         # 消息元数据

@dataclass  
class MessageMeta:
    """消息元数据"""
    status: MessageStatus      # 消息状态
    retry_count: int          # 重试次数
    max_retries: int          # 最大重试次数
    retry_delays: list[int]   # 重试延迟配置
    last_error: str | None    # 最后一次错误信息
    expire_at: int            # 过期时间戳
    # ... 其他元数据字段
```

### 枚举类型

```python
class MessagePriority(str, Enum):
    """消息优先级"""
    HIGH = "high"      # 高优先级
    NORMAL = "normal"  # 普通优先级
    LOW = "low"        # 低优先级

class MessageStatus(str, Enum):
    """消息状态"""
    PENDING = "pending"        # 待处理
    PROCESSING = "processing"  # 处理中
    COMPLETED = "completed"    # 已完成
    RETRYING = "retrying"      # 重试中
    DEAD_LETTER = "dead_letter" # 死信
```

## 监控和管理

### 指标收集

```python
from mx_rmq import MetricsCollector

# 创建指标收集器
collector = MetricsCollector(redis=mq.redis, queue_prefix=config.queue_prefix)

# 收集所有指标
metrics = await collector.collect_all_metrics(["order_created", "user_registration"])

# 打印关键指标
print(f"待处理消息: {metrics['queue.order_created.pending']}")
print(f"处理中消息: {metrics['queue.order_created.processing']}")
print(f"总吞吐量: {metrics['throughput.messages_per_minute']}")
print(f"死信队列: {metrics['queue.dlq.count']}")
```

### 队列监控

```python
# 监控单个队列
queue_metrics = await collector.collect_queue_metrics(["order_created"])
print(f"订单队列状态: {queue_metrics}")

# 监控处理性能
processing_metrics = await collector.collect_processing_metrics(["order_created"])
print(f"平均处理时间: {processing_metrics['order_created.avg_processing_time']}ms")
```

### 死信队列管理

```python
# 查看死信队列
dlq_count = await mq.redis.llen("dlq:queue")
print(f"死信队列消息数: {dlq_count}")

# 获取死信消息列表
dlq_messages = await mq.redis.lrange("dlq:queue", 0, 9)  # 获取前10条
for msg_id in dlq_messages:
    payload = await mq.redis.hget("dlq:payload:map", msg_id)
    print(f"死信消息: {msg_id} - {payload}")

# 手动重试死信消息(需要自定义实现)
async def retry_dead_message(message_id: str):
    # 从死信队列获取消息
    payload_json = await mq.redis.hget("dlq:payload:map", message_id)
    if payload_json:
        # 解析消息并重新生产
        message = json.loads(payload_json)
        await mq.produce(message["topic"], message["payload"])
        # 从死信队列移除
        await mq.redis.lrem("dlq:queue", 1, message_id)
        await mq.redis.hdel("dlq:payload:map", message_id)
```

### 实时监控脚本

```python
import asyncio
import time

async def monitor_loop():
    """实时监控循环"""
    collector = MetricsCollector(redis=mq.redis)
    
    while True:
        try:
            # 收集指标
            metrics = await collector.collect_all_metrics(["order_created"])
            
            # 输出关键指标
            print(f"[{time.strftime('%H:%M:%S')}] 队列状态:")
            print(f"  待处理: {metrics.get('queue.order_created.pending', 0)}")
            print(f"  处理中: {metrics.get('queue.order_created.processing', 0)}")
            print(f"  死信队列: {metrics.get('queue.dlq.count', 0)}")
            
            # 检查告警条件
            pending = metrics.get('queue.order_created.pending', 0)
            if pending > 100:
                print(f"⚠️  告警: 待处理消息积压 ({pending})")
                
            dlq_count = metrics.get('queue.dlq.count', 0)
            if dlq_count > 10:
                print(f"🚨 告警: 死信队列消息过多 ({dlq_count})")
                
        except Exception as e:
            print(f"监控错误: {e}")
            
        await asyncio.sleep(10)  # 每10秒检查一次

# 启动监控
asyncio.create_task(monitor_loop())
```

## 部署指南

### 本地开发环境

```bash
# 1. 启动Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine

# 2. 运行应用
python your_app.py
```

### Docker 部署

**Dockerfile:**
```dockerfile
FROM python:3.12-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt

# 复制代码
COPY . .

# 启动应用
CMD ["python", "main.py"]
```

**docker-compose.yml:**
```yaml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    
  app:
    build: .
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379
      - MQ_MAX_WORKERS=10
    restart: unless-stopped
    
volumes:
  redis_data:
```

### 生产环境配置

**Redis 配置建议 (redis.conf):**
```conf
# 内存管理
maxmemory-policy allkeys-lru
maxmemory 2gb

# 连接管理
timeout 300
tcp-keepalive 300

# 持久化配置
save 900 1      # 900秒内至少1个key变化时保存
save 300 10     # 300秒内至少10个key变化时保存
save 60 10000   # 60秒内至少10000个key变化时保存

# AOF持久化
appendonly yes
appendfsync everysec

# 性能优化
hz 10
dynamic-hz yes
```

**应用配置:**
```python
# 生产环境配置
config = MQConfig(
    redis_url="redis://redis-cluster:6379",
    redis_password="your_secure_password",
    max_workers=20,
    task_queue_size=50,
    connection_pool_size=30,
    message_ttl=86400 * 7,  # 7天
    processing_timeout=600,  # 10分钟
    queue_prefix="prod",    # 环境隔离
)
```

### 高可用部署

**Redis Sentinel 配置:**
```python
import redis.sentinel

# 配置Sentinel
sentinels = [
    ('sentinel1', 26379),
    ('sentinel2', 26379), 
    ('sentinel3', 26379),
]

sentinel = redis.sentinel.Sentinel(sentinels, socket_timeout=0.1)

# 发现主节点
redis_master = sentinel.master_for('mymaster', socket_timeout=0.1)

# 自定义Redis连接
config = MQConfig(redis_url="")  # 留空,使用自定义连接
mq = RedisMessageQueue(config)
mq.redis = redis_master  # 使用Sentinel管理的连接
```

### 监控和告警

**Prometheus 指标暴露:**
```python
from prometheus_client import start_http_server, Gauge, Counter

# 定义指标
queue_size = Gauge('mq_queue_size', 'Queue size', ['topic', 'status'])
messages_processed = Counter('mq_messages_processed_total', 'Messages processed', ['topic', 'status'])

async def export_metrics():
    """导出Prometheus指标"""
    collector = MetricsCollector(redis=mq.redis)
    
    while True:
        metrics = await collector.collect_all_metrics(['order_created'])
        
        # 更新Prometheus指标
        queue_size.labels(topic='order_created', status='pending').set(
            metrics.get('queue.order_created.pending', 0)
        )
        queue_size.labels(topic='order_created', status='processing').set(
            metrics.get('queue.order_created.processing', 0)
        )
        
        await asyncio.sleep(30)

# 启动Prometheus HTTP服务器
start_http_server(8000)
asyncio.create_task(export_metrics())
```

## 最佳实践

### 1. 消息设计

**✅ 推荐做法:**
```python
# 消息结构清晰,包含必要的上下文信息
await mq.produce("order_created", {
    "order_id": "ORD_123456",
    "user_id": 789,
    "total_amount": 99.99,
    "currency": "USD",
    "timestamp": "2024-01-01T12:00:00Z",
    "metadata": {
        "source": "web",
        "version": "v1.0"
    }
})
```

**❌ 避免做法:**
```python
# 消息过于简单,缺少上下文
await mq.produce("process", {"id": 123})

# 消息过于复杂,包含大量数据
await mq.produce("user_update", {
    "user": {...},  # 包含用户的所有信息
    "history": [...],  # 包含完整历史记录
    "related_data": {...}  # 包含大量关联数据
})
```

### 2. 错误处理

**✅ 推荐做法:**
```python
async def handle_payment(payload: dict) -> None:
    try:
        order_id = payload["order_id"]
        amount = payload["amount"]
        
        # 参数验证
        if not order_id or amount <= 0:
            raise ValueError(f"无效的订单参数: {payload}")
            
        # 业务逻辑
        result = await process_payment(order_id, amount)
        
        # 记录成功日志
        logger.info("支付处理成功", order_id=order_id, amount=amount)
        
    except ValueError as e:
        # 参数错误,不重试
        logger.error("支付参数错误", error=str(e), payload=payload)
        raise  # 重新抛出,进入死信队列
        
    except PaymentGatewayError as e:
        # 外部服务错误,可重试
        logger.warning("支付网关错误", error=str(e), order_id=order_id)
        raise  # 重新抛出,触发重试
        
    except Exception as e:
        # 未知错误
        logger.error("支付处理失败", error=str(e), order_id=order_id)
        raise
```

### 3. 幂等性处理

```python
async def handle_order_created(payload: dict) -> None:
    order_id = payload["order_id"]
    
    # 检查是否已处理(幂等性保护)
    if await is_order_processed(order_id):
        logger.info("订单已处理,跳过", order_id=order_id)
        return
        
    try:
        # 处理订单
        await process_order(order_id)
        
        # 标记为已处理
        await mark_order_processed(order_id)
        
    except Exception as e:
        logger.error("订单处理失败", order_id=order_id, error=str(e))
        raise
```

### 4. 性能优化

**工作协程数调优:**
```python
import os
import multiprocessing

# 根据CPU核心数和IO特性调整工作协程数
cpu_count = multiprocessing.cpu_count()

config = MQConfig(
    # CPU密集型任务:工作协程数 = CPU核心数
    max_workers=cpu_count if is_cpu_intensive else cpu_count * 2,
    
    # IO密集型任务:工作协程数 = CPU核心数 * 2-4
    # max_workers=cpu_count * 3,
    
    # 任务队列大小应该大于工作协程数
    task_queue_size=max_workers * 2,
)
```

**批量处理优化:**
```python
async def handle_batch_emails(payload: dict) -> None:
    """批量处理邮件发送"""
    email_list = payload["emails"]
    
    # 分批处理,避免内存占用过大
    batch_size = 10
    for i in range(0, len(email_list), batch_size):
        batch = email_list[i:i + batch_size]
        
        # 并发发送邮件
        tasks = [send_email(email) for email in batch]
        await asyncio.gather(*tasks, return_exceptions=True)
        
        # 避免过快的请求
        await asyncio.sleep(0.1)
```

### 5. 监控和告警

```python
async def setup_monitoring():
    """设置监控和告警"""
    collector = MetricsCollector(redis=mq.redis)
    
    while True:
        try:
            metrics = await collector.collect_all_metrics(["order_created"])
            
            # 队列积压告警
            pending = metrics.get('queue.order_created.pending', 0)
            if pending > 1000:
                await send_alert(f"队列积压严重: {pending} 条消息待处理")
                
            # 死信队列告警  
            dlq_count = metrics.get('queue.dlq.count', 0)
            if dlq_count > 50:
                await send_alert(f"死信队列消息过多: {dlq_count} 条")
                
            # 处理时间告警
            avg_time = metrics.get('processing.order_created.avg_time', 0)
            if avg_time > 30000:  # 30秒
                await send_alert(f"消息处理时间过长: {avg_time}ms")
                
        except Exception as e:
            logger.error("监控检查失败", error=str(e))
            
        await asyncio.sleep(60)  # 每分钟检查一次
```

## 故障排除

### 常见问题

#### Q1: 消息丢失怎么办?

**症状:** 发送的消息没有被处理

**可能原因:**
1. Redis 连接中断
2. 消费者没有正确启动
3. 消息处理器抛出异常但没有正确处理

**解决方案:**
```python
# 1. 检查Redis连接
try:
    await mq.redis.ping()
    print("Redis连接正常")
except Exception as e:
    print(f"Redis连接失败: {e}")

# 2. 检查消息是否在队列中
pending_count = await mq.redis.llen("order_created:pending")
processing_count = await mq.redis.llen("order_created:processing")
print(f"待处理: {pending_count}, 处理中: {processing_count}")

# 3. 检查死信队列
dlq_count = await mq.redis.llen("dlq:queue")
print(f"死信队列: {dlq_count}")
```

#### Q2: 消息处理过慢

**症状:** 队列积压,消息处理不及时

**可能原因:**
1. 工作协程数不足
2. 处理函数执行时间过长
3. Redis性能瓶颈

**解决方案:**
```python
# 1. 增加工作协程数
config = MQConfig(max_workers=20)  # 增加到20个

# 2. 优化处理函数
async def optimized_handler(payload: dict) -> None:
    # 使用异步IO
    async with aiohttp.ClientSession() as session:
        response = await session.post(url, json=payload)
    
    # 避免阻塞操作
    await asyncio.to_thread(blocking_operation, payload)

# 3. 监控处理时间
import time

async def timed_handler(payload: dict) -> None:
    start_time = time.time()
    try:
        await actual_handler(payload)
    finally:
        processing_time = time.time() - start_time
        if processing_time > 5:  # 处理时间超过5秒
            logger.warning("处理时间过长", time=processing_time, payload=payload)
```

#### Q3: 内存使用过高

**症状:** 应用内存持续增长

**可能原因:**
1. 本地队列积压
2. 消息对象没有正确释放
3. Redis连接池过大

**解决方案:**
```python
# 1. 调整队列大小
config = MQConfig(
    task_queue_size=10,  # 减少本地队列大小
    connection_pool_size=10,  # 减少连接池大小
)

# 2. 监控内存使用
import psutil
import gc

async def memory_monitor():
    while True:
        process = psutil.Process()
        memory_mb = process.memory_info().rss / 1024 / 1024
        
        if memory_mb > 500:  # 内存超过500MB
            logger.warning("内存使用过高", memory_mb=memory_mb)
            gc.collect()  # 强制垃圾回收
            
        await asyncio.sleep(60)
```

#### Q4: Redis 连接错误

**症状:** `ConnectionError`, `TimeoutError`

**解决方案:**
```python
# 1. 检查Redis配置
config = MQConfig(
    redis_url="redis://localhost:6379",
    connection_pool_size=20,
    # 添加连接重试
)

# 2. 实现连接重试
async def create_redis_with_retry(config: MQConfig, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            redis = aioredis.from_url(config.redis_url)
            await redis.ping()
            return redis
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            logger.warning(f"Redis连接失败,重试中 ({attempt + 1}/{max_retries})")
            await asyncio.sleep(2 ** attempt)
```

### 性能诊断

#### 延迟分析

```python
import time
from collections import defaultdict

class PerformanceAnalyzer:
    def __init__(self):
        self.metrics = defaultdict(list)
    
    async def analyze_handler(self, handler_name: str, handler_func):
        """分析处理器性能"""
        async def wrapped_handler(payload: dict):
            start_time = time.time()
            try:
                result = await handler_func(payload)
                return result
            finally:
                end_time = time.time()
                processing_time = (end_time - start_time) * 1000  # 毫秒
                self.metrics[handler_name].append(processing_time)
                
                # 定期输出统计信息
                if len(self.metrics[handler_name]) % 100 == 0:
                    times = self.metrics[handler_name]
                    avg_time = sum(times) / len(times)
                    max_time = max(times)
                    min_time = min(times)
                    
                    print(f"{handler_name} 性能统计 (最近100次):")
                    print(f"  平均时间: {avg_time:.2f}ms")
                    print(f"  最大时间: {max_time:.2f}ms") 
                    print(f"  最小时间: {min_time:.2f}ms")
        
        return wrapped_handler

# 使用示例
analyzer = PerformanceAnalyzer()

@mq.register("order_created")
async def handle_order(payload: dict):
    # 处理逻辑
    await process_order(payload)

# 包装处理器进行性能分析
mq.handlers["order_created"] = await analyzer.analyze_handler(
    "order_created", 
    handle_order
)
```


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "mx-rmq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.12",
    "maintainer_email": null,
    "keywords": "async, distributed, message-queue, redis",
    "author": null,
    "author_email": "Alistar Max <codingox@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/6b/a9/4407c824c069f7920cbedb3f12342ae8414183b5b05022770ac016857b2e/mx_rmq-1.0.2.tar.gz",
    "platform": null,
    "description": "# MX-RMQ \u4f7f\u7528\u6307\u5357\n\n[![Python 3.12+](https://img.shields.io/badge/python-3.12+-blue.svg)](https://www.python.org/downloads/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n[![Redis](https://img.shields.io/badge/redis-5.0+-red.svg)](https://redis.io/)\n\nMX-RMQ \u662f\u4e00\u4e2a\u9ad8\u6027\u80fd\u3001\u53ef\u9760\u7684\u57fa\u4e8eRedis\u7684\u5206\u5e03\u5f0f\u6d88\u606f\u961f\u5217\u7cfb\u7edf\uff0c\u652f\u6301\u666e\u901a\u6d88\u606f\u3001\u5ef6\u65f6\u6d88\u606f\u3001\u4f18\u5148\u7ea7\u6d88\u606f\uff0c\u5177\u5907\u5b8c\u5584\u7684\u76d1\u63a7\u548c\u91cd\u8bd5\u673a\u5236\u3002\n\n## \u76ee\u5f55\n\n- [\u7279\u6027\u6982\u89c8](#\u7279\u6027\u6982\u89c8)\n- [\u5feb\u901f\u5f00\u59cb](#\u5feb\u901f\u5f00\u59cb)\n- [\u5b89\u88c5](#\u5b89\u88c5)\n- [\u57fa\u672c\u4f7f\u7528](#\u57fa\u672c\u4f7f\u7528)\n- [\u9ad8\u7ea7\u529f\u80fd](#\u9ad8\u7ea7\u529f\u80fd)\n- [\u914d\u7f6e\u53c2\u8003](#\u914d\u7f6e\u53c2\u8003)\n- [API \u53c2\u8003](#api-\u53c2\u8003)\n- [\u76d1\u63a7\u548c\u7ba1\u7406](#\u76d1\u63a7\u548c\u7ba1\u7406)\n- [\u90e8\u7f72\u6307\u5357](#\u90e8\u7f72\u6307\u5357)\n- [\u6700\u4f73\u5b9e\u8df5](#\u6700\u4f73\u5b9e\u8df5)\n- [\u6545\u969c\u6392\u9664](#\u6545\u969c\u6392\u9664)\n\n## \u7279\u6027\u6982\u89c8\n\n- \ud83d\ude80 **\u9ad8\u6027\u80fd**: \u57fa\u4e8eRedis\u7684\u5185\u5b58\u5b58\u50a8\uff0c\u652f\u630110,000+\u6d88\u606f/\u79d2\u7684\u541e\u5410\u91cf\n- \ud83d\udd04 **\u53ef\u9760\u6027**: \u539f\u5b50\u6027Lua\u811a\u672c\u64cd\u4f5c\uff0c\u4fdd\u8bc1\u6d88\u606f\u4e0d\u4e22\u5931\n- \u23f0 **\u5ef6\u65f6\u6d88\u606f**: \u652f\u6301\u4efb\u610f\u65f6\u95f4\u5ef6\u8fdf\u7684\u6d88\u606f\u8c03\u5ea6\n- \ud83c\udff7\ufe0f **\u4f18\u5148\u7ea7**: \u652f\u6301\u9ad8\u3001\u4e2d\u3001\u4f4e\u4f18\u5148\u7ea7\u6d88\u606f\u5904\u7406\n- \ud83d\udd01 **\u81ea\u52a8\u91cd\u8bd5**: \u53ef\u914d\u7f6e\u7684\u91cd\u8bd5\u673a\u5236\u548c\u6307\u6570\u9000\u907f\n- \ud83d\udc80 **\u6b7b\u4fe1\u961f\u5217**: \u5931\u8d25\u6d88\u606f\u81ea\u52a8\u8fdb\u5165\u6b7b\u4fe1\u961f\u5217\uff0c\u652f\u6301\u4eba\u5de5\u5e72\u9884\n- \ud83d\udcca **\u76d1\u63a7\u6307\u6807**: \u5b9e\u65f6\u76d1\u63a7\u961f\u5217\u72b6\u6001\u3001\u5904\u7406\u65f6\u95f4\u3001\u541e\u5410\u7387\u7b49\n- \ud83d\uded1 **\u4f18\u96c5\u505c\u673a**: \u652f\u6301\u4f18\u96c5\u505c\u673a\uff0c\u786e\u4fdd\u6d88\u606f\u5904\u7406\u5b8c\u6210\n- \ud83d\udd27 **\u6613\u4e8e\u4f7f\u7528**: \u7b80\u6d01\u7684API\u8bbe\u8ba1\uff0c\u5f00\u7bb1\u5373\u7528\n\n## \u5feb\u901f\u5f00\u59cb\n\n### 30\u79d2\u5feb\u901f\u4f53\u9a8c\n\n```python\nimport asyncio\nfrom mx_rmq import MQConfig, RedisMessageQueue\n\nasync def handle_order(payload: dict) -> None:\n    \"\"\"\u5904\u7406\u8ba2\u5355\u6d88\u606f\"\"\"\n    print(f\"\u5904\u7406\u8ba2\u5355: {payload['order_id']}\")\n    # \u4f60\u7684\u4e1a\u52a1\u903b\u8f91\n    await asyncio.sleep(1)\n\nasync def main():\n    # \u521b\u5efa\u6d88\u606f\u961f\u5217\n    mq = RedisMessageQueue()\n    \n    # \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n    mq.register(\"order_created\", handle_order)\n    \n    # \u751f\u4ea7\u6d88\u606f\n    await mq.produce(\"order_created\", {\n        \"order_id\": \"ORD_123\",\n        \"user_id\": 456,\n        \"amount\": 99.99\n    })\n    \n    # \u542f\u52a8\u6d88\u8d39\u8005\uff08\u4f1a\u963b\u585e\uff09\n    await mq.start_dispatch_consuming()\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## \u5b89\u88c5\n\n### \u4f7f\u7528 uv (\u63a8\u8350)\n\n```bash\n# \u6dfb\u52a0\u5230\u73b0\u6709\u9879\u76ee\nuv add mx-rmq\n\n# \u6216\u8005\u4ece\u6e90\u7801\u5b89\u88c5\ngit clone https://github.com/CodingOX/mx-rmq.git\ncd mx-rmq\nuv sync\n```\n\n### \u4f7f\u7528 pip\n\n```bash\npip install mx-rmq\n\n# \u6216\u4ece\u6e90\u7801\u5b89\u88c5\npip install git+https://github.com/CodingOX/mx-rmq.git\n```\n\n### \u7cfb\u7edf\u8981\u6c42\n\n- Python 3.12+\n- Redis 5.0+\n- \u63a8\u8350\uff1aRedis 6.0+ (\u66f4\u597d\u7684\u6027\u80fd)\n\n## \u57fa\u672c\u4f7f\u7528\n\n### 1. \u521b\u5efa\u6d88\u606f\u961f\u5217\n\n```python\nfrom mx_rmq import MQConfig, RedisMessageQueue\n\n# \u4f7f\u7528\u9ed8\u8ba4\u914d\u7f6e\nmq = RedisMessageQueue()\n\n# \u6216\u81ea\u5b9a\u4e49\u914d\u7f6e\nconfig = MQConfig(\n    redis_url=\"redis://localhost:6379\",\n    max_workers=10,\n    task_queue_size=20\n)\nmq = RedisMessageQueue(config)\n```\n\n### 2. \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n\n```python\n# \u65b9\u5f0f1: \u4f7f\u7528\u88c5\u9970\u5668\n@mq.register(\"user_registration\")\nasync def handle_user_registration(payload: dict) -> None:\n    user_id = payload['user_id']\n    email = payload['email']\n    print(f\"\u6b22\u8fce\u65b0\u7528\u6237: {user_id} ({email})\")\n\n# \u65b9\u5f0f2: \u76f4\u63a5\u6ce8\u518c\nasync def handle_payment(payload: dict) -> None:\n    print(f\"\u5904\u7406\u652f\u4ed8: {payload}\")\n\nmq.register(\"payment_completed\", handle_payment)\n```\n\n### 3. \u751f\u4ea7\u6d88\u606f\n\n```python\n# \u751f\u4ea7\u666e\u901a\u6d88\u606f\nmessage_id = await mq.produce(\"user_registration\", {\n    \"user_id\": 12345,\n    \"email\": \"user@example.com\",\n    \"timestamp\": \"2024-01-01T00:00:00Z\"\n})\n\nprint(f\"\u6d88\u606f\u5df2\u53d1\u9001: {message_id}\")\n```\n\n### 4. \u542f\u52a8\u6d88\u8d39\u8005\n\n```python\n# \u542f\u52a8\u6d88\u8d39\u8005\uff08\u4f1a\u963b\u585e\uff0c\u76f4\u5230\u6536\u5230\u505c\u673a\u4fe1\u53f7\uff09\nawait mq.start_dispatch_consuming()\n```\n\n## \u9ad8\u7ea7\u529f\u80fd\n\n### \u5ef6\u65f6\u6d88\u606f\n\n```python\n# 5\u5206\u949f\u540e\u53d1\u9001\u63d0\u9192\nawait mq.produce(\n    topic=\"send_reminder\",\n    payload={\"user_id\": 123, \"type\": \"payment_due\"},\n    delay=300  # 300\u79d2\u540e\u6267\u884c\n)\n\n# 1\u5c0f\u65f6\u540e\u53d1\u9001\u90ae\u4ef6\nawait mq.produce(\n    topic=\"send_email\",\n    payload={\n        \"to\": \"user@example.com\",\n        \"subject\": \"\u8ba2\u5355\u786e\u8ba4\",\n        \"body\": \"\u611f\u8c22\u60a8\u7684\u8ba2\u5355...\"\n    },\n    delay=3600  # 1\u5c0f\u65f6\u540e\u6267\u884c\n)\n```\n\n### \u4f18\u5148\u7ea7\u6d88\u606f\n\n```python\nfrom mx_rmq import MessagePriority\n\n# \u9ad8\u4f18\u5148\u7ea7\u6d88\u606f\uff08\u4f18\u5148\u5904\u7406\uff09\nawait mq.produce(\n    topic=\"system_alert\",\n    payload={\"level\": \"critical\", \"message\": \"\u7cfb\u7edf\u544a\u8b66\"},\n    priority=MessagePriority.HIGH\n)\n\n# \u666e\u901a\u4f18\u5148\u7ea7\uff08\u9ed8\u8ba4\uff09\nawait mq.produce(\n    topic=\"user_activity\",\n    payload={\"user_id\": 123, \"action\": \"login\"},\n    priority=MessagePriority.NORMAL\n)\n\n# \u4f4e\u4f18\u5148\u7ea7\u6d88\u606f\uff08\u6700\u540e\u5904\u7406\uff09\nawait mq.produce(\n    topic=\"analytics_data\",\n    payload={\"event\": \"page_view\", \"page\": \"/home\"},\n    priority=MessagePriority.LOW\n)\n```\n\n### \u81ea\u5b9a\u4e49\u91cd\u8bd5\u914d\u7f6e\n\n```python\nconfig = MQConfig(\n    redis_url=\"redis://localhost:6379\",\n    max_retries=5,  # \u6700\u5927\u91cd\u8bd55\u6b21\n    retry_delays=[30, 60, 300, 900, 1800],  # \u91cd\u8bd5\u95f4\u9694\uff1a30s, 1m, 5m, 15m, 30m\n    processing_timeout=300,  # 5\u5206\u949f\u5904\u7406\u8d85\u65f6\n)\n\nmq = RedisMessageQueue(config)\n```\n\n### \u6d88\u606f\u751f\u5b58\u65f6\u95f4(TTL)\n\n```python\n# \u8bbe\u7f6e\u6d88\u606f1\u5c0f\u65f6\u540e\u8fc7\u671f\nawait mq.produce(\n    topic=\"temp_notification\",\n    payload={\"message\": \"\u4e34\u65f6\u901a\u77e5\"},\n    ttl=3600  # 1\u5c0f\u65f6\u540e\u8fc7\u671f\n)\n```\n\n### \u6279\u91cf\u751f\u4ea7\u6d88\u606f\n\n```python\n# \u6279\u91cf\u53d1\u9001\u591a\u4e2a\u6d88\u606f\nmessages = [\n    {\"topic\": \"order_created\", \"payload\": {\"order_id\": f\"ORD_{i}\"}}\n    for i in range(100)\n]\n\nfor msg in messages:\n    await mq.produce(msg[\"topic\"], msg[\"payload\"])\n```\n\n## \u914d\u7f6e\u53c2\u8003\n\n### MQConfig \u5b8c\u6574\u53c2\u6570\n\n```python\nfrom mx_rmq import MQConfig\n\nconfig = MQConfig(\n    # Redis \u8fde\u63a5\u914d\u7f6e\n    redis_url=\"redis://localhost:6379\",      # Redis\u8fde\u63a5URL\n    redis_db=0,                              # Redis\u6570\u636e\u5e93\u7f16\u53f7 (0-15)\n    redis_password=None,                     # Redis\u5bc6\u7801\n    queue_prefix=\"\",                         # \u961f\u5217\u524d\u7f00\uff0c\u7528\u4e8e\u591a\u73af\u5883\u9694\u79bb\n    connection_pool_size=20,                 # \u8fde\u63a5\u6c60\u5927\u5c0f\n    \n    # \u6d88\u8d39\u8005\u914d\u7f6e\n    max_workers=5,                           # \u6700\u5927\u5de5\u4f5c\u534f\u7a0b\u6570\n    task_queue_size=8,                       # \u672c\u5730\u4efb\u52a1\u961f\u5217\u5927\u5c0f\n    \n    # \u6d88\u606f\u751f\u547d\u5468\u671f\u914d\u7f6e\n    message_ttl=86400,                       # \u6d88\u606fTTL\uff08\u79d2\uff09\uff0c\u9ed8\u8ba424\u5c0f\u65f6\n    processing_timeout=180,                  # \u6d88\u606f\u5904\u7406\u8d85\u65f6\uff08\u79d2\uff09\uff0c\u9ed8\u8ba43\u5206\u949f\n    \n    # \u91cd\u8bd5\u914d\u7f6e\n    max_retries=3,                           # \u6700\u5927\u91cd\u8bd5\u6b21\u6570\n    retry_delays=[60, 300, 1800],           # \u91cd\u8bd5\u5ef6\u8fdf\u95f4\u9694\uff08\u79d2\uff09\n    \n    # \u6b7b\u4fe1\u961f\u5217\u914d\u7f6e\n    enable_dead_letter=True,                 # \u662f\u5426\u542f\u7528\u6b7b\u4fe1\u961f\u5217\n    \n    # \u76d1\u63a7\u914d\u7f6e\n    monitor_interval=30,                     # \u76d1\u63a7\u68c0\u67e5\u95f4\u9694\uff08\u79d2\uff09\n    expired_check_interval=10,               # \u8fc7\u671f\u6d88\u606f\u68c0\u67e5\u95f4\u9694\uff08\u79d2\uff09\n    processing_monitor_interval=30,          # Processing\u961f\u5217\u76d1\u63a7\u95f4\u9694\uff08\u79d2\uff09\n    batch_size=100,                          # \u6279\u5904\u7406\u5927\u5c0f\n)\n```\n\n### \u73af\u5883\u53d8\u91cf\u914d\u7f6e\n\n\u652f\u6301\u901a\u8fc7\u73af\u5883\u53d8\u91cf\u914d\u7f6e\uff1a\n\n```bash\nexport REDIS_URL=\"redis://localhost:6379\"\nexport REDIS_PASSWORD=\"your_password\"\nexport MQ_MAX_WORKERS=10\nexport MQ_TASK_QUEUE_SIZE=20\nexport MQ_MESSAGE_TTL=86400\n```\n\n```python\nimport os\nfrom mx_rmq import MQConfig\n\nconfig = MQConfig(\n    redis_url=os.getenv(\"REDIS_URL\", \"redis://localhost:6379\"),\n    redis_password=os.getenv(\"REDIS_PASSWORD\"),\n    max_workers=int(os.getenv(\"MQ_MAX_WORKERS\", \"5\")),\n    task_queue_size=int(os.getenv(\"MQ_TASK_QUEUE_SIZE\", \"8\")),\n    message_ttl=int(os.getenv(\"MQ_MESSAGE_TTL\", \"86400\")),\n)\n```\n\n## API \u53c2\u8003\n\n### RedisMessageQueue \u7c7b\n\n#### \u521d\u59cb\u5316\n\n```python\ndef __init__(self, config: MQConfig | None = None) -> None:\n    \"\"\"\n    \u521d\u59cb\u5316\u6d88\u606f\u961f\u5217\n    \n    Args:\n        config: \u6d88\u606f\u961f\u5217\u914d\u7f6e\uff0c\u5982\u4e3aNone\u5219\u4f7f\u7528\u9ed8\u8ba4\u914d\u7f6e\n    \"\"\"\n```\n\n#### \u6838\u5fc3\u65b9\u6cd5\n\n```python\nasync def produce(\n    self,\n    topic: str,\n    payload: dict[str, Any],\n    delay: int = 0,\n    priority: MessagePriority = MessagePriority.NORMAL,\n    ttl: int | None = None,\n    message_id: str | None = None,\n) -> str:\n    \"\"\"\n    \u751f\u4ea7\u6d88\u606f\n    \n    Args:\n        topic: \u4e3b\u9898\u540d\u79f0\n        payload: \u6d88\u606f\u8d1f\u8f7d\uff08\u5fc5\u987b\u662f\u53efJSON\u5e8f\u5217\u5316\u7684\u5b57\u5178\uff09\n        delay: \u5ef6\u8fdf\u6267\u884c\u65f6\u95f4\uff08\u79d2\uff09\uff0c0\u8868\u793a\u7acb\u5373\u6267\u884c\n        priority: \u6d88\u606f\u4f18\u5148\u7ea7\n        ttl: \u6d88\u606f\u751f\u5b58\u65f6\u95f4\uff08\u79d2\uff09\uff0cNone\u4f7f\u7528\u914d\u7f6e\u9ed8\u8ba4\u503c\n        message_id: \u6d88\u606fID\uff0cNone\u5219\u81ea\u52a8\u751f\u6210UUID\n        \n    Returns:\n        \u6d88\u606fID\uff08\u5b57\u7b26\u4e32\uff09\n        \n    Raises:\n        ValueError: \u53c2\u6570\u9a8c\u8bc1\u5931\u8d25\n        RedisError: Redis\u64cd\u4f5c\u5931\u8d25\n    \"\"\"\n\ndef register(self, topic: str, handler: Callable) -> None:\n    \"\"\"\n    \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n    \n    Args:\n        topic: \u4e3b\u9898\u540d\u79f0\n        handler: \u5904\u7406\u51fd\u6570\uff0c\u5fc5\u987b\u662fasync\u51fd\u6570\uff0c\u63a5\u53d7\u4e00\u4e2adict\u53c2\u6570\n        \n    Raises:\n        ValueError: \u5904\u7406\u5668\u4e0d\u662f\u53ef\u8c03\u7528\u5bf9\u8c61\n    \"\"\"\n\nasync def start_dispatch_consuming(self) -> None:\n    \"\"\"\n    \u542f\u52a8\u6d88\u606f\u5206\u53d1\u548c\u6d88\u8d39\n    \n    \u6b64\u65b9\u6cd5\u4f1a\u963b\u585e\uff0c\u76f4\u5230\u6536\u5230\u505c\u673a\u4fe1\u53f7(SIGINT/SIGTERM)\n    \n    Raises:\n        RuntimeError: \u7cfb\u7edf\u672a\u6b63\u786e\u521d\u59cb\u5316\n        RedisError: Redis\u8fde\u63a5\u9519\u8bef\n    \"\"\"\n\nasync def cleanup(self) -> None:\n    \"\"\"\n    \u6e05\u7406\u8d44\u6e90\uff0c\u5173\u95edRedis\u8fde\u63a5\u6c60\n    \"\"\"\n```\n\n### Message \u7c7b\n\n```python\n@dataclass\nclass Message:\n    \"\"\"\u6d88\u606f\u6570\u636e\u7c7b\"\"\"\n    id: str                    # \u6d88\u606f\u552f\u4e00ID\n    version: str               # \u6d88\u606f\u683c\u5f0f\u7248\u672c\n    topic: str                 # \u4e3b\u9898\u540d\u79f0  \n    payload: dict[str, Any]    # \u6d88\u606f\u8d1f\u8f7d\n    priority: MessagePriority  # \u6d88\u606f\u4f18\u5148\u7ea7\n    created_at: int           # \u521b\u5efa\u65f6\u95f4\u6233\uff08\u6beb\u79d2\uff09\n    meta: MessageMeta         # \u6d88\u606f\u5143\u6570\u636e\n\n@dataclass  \nclass MessageMeta:\n    \"\"\"\u6d88\u606f\u5143\u6570\u636e\"\"\"\n    status: MessageStatus      # \u6d88\u606f\u72b6\u6001\n    retry_count: int          # \u91cd\u8bd5\u6b21\u6570\n    max_retries: int          # \u6700\u5927\u91cd\u8bd5\u6b21\u6570\n    retry_delays: list[int]   # \u91cd\u8bd5\u5ef6\u8fdf\u914d\u7f6e\n    last_error: str | None    # \u6700\u540e\u4e00\u6b21\u9519\u8bef\u4fe1\u606f\n    expire_at: int            # \u8fc7\u671f\u65f6\u95f4\u6233\n    # ... \u5176\u4ed6\u5143\u6570\u636e\u5b57\u6bb5\n```\n\n### \u679a\u4e3e\u7c7b\u578b\n\n```python\nclass MessagePriority(str, Enum):\n    \"\"\"\u6d88\u606f\u4f18\u5148\u7ea7\"\"\"\n    HIGH = \"high\"      # \u9ad8\u4f18\u5148\u7ea7\n    NORMAL = \"normal\"  # \u666e\u901a\u4f18\u5148\u7ea7\n    LOW = \"low\"        # \u4f4e\u4f18\u5148\u7ea7\n\nclass MessageStatus(str, Enum):\n    \"\"\"\u6d88\u606f\u72b6\u6001\"\"\"\n    PENDING = \"pending\"        # \u5f85\u5904\u7406\n    PROCESSING = \"processing\"  # \u5904\u7406\u4e2d\n    COMPLETED = \"completed\"    # \u5df2\u5b8c\u6210\n    RETRYING = \"retrying\"      # \u91cd\u8bd5\u4e2d\n    DEAD_LETTER = \"dead_letter\" # \u6b7b\u4fe1\n```\n\n## \u76d1\u63a7\u548c\u7ba1\u7406\n\n### \u6307\u6807\u6536\u96c6\n\n```python\nfrom mx_rmq import MetricsCollector\n\n# \u521b\u5efa\u6307\u6807\u6536\u96c6\u5668\ncollector = MetricsCollector(redis=mq.redis, queue_prefix=config.queue_prefix)\n\n# \u6536\u96c6\u6240\u6709\u6307\u6807\nmetrics = await collector.collect_all_metrics([\"order_created\", \"user_registration\"])\n\n# \u6253\u5370\u5173\u952e\u6307\u6807\nprint(f\"\u5f85\u5904\u7406\u6d88\u606f: {metrics['queue.order_created.pending']}\")\nprint(f\"\u5904\u7406\u4e2d\u6d88\u606f: {metrics['queue.order_created.processing']}\")\nprint(f\"\u603b\u541e\u5410\u91cf: {metrics['throughput.messages_per_minute']}\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217: {metrics['queue.dlq.count']}\")\n```\n\n### \u961f\u5217\u76d1\u63a7\n\n```python\n# \u76d1\u63a7\u5355\u4e2a\u961f\u5217\nqueue_metrics = await collector.collect_queue_metrics([\"order_created\"])\nprint(f\"\u8ba2\u5355\u961f\u5217\u72b6\u6001: {queue_metrics}\")\n\n# \u76d1\u63a7\u5904\u7406\u6027\u80fd\nprocessing_metrics = await collector.collect_processing_metrics([\"order_created\"])\nprint(f\"\u5e73\u5747\u5904\u7406\u65f6\u95f4: {processing_metrics['order_created.avg_processing_time']}ms\")\n```\n\n### \u6b7b\u4fe1\u961f\u5217\u7ba1\u7406\n\n```python\n# \u67e5\u770b\u6b7b\u4fe1\u961f\u5217\ndlq_count = await mq.redis.llen(\"dlq:queue\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u6570: {dlq_count}\")\n\n# \u83b7\u53d6\u6b7b\u4fe1\u6d88\u606f\u5217\u8868\ndlq_messages = await mq.redis.lrange(\"dlq:queue\", 0, 9)  # \u83b7\u53d6\u524d10\u6761\nfor msg_id in dlq_messages:\n    payload = await mq.redis.hget(\"dlq:payload:map\", msg_id)\n    print(f\"\u6b7b\u4fe1\u6d88\u606f: {msg_id} - {payload}\")\n\n# \u624b\u52a8\u91cd\u8bd5\u6b7b\u4fe1\u6d88\u606f\uff08\u9700\u8981\u81ea\u5b9a\u4e49\u5b9e\u73b0\uff09\nasync def retry_dead_message(message_id: str):\n    # \u4ece\u6b7b\u4fe1\u961f\u5217\u83b7\u53d6\u6d88\u606f\n    payload_json = await mq.redis.hget(\"dlq:payload:map\", message_id)\n    if payload_json:\n        # \u89e3\u6790\u6d88\u606f\u5e76\u91cd\u65b0\u751f\u4ea7\n        message = json.loads(payload_json)\n        await mq.produce(message[\"topic\"], message[\"payload\"])\n        # \u4ece\u6b7b\u4fe1\u961f\u5217\u79fb\u9664\n        await mq.redis.lrem(\"dlq:queue\", 1, message_id)\n        await mq.redis.hdel(\"dlq:payload:map\", message_id)\n```\n\n### \u5b9e\u65f6\u76d1\u63a7\u811a\u672c\n\n```python\nimport asyncio\nimport time\n\nasync def monitor_loop():\n    \"\"\"\u5b9e\u65f6\u76d1\u63a7\u5faa\u73af\"\"\"\n    collector = MetricsCollector(redis=mq.redis)\n    \n    while True:\n        try:\n            # \u6536\u96c6\u6307\u6807\n            metrics = await collector.collect_all_metrics([\"order_created\"])\n            \n            # \u8f93\u51fa\u5173\u952e\u6307\u6807\n            print(f\"[{time.strftime('%H:%M:%S')}] \u961f\u5217\u72b6\u6001:\")\n            print(f\"  \u5f85\u5904\u7406: {metrics.get('queue.order_created.pending', 0)}\")\n            print(f\"  \u5904\u7406\u4e2d: {metrics.get('queue.order_created.processing', 0)}\")\n            print(f\"  \u6b7b\u4fe1\u961f\u5217: {metrics.get('queue.dlq.count', 0)}\")\n            \n            # \u68c0\u67e5\u544a\u8b66\u6761\u4ef6\n            pending = metrics.get('queue.order_created.pending', 0)\n            if pending > 100:\n                print(f\"\u26a0\ufe0f  \u544a\u8b66: \u5f85\u5904\u7406\u6d88\u606f\u79ef\u538b ({pending})\")\n                \n            dlq_count = metrics.get('queue.dlq.count', 0)\n            if dlq_count > 10:\n                print(f\"\ud83d\udea8 \u544a\u8b66: \u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u8fc7\u591a ({dlq_count})\")\n                \n        except Exception as e:\n            print(f\"\u76d1\u63a7\u9519\u8bef: {e}\")\n            \n        await asyncio.sleep(10)  # \u6bcf10\u79d2\u68c0\u67e5\u4e00\u6b21\n\n# \u542f\u52a8\u76d1\u63a7\nasyncio.create_task(monitor_loop())\n```\n\n## \u90e8\u7f72\u6307\u5357\n\n### \u672c\u5730\u5f00\u53d1\u73af\u5883\n\n```bash\n# 1. \u542f\u52a8Redis\ndocker run -d --name redis -p 6379:6379 redis:7-alpine\n\n# 2. \u8fd0\u884c\u5e94\u7528\npython your_app.py\n```\n\n### Docker \u90e8\u7f72\n\n**Dockerfile:**\n```dockerfile\nFROM python:3.12-slim\n\nWORKDIR /app\n\n# \u5b89\u88c5\u4f9d\u8d56\nCOPY requirements.txt .\nRUN pip install -r requirements.txt\n\n# \u590d\u5236\u4ee3\u7801\nCOPY . .\n\n# \u542f\u52a8\u5e94\u7528\nCMD [\"python\", \"main.py\"]\n```\n\n**docker-compose.yml:**\n```yaml\nversion: '3.8'\nservices:\n  redis:\n    image: redis:7-alpine\n    ports:\n      - \"6379:6379\"\n    volumes:\n      - redis_data:/data\n    command: redis-server --appendonly yes\n    \n  app:\n    build: .\n    depends_on:\n      - redis\n    environment:\n      - REDIS_URL=redis://redis:6379\n      - MQ_MAX_WORKERS=10\n    restart: unless-stopped\n    \nvolumes:\n  redis_data:\n```\n\n### \u751f\u4ea7\u73af\u5883\u914d\u7f6e\n\n**Redis \u914d\u7f6e\u5efa\u8bae (redis.conf):**\n```conf\n# \u5185\u5b58\u7ba1\u7406\nmaxmemory-policy allkeys-lru\nmaxmemory 2gb\n\n# \u8fde\u63a5\u7ba1\u7406\ntimeout 300\ntcp-keepalive 300\n\n# \u6301\u4e45\u5316\u914d\u7f6e\nsave 900 1      # 900\u79d2\u5185\u81f3\u5c111\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\nsave 300 10     # 300\u79d2\u5185\u81f3\u5c1110\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\nsave 60 10000   # 60\u79d2\u5185\u81f3\u5c1110000\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\n\n# AOF\u6301\u4e45\u5316\nappendonly yes\nappendfsync everysec\n\n# \u6027\u80fd\u4f18\u5316\nhz 10\ndynamic-hz yes\n```\n\n**\u5e94\u7528\u914d\u7f6e:**\n```python\n# \u751f\u4ea7\u73af\u5883\u914d\u7f6e\nconfig = MQConfig(\n    redis_url=\"redis://redis-cluster:6379\",\n    redis_password=\"your_secure_password\",\n    max_workers=20,\n    task_queue_size=50,\n    connection_pool_size=30,\n    message_ttl=86400 * 7,  # 7\u5929\n    processing_timeout=600,  # 10\u5206\u949f\n    queue_prefix=\"prod\",    # \u73af\u5883\u9694\u79bb\n)\n```\n\n### \u9ad8\u53ef\u7528\u90e8\u7f72\n\n**Redis Sentinel \u914d\u7f6e:**\n```python\nimport redis.sentinel\n\n# \u914d\u7f6eSentinel\nsentinels = [\n    ('sentinel1', 26379),\n    ('sentinel2', 26379), \n    ('sentinel3', 26379),\n]\n\nsentinel = redis.sentinel.Sentinel(sentinels, socket_timeout=0.1)\n\n# \u53d1\u73b0\u4e3b\u8282\u70b9\nredis_master = sentinel.master_for('mymaster', socket_timeout=0.1)\n\n# \u81ea\u5b9a\u4e49Redis\u8fde\u63a5\nconfig = MQConfig(redis_url=\"\")  # \u7559\u7a7a\uff0c\u4f7f\u7528\u81ea\u5b9a\u4e49\u8fde\u63a5\nmq = RedisMessageQueue(config)\nmq.redis = redis_master  # \u4f7f\u7528Sentinel\u7ba1\u7406\u7684\u8fde\u63a5\n```\n\n### \u76d1\u63a7\u548c\u544a\u8b66\n\n**Prometheus \u6307\u6807\u66b4\u9732:**\n```python\nfrom prometheus_client import start_http_server, Gauge, Counter\n\n# \u5b9a\u4e49\u6307\u6807\nqueue_size = Gauge('mq_queue_size', 'Queue size', ['topic', 'status'])\nmessages_processed = Counter('mq_messages_processed_total', 'Messages processed', ['topic', 'status'])\n\nasync def export_metrics():\n    \"\"\"\u5bfc\u51faPrometheus\u6307\u6807\"\"\"\n    collector = MetricsCollector(redis=mq.redis)\n    \n    while True:\n        metrics = await collector.collect_all_metrics(['order_created'])\n        \n        # \u66f4\u65b0Prometheus\u6307\u6807\n        queue_size.labels(topic='order_created', status='pending').set(\n            metrics.get('queue.order_created.pending', 0)\n        )\n        queue_size.labels(topic='order_created', status='processing').set(\n            metrics.get('queue.order_created.processing', 0)\n        )\n        \n        await asyncio.sleep(30)\n\n# \u542f\u52a8Prometheus HTTP\u670d\u52a1\u5668\nstart_http_server(8000)\nasyncio.create_task(export_metrics())\n```\n\n## \u6700\u4f73\u5b9e\u8df5\n\n### 1. \u6d88\u606f\u8bbe\u8ba1\n\n**\u2705 \u63a8\u8350\u505a\u6cd5:**\n```python\n# \u6d88\u606f\u7ed3\u6784\u6e05\u6670\uff0c\u5305\u542b\u5fc5\u8981\u7684\u4e0a\u4e0b\u6587\u4fe1\u606f\nawait mq.produce(\"order_created\", {\n    \"order_id\": \"ORD_123456\",\n    \"user_id\": 789,\n    \"total_amount\": 99.99,\n    \"currency\": \"USD\",\n    \"timestamp\": \"2024-01-01T12:00:00Z\",\n    \"metadata\": {\n        \"source\": \"web\",\n        \"version\": \"v1.0\"\n    }\n})\n```\n\n**\u274c \u907f\u514d\u505a\u6cd5:**\n```python\n# \u6d88\u606f\u8fc7\u4e8e\u7b80\u5355\uff0c\u7f3a\u5c11\u4e0a\u4e0b\u6587\nawait mq.produce(\"process\", {\"id\": 123})\n\n# \u6d88\u606f\u8fc7\u4e8e\u590d\u6742\uff0c\u5305\u542b\u5927\u91cf\u6570\u636e\nawait mq.produce(\"user_update\", {\n    \"user\": {...},  # \u5305\u542b\u7528\u6237\u7684\u6240\u6709\u4fe1\u606f\n    \"history\": [...],  # \u5305\u542b\u5b8c\u6574\u5386\u53f2\u8bb0\u5f55\n    \"related_data\": {...}  # \u5305\u542b\u5927\u91cf\u5173\u8054\u6570\u636e\n})\n```\n\n### 2. \u9519\u8bef\u5904\u7406\n\n**\u2705 \u63a8\u8350\u505a\u6cd5:**\n```python\nasync def handle_payment(payload: dict) -> None:\n    try:\n        order_id = payload[\"order_id\"]\n        amount = payload[\"amount\"]\n        \n        # \u53c2\u6570\u9a8c\u8bc1\n        if not order_id or amount <= 0:\n            raise ValueError(f\"\u65e0\u6548\u7684\u8ba2\u5355\u53c2\u6570: {payload}\")\n            \n        # \u4e1a\u52a1\u903b\u8f91\n        result = await process_payment(order_id, amount)\n        \n        # \u8bb0\u5f55\u6210\u529f\u65e5\u5fd7\n        logger.info(\"\u652f\u4ed8\u5904\u7406\u6210\u529f\", order_id=order_id, amount=amount)\n        \n    except ValueError as e:\n        # \u53c2\u6570\u9519\u8bef\uff0c\u4e0d\u91cd\u8bd5\n        logger.error(\"\u652f\u4ed8\u53c2\u6570\u9519\u8bef\", error=str(e), payload=payload)\n        raise  # \u91cd\u65b0\u629b\u51fa\uff0c\u8fdb\u5165\u6b7b\u4fe1\u961f\u5217\n        \n    except PaymentGatewayError as e:\n        # \u5916\u90e8\u670d\u52a1\u9519\u8bef\uff0c\u53ef\u91cd\u8bd5\n        logger.warning(\"\u652f\u4ed8\u7f51\u5173\u9519\u8bef\", error=str(e), order_id=order_id)\n        raise  # \u91cd\u65b0\u629b\u51fa\uff0c\u89e6\u53d1\u91cd\u8bd5\n        \n    except Exception as e:\n        # \u672a\u77e5\u9519\u8bef\n        logger.error(\"\u652f\u4ed8\u5904\u7406\u5931\u8d25\", error=str(e), order_id=order_id)\n        raise\n```\n\n### 3. \u5e42\u7b49\u6027\u5904\u7406\n\n```python\nasync def handle_order_created(payload: dict) -> None:\n    order_id = payload[\"order_id\"]\n    \n    # \u68c0\u67e5\u662f\u5426\u5df2\u5904\u7406\uff08\u5e42\u7b49\u6027\u4fdd\u62a4\uff09\n    if await is_order_processed(order_id):\n        logger.info(\"\u8ba2\u5355\u5df2\u5904\u7406\uff0c\u8df3\u8fc7\", order_id=order_id)\n        return\n        \n    try:\n        # \u5904\u7406\u8ba2\u5355\n        await process_order(order_id)\n        \n        # \u6807\u8bb0\u4e3a\u5df2\u5904\u7406\n        await mark_order_processed(order_id)\n        \n    except Exception as e:\n        logger.error(\"\u8ba2\u5355\u5904\u7406\u5931\u8d25\", order_id=order_id, error=str(e))\n        raise\n```\n\n### 4. \u6027\u80fd\u4f18\u5316\n\n**\u5de5\u4f5c\u534f\u7a0b\u6570\u8c03\u4f18:**\n```python\nimport os\nimport multiprocessing\n\n# \u6839\u636eCPU\u6838\u5fc3\u6570\u548cIO\u7279\u6027\u8c03\u6574\u5de5\u4f5c\u534f\u7a0b\u6570\ncpu_count = multiprocessing.cpu_count()\n\nconfig = MQConfig(\n    # CPU\u5bc6\u96c6\u578b\u4efb\u52a1\uff1a\u5de5\u4f5c\u534f\u7a0b\u6570 = CPU\u6838\u5fc3\u6570\n    max_workers=cpu_count if is_cpu_intensive else cpu_count * 2,\n    \n    # IO\u5bc6\u96c6\u578b\u4efb\u52a1\uff1a\u5de5\u4f5c\u534f\u7a0b\u6570 = CPU\u6838\u5fc3\u6570 * 2-4\n    # max_workers=cpu_count * 3,\n    \n    # \u4efb\u52a1\u961f\u5217\u5927\u5c0f\u5e94\u8be5\u5927\u4e8e\u5de5\u4f5c\u534f\u7a0b\u6570\n    task_queue_size=max_workers * 2,\n)\n```\n\n**\u6279\u91cf\u5904\u7406\u4f18\u5316:**\n```python\nasync def handle_batch_emails(payload: dict) -> None:\n    \"\"\"\u6279\u91cf\u5904\u7406\u90ae\u4ef6\u53d1\u9001\"\"\"\n    email_list = payload[\"emails\"]\n    \n    # \u5206\u6279\u5904\u7406\uff0c\u907f\u514d\u5185\u5b58\u5360\u7528\u8fc7\u5927\n    batch_size = 10\n    for i in range(0, len(email_list), batch_size):\n        batch = email_list[i:i + batch_size]\n        \n        # \u5e76\u53d1\u53d1\u9001\u90ae\u4ef6\n        tasks = [send_email(email) for email in batch]\n        await asyncio.gather(*tasks, return_exceptions=True)\n        \n        # \u907f\u514d\u8fc7\u5feb\u7684\u8bf7\u6c42\n        await asyncio.sleep(0.1)\n```\n\n### 5. \u76d1\u63a7\u548c\u544a\u8b66\n\n```python\nasync def setup_monitoring():\n    \"\"\"\u8bbe\u7f6e\u76d1\u63a7\u548c\u544a\u8b66\"\"\"\n    collector = MetricsCollector(redis=mq.redis)\n    \n    while True:\n        try:\n            metrics = await collector.collect_all_metrics([\"order_created\"])\n            \n            # \u961f\u5217\u79ef\u538b\u544a\u8b66\n            pending = metrics.get('queue.order_created.pending', 0)\n            if pending > 1000:\n                await send_alert(f\"\u961f\u5217\u79ef\u538b\u4e25\u91cd: {pending} \u6761\u6d88\u606f\u5f85\u5904\u7406\")\n                \n            # \u6b7b\u4fe1\u961f\u5217\u544a\u8b66  \n            dlq_count = metrics.get('queue.dlq.count', 0)\n            if dlq_count > 50:\n                await send_alert(f\"\u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u8fc7\u591a: {dlq_count} \u6761\")\n                \n            # \u5904\u7406\u65f6\u95f4\u544a\u8b66\n            avg_time = metrics.get('processing.order_created.avg_time', 0)\n            if avg_time > 30000:  # 30\u79d2\n                await send_alert(f\"\u6d88\u606f\u5904\u7406\u65f6\u95f4\u8fc7\u957f: {avg_time}ms\")\n                \n        except Exception as e:\n            logger.error(\"\u76d1\u63a7\u68c0\u67e5\u5931\u8d25\", error=str(e))\n            \n        await asyncio.sleep(60)  # \u6bcf\u5206\u949f\u68c0\u67e5\u4e00\u6b21\n```\n\n## \u6545\u969c\u6392\u9664\n\n### \u5e38\u89c1\u95ee\u9898\n\n#### Q1: \u6d88\u606f\u4e22\u5931\u600e\u4e48\u529e\uff1f\n\n**\u75c7\u72b6:** \u53d1\u9001\u7684\u6d88\u606f\u6ca1\u6709\u88ab\u5904\u7406\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. Redis \u8fde\u63a5\u4e2d\u65ad\n2. \u6d88\u8d39\u8005\u6ca1\u6709\u6b63\u786e\u542f\u52a8\n3. \u6d88\u606f\u5904\u7406\u5668\u629b\u51fa\u5f02\u5e38\u4f46\u6ca1\u6709\u6b63\u786e\u5904\u7406\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u68c0\u67e5Redis\u8fde\u63a5\ntry:\n    await mq.redis.ping()\n    print(\"Redis\u8fde\u63a5\u6b63\u5e38\")\nexcept Exception as e:\n    print(f\"Redis\u8fde\u63a5\u5931\u8d25: {e}\")\n\n# 2. \u68c0\u67e5\u6d88\u606f\u662f\u5426\u5728\u961f\u5217\u4e2d\npending_count = await mq.redis.llen(\"order_created:pending\")\nprocessing_count = await mq.redis.llen(\"order_created:processing\")\nprint(f\"\u5f85\u5904\u7406: {pending_count}, \u5904\u7406\u4e2d: {processing_count}\")\n\n# 3. \u68c0\u67e5\u6b7b\u4fe1\u961f\u5217\ndlq_count = await mq.redis.llen(\"dlq:queue\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217: {dlq_count}\")\n```\n\n#### Q2: \u6d88\u606f\u5904\u7406\u8fc7\u6162\n\n**\u75c7\u72b6:** \u961f\u5217\u79ef\u538b\uff0c\u6d88\u606f\u5904\u7406\u4e0d\u53ca\u65f6\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. \u5de5\u4f5c\u534f\u7a0b\u6570\u4e0d\u8db3\n2. \u5904\u7406\u51fd\u6570\u6267\u884c\u65f6\u95f4\u8fc7\u957f\n3. Redis\u6027\u80fd\u74f6\u9888\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u589e\u52a0\u5de5\u4f5c\u534f\u7a0b\u6570\nconfig = MQConfig(max_workers=20)  # \u589e\u52a0\u523020\u4e2a\n\n# 2. \u4f18\u5316\u5904\u7406\u51fd\u6570\nasync def optimized_handler(payload: dict) -> None:\n    # \u4f7f\u7528\u5f02\u6b65IO\n    async with aiohttp.ClientSession() as session:\n        response = await session.post(url, json=payload)\n    \n    # \u907f\u514d\u963b\u585e\u64cd\u4f5c\n    await asyncio.to_thread(blocking_operation, payload)\n\n# 3. \u76d1\u63a7\u5904\u7406\u65f6\u95f4\nimport time\n\nasync def timed_handler(payload: dict) -> None:\n    start_time = time.time()\n    try:\n        await actual_handler(payload)\n    finally:\n        processing_time = time.time() - start_time\n        if processing_time > 5:  # \u5904\u7406\u65f6\u95f4\u8d85\u8fc75\u79d2\n            logger.warning(\"\u5904\u7406\u65f6\u95f4\u8fc7\u957f\", time=processing_time, payload=payload)\n```\n\n#### Q3: \u5185\u5b58\u4f7f\u7528\u8fc7\u9ad8\n\n**\u75c7\u72b6:** \u5e94\u7528\u5185\u5b58\u6301\u7eed\u589e\u957f\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. \u672c\u5730\u961f\u5217\u79ef\u538b\n2. \u6d88\u606f\u5bf9\u8c61\u6ca1\u6709\u6b63\u786e\u91ca\u653e\n3. Redis\u8fde\u63a5\u6c60\u8fc7\u5927\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u8c03\u6574\u961f\u5217\u5927\u5c0f\nconfig = MQConfig(\n    task_queue_size=10,  # \u51cf\u5c11\u672c\u5730\u961f\u5217\u5927\u5c0f\n    connection_pool_size=10,  # \u51cf\u5c11\u8fde\u63a5\u6c60\u5927\u5c0f\n)\n\n# 2. \u76d1\u63a7\u5185\u5b58\u4f7f\u7528\nimport psutil\nimport gc\n\nasync def memory_monitor():\n    while True:\n        process = psutil.Process()\n        memory_mb = process.memory_info().rss / 1024 / 1024\n        \n        if memory_mb > 500:  # \u5185\u5b58\u8d85\u8fc7500MB\n            logger.warning(\"\u5185\u5b58\u4f7f\u7528\u8fc7\u9ad8\", memory_mb=memory_mb)\n            gc.collect()  # \u5f3a\u5236\u5783\u573e\u56de\u6536\n            \n        await asyncio.sleep(60)\n```\n\n#### Q4: Redis \u8fde\u63a5\u9519\u8bef\n\n**\u75c7\u72b6:** `ConnectionError`, `TimeoutError`\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u68c0\u67e5Redis\u914d\u7f6e\nconfig = MQConfig(\n    redis_url=\"redis://localhost:6379\",\n    connection_pool_size=20,\n    # \u6dfb\u52a0\u8fde\u63a5\u91cd\u8bd5\n)\n\n# 2. \u5b9e\u73b0\u8fde\u63a5\u91cd\u8bd5\nasync def create_redis_with_retry(config: MQConfig, max_retries: int = 3):\n    for attempt in range(max_retries):\n        try:\n            redis = aioredis.from_url(config.redis_url)\n            await redis.ping()\n            return redis\n        except Exception as e:\n            if attempt == max_retries - 1:\n                raise\n            logger.warning(f\"Redis\u8fde\u63a5\u5931\u8d25\uff0c\u91cd\u8bd5\u4e2d ({attempt + 1}/{max_retries})\")\n            await asyncio.sleep(2 ** attempt)\n```\n\n### \u6027\u80fd\u8bca\u65ad\n\n#### \u5ef6\u8fdf\u5206\u6790\n\n```python\nimport time\nfrom collections import defaultdict\n\nclass PerformanceAnalyzer:\n    def __init__(self):\n        self.metrics = defaultdict(list)\n    \n    async def analyze_handler(self, handler_name: str, handler_func):\n        \"\"\"\u5206\u6790\u5904\u7406\u5668\u6027\u80fd\"\"\"\n        async def wrapped_handler(payload: dict):\n            start_time = time.time()\n            try:\n                result = await handler_func(payload)\n                return result\n            finally:\n                end_time = time.time()\n                processing_time = (end_time - start_time) * 1000  # \u6beb\u79d2\n                self.metrics[handler_name].append(processing_time)\n                \n                # \u5b9a\u671f\u8f93\u51fa\u7edf\u8ba1\u4fe1\u606f\n                if len(self.metrics[handler_name]) % 100 == 0:\n                    times = self.metrics[handler_name]\n                    avg_time = sum(times) / len(times)\n                    max_time = max(times)\n                    min_time = min(times)\n                    \n                    print(f\"{handler_name} \u6027\u80fd\u7edf\u8ba1 (\u6700\u8fd1100\u6b21):\")\n                    print(f\"  \u5e73\u5747\u65f6\u95f4: {avg_time:.2f}ms\")\n                    print(f\"  \u6700\u5927\u65f6\u95f4: {max_time:.2f}ms\") \n                    print(f\"  \u6700\u5c0f\u65f6\u95f4: {min_time:.2f}ms\")\n        \n        return wrapped_handler\n\n# \u4f7f\u7528\u793a\u4f8b\nanalyzer = PerformanceAnalyzer()\n\n@mq.register(\"order_created\")\nasync def handle_order(payload: dict):\n    # \u5904\u7406\u903b\u8f91\n    await process_order(payload)\n\n# \u5305\u88c5\u5904\u7406\u5668\u8fdb\u884c\u6027\u80fd\u5206\u6790\nmq.handlers[\"order_created\"] = await analyzer.analyze_handler(\n    \"order_created\", \n    handle_order\n)\n```\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "\u57fa\u4e8eRedis\u7684\u5f02\u6b65\u6d88\u606f\u961f\u5217\u7cfb\u7edf",
    "version": "1.0.2",
    "project_urls": {
        "Documentation": "https://github.com/CodingOX/mx-rmq#readme",
        "Homepage": "https://github.com/CodingOX/mx-rmq",
        "Issues": "https://github.com/CodingOX/mx-rmq/issues",
        "Repository": "https://github.com/CodingOX/mx-rmq"
    },
    "split_keywords": [
        "async",
        " distributed",
        " message-queue",
        " redis"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "41e64aedf312965f53a79e257f04561efa79fb39c2d68a02a8e3b1960f784c9a",
                "md5": "21e74a01a038656abbfaa67446aab2af",
                "sha256": "e0e4d53250c028387be73c7570ed343397619ac1c4a7da22e7663bc27f20d348"
            },
            "downloads": -1,
            "filename": "mx_rmq-1.0.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "21e74a01a038656abbfaa67446aab2af",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.12",
            "size": 50126,
            "upload_time": "2025-07-12T10:01:34",
            "upload_time_iso_8601": "2025-07-12T10:01:34.169252Z",
            "url": "https://files.pythonhosted.org/packages/41/e6/4aedf312965f53a79e257f04561efa79fb39c2d68a02a8e3b1960f784c9a/mx_rmq-1.0.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "6ba94407c824c069f7920cbedb3f12342ae8414183b5b05022770ac016857b2e",
                "md5": "689a29d952044b78bd8f5875e32e7132",
                "sha256": "ccba71b367e284ac84f3beafc078bb0739310c740eb135607bbecdf31fd45251"
            },
            "downloads": -1,
            "filename": "mx_rmq-1.0.2.tar.gz",
            "has_sig": false,
            "md5_digest": "689a29d952044b78bd8f5875e32e7132",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.12",
            "size": 54485,
            "upload_time": "2025-07-12T10:01:35",
            "upload_time_iso_8601": "2025-07-12T10:01:35.706139Z",
            "url": "https://files.pythonhosted.org/packages/6b/a9/4407c824c069f7920cbedb3f12342ae8414183b5b05022770ac016857b2e/mx_rmq-1.0.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-12 10:01:35",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "CodingOX",
    "github_project": "mx-rmq#readme",
    "github_not_found": true,
    "lcname": "mx-rmq"
}
        
Elapsed time: 0.42980s