# MX-RMQ 使用指南
[](https://www.python.org/downloads/)
[](https://opensource.org/licenses/MIT)
[](https://redis.io/)
MX-RMQ 是一个高性能、可靠的基于Redis的分布式消息队列系统,支持普通消息、延时消息、优先级消息,具备完善的监控和重试机制。
## 目录
- [特性概览](#特性概览)
- [快速开始](#快速开始)
- [安装](#安装)
- [基本使用](#基本使用)
- [高级功能](#高级功能)
- [日志系统](#日志系统)
- [配置参考](#配置参考)
- [API 参考](#api-参考)
- [监控和管理](#监控和管理)
- [部署指南](#部署指南)
- [最佳实践](#最佳实践)
- [故障排除](#故障排除)
## 特性概览
- 🚀 **高性能**: 基于Redis的内存存储,支持10,000+消息/秒的吞吐量
- 🔄 **可靠性**: 原子性Lua脚本操作,保证消息不丢失
- ⏰ **延时消息**: 支持任意时间延迟的消息调度
- 🏷️ **优先级**: 支持高、中、低优先级消息处理
- 🔁 **自动重试**: 可配置的重试机制和指数退避
- 💀 **死信队列**: 失败消息自动进入死信队列,支持人工干预
- 📊 **监控指标**: 实时监控队列状态、处理时间、吞吐率等
- 🛑 **优雅停机**: 支持优雅停机,确保消息处理完成
- 🔧 **易于使用**: 简洁的API设计,开箱即用
## 设计限制
- ❌ **不支持消费者组**: 每个topic只能被单一消费者组负载均衡消费,如需多组消费同一消息,请创建多个topic并投递多次消息
## 快速开始
启动 redis
```shell
docker run -d --name redis8 -p 6379:6379 redis:8 redis-server
```
### 30秒快速体验
```python
import asyncio
from mx_rmq import MQConfig, RedisMessageQueue
async def handle_order(payload: dict) -> None:
"""处理订单消息"""
print(f"处理订单: {payload['order_id']}")
# 你的业务逻辑
await asyncio.sleep(1)
async def main():
# 创建消息队列
mq = RedisMessageQueue()
# 注册消息处理器
mq.register("order_created", handle_order)
# 生产消息
await mq.produce("order_created", {
"order_id": "ORD_123",
"user_id": 456,
"amount": 99.99
})
# 不会阻塞
task = await mq.start_background()
#阻塞下
await task
if __name__ == "__main__":
asyncio.run(main())
```
## 安装
### 使用 uv (推荐)
```bash
# 添加到现有项目
uv add mx-rmq
# 或者从源码安装
git clone https://github.com/CodingOX/mx-rmq.git
cd mx-rmq
uv sync
```
### 使用 pip
```bash
pip install mx-rmq
# 或从源码安装
pip install git+https://github.com/CodingOX/mx-rmq.git
```
### 系统要求
- Python 3.12+
- Redis 5.0+
## 基本使用
### 1. 创建消息队列
```python
from mx_rmq import MQConfig, RedisMessageQueue
# 使用默认配置
mq = RedisMessageQueue()
# 或自定义配置
config = MQConfig(
redis_host="redis://localhost:6379",
max_workers=10,
task_queue_size=20
)
mq = RedisMessageQueue(config)
```
### 2. 注册消息处理器
```python
# 方式1: 使用装饰器
@mq.register("user_registration")
async def handle_user_registration(payload: dict) -> None:
user_id = payload['user_id']
email = payload['email']
print(f"欢迎新用户: {user_id} ({email})")
# 方式2: 直接注册
async def handle_payment(payload: dict) -> None:
print(f"处理支付: {payload}")
mq.register("payment_completed", handle_payment)
```
### 3. 生产消息
```python
# 生产普通消息
message_id = await mq.produce("user_registration", {
"user_id": 12345,
"email": "user@example.com",
"timestamp": "2024-01-01T00:00:00Z"
})
print(f"消息已发送: {message_id}")
```
### 4. 启动消费者
```python
# 启动消费者(会阻塞,直到收到停机信号)
await mq.start_dispatch_consuming()
```
## 高级功能
### 延时消息
```python
# 5分钟后发送提醒
await mq.produce(
topic="send_reminder",
payload={"user_id": 123, "type": "payment_due"},
delay=300 # 300秒后执行
)
# 1小时后发送邮件
await mq.produce(
topic="send_email",
payload={
"to": "user@example.com",
"subject": "订单确认",
"body": "感谢您的订单..."
},
delay=3600 # 1小时后执行
)
```
### 优先级消息
```python
from mx_rmq import MessagePriority
# 高优先级消息(优先处理)
await mq.produce(
topic="system_alert",
payload={"level": "critical", "message": "系统告警"},
priority=MessagePriority.HIGH
)
# 普通优先级(默认)
await mq.produce(
topic="user_activity",
payload={"user_id": 123, "action": "login"},
priority=MessagePriority.NORMAL
)
# 低优先级消息(最后处理)
await mq.produce(
topic="analytics_data",
payload={"event": "page_view", "page": "/home"},
priority=MessagePriority.LOW
)
```
### 自定义重试配置
```python
config = MQConfig(
redis_url="redis://localhost:6379",
max_retries=5, # 最大重试5次
retry_delays=[30, 60, 300, 900, 1800], # 重试间隔:30s, 1m, 5m, 15m, 30m
processing_timeout=300, # 5分钟处理超时
)
mq = RedisMessageQueue(config)
```
### 消息生存时间(TTL)
```python
# 设置消息1小时后过期
await mq.produce(
topic="temp_notification",
payload={"message": "临时通知"},
ttl=3600 # 1小时后过期
)
```
### 批量生产消息
```python
# 批量发送多个消息
messages = [
{"topic": "order_created", "payload": {"order_id": f"ORD_{i}"}}
for i in range(100)
]
for msg in messages:
await mq.produce(msg["topic"], msg["payload"])
```
## 日志系统
MX-RMQ 使用标准的 Python logging 系统,并提供了便捷的配置函数和彩色日志支持。
### 快速开始
#### 1. 基本日志配置
```python
import logging
from mx_rmq import RedisMessageQueue
# 方式1:使用标准 logging 配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 方式2:使用 MX-RMQ 提供的便捷配置
from mx_rmq.logging import setup_basic_logging
setup_basic_logging("INFO")
# 创建消息队列
mq = RedisMessageQueue()
# 现在可以看到所有内部日志
await mq.start_dispatch_consuming()
```
#### 2. 彩色日志配置
```python
from mx_rmq.logging import setup_colored_logging
# 配置彩色日志输出
setup_colored_logging("INFO")
# 或者使用简洁的彩色日志(不显示时间戳)
from mx_rmq.logging import setup_simple_colored_logging
setup_simple_colored_logging("INFO")
# 创建消息队列
mq = RedisMessageQueue()
```
### 日志配置函数
MX-RMQ 提供了三种便捷的日志配置函数:
#### `setup_basic_logging(level="INFO")`
基本的日志配置,输出到控制台:
```python
from mx_rmq.logging import setup_basic_logging
setup_basic_logging("DEBUG") # 设置为 DEBUG 级别
```
#### `setup_colored_logging(level="INFO")`
彩色日志配置,不同级别使用不同颜色:
```python
from mx_rmq.logging import setup_colored_logging
setup_colored_logging("INFO")
```
颜色方案:
- 🔵 **DEBUG**: 青色
- 🟢 **INFO**: 绿色
- 🟡 **WARNING**: 黄色
- 🔴 **ERROR**: 红色
- 🟣 **CRITICAL**: 紫色
#### `setup_simple_colored_logging(level="INFO")`
简洁的彩色日志,不显示时间戳:
```python
from mx_rmq.logging import setup_simple_colored_logging
setup_simple_colored_logging("INFO")
```
### 在应用中使用日志
#### 1. 标准方式(推荐)
```python
import logging
from mx_rmq import RedisMessageQueue
from mx_rmq.logging import setup_colored_logging
# 配置彩色日志
setup_colored_logging("INFO")
# 在每个模块中使用标准方式
logger = logging.getLogger(__name__)
async def handle_order(payload: dict) -> None:
order_id = payload.get("order_id")
# 记录业务日志
logger.info("开始处理订单", extra={"order_id": order_id})
try:
# 处理订单逻辑
await process_order(payload)
logger.info("订单处理成功", extra={"order_id": order_id})
except Exception as e:
logger.error("订单处理失败", extra={"order_id": order_id}, exc_info=e)
raise
# 创建消息队列
mq = RedisMessageQueue()
mq.register("order_created", handle_order)
```
#### 2. 使用 LoggerService(向后兼容)
```python
from mx_rmq import LoggerService
from mx_rmq.logging import setup_colored_logging
# 配置彩色日志
setup_colored_logging("INFO")
# 创建日志服务
logger_service = LoggerService("PaymentService")
# 使用标准的日志接口
logger_service.logger.info("支付服务启动")
# 使用便捷方法
logger_service.log_message_event("支付开始", "msg_123", "payments", user_id=456)
logger_service.log_error("支付失败", Exception("网络错误"), payment_id="pay_789")
logger_service.log_metric("处理延迟", 150, unit="ms")
```
### 完整示例
```python
import asyncio
import logging
from mx_rmq import MQConfig, RedisMessageQueue
from mx_rmq.logging import setup_colored_logging
# 配置彩色日志
setup_colored_logging("INFO")
# 获取应用日志器
logger = logging.getLogger("OrderApp")
async def handle_order(payload: dict) -> None:
"""处理订单消息"""
order_id = payload.get("order_id")
logger.info(f"开始处理订单: {order_id}")
try:
# 模拟订单处理
await asyncio.sleep(1)
# 模拟不同的处理结果
if order_id.endswith("error"):
raise ValueError("订单数据无效")
elif order_id.endswith("warn"):
logger.warning(f"订单处理有警告: {order_id}")
logger.info(f"订单处理成功: {order_id}")
except Exception as e:
logger.error(f"订单处理失败: {order_id}", exc_info=e)
raise
async def main():
# 创建消息队列
mq = RedisMessageQueue()
# 注册处理器
mq.register("order_created", handle_order)
# 发送一些测试消息
await mq.produce("order_created", {"order_id": "ORD_001"})
await mq.produce("order_created", {"order_id": "ORD_002_warn"})
await mq.produce("order_created", {"order_id": "ORD_003_error"})
# 启动消费者
await mq.start_dispatch_consuming()
if __name__ == "__main__":
asyncio.run(main())
```
### 日志级别说明
```python
import logging
logger = logging.getLogger("MyApp")
# DEBUG: 详细的调试信息
logger.debug("计算折扣", extra={"original_price": 100, "discount_rate": 0.1})
# INFO: 重要的业务事件
logger.info("订单支付成功", extra={"order_id": "ORD_123", "payment_id": "PAY_456"})
# WARNING: 潜在问题但不影响功能
logger.warning("库存不足", extra={"product_id": "PROD_789", "requested": 10, "available": 5})
# ERROR: 错误需要关注
logger.error("支付网关错误", extra={"order_id": "ORD_123"}, exc_info=True)
# CRITICAL: 严重错误需要立即处理
logger.critical("数据库连接失败", extra={"database": "order_db"})
```
### 环境变量配置
可以通过环境变量配置日志级别:
```bash
# 设置日志级别
export LOG_LEVEL=DEBUG
# 在应用中使用
python your_app.py
```
```python
import os
from mx_rmq.logging import setup_colored_logging
# 从环境变量读取日志级别
log_level = os.getenv("LOG_LEVEL", "INFO")
setup_colored_logging(log_level)
```
### 日志最佳实践
#### 1. 使用结构化日志
```python
# ✅ 推荐:使用 extra 参数传递结构化数据
logger.info("订单创建", extra={
"order_id": "ORD_123",
"user_id": 456,
"amount": 99.99,
"currency": "USD"
})
# ❌ 避免:字符串拼接
logger.info(f"用户 {user_id} 创建了订单 {order_id},金额 {amount}")
```
#### 2. 错误处理中的日志
```python
async def handle_payment(payload: dict) -> None:
order_id = payload.get("order_id")
try:
await process_payment(payload)
except PaymentValidationError as e:
# 业务验证错误,记录但不重试
logger.warning("支付验证失败", extra={
"order_id": order_id,
"error": str(e),
"error_type": "validation"
})
raise # 重新抛出,进入死信队列
except PaymentGatewayError as e:
# 外部服务错误,可重试
logger.error("支付网关错误", extra={
"order_id": order_id,
"error": str(e),
"error_type": "gateway",
"retryable": True
})
raise # 重新抛出,触发重试
except Exception as e:
# 未知错误,记录详细信息
logger.error("支付处理异常", extra={
"order_id": order_id,
"error": str(e),
"error_type": type(e).__name__,
"payload": payload
}, exc_info=True)
raise
```
#### 3. 性能监控日志
```python
import time
async def timed_handler(payload: dict) -> None:
start_time = time.time()
order_id = payload.get("order_id")
try:
await process_order(payload)
finally:
processing_time = time.time() - start_time
logger.info("订单处理完成", extra={
"order_id": order_id,
"processing_time": f"{processing_time:.2f}s"
})
# 性能告警
if processing_time > 5:
logger.warning("订单处理时间过长", extra={
"order_id": order_id,
"processing_time": f"{processing_time:.2f}s"
})
```
### 测试日志功能
您可以使用以下命令测试不同的日志功能:
```bash
# 测试所有日志功能(推荐)
uv run python examples/usage_sample.py test_all_logging
# 测试标准日志
uv run python examples/usage_sample.py logging
# 测试彩色日志
uv run python examples/usage_sample.py colored
# 测试简洁彩色日志
uv run python examples/usage_sample.py simple_colored
# 测试 LoggerService 兼容性
uv run python examples/usage_sample.py logger
# 测试带彩色日志的 consumer
uv run python examples/usage_sample.py consumer
```
### 常见问题
#### Q: 为什么看不到日志输出?
**A:** MX-RMQ 遵循 Python 库最佳实践,默认使用 `NullHandler`,需要用户配置日志处理器:
```python
# 解决方案1:使用便捷配置函数
from mx_rmq.logging import setup_colored_logging
setup_colored_logging("INFO")
# 解决方案2:使用标准 logging 配置
import logging
logging.basicConfig(level=logging.INFO)
```
#### Q: 如何在生产环境中使用文件日志?
**A:** 使用标准的 Python logging 配置:
```python
import logging
from logging.handlers import RotatingFileHandler
# 配置文件日志
file_handler = RotatingFileHandler(
'app.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setLevel(logging.INFO)
# 配置控制台日志
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 设置格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 配置根日志器
logger = logging.getLogger('mx_rmq')
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
```
## 配置参考
### MQConfig 完整参数
```python
from mx_rmq import MQConfig
config = MQConfig(
# Redis 连接配置
redis_host="redis://localhost:6379", # Redis连接URL
redis_db=0, # Redis数据库编号 (0-15)
redis_password=None, # Redis密码
queue_prefix="", # 队列前缀,用于多环境隔离
connection_pool_size=20, # 连接池大小
# 消费者配置
max_workers=5, # 最大工作协程数
task_queue_size=8, # 本地任务队列大小
# 消息生命周期配置
message_ttl=86400, # 消息TTL(秒),默认24小时
processing_timeout=180, # 消息处理超时(秒),默认3分钟
# 重试配置
max_retries=3, # 最大重试次数
retry_delays=[60, 300, 1800], # 重试延迟间隔(秒)
# 死信队列配置
enable_dead_letter=True, # 是否启用死信队列
# 监控配置
monitor_interval=30, # 监控检查间隔(秒)
expired_check_interval=10, # 过期消息检查间隔(秒)
processing_monitor_interval=30, # Processing队列监控间隔(秒)
batch_size=100, # 批处理大小
)
```
### 环境变量配置
支持通过环境变量配置:
```bash
export REDIS_URL="redis://localhost:6379"
export REDIS_PASSWORD="your_password"
export MQ_MAX_WORKERS=10
export MQ_TASK_QUEUE_SIZE=20
export MQ_MESSAGE_TTL=86400
```
```python
import os
from mx_rmq import MQConfig
config = MQConfig(
redis_host=os.getenv("REDIS_URL", "redis://localhost:6379"),
redis_password=os.getenv("REDIS_PASSWORD"),
max_workers=int(os.getenv("MQ_MAX_WORKERS", "5")),
task_queue_size=int(os.getenv("MQ_TASK_QUEUE_SIZE", "8")),
message_ttl=int(os.getenv("MQ_MESSAGE_TTL", "86400")),
)
```
## API 参考
### RedisMessageQueue 类
#### 初始化
```python
def __init__(self, config: MQConfig | None = None) -> None:
"""
初始化消息队列
Args:
config: 消息队列配置,如为None则使用默认配置
"""
```
#### 核心方法
```python
async def produce(
self,
topic: str,
payload: dict[str, Any],
delay: int = 0,
priority: MessagePriority = MessagePriority.NORMAL,
ttl: int | None = None,
message_id: str | None = None,
) -> str:
"""
生产消息
Args:
topic: 主题名称
payload: 消息负载(必须是可JSON序列化的字典)
delay: 延迟执行时间(秒),0表示立即执行
priority: 消息优先级
ttl: 消息生存时间(秒),None使用配置默认值
message_id: 消息ID,None则自动生成UUID
Returns:
消息ID(字符串)
Raises:
ValueError: 参数验证失败
RedisError: Redis操作失败
"""
def register(self, topic: str, handler: Callable) -> None:
"""
注册消息处理器
Args:
topic: 主题名称
handler: 处理函数,必须是async函数,接受一个dict参数
Raises:
ValueError: 处理器不是可调用对象
"""
async def start_dispatch_consuming(self) -> None:
"""
启动消息分发和消费
此方法会阻塞,直到收到停机信号(SIGINT/SIGTERM)
Raises:
RuntimeError: 系统未正确初始化
RedisError: Redis连接错误
"""
async def cleanup(self) -> None:
"""
清理资源,关闭Redis连接池
"""
```
### Message 类
```python
@dataclass
class Message:
"""消息数据类"""
id: str # 消息唯一ID
version: str # 消息格式版本
topic: str # 主题名称
payload: dict[str, Any] # 消息负载
priority: MessagePriority # 消息优先级
created_at: int # 创建时间戳(毫秒)
meta: MessageMeta # 消息元数据
@dataclass
class MessageMeta:
"""消息元数据"""
status: MessageStatus # 消息状态
retry_count: int # 重试次数
max_retries: int # 最大重试次数
retry_delays: list[int] # 重试延迟配置
last_error: str | None # 最后一次错误信息
expire_at: int # 过期时间戳
# ... 其他元数据字段
```
### 枚举类型
```python
class MessagePriority(str, Enum):
"""消息优先级"""
HIGH = "high" # 高优先级
NORMAL = "normal" # 普通优先级
LOW = "low" # 低优先级
class MessageStatus(str, Enum):
"""消息状态"""
PENDING = "pending" # 待处理
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
RETRYING = "retrying" # 重试中
DEAD_LETTER = "dead_letter" # 死信
```
## 监控和管理
### 指标收集
```python
from mx_rmq import MetricsCollector
# 创建指标收集器
collector = MetricsCollector(redis=mq.redis, queue_prefix=config.queue_prefix)
# 收集所有指标
metrics = await collector.collect_all_metrics(["order_created", "user_registration"])
# 打印关键指标
print(f"待处理消息: {metrics['queue.order_created.pending']}")
print(f"处理中消息: {metrics['queue.order_created.processing']}")
print(f"总吞吐量: {metrics['throughput.messages_per_minute']}")
print(f"死信队列: {metrics['queue.dlq.count']}")
```
### 队列监控
```python
# 监控单个队列
queue_metrics = await collector.collect_queue_metrics(["order_created"])
print(f"订单队列状态: {queue_metrics}")
# 监控处理性能
processing_metrics = await collector.collect_processing_metrics(["order_created"])
print(f"平均处理时间: {processing_metrics['order_created.avg_processing_time']}ms")
```
### 死信队列管理
```python
# 查看死信队列
dlq_count = await mq.redis.llen("dlq:queue")
print(f"死信队列消息数: {dlq_count}")
# 获取死信消息列表
dlq_messages = await mq.redis.lrange("dlq:queue", 0, 9) # 获取前10条
for msg_id in dlq_messages:
payload = await mq.redis.hget("dlq:payload:map", msg_id)
print(f"死信消息: {msg_id} - {payload}")
# 手动重试死信消息(需要自定义实现)
async def retry_dead_message(message_id: str):
# 从死信队列获取消息
payload_json = await mq.redis.hget("dlq:payload:map", message_id)
if payload_json:
# 解析消息并重新生产
message = json.loads(payload_json)
await mq.produce(message["topic"], message["payload"])
# 从死信队列移除
await mq.redis.lrem("dlq:queue", 1, message_id)
await mq.redis.hdel("dlq:payload:map", message_id)
```
### 实时监控脚本
```python
import asyncio
import time
async def monitor_loop():
"""实时监控循环"""
collector = MetricsCollector(redis=mq.redis)
while True:
try:
# 收集指标
metrics = await collector.collect_all_metrics(["order_created"])
# 输出关键指标
print(f"[{time.strftime('%H:%M:%S')}] 队列状态:")
print(f" 待处理: {metrics.get('queue.order_created.pending', 0)}")
print(f" 处理中: {metrics.get('queue.order_created.processing', 0)}")
print(f" 死信队列: {metrics.get('queue.dlq.count', 0)}")
# 检查告警条件
pending = metrics.get('queue.order_created.pending', 0)
if pending > 100:
print(f"⚠️ 告警: 待处理消息积压 ({pending})")
dlq_count = metrics.get('queue.dlq.count', 0)
if dlq_count > 10:
print(f"🚨 告警: 死信队列消息过多 ({dlq_count})")
except Exception as e:
print(f"监控错误: {e}")
await asyncio.sleep(10) # 每10秒检查一次
# 启动监控
asyncio.create_task(monitor_loop())
```
## 部署指南
### 本地开发环境
```bash
# 1. 启动Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine
# 2. 运行应用
python your_app.py
```
### Docker 部署
**Dockerfile:**
```dockerfile
FROM python:3.12-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制代码
COPY . .
# 启动应用
CMD ["python", "main.py"]
```
**docker-compose.yml:**
```yaml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
app:
build: .
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
- MQ_MAX_WORKERS=10
restart: unless-stopped
volumes:
redis_data:
```
### 生产环境配置
**Redis 配置建议 (redis.conf):**
```conf
# 内存管理
maxmemory-policy allkeys-lru
maxmemory 2gb
# 连接管理
timeout 300
tcp-keepalive 300
# 持久化配置
save 900 1 # 900秒内至少1个key变化时保存
save 300 10 # 300秒内至少10个key变化时保存
save 60 10000 # 60秒内至少10000个key变化时保存
# AOF持久化
appendonly yes
appendfsync everysec
# 性能优化
hz 10
dynamic-hz yes
```
**应用配置:**
```python
# 生产环境配置
config = MQConfig(
redis_url="redis://redis-cluster:6379",
redis_password="your_secure_password",
max_workers=20,
task_queue_size=50,
connection_pool_size=30,
message_ttl=86400 * 7, # 7天
processing_timeout=600, # 10分钟
queue_prefix="prod", # 环境隔离
)
```
### 高可用部署
**Redis Sentinel 配置:**
```python
import redis.sentinel
# 配置Sentinel
sentinels = [
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379),
]
sentinel = redis.sentinel.Sentinel(sentinels, socket_timeout=0.1)
# 发现主节点
redis_master = sentinel.master_for('mymaster', socket_timeout=0.1)
# 自定义Redis连接
config = MQConfig(redis_url="") # 留空,使用自定义连接
mq = RedisMessageQueue(config)
mq.redis = redis_master # 使用Sentinel管理的连接
```
### 监控和告警
**Prometheus 指标暴露:**
```python
from prometheus_client import start_http_server, Gauge, Counter
# 定义指标
queue_size = Gauge('mq_queue_size', 'Queue size', ['topic', 'status'])
messages_processed = Counter('mq_messages_processed_total', 'Messages processed', ['topic', 'status'])
async def export_metrics():
"""导出Prometheus指标"""
collector = MetricsCollector(redis=mq.redis)
while True:
metrics = await collector.collect_all_metrics(['order_created'])
# 更新Prometheus指标
queue_size.labels(topic='order_created', status='pending').set(
metrics.get('queue.order_created.pending', 0)
)
queue_size.labels(topic='order_created', status='processing').set(
metrics.get('queue.order_created.processing', 0)
)
await asyncio.sleep(30)
# 启动Prometheus HTTP服务器
start_http_server(8000)
asyncio.create_task(export_metrics())
```
## 最佳实践
### 1. 消息设计
**✅ 推荐做法:**
```python
# 消息结构清晰,包含必要的上下文信息
await mq.produce("order_created", {
"order_id": "ORD_123456",
"user_id": 789,
"total_amount": 99.99,
"currency": "USD",
"timestamp": "2024-01-01T12:00:00Z",
"metadata": {
"source": "web",
"version": "v1.0"
}
})
```
**❌ 避免做法:**
```python
# 消息过于简单,缺少上下文
await mq.produce("process", {"id": 123})
# 消息过于复杂,包含大量数据
await mq.produce("user_update", {
"user": {...}, # 包含用户的所有信息
"history": [...], # 包含完整历史记录
"related_data": {...} # 包含大量关联数据
})
```
### 2. 错误处理
**✅ 推荐做法:**
```python
async def handle_payment(payload: dict) -> None:
try:
order_id = payload["order_id"]
amount = payload["amount"]
# 参数验证
if not order_id or amount <= 0:
raise ValueError(f"无效的订单参数: {payload}")
# 业务逻辑
result = await process_payment(order_id, amount)
# 记录成功日志
logger.info("支付处理成功", order_id=order_id, amount=amount)
except ValueError as e:
# 参数错误,不重试
logger.error("支付参数错误", error=str(e), payload=payload)
raise # 重新抛出,进入死信队列
except PaymentGatewayError as e:
# 外部服务错误,可重试
logger.warning("支付网关错误", error=str(e), order_id=order_id)
raise # 重新抛出,触发重试
except Exception as e:
# 未知错误
logger.error("支付处理失败", error=str(e), order_id=order_id)
raise
```
### 3. 幂等性处理
```python
async def handle_order_created(payload: dict) -> None:
order_id = payload["order_id"]
# 检查是否已处理(幂等性保护)
if await is_order_processed(order_id):
logger.info("订单已处理,跳过", order_id=order_id)
return
try:
# 处理订单
await process_order(order_id)
# 标记为已处理
await mark_order_processed(order_id)
except Exception as e:
logger.error("订单处理失败", order_id=order_id, error=str(e))
raise
```
### 4. 性能优化
**工作协程数调优:**
```python
import os
import multiprocessing
# 根据CPU核心数和IO特性调整工作协程数
cpu_count = multiprocessing.cpu_count()
config = MQConfig(
# CPU密集型任务:工作协程数 = CPU核心数
max_workers=cpu_count if is_cpu_intensive else cpu_count * 2,
# IO密集型任务:工作协程数 = CPU核心数 * 2-4
# max_workers=cpu_count * 3,
# 任务队列大小应该大于工作协程数
task_queue_size=max_workers * 2,
)
```
**批量处理优化:**
```python
async def handle_batch_emails(payload: dict) -> None:
"""批量处理邮件发送"""
email_list = payload["emails"]
# 分批处理,避免内存占用过大
batch_size = 10
for i in range(0, len(email_list), batch_size):
batch = email_list[i:i + batch_size]
# 并发发送邮件
tasks = [send_email(email) for email in batch]
await asyncio.gather(*tasks, return_exceptions=True)
# 避免过快的请求
await asyncio.sleep(0.1)
```
### 5. 多组消费的实现方案
由于系统不支持消费者组功能,如需实现多组消费同一消息,建议采用以下方案:
**✅ 推荐做法:**
```python
# 方案1:创建多个topic,发送多次消息
async def send_order_created(order_data: dict):
"""发送订单创建消息到多个处理组"""
# 发送到不同的处理组
await mq.produce("order_created_payment", order_data) # 支付处理组
await mq.produce("order_created_inventory", order_data) # 库存处理组
await mq.produce("order_created_analytics", order_data) # 分析处理组
await mq.produce("order_created_notification", order_data) # 通知处理组
# 注册不同的处理器
@mq.register("order_created_payment")
async def handle_payment_processing(payload: dict):
"""处理支付相关逻辑"""
await process_payment(payload)
@mq.register("order_created_inventory")
async def handle_inventory_processing(payload: dict):
"""处理库存相关逻辑"""
await update_inventory(payload)
@mq.register("order_created_analytics")
async def handle_analytics_processing(payload: dict):
"""处理分析相关逻辑"""
await update_analytics(payload)
@mq.register("order_created_notification")
async def handle_notification_processing(payload: dict):
"""处理通知相关逻辑"""
await send_notifications(payload)
```
**方案2:使用统一的分发器**
```python
# 创建一个分发器topic
@mq.register("order_created")
async def order_dispatcher(payload: dict):
"""订单消息分发器"""
order_id = payload["order_id"]
# 并发分发到各个处理组
tasks = [
mq.produce("order_payment", payload),
mq.produce("order_inventory", payload),
mq.produce("order_analytics", payload),
mq.produce("order_notification", payload),
]
try:
await asyncio.gather(*tasks)
logger.info("订单消息分发成功", order_id=order_id)
except Exception as e:
logger.error("订单消息分发失败", order_id=order_id, error=str(e))
raise
```
**方案3:使用topic命名规范**
```python
# 使用统一的命名规范
TOPIC_PATTERNS = {
"order_created": [
"order_created.payment",
"order_created.inventory",
"order_created.analytics",
"order_created.notification"
]
}
async def broadcast_message(base_topic: str, payload: dict):
"""广播消息到多个相关topic"""
topics = TOPIC_PATTERNS.get(base_topic, [base_topic])
tasks = [mq.produce(topic, payload) for topic in topics]
await asyncio.gather(*tasks)
logger.info("消息广播完成", base_topic=base_topic, target_topics=topics)
# 使用示例
await broadcast_message("order_created", order_data)
```
### 6. 监控和告警
```python
async def setup_monitoring():
"""设置监控和告警"""
collector = MetricsCollector(redis=mq.redis)
while True:
try:
metrics = await collector.collect_all_metrics(["order_created"])
# 队列积压告警
pending = metrics.get('queue.order_created.pending', 0)
if pending > 1000:
await send_alert(f"队列积压严重: {pending} 条消息待处理")
# 死信队列告警
dlq_count = metrics.get('queue.dlq.count', 0)
if dlq_count > 50:
await send_alert(f"死信队列消息过多: {dlq_count} 条")
# 处理时间告警
avg_time = metrics.get('processing.order_created.avg_time', 0)
if avg_time > 30000: # 30秒
await send_alert(f"消息处理时间过长: {avg_time}ms")
except Exception as e:
logger.error("监控检查失败", error=str(e))
await asyncio.sleep(60) # 每分钟检查一次
```
## 故障排除
### 常见问题
#### Q1: 消息丢失怎么办?
**症状:** 发送的消息没有被处理
**可能原因:**
1. Redis 连接中断
2. 消费者没有正确启动
3. 消息处理器抛出异常但没有正确处理
**解决方案:**
```python
# 1. 检查Redis连接
try:
await mq.redis.ping()
print("Redis连接正常")
except Exception as e:
print(f"Redis连接失败: {e}")
# 2. 检查消息是否在队列中
pending_count = await mq.redis.llen("order_created:pending")
processing_count = await mq.redis.llen("order_created:processing")
print(f"待处理: {pending_count}, 处理中: {processing_count}")
# 3. 检查死信队列
dlq_count = await mq.redis.llen("dlq:queue")
print(f"死信队列: {dlq_count}")
```
#### Q2: 消息处理过慢
**症状:** 队列积压,消息处理不及时
**可能原因:**
1. 工作协程数不足
2. 处理函数执行时间过长
3. Redis性能瓶颈
**解决方案:**
```python
# 1. 增加工作协程数
config = MQConfig(max_workers=20) # 增加到20个
# 2. 优化处理函数
async def optimized_handler(payload: dict) -> None:
# 使用异步IO
async with aiohttp.ClientSession() as session:
response = await session.post(url, json=payload)
# 避免阻塞操作
await asyncio.to_thread(blocking_operation, payload)
# 3. 监控处理时间
import time
async def timed_handler(payload: dict) -> None:
start_time = time.time()
try:
await actual_handler(payload)
finally:
processing_time = time.time() - start_time
if processing_time > 5: # 处理时间超过5秒
logger.warning("处理时间过长", time=processing_time, payload=payload)
```
#### Q3: 内存使用过高
**症状:** 应用内存持续增长
**可能原因:**
1. 本地队列积压
2. 消息对象没有正确释放
3. Redis连接池过大
**解决方案:**
```python
# 1. 调整队列大小
config = MQConfig(
task_queue_size=10, # 减少本地队列大小
connection_pool_size=10, # 减少连接池大小
)
# 2. 监控内存使用
import psutil
import gc
async def memory_monitor():
while True:
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
if memory_mb > 500: # 内存超过500MB
logger.warning("内存使用过高", memory_mb=memory_mb)
gc.collect() # 强制垃圾回收
await asyncio.sleep(60)
```
#### Q4: Redis 连接错误
**症状:** `ConnectionError`, `TimeoutError`
**解决方案:**
```python
# 1. 检查Redis配置
config = MQConfig(
redis_url="redis://localhost:6379",
connection_pool_size=20,
# 添加连接重试
)
# 2. 实现连接重试
async def create_redis_with_retry(config: MQConfig, max_retries: int = 3):
for attempt in range(max_retries):
try:
redis = aioredis.from_url(config.redis_host)
await redis.ping()
return redis
except Exception as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Redis连接失败,重试中 ({attempt + 1}/{max_retries})")
await asyncio.sleep(2 ** attempt)
```
### 性能诊断
#### 延迟分析
```python
import time
from collections import defaultdict
class PerformanceAnalyzer:
def __init__(self):
self.metrics = defaultdict(list)
async def analyze_handler(self, handler_name: str, handler_func):
"""分析处理器性能"""
async def wrapped_handler(payload: dict):
start_time = time.time()
try:
result = await handler_func(payload)
return result
finally:
end_time = time.time()
processing_time = (end_time - start_time) * 1000 # 毫秒
self.metrics[handler_name].append(processing_time)
# 定期输出统计信息
if len(self.metrics[handler_name]) % 100 == 0:
times = self.metrics[handler_name]
avg_time = sum(times) / len(times)
max_time = max(times)
min_time = min(times)
print(f"{handler_name} 性能统计 (最近100次):")
print(f" 平均时间: {avg_time:.2f}ms")
print(f" 最大时间: {max_time:.2f}ms")
print(f" 最小时间: {min_time:.2f}ms")
return wrapped_handler
# 使用示例
analyzer = PerformanceAnalyzer()
@mq.register("order_created")
async def handle_order(payload: dict):
# 处理逻辑
await process_order(payload)
# 包装处理器进行性能分析
mq.handlers["order_created"] = await analyzer.analyze_handler(
"order_created",
handle_order
)
```
Raw data
{
"_id": null,
"home_page": null,
"name": "mx-rmq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": "async, distributed, message-queue, redis",
"author": null,
"author_email": "Alistar Max <codingox@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/ea/a4/d45e82c9dc6bae5175cc4c4d1c3d60470b70a6613032ba7edc2336784fad/mx_rmq-1.5.0.tar.gz",
"platform": null,
"description": "# MX-RMQ \u4f7f\u7528\u6307\u5357\n\n[](https://www.python.org/downloads/)\n[](https://opensource.org/licenses/MIT)\n[](https://redis.io/)\n\nMX-RMQ \u662f\u4e00\u4e2a\u9ad8\u6027\u80fd\u3001\u53ef\u9760\u7684\u57fa\u4e8eRedis\u7684\u5206\u5e03\u5f0f\u6d88\u606f\u961f\u5217\u7cfb\u7edf\uff0c\u652f\u6301\u666e\u901a\u6d88\u606f\u3001\u5ef6\u65f6\u6d88\u606f\u3001\u4f18\u5148\u7ea7\u6d88\u606f\uff0c\u5177\u5907\u5b8c\u5584\u7684\u76d1\u63a7\u548c\u91cd\u8bd5\u673a\u5236\u3002\n\n## \u76ee\u5f55\n\n- [\u7279\u6027\u6982\u89c8](#\u7279\u6027\u6982\u89c8)\n- [\u5feb\u901f\u5f00\u59cb](#\u5feb\u901f\u5f00\u59cb)\n- [\u5b89\u88c5](#\u5b89\u88c5)\n- [\u57fa\u672c\u4f7f\u7528](#\u57fa\u672c\u4f7f\u7528)\n- [\u9ad8\u7ea7\u529f\u80fd](#\u9ad8\u7ea7\u529f\u80fd)\n- [\u65e5\u5fd7\u7cfb\u7edf](#\u65e5\u5fd7\u7cfb\u7edf)\n- [\u914d\u7f6e\u53c2\u8003](#\u914d\u7f6e\u53c2\u8003)\n- [API \u53c2\u8003](#api-\u53c2\u8003)\n- [\u76d1\u63a7\u548c\u7ba1\u7406](#\u76d1\u63a7\u548c\u7ba1\u7406)\n- [\u90e8\u7f72\u6307\u5357](#\u90e8\u7f72\u6307\u5357)\n- [\u6700\u4f73\u5b9e\u8df5](#\u6700\u4f73\u5b9e\u8df5)\n- [\u6545\u969c\u6392\u9664](#\u6545\u969c\u6392\u9664)\n\n## \u7279\u6027\u6982\u89c8\n\n- \ud83d\ude80 **\u9ad8\u6027\u80fd**: \u57fa\u4e8eRedis\u7684\u5185\u5b58\u5b58\u50a8\uff0c\u652f\u630110,000+\u6d88\u606f/\u79d2\u7684\u541e\u5410\u91cf\n- \ud83d\udd04 **\u53ef\u9760\u6027**: \u539f\u5b50\u6027Lua\u811a\u672c\u64cd\u4f5c\uff0c\u4fdd\u8bc1\u6d88\u606f\u4e0d\u4e22\u5931\n- \u23f0 **\u5ef6\u65f6\u6d88\u606f**: \u652f\u6301\u4efb\u610f\u65f6\u95f4\u5ef6\u8fdf\u7684\u6d88\u606f\u8c03\u5ea6\n- \ud83c\udff7\ufe0f **\u4f18\u5148\u7ea7**: \u652f\u6301\u9ad8\u3001\u4e2d\u3001\u4f4e\u4f18\u5148\u7ea7\u6d88\u606f\u5904\u7406\n- \ud83d\udd01 **\u81ea\u52a8\u91cd\u8bd5**: \u53ef\u914d\u7f6e\u7684\u91cd\u8bd5\u673a\u5236\u548c\u6307\u6570\u9000\u907f\n- \ud83d\udc80 **\u6b7b\u4fe1\u961f\u5217**: \u5931\u8d25\u6d88\u606f\u81ea\u52a8\u8fdb\u5165\u6b7b\u4fe1\u961f\u5217\uff0c\u652f\u6301\u4eba\u5de5\u5e72\u9884\n- \ud83d\udcca **\u76d1\u63a7\u6307\u6807**: \u5b9e\u65f6\u76d1\u63a7\u961f\u5217\u72b6\u6001\u3001\u5904\u7406\u65f6\u95f4\u3001\u541e\u5410\u7387\u7b49\n- \ud83d\uded1 **\u4f18\u96c5\u505c\u673a**: \u652f\u6301\u4f18\u96c5\u505c\u673a\uff0c\u786e\u4fdd\u6d88\u606f\u5904\u7406\u5b8c\u6210\n- \ud83d\udd27 **\u6613\u4e8e\u4f7f\u7528**: \u7b80\u6d01\u7684API\u8bbe\u8ba1\uff0c\u5f00\u7bb1\u5373\u7528\n\n## \u8bbe\u8ba1\u9650\u5236\n\n- \u274c **\u4e0d\u652f\u6301\u6d88\u8d39\u8005\u7ec4**: \u6bcf\u4e2atopic\u53ea\u80fd\u88ab\u5355\u4e00\u6d88\u8d39\u8005\u7ec4\u8d1f\u8f7d\u5747\u8861\u6d88\u8d39\uff0c\u5982\u9700\u591a\u7ec4\u6d88\u8d39\u540c\u4e00\u6d88\u606f\uff0c\u8bf7\u521b\u5efa\u591a\u4e2atopic\u5e76\u6295\u9012\u591a\u6b21\u6d88\u606f\n\n## \u5feb\u901f\u5f00\u59cb\n\n\u542f\u52a8 redis\n```shell\ndocker run -d --name redis8 -p 6379:6379 redis:8 redis-server\n```\n### 30\u79d2\u5feb\u901f\u4f53\u9a8c\n\n```python\nimport asyncio\nfrom mx_rmq import MQConfig, RedisMessageQueue\n\nasync def handle_order(payload: dict) -> None:\n \"\"\"\u5904\u7406\u8ba2\u5355\u6d88\u606f\"\"\"\n print(f\"\u5904\u7406\u8ba2\u5355: {payload['order_id']}\")\n # \u4f60\u7684\u4e1a\u52a1\u903b\u8f91\n await asyncio.sleep(1)\n\nasync def main():\n # \u521b\u5efa\u6d88\u606f\u961f\u5217\n mq = RedisMessageQueue()\n \n # \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n mq.register(\"order_created\", handle_order)\n \n # \u751f\u4ea7\u6d88\u606f\n await mq.produce(\"order_created\", {\n \"order_id\": \"ORD_123\",\n \"user_id\": 456,\n \"amount\": 99.99\n })\n \n # \u4e0d\u4f1a\u963b\u585e\n task = await mq.start_background() \n #\u963b\u585e\u4e0b\n \n await task\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## \u5b89\u88c5\n\n### \u4f7f\u7528 uv (\u63a8\u8350)\n\n```bash\n# \u6dfb\u52a0\u5230\u73b0\u6709\u9879\u76ee\nuv add mx-rmq\n\n# \u6216\u8005\u4ece\u6e90\u7801\u5b89\u88c5\ngit clone https://github.com/CodingOX/mx-rmq.git\ncd mx-rmq\nuv sync\n```\n\n### \u4f7f\u7528 pip\n\n```bash\npip install mx-rmq\n\n# \u6216\u4ece\u6e90\u7801\u5b89\u88c5\npip install git+https://github.com/CodingOX/mx-rmq.git\n```\n\n### \u7cfb\u7edf\u8981\u6c42\n\n- Python 3.12+\n- Redis 5.0+\n\n## \u57fa\u672c\u4f7f\u7528\n\n### 1. \u521b\u5efa\u6d88\u606f\u961f\u5217\n\n```python\nfrom mx_rmq import MQConfig, RedisMessageQueue\n\n# \u4f7f\u7528\u9ed8\u8ba4\u914d\u7f6e\nmq = RedisMessageQueue()\n\n# \u6216\u81ea\u5b9a\u4e49\u914d\u7f6e\nconfig = MQConfig(\n redis_host=\"redis://localhost:6379\",\n max_workers=10,\n task_queue_size=20\n)\nmq = RedisMessageQueue(config)\n```\n\n### 2. \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n\n```python\n# \u65b9\u5f0f1: \u4f7f\u7528\u88c5\u9970\u5668\n@mq.register(\"user_registration\")\nasync def handle_user_registration(payload: dict) -> None:\n user_id = payload['user_id']\n email = payload['email']\n print(f\"\u6b22\u8fce\u65b0\u7528\u6237: {user_id} ({email})\")\n\n# \u65b9\u5f0f2: \u76f4\u63a5\u6ce8\u518c\nasync def handle_payment(payload: dict) -> None:\n print(f\"\u5904\u7406\u652f\u4ed8: {payload}\")\n\nmq.register(\"payment_completed\", handle_payment)\n```\n\n### 3. \u751f\u4ea7\u6d88\u606f\n\n```python\n# \u751f\u4ea7\u666e\u901a\u6d88\u606f\nmessage_id = await mq.produce(\"user_registration\", {\n \"user_id\": 12345,\n \"email\": \"user@example.com\",\n \"timestamp\": \"2024-01-01T00:00:00Z\"\n})\n\nprint(f\"\u6d88\u606f\u5df2\u53d1\u9001: {message_id}\")\n```\n\n### 4. \u542f\u52a8\u6d88\u8d39\u8005\n\n```python\n# \u542f\u52a8\u6d88\u8d39\u8005\uff08\u4f1a\u963b\u585e\uff0c\u76f4\u5230\u6536\u5230\u505c\u673a\u4fe1\u53f7\uff09\nawait mq.start_dispatch_consuming()\n```\n\n## \u9ad8\u7ea7\u529f\u80fd\n\n### \u5ef6\u65f6\u6d88\u606f\n\n```python\n# 5\u5206\u949f\u540e\u53d1\u9001\u63d0\u9192\nawait mq.produce(\n topic=\"send_reminder\",\n payload={\"user_id\": 123, \"type\": \"payment_due\"},\n delay=300 # 300\u79d2\u540e\u6267\u884c\n)\n\n# 1\u5c0f\u65f6\u540e\u53d1\u9001\u90ae\u4ef6\nawait mq.produce(\n topic=\"send_email\",\n payload={\n \"to\": \"user@example.com\",\n \"subject\": \"\u8ba2\u5355\u786e\u8ba4\",\n \"body\": \"\u611f\u8c22\u60a8\u7684\u8ba2\u5355...\"\n },\n delay=3600 # 1\u5c0f\u65f6\u540e\u6267\u884c\n)\n```\n\n### \u4f18\u5148\u7ea7\u6d88\u606f\n\n```python\nfrom mx_rmq import MessagePriority\n\n# \u9ad8\u4f18\u5148\u7ea7\u6d88\u606f\uff08\u4f18\u5148\u5904\u7406\uff09\nawait mq.produce(\n topic=\"system_alert\",\n payload={\"level\": \"critical\", \"message\": \"\u7cfb\u7edf\u544a\u8b66\"},\n priority=MessagePriority.HIGH\n)\n\n# \u666e\u901a\u4f18\u5148\u7ea7\uff08\u9ed8\u8ba4\uff09\nawait mq.produce(\n topic=\"user_activity\",\n payload={\"user_id\": 123, \"action\": \"login\"},\n priority=MessagePriority.NORMAL\n)\n\n# \u4f4e\u4f18\u5148\u7ea7\u6d88\u606f\uff08\u6700\u540e\u5904\u7406\uff09\nawait mq.produce(\n topic=\"analytics_data\",\n payload={\"event\": \"page_view\", \"page\": \"/home\"},\n priority=MessagePriority.LOW\n)\n```\n\n### \u81ea\u5b9a\u4e49\u91cd\u8bd5\u914d\u7f6e\n\n```python\nconfig = MQConfig(\n redis_url=\"redis://localhost:6379\",\n max_retries=5, # \u6700\u5927\u91cd\u8bd55\u6b21\n retry_delays=[30, 60, 300, 900, 1800], # \u91cd\u8bd5\u95f4\u9694\uff1a30s, 1m, 5m, 15m, 30m\n processing_timeout=300, # 5\u5206\u949f\u5904\u7406\u8d85\u65f6\n)\n\nmq = RedisMessageQueue(config)\n```\n\n### \u6d88\u606f\u751f\u5b58\u65f6\u95f4(TTL)\n\n```python\n# \u8bbe\u7f6e\u6d88\u606f1\u5c0f\u65f6\u540e\u8fc7\u671f\nawait mq.produce(\n topic=\"temp_notification\",\n payload={\"message\": \"\u4e34\u65f6\u901a\u77e5\"},\n ttl=3600 # 1\u5c0f\u65f6\u540e\u8fc7\u671f\n)\n```\n\n### \u6279\u91cf\u751f\u4ea7\u6d88\u606f\n\n```python\n# \u6279\u91cf\u53d1\u9001\u591a\u4e2a\u6d88\u606f\nmessages = [\n {\"topic\": \"order_created\", \"payload\": {\"order_id\": f\"ORD_{i}\"}}\n for i in range(100)\n]\n\nfor msg in messages:\n await mq.produce(msg[\"topic\"], msg[\"payload\"])\n```\n\n## \u65e5\u5fd7\u7cfb\u7edf\n\nMX-RMQ \u4f7f\u7528\u6807\u51c6\u7684 Python logging \u7cfb\u7edf\uff0c\u5e76\u63d0\u4f9b\u4e86\u4fbf\u6377\u7684\u914d\u7f6e\u51fd\u6570\u548c\u5f69\u8272\u65e5\u5fd7\u652f\u6301\u3002\n\n### \u5feb\u901f\u5f00\u59cb\n\n#### 1. \u57fa\u672c\u65e5\u5fd7\u914d\u7f6e\n\n```python\nimport logging\nfrom mx_rmq import RedisMessageQueue\n\n# \u65b9\u5f0f1\uff1a\u4f7f\u7528\u6807\u51c6 logging \u914d\u7f6e\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n)\n\n# \u65b9\u5f0f2\uff1a\u4f7f\u7528 MX-RMQ \u63d0\u4f9b\u7684\u4fbf\u6377\u914d\u7f6e\nfrom mx_rmq.logging import setup_basic_logging\nsetup_basic_logging(\"INFO\")\n\n# \u521b\u5efa\u6d88\u606f\u961f\u5217\nmq = RedisMessageQueue()\n\n# \u73b0\u5728\u53ef\u4ee5\u770b\u5230\u6240\u6709\u5185\u90e8\u65e5\u5fd7\nawait mq.start_dispatch_consuming()\n```\n\n#### 2. \u5f69\u8272\u65e5\u5fd7\u914d\u7f6e\n\n```python\nfrom mx_rmq.logging import setup_colored_logging\n\n# \u914d\u7f6e\u5f69\u8272\u65e5\u5fd7\u8f93\u51fa\nsetup_colored_logging(\"INFO\")\n\n# \u6216\u8005\u4f7f\u7528\u7b80\u6d01\u7684\u5f69\u8272\u65e5\u5fd7\uff08\u4e0d\u663e\u793a\u65f6\u95f4\u6233\uff09\nfrom mx_rmq.logging import setup_simple_colored_logging\nsetup_simple_colored_logging(\"INFO\")\n\n# \u521b\u5efa\u6d88\u606f\u961f\u5217\nmq = RedisMessageQueue()\n```\n\n### \u65e5\u5fd7\u914d\u7f6e\u51fd\u6570\n\nMX-RMQ \u63d0\u4f9b\u4e86\u4e09\u79cd\u4fbf\u6377\u7684\u65e5\u5fd7\u914d\u7f6e\u51fd\u6570\uff1a\n\n#### `setup_basic_logging(level=\"INFO\")`\n\u57fa\u672c\u7684\u65e5\u5fd7\u914d\u7f6e\uff0c\u8f93\u51fa\u5230\u63a7\u5236\u53f0\uff1a\n\n```python\nfrom mx_rmq.logging import setup_basic_logging\n\nsetup_basic_logging(\"DEBUG\") # \u8bbe\u7f6e\u4e3a DEBUG \u7ea7\u522b\n```\n\n#### `setup_colored_logging(level=\"INFO\")`\n\u5f69\u8272\u65e5\u5fd7\u914d\u7f6e\uff0c\u4e0d\u540c\u7ea7\u522b\u4f7f\u7528\u4e0d\u540c\u989c\u8272\uff1a\n\n```python\nfrom mx_rmq.logging import setup_colored_logging\n\nsetup_colored_logging(\"INFO\")\n```\n\n\u989c\u8272\u65b9\u6848\uff1a\n- \ud83d\udd35 **DEBUG**: \u9752\u8272\n- \ud83d\udfe2 **INFO**: \u7eff\u8272 \n- \ud83d\udfe1 **WARNING**: \u9ec4\u8272\n- \ud83d\udd34 **ERROR**: \u7ea2\u8272\n- \ud83d\udfe3 **CRITICAL**: \u7d2b\u8272\n\n#### `setup_simple_colored_logging(level=\"INFO\")`\n\u7b80\u6d01\u7684\u5f69\u8272\u65e5\u5fd7\uff0c\u4e0d\u663e\u793a\u65f6\u95f4\u6233\uff1a\n\n```python\nfrom mx_rmq.logging import setup_simple_colored_logging\n\nsetup_simple_colored_logging(\"INFO\")\n```\n\n### \u5728\u5e94\u7528\u4e2d\u4f7f\u7528\u65e5\u5fd7\n\n#### 1. \u6807\u51c6\u65b9\u5f0f\uff08\u63a8\u8350\uff09\n\n```python\nimport logging\nfrom mx_rmq import RedisMessageQueue\nfrom mx_rmq.logging import setup_colored_logging\n\n# \u914d\u7f6e\u5f69\u8272\u65e5\u5fd7\nsetup_colored_logging(\"INFO\")\n\n# \u5728\u6bcf\u4e2a\u6a21\u5757\u4e2d\u4f7f\u7528\u6807\u51c6\u65b9\u5f0f\nlogger = logging.getLogger(__name__)\n\nasync def handle_order(payload: dict) -> None:\n order_id = payload.get(\"order_id\")\n \n # \u8bb0\u5f55\u4e1a\u52a1\u65e5\u5fd7\n logger.info(\"\u5f00\u59cb\u5904\u7406\u8ba2\u5355\", extra={\"order_id\": order_id})\n \n try:\n # \u5904\u7406\u8ba2\u5355\u903b\u8f91\n await process_order(payload)\n logger.info(\"\u8ba2\u5355\u5904\u7406\u6210\u529f\", extra={\"order_id\": order_id})\n \n except Exception as e:\n logger.error(\"\u8ba2\u5355\u5904\u7406\u5931\u8d25\", extra={\"order_id\": order_id}, exc_info=e)\n raise\n\n# \u521b\u5efa\u6d88\u606f\u961f\u5217\nmq = RedisMessageQueue()\nmq.register(\"order_created\", handle_order)\n```\n\n#### 2. \u4f7f\u7528 LoggerService\uff08\u5411\u540e\u517c\u5bb9\uff09\n\n```python\nfrom mx_rmq import LoggerService\nfrom mx_rmq.logging import setup_colored_logging\n\n# \u914d\u7f6e\u5f69\u8272\u65e5\u5fd7\nsetup_colored_logging(\"INFO\")\n\n# \u521b\u5efa\u65e5\u5fd7\u670d\u52a1\nlogger_service = LoggerService(\"PaymentService\")\n\n# \u4f7f\u7528\u6807\u51c6\u7684\u65e5\u5fd7\u63a5\u53e3\nlogger_service.logger.info(\"\u652f\u4ed8\u670d\u52a1\u542f\u52a8\")\n\n# \u4f7f\u7528\u4fbf\u6377\u65b9\u6cd5\nlogger_service.log_message_event(\"\u652f\u4ed8\u5f00\u59cb\", \"msg_123\", \"payments\", user_id=456)\nlogger_service.log_error(\"\u652f\u4ed8\u5931\u8d25\", Exception(\"\u7f51\u7edc\u9519\u8bef\"), payment_id=\"pay_789\")\nlogger_service.log_metric(\"\u5904\u7406\u5ef6\u8fdf\", 150, unit=\"ms\")\n```\n\n### \u5b8c\u6574\u793a\u4f8b\n\n```python\nimport asyncio\nimport logging\nfrom mx_rmq import MQConfig, RedisMessageQueue\nfrom mx_rmq.logging import setup_colored_logging\n\n# \u914d\u7f6e\u5f69\u8272\u65e5\u5fd7\nsetup_colored_logging(\"INFO\")\n\n# \u83b7\u53d6\u5e94\u7528\u65e5\u5fd7\u5668\nlogger = logging.getLogger(\"OrderApp\")\n\nasync def handle_order(payload: dict) -> None:\n \"\"\"\u5904\u7406\u8ba2\u5355\u6d88\u606f\"\"\"\n order_id = payload.get(\"order_id\")\n \n logger.info(f\"\u5f00\u59cb\u5904\u7406\u8ba2\u5355: {order_id}\")\n \n try:\n # \u6a21\u62df\u8ba2\u5355\u5904\u7406\n await asyncio.sleep(1)\n \n # \u6a21\u62df\u4e0d\u540c\u7684\u5904\u7406\u7ed3\u679c\n if order_id.endswith(\"error\"):\n raise ValueError(\"\u8ba2\u5355\u6570\u636e\u65e0\u6548\")\n elif order_id.endswith(\"warn\"):\n logger.warning(f\"\u8ba2\u5355\u5904\u7406\u6709\u8b66\u544a: {order_id}\")\n \n logger.info(f\"\u8ba2\u5355\u5904\u7406\u6210\u529f: {order_id}\")\n \n except Exception as e:\n logger.error(f\"\u8ba2\u5355\u5904\u7406\u5931\u8d25: {order_id}\", exc_info=e)\n raise\n\nasync def main():\n # \u521b\u5efa\u6d88\u606f\u961f\u5217\n mq = RedisMessageQueue()\n \n # \u6ce8\u518c\u5904\u7406\u5668\n mq.register(\"order_created\", handle_order)\n \n # \u53d1\u9001\u4e00\u4e9b\u6d4b\u8bd5\u6d88\u606f\n await mq.produce(\"order_created\", {\"order_id\": \"ORD_001\"})\n await mq.produce(\"order_created\", {\"order_id\": \"ORD_002_warn\"})\n await mq.produce(\"order_created\", {\"order_id\": \"ORD_003_error\"})\n \n # \u542f\u52a8\u6d88\u8d39\u8005\n await mq.start_dispatch_consuming()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n### \u65e5\u5fd7\u7ea7\u522b\u8bf4\u660e\n\n```python\nimport logging\n\nlogger = logging.getLogger(\"MyApp\")\n\n# DEBUG: \u8be6\u7ec6\u7684\u8c03\u8bd5\u4fe1\u606f\nlogger.debug(\"\u8ba1\u7b97\u6298\u6263\", extra={\"original_price\": 100, \"discount_rate\": 0.1})\n\n# INFO: \u91cd\u8981\u7684\u4e1a\u52a1\u4e8b\u4ef6\nlogger.info(\"\u8ba2\u5355\u652f\u4ed8\u6210\u529f\", extra={\"order_id\": \"ORD_123\", \"payment_id\": \"PAY_456\"})\n\n# WARNING: \u6f5c\u5728\u95ee\u9898\u4f46\u4e0d\u5f71\u54cd\u529f\u80fd\nlogger.warning(\"\u5e93\u5b58\u4e0d\u8db3\", extra={\"product_id\": \"PROD_789\", \"requested\": 10, \"available\": 5})\n\n# ERROR: \u9519\u8bef\u9700\u8981\u5173\u6ce8\nlogger.error(\"\u652f\u4ed8\u7f51\u5173\u9519\u8bef\", extra={\"order_id\": \"ORD_123\"}, exc_info=True)\n\n# CRITICAL: \u4e25\u91cd\u9519\u8bef\u9700\u8981\u7acb\u5373\u5904\u7406\nlogger.critical(\"\u6570\u636e\u5e93\u8fde\u63a5\u5931\u8d25\", extra={\"database\": \"order_db\"})\n```\n\n### \u73af\u5883\u53d8\u91cf\u914d\u7f6e\n\n\u53ef\u4ee5\u901a\u8fc7\u73af\u5883\u53d8\u91cf\u914d\u7f6e\u65e5\u5fd7\u7ea7\u522b\uff1a\n\n```bash\n# \u8bbe\u7f6e\u65e5\u5fd7\u7ea7\u522b\nexport LOG_LEVEL=DEBUG\n\n# \u5728\u5e94\u7528\u4e2d\u4f7f\u7528\npython your_app.py\n```\n\n```python\nimport os\nfrom mx_rmq.logging import setup_colored_logging\n\n# \u4ece\u73af\u5883\u53d8\u91cf\u8bfb\u53d6\u65e5\u5fd7\u7ea7\u522b\nlog_level = os.getenv(\"LOG_LEVEL\", \"INFO\")\nsetup_colored_logging(log_level)\n```\n\n### \u65e5\u5fd7\u6700\u4f73\u5b9e\u8df5\n\n#### 1. \u4f7f\u7528\u7ed3\u6784\u5316\u65e5\u5fd7\n\n```python\n# \u2705 \u63a8\u8350\uff1a\u4f7f\u7528 extra \u53c2\u6570\u4f20\u9012\u7ed3\u6784\u5316\u6570\u636e\nlogger.info(\"\u8ba2\u5355\u521b\u5efa\", extra={\n \"order_id\": \"ORD_123\",\n \"user_id\": 456, \n \"amount\": 99.99,\n \"currency\": \"USD\"\n})\n\n# \u274c \u907f\u514d\uff1a\u5b57\u7b26\u4e32\u62fc\u63a5\nlogger.info(f\"\u7528\u6237 {user_id} \u521b\u5efa\u4e86\u8ba2\u5355 {order_id}\uff0c\u91d1\u989d {amount}\")\n```\n\n#### 2. \u9519\u8bef\u5904\u7406\u4e2d\u7684\u65e5\u5fd7\n\n```python\nasync def handle_payment(payload: dict) -> None:\n order_id = payload.get(\"order_id\")\n \n try:\n await process_payment(payload)\n \n except PaymentValidationError as e:\n # \u4e1a\u52a1\u9a8c\u8bc1\u9519\u8bef\uff0c\u8bb0\u5f55\u4f46\u4e0d\u91cd\u8bd5\n logger.warning(\"\u652f\u4ed8\u9a8c\u8bc1\u5931\u8d25\", extra={\n \"order_id\": order_id, \n \"error\": str(e),\n \"error_type\": \"validation\"\n })\n raise # \u91cd\u65b0\u629b\u51fa\uff0c\u8fdb\u5165\u6b7b\u4fe1\u961f\u5217\n \n except PaymentGatewayError as e:\n # \u5916\u90e8\u670d\u52a1\u9519\u8bef\uff0c\u53ef\u91cd\u8bd5\n logger.error(\"\u652f\u4ed8\u7f51\u5173\u9519\u8bef\", extra={\n \"order_id\": order_id,\n \"error\": str(e),\n \"error_type\": \"gateway\",\n \"retryable\": True\n })\n raise # \u91cd\u65b0\u629b\u51fa\uff0c\u89e6\u53d1\u91cd\u8bd5\n \n except Exception as e:\n # \u672a\u77e5\u9519\u8bef\uff0c\u8bb0\u5f55\u8be6\u7ec6\u4fe1\u606f\n logger.error(\"\u652f\u4ed8\u5904\u7406\u5f02\u5e38\", extra={\n \"order_id\": order_id,\n \"error\": str(e),\n \"error_type\": type(e).__name__,\n \"payload\": payload\n }, exc_info=True)\n raise\n```\n\n#### 3. \u6027\u80fd\u76d1\u63a7\u65e5\u5fd7\n\n```python\nimport time\n\nasync def timed_handler(payload: dict) -> None:\n start_time = time.time()\n order_id = payload.get(\"order_id\")\n \n try:\n await process_order(payload)\n \n finally:\n processing_time = time.time() - start_time\n logger.info(\"\u8ba2\u5355\u5904\u7406\u5b8c\u6210\", extra={\n \"order_id\": order_id,\n \"processing_time\": f\"{processing_time:.2f}s\"\n })\n \n # \u6027\u80fd\u544a\u8b66\n if processing_time > 5:\n logger.warning(\"\u8ba2\u5355\u5904\u7406\u65f6\u95f4\u8fc7\u957f\", extra={\n \"order_id\": order_id,\n \"processing_time\": f\"{processing_time:.2f}s\"\n })\n```\n\n### \u6d4b\u8bd5\u65e5\u5fd7\u529f\u80fd\n\n\u60a8\u53ef\u4ee5\u4f7f\u7528\u4ee5\u4e0b\u547d\u4ee4\u6d4b\u8bd5\u4e0d\u540c\u7684\u65e5\u5fd7\u529f\u80fd\uff1a\n\n```bash\n# \u6d4b\u8bd5\u6240\u6709\u65e5\u5fd7\u529f\u80fd\uff08\u63a8\u8350\uff09\nuv run python examples/usage_sample.py test_all_logging\n\n# \u6d4b\u8bd5\u6807\u51c6\u65e5\u5fd7\nuv run python examples/usage_sample.py logging\n\n# \u6d4b\u8bd5\u5f69\u8272\u65e5\u5fd7\nuv run python examples/usage_sample.py colored\n\n# \u6d4b\u8bd5\u7b80\u6d01\u5f69\u8272\u65e5\u5fd7\nuv run python examples/usage_sample.py simple_colored\n\n# \u6d4b\u8bd5 LoggerService \u517c\u5bb9\u6027\nuv run python examples/usage_sample.py logger\n\n# \u6d4b\u8bd5\u5e26\u5f69\u8272\u65e5\u5fd7\u7684 consumer\nuv run python examples/usage_sample.py consumer\n```\n\n### \u5e38\u89c1\u95ee\u9898\n\n#### Q: \u4e3a\u4ec0\u4e48\u770b\u4e0d\u5230\u65e5\u5fd7\u8f93\u51fa\uff1f\n\n**A:** MX-RMQ \u9075\u5faa Python \u5e93\u6700\u4f73\u5b9e\u8df5\uff0c\u9ed8\u8ba4\u4f7f\u7528 `NullHandler`\uff0c\u9700\u8981\u7528\u6237\u914d\u7f6e\u65e5\u5fd7\u5904\u7406\u5668\uff1a\n\n```python\n# \u89e3\u51b3\u65b9\u68481\uff1a\u4f7f\u7528\u4fbf\u6377\u914d\u7f6e\u51fd\u6570\nfrom mx_rmq.logging import setup_colored_logging\nsetup_colored_logging(\"INFO\")\n\n# \u89e3\u51b3\u65b9\u68482\uff1a\u4f7f\u7528\u6807\u51c6 logging \u914d\u7f6e\nimport logging\nlogging.basicConfig(level=logging.INFO)\n```\n\n#### Q: \u5982\u4f55\u5728\u751f\u4ea7\u73af\u5883\u4e2d\u4f7f\u7528\u6587\u4ef6\u65e5\u5fd7\uff1f\n\n**A:** \u4f7f\u7528\u6807\u51c6\u7684 Python logging \u914d\u7f6e\uff1a\n\n```python\nimport logging\nfrom logging.handlers import RotatingFileHandler\n\n# \u914d\u7f6e\u6587\u4ef6\u65e5\u5fd7\nfile_handler = RotatingFileHandler(\n 'app.log', \n maxBytes=10*1024*1024, # 10MB\n backupCount=5\n)\nfile_handler.setLevel(logging.INFO)\n\n# \u914d\u7f6e\u63a7\u5236\u53f0\u65e5\u5fd7\nconsole_handler = logging.StreamHandler()\nconsole_handler.setLevel(logging.INFO)\n\n# \u8bbe\u7f6e\u683c\u5f0f\nformatter = logging.Formatter(\n '%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n)\nfile_handler.setFormatter(formatter)\nconsole_handler.setFormatter(formatter)\n\n# \u914d\u7f6e\u6839\u65e5\u5fd7\u5668\nlogger = logging.getLogger('mx_rmq')\nlogger.setLevel(logging.INFO)\nlogger.addHandler(file_handler)\nlogger.addHandler(console_handler)\n```\n\n## \u914d\u7f6e\u53c2\u8003\n\n### MQConfig \u5b8c\u6574\u53c2\u6570\n\n```python\nfrom mx_rmq import MQConfig\n\nconfig = MQConfig(\n # Redis \u8fde\u63a5\u914d\u7f6e\n redis_host=\"redis://localhost:6379\", # Redis\u8fde\u63a5URL\n redis_db=0, # Redis\u6570\u636e\u5e93\u7f16\u53f7 (0-15)\n redis_password=None, # Redis\u5bc6\u7801\n queue_prefix=\"\", # \u961f\u5217\u524d\u7f00\uff0c\u7528\u4e8e\u591a\u73af\u5883\u9694\u79bb\n connection_pool_size=20, # \u8fde\u63a5\u6c60\u5927\u5c0f\n \n # \u6d88\u8d39\u8005\u914d\u7f6e\n max_workers=5, # \u6700\u5927\u5de5\u4f5c\u534f\u7a0b\u6570\n task_queue_size=8, # \u672c\u5730\u4efb\u52a1\u961f\u5217\u5927\u5c0f\n \n # \u6d88\u606f\u751f\u547d\u5468\u671f\u914d\u7f6e\n message_ttl=86400, # \u6d88\u606fTTL\uff08\u79d2\uff09\uff0c\u9ed8\u8ba424\u5c0f\u65f6\n processing_timeout=180, # \u6d88\u606f\u5904\u7406\u8d85\u65f6\uff08\u79d2\uff09\uff0c\u9ed8\u8ba43\u5206\u949f\n \n # \u91cd\u8bd5\u914d\u7f6e\n max_retries=3, # \u6700\u5927\u91cd\u8bd5\u6b21\u6570\n retry_delays=[60, 300, 1800], # \u91cd\u8bd5\u5ef6\u8fdf\u95f4\u9694\uff08\u79d2\uff09\n \n # \u6b7b\u4fe1\u961f\u5217\u914d\u7f6e\n enable_dead_letter=True, # \u662f\u5426\u542f\u7528\u6b7b\u4fe1\u961f\u5217\n \n # \u76d1\u63a7\u914d\u7f6e\n monitor_interval=30, # \u76d1\u63a7\u68c0\u67e5\u95f4\u9694\uff08\u79d2\uff09\n expired_check_interval=10, # \u8fc7\u671f\u6d88\u606f\u68c0\u67e5\u95f4\u9694\uff08\u79d2\uff09\n processing_monitor_interval=30, # Processing\u961f\u5217\u76d1\u63a7\u95f4\u9694\uff08\u79d2\uff09\n batch_size=100, # \u6279\u5904\u7406\u5927\u5c0f\n)\n```\n\n### \u73af\u5883\u53d8\u91cf\u914d\u7f6e\n\n\u652f\u6301\u901a\u8fc7\u73af\u5883\u53d8\u91cf\u914d\u7f6e\uff1a\n\n```bash\nexport REDIS_URL=\"redis://localhost:6379\"\nexport REDIS_PASSWORD=\"your_password\"\nexport MQ_MAX_WORKERS=10\nexport MQ_TASK_QUEUE_SIZE=20\nexport MQ_MESSAGE_TTL=86400\n```\n\n```python\nimport os\nfrom mx_rmq import MQConfig\n\nconfig = MQConfig(\n redis_host=os.getenv(\"REDIS_URL\", \"redis://localhost:6379\"),\n redis_password=os.getenv(\"REDIS_PASSWORD\"),\n max_workers=int(os.getenv(\"MQ_MAX_WORKERS\", \"5\")),\n task_queue_size=int(os.getenv(\"MQ_TASK_QUEUE_SIZE\", \"8\")),\n message_ttl=int(os.getenv(\"MQ_MESSAGE_TTL\", \"86400\")),\n)\n```\n\n## API \u53c2\u8003\n\n### RedisMessageQueue \u7c7b\n\n#### \u521d\u59cb\u5316\n\n```python\ndef __init__(self, config: MQConfig | None = None) -> None:\n \"\"\"\n \u521d\u59cb\u5316\u6d88\u606f\u961f\u5217\n \n Args:\n config: \u6d88\u606f\u961f\u5217\u914d\u7f6e\uff0c\u5982\u4e3aNone\u5219\u4f7f\u7528\u9ed8\u8ba4\u914d\u7f6e\n \"\"\"\n```\n\n#### \u6838\u5fc3\u65b9\u6cd5\n\n```python\nasync def produce(\n self,\n topic: str,\n payload: dict[str, Any],\n delay: int = 0,\n priority: MessagePriority = MessagePriority.NORMAL,\n ttl: int | None = None,\n message_id: str | None = None,\n) -> str:\n \"\"\"\n \u751f\u4ea7\u6d88\u606f\n \n Args:\n topic: \u4e3b\u9898\u540d\u79f0\n payload: \u6d88\u606f\u8d1f\u8f7d\uff08\u5fc5\u987b\u662f\u53efJSON\u5e8f\u5217\u5316\u7684\u5b57\u5178\uff09\n delay: \u5ef6\u8fdf\u6267\u884c\u65f6\u95f4\uff08\u79d2\uff09\uff0c0\u8868\u793a\u7acb\u5373\u6267\u884c\n priority: \u6d88\u606f\u4f18\u5148\u7ea7\n ttl: \u6d88\u606f\u751f\u5b58\u65f6\u95f4\uff08\u79d2\uff09\uff0cNone\u4f7f\u7528\u914d\u7f6e\u9ed8\u8ba4\u503c\n message_id: \u6d88\u606fID\uff0cNone\u5219\u81ea\u52a8\u751f\u6210UUID\n \n Returns:\n \u6d88\u606fID\uff08\u5b57\u7b26\u4e32\uff09\n \n Raises:\n ValueError: \u53c2\u6570\u9a8c\u8bc1\u5931\u8d25\n RedisError: Redis\u64cd\u4f5c\u5931\u8d25\n \"\"\"\n\ndef register(self, topic: str, handler: Callable) -> None:\n \"\"\"\n \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n \n Args:\n topic: \u4e3b\u9898\u540d\u79f0\n handler: \u5904\u7406\u51fd\u6570\uff0c\u5fc5\u987b\u662fasync\u51fd\u6570\uff0c\u63a5\u53d7\u4e00\u4e2adict\u53c2\u6570\n \n Raises:\n ValueError: \u5904\u7406\u5668\u4e0d\u662f\u53ef\u8c03\u7528\u5bf9\u8c61\n \"\"\"\n\nasync def start_dispatch_consuming(self) -> None:\n \"\"\"\n \u542f\u52a8\u6d88\u606f\u5206\u53d1\u548c\u6d88\u8d39\n \n \u6b64\u65b9\u6cd5\u4f1a\u963b\u585e\uff0c\u76f4\u5230\u6536\u5230\u505c\u673a\u4fe1\u53f7(SIGINT/SIGTERM)\n \n Raises:\n RuntimeError: \u7cfb\u7edf\u672a\u6b63\u786e\u521d\u59cb\u5316\n RedisError: Redis\u8fde\u63a5\u9519\u8bef\n \"\"\"\n\nasync def cleanup(self) -> None:\n \"\"\"\n \u6e05\u7406\u8d44\u6e90\uff0c\u5173\u95edRedis\u8fde\u63a5\u6c60\n \"\"\"\n```\n\n### Message \u7c7b\n\n```python\n@dataclass\nclass Message:\n \"\"\"\u6d88\u606f\u6570\u636e\u7c7b\"\"\"\n id: str # \u6d88\u606f\u552f\u4e00ID\n version: str # \u6d88\u606f\u683c\u5f0f\u7248\u672c\n topic: str # \u4e3b\u9898\u540d\u79f0 \n payload: dict[str, Any] # \u6d88\u606f\u8d1f\u8f7d\n priority: MessagePriority # \u6d88\u606f\u4f18\u5148\u7ea7\n created_at: int # \u521b\u5efa\u65f6\u95f4\u6233\uff08\u6beb\u79d2\uff09\n meta: MessageMeta # \u6d88\u606f\u5143\u6570\u636e\n\n@dataclass \nclass MessageMeta:\n \"\"\"\u6d88\u606f\u5143\u6570\u636e\"\"\"\n status: MessageStatus # \u6d88\u606f\u72b6\u6001\n retry_count: int # \u91cd\u8bd5\u6b21\u6570\n max_retries: int # \u6700\u5927\u91cd\u8bd5\u6b21\u6570\n retry_delays: list[int] # \u91cd\u8bd5\u5ef6\u8fdf\u914d\u7f6e\n last_error: str | None # \u6700\u540e\u4e00\u6b21\u9519\u8bef\u4fe1\u606f\n expire_at: int # \u8fc7\u671f\u65f6\u95f4\u6233\n # ... \u5176\u4ed6\u5143\u6570\u636e\u5b57\u6bb5\n```\n\n### \u679a\u4e3e\u7c7b\u578b\n\n```python\nclass MessagePriority(str, Enum):\n \"\"\"\u6d88\u606f\u4f18\u5148\u7ea7\"\"\"\n HIGH = \"high\" # \u9ad8\u4f18\u5148\u7ea7\n NORMAL = \"normal\" # \u666e\u901a\u4f18\u5148\u7ea7\n LOW = \"low\" # \u4f4e\u4f18\u5148\u7ea7\n\nclass MessageStatus(str, Enum):\n \"\"\"\u6d88\u606f\u72b6\u6001\"\"\"\n PENDING = \"pending\" # \u5f85\u5904\u7406\n PROCESSING = \"processing\" # \u5904\u7406\u4e2d\n COMPLETED = \"completed\" # \u5df2\u5b8c\u6210\n RETRYING = \"retrying\" # \u91cd\u8bd5\u4e2d\n DEAD_LETTER = \"dead_letter\" # \u6b7b\u4fe1\n```\n\n## \u76d1\u63a7\u548c\u7ba1\u7406\n\n### \u6307\u6807\u6536\u96c6\n\n```python\nfrom mx_rmq import MetricsCollector\n\n# \u521b\u5efa\u6307\u6807\u6536\u96c6\u5668\ncollector = MetricsCollector(redis=mq.redis, queue_prefix=config.queue_prefix)\n\n# \u6536\u96c6\u6240\u6709\u6307\u6807\nmetrics = await collector.collect_all_metrics([\"order_created\", \"user_registration\"])\n\n# \u6253\u5370\u5173\u952e\u6307\u6807\nprint(f\"\u5f85\u5904\u7406\u6d88\u606f: {metrics['queue.order_created.pending']}\")\nprint(f\"\u5904\u7406\u4e2d\u6d88\u606f: {metrics['queue.order_created.processing']}\")\nprint(f\"\u603b\u541e\u5410\u91cf: {metrics['throughput.messages_per_minute']}\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217: {metrics['queue.dlq.count']}\")\n```\n\n### \u961f\u5217\u76d1\u63a7\n\n```python\n# \u76d1\u63a7\u5355\u4e2a\u961f\u5217\nqueue_metrics = await collector.collect_queue_metrics([\"order_created\"])\nprint(f\"\u8ba2\u5355\u961f\u5217\u72b6\u6001: {queue_metrics}\")\n\n# \u76d1\u63a7\u5904\u7406\u6027\u80fd\nprocessing_metrics = await collector.collect_processing_metrics([\"order_created\"])\nprint(f\"\u5e73\u5747\u5904\u7406\u65f6\u95f4: {processing_metrics['order_created.avg_processing_time']}ms\")\n```\n\n### \u6b7b\u4fe1\u961f\u5217\u7ba1\u7406\n\n```python\n# \u67e5\u770b\u6b7b\u4fe1\u961f\u5217\ndlq_count = await mq.redis.llen(\"dlq:queue\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u6570: {dlq_count}\")\n\n# \u83b7\u53d6\u6b7b\u4fe1\u6d88\u606f\u5217\u8868\ndlq_messages = await mq.redis.lrange(\"dlq:queue\", 0, 9) # \u83b7\u53d6\u524d10\u6761\nfor msg_id in dlq_messages:\n payload = await mq.redis.hget(\"dlq:payload:map\", msg_id)\n print(f\"\u6b7b\u4fe1\u6d88\u606f: {msg_id} - {payload}\")\n\n# \u624b\u52a8\u91cd\u8bd5\u6b7b\u4fe1\u6d88\u606f\uff08\u9700\u8981\u81ea\u5b9a\u4e49\u5b9e\u73b0\uff09\nasync def retry_dead_message(message_id: str):\n # \u4ece\u6b7b\u4fe1\u961f\u5217\u83b7\u53d6\u6d88\u606f\n payload_json = await mq.redis.hget(\"dlq:payload:map\", message_id)\n if payload_json:\n # \u89e3\u6790\u6d88\u606f\u5e76\u91cd\u65b0\u751f\u4ea7\n message = json.loads(payload_json)\n await mq.produce(message[\"topic\"], message[\"payload\"])\n # \u4ece\u6b7b\u4fe1\u961f\u5217\u79fb\u9664\n await mq.redis.lrem(\"dlq:queue\", 1, message_id)\n await mq.redis.hdel(\"dlq:payload:map\", message_id)\n```\n\n### \u5b9e\u65f6\u76d1\u63a7\u811a\u672c\n\n```python\nimport asyncio\nimport time\n\nasync def monitor_loop():\n \"\"\"\u5b9e\u65f6\u76d1\u63a7\u5faa\u73af\"\"\"\n collector = MetricsCollector(redis=mq.redis)\n \n while True:\n try:\n # \u6536\u96c6\u6307\u6807\n metrics = await collector.collect_all_metrics([\"order_created\"])\n \n # \u8f93\u51fa\u5173\u952e\u6307\u6807\n print(f\"[{time.strftime('%H:%M:%S')}] \u961f\u5217\u72b6\u6001:\")\n print(f\" \u5f85\u5904\u7406: {metrics.get('queue.order_created.pending', 0)}\")\n print(f\" \u5904\u7406\u4e2d: {metrics.get('queue.order_created.processing', 0)}\")\n print(f\" \u6b7b\u4fe1\u961f\u5217: {metrics.get('queue.dlq.count', 0)}\")\n \n # \u68c0\u67e5\u544a\u8b66\u6761\u4ef6\n pending = metrics.get('queue.order_created.pending', 0)\n if pending > 100:\n print(f\"\u26a0\ufe0f \u544a\u8b66: \u5f85\u5904\u7406\u6d88\u606f\u79ef\u538b ({pending})\")\n \n dlq_count = metrics.get('queue.dlq.count', 0)\n if dlq_count > 10:\n print(f\"\ud83d\udea8 \u544a\u8b66: \u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u8fc7\u591a ({dlq_count})\")\n \n except Exception as e:\n print(f\"\u76d1\u63a7\u9519\u8bef: {e}\")\n \n await asyncio.sleep(10) # \u6bcf10\u79d2\u68c0\u67e5\u4e00\u6b21\n\n# \u542f\u52a8\u76d1\u63a7\nasyncio.create_task(monitor_loop())\n```\n\n## \u90e8\u7f72\u6307\u5357\n\n### \u672c\u5730\u5f00\u53d1\u73af\u5883\n\n```bash\n# 1. \u542f\u52a8Redis\ndocker run -d --name redis -p 6379:6379 redis:7-alpine\n\n# 2. \u8fd0\u884c\u5e94\u7528\npython your_app.py\n```\n\n### Docker \u90e8\u7f72\n\n**Dockerfile:**\n```dockerfile\nFROM python:3.12-slim\n\nWORKDIR /app\n\n# \u5b89\u88c5\u4f9d\u8d56\nCOPY requirements.txt .\nRUN pip install -r requirements.txt\n\n# \u590d\u5236\u4ee3\u7801\nCOPY . .\n\n# \u542f\u52a8\u5e94\u7528\nCMD [\"python\", \"main.py\"]\n```\n\n**docker-compose.yml:**\n```yaml\nversion: '3.8'\nservices:\n redis:\n image: redis:7-alpine\n ports:\n - \"6379:6379\"\n volumes:\n - redis_data:/data\n command: redis-server --appendonly yes\n \n app:\n build: .\n depends_on:\n - redis\n environment:\n - REDIS_URL=redis://redis:6379\n - MQ_MAX_WORKERS=10\n restart: unless-stopped\n \nvolumes:\n redis_data:\n```\n\n### \u751f\u4ea7\u73af\u5883\u914d\u7f6e\n\n**Redis \u914d\u7f6e\u5efa\u8bae (redis.conf):**\n```conf\n# \u5185\u5b58\u7ba1\u7406\nmaxmemory-policy allkeys-lru\nmaxmemory 2gb\n\n# \u8fde\u63a5\u7ba1\u7406\ntimeout 300\ntcp-keepalive 300\n\n# \u6301\u4e45\u5316\u914d\u7f6e\nsave 900 1 # 900\u79d2\u5185\u81f3\u5c111\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\nsave 300 10 # 300\u79d2\u5185\u81f3\u5c1110\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\nsave 60 10000 # 60\u79d2\u5185\u81f3\u5c1110000\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\n\n# AOF\u6301\u4e45\u5316\nappendonly yes\nappendfsync everysec\n\n# \u6027\u80fd\u4f18\u5316\nhz 10\ndynamic-hz yes\n```\n\n**\u5e94\u7528\u914d\u7f6e:**\n```python\n# \u751f\u4ea7\u73af\u5883\u914d\u7f6e\nconfig = MQConfig(\n redis_url=\"redis://redis-cluster:6379\",\n redis_password=\"your_secure_password\",\n max_workers=20,\n task_queue_size=50,\n connection_pool_size=30,\n message_ttl=86400 * 7, # 7\u5929\n processing_timeout=600, # 10\u5206\u949f\n queue_prefix=\"prod\", # \u73af\u5883\u9694\u79bb\n)\n```\n\n### \u9ad8\u53ef\u7528\u90e8\u7f72\n\n**Redis Sentinel \u914d\u7f6e:**\n```python\nimport redis.sentinel\n\n# \u914d\u7f6eSentinel\nsentinels = [\n ('sentinel1', 26379),\n ('sentinel2', 26379), \n ('sentinel3', 26379),\n]\n\nsentinel = redis.sentinel.Sentinel(sentinels, socket_timeout=0.1)\n\n# \u53d1\u73b0\u4e3b\u8282\u70b9\nredis_master = sentinel.master_for('mymaster', socket_timeout=0.1)\n\n# \u81ea\u5b9a\u4e49Redis\u8fde\u63a5\nconfig = MQConfig(redis_url=\"\") # \u7559\u7a7a\uff0c\u4f7f\u7528\u81ea\u5b9a\u4e49\u8fde\u63a5\nmq = RedisMessageQueue(config)\nmq.redis = redis_master # \u4f7f\u7528Sentinel\u7ba1\u7406\u7684\u8fde\u63a5\n```\n\n### \u76d1\u63a7\u548c\u544a\u8b66\n\n**Prometheus \u6307\u6807\u66b4\u9732:**\n```python\nfrom prometheus_client import start_http_server, Gauge, Counter\n\n# \u5b9a\u4e49\u6307\u6807\nqueue_size = Gauge('mq_queue_size', 'Queue size', ['topic', 'status'])\nmessages_processed = Counter('mq_messages_processed_total', 'Messages processed', ['topic', 'status'])\n\nasync def export_metrics():\n \"\"\"\u5bfc\u51faPrometheus\u6307\u6807\"\"\"\n collector = MetricsCollector(redis=mq.redis)\n \n while True:\n metrics = await collector.collect_all_metrics(['order_created'])\n \n # \u66f4\u65b0Prometheus\u6307\u6807\n queue_size.labels(topic='order_created', status='pending').set(\n metrics.get('queue.order_created.pending', 0)\n )\n queue_size.labels(topic='order_created', status='processing').set(\n metrics.get('queue.order_created.processing', 0)\n )\n \n await asyncio.sleep(30)\n\n# \u542f\u52a8Prometheus HTTP\u670d\u52a1\u5668\nstart_http_server(8000)\nasyncio.create_task(export_metrics())\n```\n\n## \u6700\u4f73\u5b9e\u8df5\n\n### 1. \u6d88\u606f\u8bbe\u8ba1\n\n**\u2705 \u63a8\u8350\u505a\u6cd5:**\n```python\n# \u6d88\u606f\u7ed3\u6784\u6e05\u6670\uff0c\u5305\u542b\u5fc5\u8981\u7684\u4e0a\u4e0b\u6587\u4fe1\u606f\nawait mq.produce(\"order_created\", {\n \"order_id\": \"ORD_123456\",\n \"user_id\": 789,\n \"total_amount\": 99.99,\n \"currency\": \"USD\",\n \"timestamp\": \"2024-01-01T12:00:00Z\",\n \"metadata\": {\n \"source\": \"web\",\n \"version\": \"v1.0\"\n }\n})\n```\n\n**\u274c \u907f\u514d\u505a\u6cd5:**\n```python\n# \u6d88\u606f\u8fc7\u4e8e\u7b80\u5355\uff0c\u7f3a\u5c11\u4e0a\u4e0b\u6587\nawait mq.produce(\"process\", {\"id\": 123})\n\n# \u6d88\u606f\u8fc7\u4e8e\u590d\u6742\uff0c\u5305\u542b\u5927\u91cf\u6570\u636e\nawait mq.produce(\"user_update\", {\n \"user\": {...}, # \u5305\u542b\u7528\u6237\u7684\u6240\u6709\u4fe1\u606f\n \"history\": [...], # \u5305\u542b\u5b8c\u6574\u5386\u53f2\u8bb0\u5f55\n \"related_data\": {...} # \u5305\u542b\u5927\u91cf\u5173\u8054\u6570\u636e\n})\n```\n\n### 2. \u9519\u8bef\u5904\u7406\n\n**\u2705 \u63a8\u8350\u505a\u6cd5:**\n```python\nasync def handle_payment(payload: dict) -> None:\n try:\n order_id = payload[\"order_id\"]\n amount = payload[\"amount\"]\n \n # \u53c2\u6570\u9a8c\u8bc1\n if not order_id or amount <= 0:\n raise ValueError(f\"\u65e0\u6548\u7684\u8ba2\u5355\u53c2\u6570: {payload}\")\n \n # \u4e1a\u52a1\u903b\u8f91\n result = await process_payment(order_id, amount)\n \n # \u8bb0\u5f55\u6210\u529f\u65e5\u5fd7\n logger.info(\"\u652f\u4ed8\u5904\u7406\u6210\u529f\", order_id=order_id, amount=amount)\n \n except ValueError as e:\n # \u53c2\u6570\u9519\u8bef\uff0c\u4e0d\u91cd\u8bd5\n logger.error(\"\u652f\u4ed8\u53c2\u6570\u9519\u8bef\", error=str(e), payload=payload)\n raise # \u91cd\u65b0\u629b\u51fa\uff0c\u8fdb\u5165\u6b7b\u4fe1\u961f\u5217\n \n except PaymentGatewayError as e:\n # \u5916\u90e8\u670d\u52a1\u9519\u8bef\uff0c\u53ef\u91cd\u8bd5\n logger.warning(\"\u652f\u4ed8\u7f51\u5173\u9519\u8bef\", error=str(e), order_id=order_id)\n raise # \u91cd\u65b0\u629b\u51fa\uff0c\u89e6\u53d1\u91cd\u8bd5\n \n except Exception as e:\n # \u672a\u77e5\u9519\u8bef\n logger.error(\"\u652f\u4ed8\u5904\u7406\u5931\u8d25\", error=str(e), order_id=order_id)\n raise\n```\n\n### 3. \u5e42\u7b49\u6027\u5904\u7406\n\n```python\nasync def handle_order_created(payload: dict) -> None:\n order_id = payload[\"order_id\"]\n \n # \u68c0\u67e5\u662f\u5426\u5df2\u5904\u7406\uff08\u5e42\u7b49\u6027\u4fdd\u62a4\uff09\n if await is_order_processed(order_id):\n logger.info(\"\u8ba2\u5355\u5df2\u5904\u7406\uff0c\u8df3\u8fc7\", order_id=order_id)\n return\n \n try:\n # \u5904\u7406\u8ba2\u5355\n await process_order(order_id)\n \n # \u6807\u8bb0\u4e3a\u5df2\u5904\u7406\n await mark_order_processed(order_id)\n \n except Exception as e:\n logger.error(\"\u8ba2\u5355\u5904\u7406\u5931\u8d25\", order_id=order_id, error=str(e))\n raise\n```\n\n### 4. \u6027\u80fd\u4f18\u5316\n\n**\u5de5\u4f5c\u534f\u7a0b\u6570\u8c03\u4f18:**\n```python\nimport os\nimport multiprocessing\n\n# \u6839\u636eCPU\u6838\u5fc3\u6570\u548cIO\u7279\u6027\u8c03\u6574\u5de5\u4f5c\u534f\u7a0b\u6570\ncpu_count = multiprocessing.cpu_count()\n\nconfig = MQConfig(\n # CPU\u5bc6\u96c6\u578b\u4efb\u52a1\uff1a\u5de5\u4f5c\u534f\u7a0b\u6570 = CPU\u6838\u5fc3\u6570\n max_workers=cpu_count if is_cpu_intensive else cpu_count * 2,\n \n # IO\u5bc6\u96c6\u578b\u4efb\u52a1\uff1a\u5de5\u4f5c\u534f\u7a0b\u6570 = CPU\u6838\u5fc3\u6570 * 2-4\n # max_workers=cpu_count * 3,\n \n # \u4efb\u52a1\u961f\u5217\u5927\u5c0f\u5e94\u8be5\u5927\u4e8e\u5de5\u4f5c\u534f\u7a0b\u6570\n task_queue_size=max_workers * 2,\n)\n```\n\n**\u6279\u91cf\u5904\u7406\u4f18\u5316:**\n```python\nasync def handle_batch_emails(payload: dict) -> None:\n \"\"\"\u6279\u91cf\u5904\u7406\u90ae\u4ef6\u53d1\u9001\"\"\"\n email_list = payload[\"emails\"]\n \n # \u5206\u6279\u5904\u7406\uff0c\u907f\u514d\u5185\u5b58\u5360\u7528\u8fc7\u5927\n batch_size = 10\n for i in range(0, len(email_list), batch_size):\n batch = email_list[i:i + batch_size]\n \n # \u5e76\u53d1\u53d1\u9001\u90ae\u4ef6\n tasks = [send_email(email) for email in batch]\n await asyncio.gather(*tasks, return_exceptions=True)\n \n # \u907f\u514d\u8fc7\u5feb\u7684\u8bf7\u6c42\n await asyncio.sleep(0.1)\n```\n\n### 5. \u591a\u7ec4\u6d88\u8d39\u7684\u5b9e\u73b0\u65b9\u6848\n\n\u7531\u4e8e\u7cfb\u7edf\u4e0d\u652f\u6301\u6d88\u8d39\u8005\u7ec4\u529f\u80fd\uff0c\u5982\u9700\u5b9e\u73b0\u591a\u7ec4\u6d88\u8d39\u540c\u4e00\u6d88\u606f\uff0c\u5efa\u8bae\u91c7\u7528\u4ee5\u4e0b\u65b9\u6848\uff1a\n\n**\u2705 \u63a8\u8350\u505a\u6cd5:**\n```python\n# \u65b9\u68481\uff1a\u521b\u5efa\u591a\u4e2atopic\uff0c\u53d1\u9001\u591a\u6b21\u6d88\u606f\nasync def send_order_created(order_data: dict):\n \"\"\"\u53d1\u9001\u8ba2\u5355\u521b\u5efa\u6d88\u606f\u5230\u591a\u4e2a\u5904\u7406\u7ec4\"\"\"\n # \u53d1\u9001\u5230\u4e0d\u540c\u7684\u5904\u7406\u7ec4\n await mq.produce(\"order_created_payment\", order_data) # \u652f\u4ed8\u5904\u7406\u7ec4\n await mq.produce(\"order_created_inventory\", order_data) # \u5e93\u5b58\u5904\u7406\u7ec4\n await mq.produce(\"order_created_analytics\", order_data) # \u5206\u6790\u5904\u7406\u7ec4\n await mq.produce(\"order_created_notification\", order_data) # \u901a\u77e5\u5904\u7406\u7ec4\n\n# \u6ce8\u518c\u4e0d\u540c\u7684\u5904\u7406\u5668\n@mq.register(\"order_created_payment\")\nasync def handle_payment_processing(payload: dict):\n \"\"\"\u5904\u7406\u652f\u4ed8\u76f8\u5173\u903b\u8f91\"\"\"\n await process_payment(payload)\n\n@mq.register(\"order_created_inventory\")\nasync def handle_inventory_processing(payload: dict):\n \"\"\"\u5904\u7406\u5e93\u5b58\u76f8\u5173\u903b\u8f91\"\"\"\n await update_inventory(payload)\n\n@mq.register(\"order_created_analytics\")\nasync def handle_analytics_processing(payload: dict):\n \"\"\"\u5904\u7406\u5206\u6790\u76f8\u5173\u903b\u8f91\"\"\"\n await update_analytics(payload)\n\n@mq.register(\"order_created_notification\")\nasync def handle_notification_processing(payload: dict):\n \"\"\"\u5904\u7406\u901a\u77e5\u76f8\u5173\u903b\u8f91\"\"\"\n await send_notifications(payload)\n```\n\n**\u65b9\u68482\uff1a\u4f7f\u7528\u7edf\u4e00\u7684\u5206\u53d1\u5668**\n```python\n# \u521b\u5efa\u4e00\u4e2a\u5206\u53d1\u5668topic\n@mq.register(\"order_created\")\nasync def order_dispatcher(payload: dict):\n \"\"\"\u8ba2\u5355\u6d88\u606f\u5206\u53d1\u5668\"\"\"\n order_id = payload[\"order_id\"]\n \n # \u5e76\u53d1\u5206\u53d1\u5230\u5404\u4e2a\u5904\u7406\u7ec4\n tasks = [\n mq.produce(\"order_payment\", payload),\n mq.produce(\"order_inventory\", payload),\n mq.produce(\"order_analytics\", payload),\n mq.produce(\"order_notification\", payload),\n ]\n \n try:\n await asyncio.gather(*tasks)\n logger.info(\"\u8ba2\u5355\u6d88\u606f\u5206\u53d1\u6210\u529f\", order_id=order_id)\n except Exception as e:\n logger.error(\"\u8ba2\u5355\u6d88\u606f\u5206\u53d1\u5931\u8d25\", order_id=order_id, error=str(e))\n raise\n```\n\n**\u65b9\u68483\uff1a\u4f7f\u7528topic\u547d\u540d\u89c4\u8303**\n```python\n# \u4f7f\u7528\u7edf\u4e00\u7684\u547d\u540d\u89c4\u8303\nTOPIC_PATTERNS = {\n \"order_created\": [\n \"order_created.payment\",\n \"order_created.inventory\", \n \"order_created.analytics\",\n \"order_created.notification\"\n ]\n}\n\nasync def broadcast_message(base_topic: str, payload: dict):\n \"\"\"\u5e7f\u64ad\u6d88\u606f\u5230\u591a\u4e2a\u76f8\u5173topic\"\"\"\n topics = TOPIC_PATTERNS.get(base_topic, [base_topic])\n \n tasks = [mq.produce(topic, payload) for topic in topics]\n await asyncio.gather(*tasks)\n \n logger.info(\"\u6d88\u606f\u5e7f\u64ad\u5b8c\u6210\", base_topic=base_topic, target_topics=topics)\n\n# \u4f7f\u7528\u793a\u4f8b\nawait broadcast_message(\"order_created\", order_data)\n```\n\n### 6. \u76d1\u63a7\u548c\u544a\u8b66\n\n```python\nasync def setup_monitoring():\n \"\"\"\u8bbe\u7f6e\u76d1\u63a7\u548c\u544a\u8b66\"\"\"\n collector = MetricsCollector(redis=mq.redis)\n \n while True:\n try:\n metrics = await collector.collect_all_metrics([\"order_created\"])\n \n # \u961f\u5217\u79ef\u538b\u544a\u8b66\n pending = metrics.get('queue.order_created.pending', 0)\n if pending > 1000:\n await send_alert(f\"\u961f\u5217\u79ef\u538b\u4e25\u91cd: {pending} \u6761\u6d88\u606f\u5f85\u5904\u7406\")\n \n # \u6b7b\u4fe1\u961f\u5217\u544a\u8b66 \n dlq_count = metrics.get('queue.dlq.count', 0)\n if dlq_count > 50:\n await send_alert(f\"\u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u8fc7\u591a: {dlq_count} \u6761\")\n \n # \u5904\u7406\u65f6\u95f4\u544a\u8b66\n avg_time = metrics.get('processing.order_created.avg_time', 0)\n if avg_time > 30000: # 30\u79d2\n await send_alert(f\"\u6d88\u606f\u5904\u7406\u65f6\u95f4\u8fc7\u957f: {avg_time}ms\")\n \n except Exception as e:\n logger.error(\"\u76d1\u63a7\u68c0\u67e5\u5931\u8d25\", error=str(e))\n \n await asyncio.sleep(60) # \u6bcf\u5206\u949f\u68c0\u67e5\u4e00\u6b21\n```\n\n## \u6545\u969c\u6392\u9664\n\n### \u5e38\u89c1\u95ee\u9898\n\n#### Q1: \u6d88\u606f\u4e22\u5931\u600e\u4e48\u529e\uff1f\n\n**\u75c7\u72b6:** \u53d1\u9001\u7684\u6d88\u606f\u6ca1\u6709\u88ab\u5904\u7406\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. Redis \u8fde\u63a5\u4e2d\u65ad\n2. \u6d88\u8d39\u8005\u6ca1\u6709\u6b63\u786e\u542f\u52a8\n3. \u6d88\u606f\u5904\u7406\u5668\u629b\u51fa\u5f02\u5e38\u4f46\u6ca1\u6709\u6b63\u786e\u5904\u7406\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u68c0\u67e5Redis\u8fde\u63a5\ntry:\n await mq.redis.ping()\n print(\"Redis\u8fde\u63a5\u6b63\u5e38\")\nexcept Exception as e:\n print(f\"Redis\u8fde\u63a5\u5931\u8d25: {e}\")\n\n# 2. \u68c0\u67e5\u6d88\u606f\u662f\u5426\u5728\u961f\u5217\u4e2d\npending_count = await mq.redis.llen(\"order_created:pending\")\nprocessing_count = await mq.redis.llen(\"order_created:processing\")\nprint(f\"\u5f85\u5904\u7406: {pending_count}, \u5904\u7406\u4e2d: {processing_count}\")\n\n# 3. \u68c0\u67e5\u6b7b\u4fe1\u961f\u5217\ndlq_count = await mq.redis.llen(\"dlq:queue\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217: {dlq_count}\")\n```\n\n#### Q2: \u6d88\u606f\u5904\u7406\u8fc7\u6162\n\n**\u75c7\u72b6:** \u961f\u5217\u79ef\u538b\uff0c\u6d88\u606f\u5904\u7406\u4e0d\u53ca\u65f6\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. \u5de5\u4f5c\u534f\u7a0b\u6570\u4e0d\u8db3\n2. \u5904\u7406\u51fd\u6570\u6267\u884c\u65f6\u95f4\u8fc7\u957f\n3. Redis\u6027\u80fd\u74f6\u9888\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u589e\u52a0\u5de5\u4f5c\u534f\u7a0b\u6570\nconfig = MQConfig(max_workers=20) # \u589e\u52a0\u523020\u4e2a\n\n# 2. \u4f18\u5316\u5904\u7406\u51fd\u6570\nasync def optimized_handler(payload: dict) -> None:\n # \u4f7f\u7528\u5f02\u6b65IO\n async with aiohttp.ClientSession() as session:\n response = await session.post(url, json=payload)\n \n # \u907f\u514d\u963b\u585e\u64cd\u4f5c\n await asyncio.to_thread(blocking_operation, payload)\n\n# 3. \u76d1\u63a7\u5904\u7406\u65f6\u95f4\nimport time\n\nasync def timed_handler(payload: dict) -> None:\n start_time = time.time()\n try:\n await actual_handler(payload)\n finally:\n processing_time = time.time() - start_time\n if processing_time > 5: # \u5904\u7406\u65f6\u95f4\u8d85\u8fc75\u79d2\n logger.warning(\"\u5904\u7406\u65f6\u95f4\u8fc7\u957f\", time=processing_time, payload=payload)\n```\n\n#### Q3: \u5185\u5b58\u4f7f\u7528\u8fc7\u9ad8\n\n**\u75c7\u72b6:** \u5e94\u7528\u5185\u5b58\u6301\u7eed\u589e\u957f\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. \u672c\u5730\u961f\u5217\u79ef\u538b\n2. \u6d88\u606f\u5bf9\u8c61\u6ca1\u6709\u6b63\u786e\u91ca\u653e\n3. Redis\u8fde\u63a5\u6c60\u8fc7\u5927\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u8c03\u6574\u961f\u5217\u5927\u5c0f\nconfig = MQConfig(\n task_queue_size=10, # \u51cf\u5c11\u672c\u5730\u961f\u5217\u5927\u5c0f\n connection_pool_size=10, # \u51cf\u5c11\u8fde\u63a5\u6c60\u5927\u5c0f\n)\n\n# 2. \u76d1\u63a7\u5185\u5b58\u4f7f\u7528\nimport psutil\nimport gc\n\nasync def memory_monitor():\n while True:\n process = psutil.Process()\n memory_mb = process.memory_info().rss / 1024 / 1024\n \n if memory_mb > 500: # \u5185\u5b58\u8d85\u8fc7500MB\n logger.warning(\"\u5185\u5b58\u4f7f\u7528\u8fc7\u9ad8\", memory_mb=memory_mb)\n gc.collect() # \u5f3a\u5236\u5783\u573e\u56de\u6536\n \n await asyncio.sleep(60)\n```\n\n#### Q4: Redis \u8fde\u63a5\u9519\u8bef\n\n**\u75c7\u72b6:** `ConnectionError`, `TimeoutError`\n\n**\u89e3\u51b3\u65b9\u6848:**\n\n```python\n# 1. \u68c0\u67e5Redis\u914d\u7f6e\nconfig = MQConfig(\n redis_url=\"redis://localhost:6379\",\n connection_pool_size=20,\n # \u6dfb\u52a0\u8fde\u63a5\u91cd\u8bd5\n)\n\n# 2. \u5b9e\u73b0\u8fde\u63a5\u91cd\u8bd5\nasync def create_redis_with_retry(config: MQConfig, max_retries: int = 3):\n for attempt in range(max_retries):\n try:\n redis = aioredis.from_url(config.redis_host)\n await redis.ping()\n return redis\n except Exception as e:\n if attempt == max_retries - 1:\n raise\n logger.warning(f\"Redis\u8fde\u63a5\u5931\u8d25\uff0c\u91cd\u8bd5\u4e2d ({attempt + 1}/{max_retries})\")\n await asyncio.sleep(2 ** attempt)\n```\n\n### \u6027\u80fd\u8bca\u65ad\n\n#### \u5ef6\u8fdf\u5206\u6790\n\n```python\nimport time\nfrom collections import defaultdict\n\nclass PerformanceAnalyzer:\n def __init__(self):\n self.metrics = defaultdict(list)\n \n async def analyze_handler(self, handler_name: str, handler_func):\n \"\"\"\u5206\u6790\u5904\u7406\u5668\u6027\u80fd\"\"\"\n async def wrapped_handler(payload: dict):\n start_time = time.time()\n try:\n result = await handler_func(payload)\n return result\n finally:\n end_time = time.time()\n processing_time = (end_time - start_time) * 1000 # \u6beb\u79d2\n self.metrics[handler_name].append(processing_time)\n \n # \u5b9a\u671f\u8f93\u51fa\u7edf\u8ba1\u4fe1\u606f\n if len(self.metrics[handler_name]) % 100 == 0:\n times = self.metrics[handler_name]\n avg_time = sum(times) / len(times)\n max_time = max(times)\n min_time = min(times)\n \n print(f\"{handler_name} \u6027\u80fd\u7edf\u8ba1 (\u6700\u8fd1100\u6b21):\")\n print(f\" \u5e73\u5747\u65f6\u95f4: {avg_time:.2f}ms\")\n print(f\" \u6700\u5927\u65f6\u95f4: {max_time:.2f}ms\") \n print(f\" \u6700\u5c0f\u65f6\u95f4: {min_time:.2f}ms\")\n \n return wrapped_handler\n\n# \u4f7f\u7528\u793a\u4f8b\nanalyzer = PerformanceAnalyzer()\n\n@mq.register(\"order_created\")\nasync def handle_order(payload: dict):\n # \u5904\u7406\u903b\u8f91\n await process_order(payload)\n\n# \u5305\u88c5\u5904\u7406\u5668\u8fdb\u884c\u6027\u80fd\u5206\u6790\nmq.handlers[\"order_created\"] = await analyzer.analyze_handler(\n \"order_created\", \n handle_order\n)\n```\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "\u57fa\u4e8eRedis\u7684\u5f02\u6b65\u6d88\u606f\u961f\u5217\u7cfb\u7edf",
"version": "1.5.0",
"project_urls": {
"Documentation": "https://github.com/CodingOX/mx-rmq#readme",
"Homepage": "https://github.com/CodingOX/mx-rmq",
"Issues": "https://github.com/CodingOX/mx-rmq/issues",
"Repository": "https://github.com/CodingOX/mx-rmq"
},
"split_keywords": [
"async",
" distributed",
" message-queue",
" redis"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "00f83411fea1b74918361c202270370572bcae6490442d5144bdee7e9045a4b2",
"md5": "27dc2e4698681ea27c8356449a4c2bb0",
"sha256": "1700f87ef505cf2ece8106775d2a42b30ba0d4afc4b533396120c822960019a4"
},
"downloads": -1,
"filename": "mx_rmq-1.5.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "27dc2e4698681ea27c8356449a4c2bb0",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 60048,
"upload_time": "2025-07-31T10:35:56",
"upload_time_iso_8601": "2025-07-31T10:35:56.072945Z",
"url": "https://files.pythonhosted.org/packages/00/f8/3411fea1b74918361c202270370572bcae6490442d5144bdee7e9045a4b2/mx_rmq-1.5.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "eaa4d45e82c9dc6bae5175cc4c4d1c3d60470b70a6613032ba7edc2336784fad",
"md5": "c52b72d478748e011e1a789d7cd50e7d",
"sha256": "1437c67f666747a6ed09c2b20a6eaf7502d859b9bc2b7d265324cab7ac035a97"
},
"downloads": -1,
"filename": "mx_rmq-1.5.0.tar.gz",
"has_sig": false,
"md5_digest": "c52b72d478748e011e1a789d7cd50e7d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 82539,
"upload_time": "2025-07-31T10:35:57",
"upload_time_iso_8601": "2025-07-31T10:35:57.496725Z",
"url": "https://files.pythonhosted.org/packages/ea/a4/d45e82c9dc6bae5175cc4c4d1c3d60470b70a6613032ba7edc2336784fad/mx_rmq-1.5.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-31 10:35:57",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "CodingOX",
"github_project": "mx-rmq#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "mx-rmq"
}