# MX-RMQ 使用指南
[](https://www.python.org/downloads/)
[](https://opensource.org/licenses/MIT)
[](https://redis.io/)
MX-RMQ 是一个高性能、可靠的基于Redis的分布式消息队列系统,支持普通消息、延时消息、优先级消息,具备完善的监控和重试机制。
## 目录
- [特性概览](#特性概览)
- [快速开始](#快速开始)
- [安装](#安装)
- [基本使用](#基本使用)
- [高级功能](#高级功能)
- [配置参考](#配置参考)
- [API 参考](#api-参考)
- [监控和管理](#监控和管理)
- [部署指南](#部署指南)
- [最佳实践](#最佳实践)
- [故障排除](#故障排除)
## 特性概览
- 🚀 **高性能**: 基于Redis的内存存储,支持10,000+消息/秒的吞吐量
- 🔄 **可靠性**: 原子性Lua脚本操作,保证消息不丢失
- ⏰ **延时消息**: 支持任意时间延迟的消息调度
- 🏷️ **优先级**: 支持高、中、低优先级消息处理
- 🔁 **自动重试**: 可配置的重试机制和指数退避
- 💀 **死信队列**: 失败消息自动进入死信队列,支持人工干预
- 📊 **监控指标**: 实时监控队列状态、处理时间、吞吐率等
- 🛑 **优雅停机**: 支持优雅停机,确保消息处理完成
- 🔧 **易于使用**: 简洁的API设计,开箱即用
## 快速开始
### 30秒快速体验
```python
import asyncio
from mx_rmq import MQConfig, RedisMessageQueue
async def handle_order(payload: dict) -> None:
"""处理订单消息"""
print(f"处理订单: {payload['order_id']}")
# 你的业务逻辑
await asyncio.sleep(1)
async def main():
# 创建消息队列
mq = RedisMessageQueue()
# 注册消息处理器
mq.register("order_created", handle_order)
# 生产消息
await mq.produce("order_created", {
"order_id": "ORD_123",
"user_id": 456,
"amount": 99.99
})
# 启动消费者(会阻塞)
await mq.start_dispatch_consuming()
if __name__ == "__main__":
asyncio.run(main())
```
## 安装
### 使用 uv (推荐)
```bash
# 添加到现有项目
uv add mx-rmq
# 或者从源码安装
git clone https://github.com/CodingOX/mx-rmq.git
cd mx-rmq
uv sync
```
### 使用 pip
```bash
pip install mx-rmq
# 或从源码安装
pip install git+https://github.com/CodingOX/mx-rmq.git
```
### 系统要求
- Python 3.12+
- Redis 5.0+
- 推荐:Redis 6.0+ (更好的性能)
## 基本使用
### 1. 创建消息队列
```python
from mx_rmq import MQConfig, RedisMessageQueue
# 使用默认配置
mq = RedisMessageQueue()
# 或自定义配置
config = MQConfig(
redis_url="redis://localhost:6379",
max_workers=10,
task_queue_size=20
)
mq = RedisMessageQueue(config)
```
### 2. 注册消息处理器
```python
# 方式1: 使用装饰器
@mq.register("user_registration")
async def handle_user_registration(payload: dict) -> None:
user_id = payload['user_id']
email = payload['email']
print(f"欢迎新用户: {user_id} ({email})")
# 方式2: 直接注册
async def handle_payment(payload: dict) -> None:
print(f"处理支付: {payload}")
mq.register("payment_completed", handle_payment)
```
### 3. 生产消息
```python
# 生产普通消息
message_id = await mq.produce("user_registration", {
"user_id": 12345,
"email": "user@example.com",
"timestamp": "2024-01-01T00:00:00Z"
})
print(f"消息已发送: {message_id}")
```
### 4. 启动消费者
```python
# 启动消费者(会阻塞,直到收到停机信号)
await mq.start_dispatch_consuming()
```
## 高级功能
### 延时消息
```python
# 5分钟后发送提醒
await mq.produce(
topic="send_reminder",
payload={"user_id": 123, "type": "payment_due"},
delay=300 # 300秒后执行
)
# 1小时后发送邮件
await mq.produce(
topic="send_email",
payload={
"to": "user@example.com",
"subject": "订单确认",
"body": "感谢您的订单..."
},
delay=3600 # 1小时后执行
)
```
### 优先级消息
```python
from mx_rmq import MessagePriority
# 高优先级消息(优先处理)
await mq.produce(
topic="system_alert",
payload={"level": "critical", "message": "系统告警"},
priority=MessagePriority.HIGH
)
# 普通优先级(默认)
await mq.produce(
topic="user_activity",
payload={"user_id": 123, "action": "login"},
priority=MessagePriority.NORMAL
)
# 低优先级消息(最后处理)
await mq.produce(
topic="analytics_data",
payload={"event": "page_view", "page": "/home"},
priority=MessagePriority.LOW
)
```
### 自定义重试配置
```python
config = MQConfig(
redis_url="redis://localhost:6379",
max_retries=5, # 最大重试5次
retry_delays=[30, 60, 300, 900, 1800], # 重试间隔:30s, 1m, 5m, 15m, 30m
processing_timeout=300, # 5分钟处理超时
)
mq = RedisMessageQueue(config)
```
### 消息生存时间(TTL)
```python
# 设置消息1小时后过期
await mq.produce(
topic="temp_notification",
payload={"message": "临时通知"},
ttl=3600 # 1小时后过期
)
```
### 批量生产消息
```python
# 批量发送多个消息
messages = [
{"topic": "order_created", "payload": {"order_id": f"ORD_{i}"}}
for i in range(100)
]
for msg in messages:
await mq.produce(msg["topic"], msg["payload"])
```
## 配置参考
### MQConfig 完整参数
```python
from mx_rmq import MQConfig
config = MQConfig(
# Redis 连接配置
redis_url="redis://localhost:6379", # Redis连接URL
redis_db=0, # Redis数据库编号 (0-15)
redis_password=None, # Redis密码
queue_prefix="", # 队列前缀,用于多环境隔离
connection_pool_size=20, # 连接池大小
# 消费者配置
max_workers=5, # 最大工作协程数
task_queue_size=8, # 本地任务队列大小
# 消息生命周期配置
message_ttl=86400, # 消息TTL(秒),默认24小时
processing_timeout=180, # 消息处理超时(秒),默认3分钟
# 重试配置
max_retries=3, # 最大重试次数
retry_delays=[60, 300, 1800], # 重试延迟间隔(秒)
# 死信队列配置
enable_dead_letter=True, # 是否启用死信队列
# 监控配置
monitor_interval=30, # 监控检查间隔(秒)
expired_check_interval=10, # 过期消息检查间隔(秒)
processing_monitor_interval=30, # Processing队列监控间隔(秒)
batch_size=100, # 批处理大小
)
```
### 环境变量配置
支持通过环境变量配置:
```bash
export REDIS_URL="redis://localhost:6379"
export REDIS_PASSWORD="your_password"
export MQ_MAX_WORKERS=10
export MQ_TASK_QUEUE_SIZE=20
export MQ_MESSAGE_TTL=86400
```
```python
import os
from mx_rmq import MQConfig
config = MQConfig(
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
redis_password=os.getenv("REDIS_PASSWORD"),
max_workers=int(os.getenv("MQ_MAX_WORKERS", "5")),
task_queue_size=int(os.getenv("MQ_TASK_QUEUE_SIZE", "8")),
message_ttl=int(os.getenv("MQ_MESSAGE_TTL", "86400")),
)
```
## API 参考
### RedisMessageQueue 类
#### 初始化
```python
def __init__(self, config: MQConfig | None = None) -> None:
"""
初始化消息队列
Args:
config: 消息队列配置,如为None则使用默认配置
"""
```
#### 核心方法
```python
async def produce(
self,
topic: str,
payload: dict[str, Any],
delay: int = 0,
priority: MessagePriority = MessagePriority.NORMAL,
ttl: int | None = None,
message_id: str | None = None,
) -> str:
"""
生产消息
Args:
topic: 主题名称
payload: 消息负载(必须是可JSON序列化的字典)
delay: 延迟执行时间(秒),0表示立即执行
priority: 消息优先级
ttl: 消息生存时间(秒),None使用配置默认值
message_id: 消息ID,None则自动生成UUID
Returns:
消息ID(字符串)
Raises:
ValueError: 参数验证失败
RedisError: Redis操作失败
"""
def register(self, topic: str, handler: Callable) -> None:
"""
注册消息处理器
Args:
topic: 主题名称
handler: 处理函数,必须是async函数,接受一个dict参数
Raises:
ValueError: 处理器不是可调用对象
"""
async def start_dispatch_consuming(self) -> None:
"""
启动消息分发和消费
此方法会阻塞,直到收到停机信号(SIGINT/SIGTERM)
Raises:
RuntimeError: 系统未正确初始化
RedisError: Redis连接错误
"""
async def cleanup(self) -> None:
"""
清理资源,关闭Redis连接池
"""
```
### Message 类
```python
@dataclass
class Message:
"""消息数据类"""
id: str # 消息唯一ID
version: str # 消息格式版本
topic: str # 主题名称
payload: dict[str, Any] # 消息负载
priority: MessagePriority # 消息优先级
created_at: int # 创建时间戳(毫秒)
meta: MessageMeta # 消息元数据
@dataclass
class MessageMeta:
"""消息元数据"""
status: MessageStatus # 消息状态
retry_count: int # 重试次数
max_retries: int # 最大重试次数
retry_delays: list[int] # 重试延迟配置
last_error: str | None # 最后一次错误信息
expire_at: int # 过期时间戳
# ... 其他元数据字段
```
### 枚举类型
```python
class MessagePriority(str, Enum):
"""消息优先级"""
HIGH = "high" # 高优先级
NORMAL = "normal" # 普通优先级
LOW = "low" # 低优先级
class MessageStatus(str, Enum):
"""消息状态"""
PENDING = "pending" # 待处理
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
RETRYING = "retrying" # 重试中
DEAD_LETTER = "dead_letter" # 死信
```
## 监控和管理
### 指标收集
```python
from mx_rmq import MetricsCollector
# 创建指标收集器
collector = MetricsCollector(redis=mq.redis, queue_prefix=config.queue_prefix)
# 收集所有指标
metrics = await collector.collect_all_metrics(["order_created", "user_registration"])
# 打印关键指标
print(f"待处理消息: {metrics['queue.order_created.pending']}")
print(f"处理中消息: {metrics['queue.order_created.processing']}")
print(f"总吞吐量: {metrics['throughput.messages_per_minute']}")
print(f"死信队列: {metrics['queue.dlq.count']}")
```
### 队列监控
```python
# 监控单个队列
queue_metrics = await collector.collect_queue_metrics(["order_created"])
print(f"订单队列状态: {queue_metrics}")
# 监控处理性能
processing_metrics = await collector.collect_processing_metrics(["order_created"])
print(f"平均处理时间: {processing_metrics['order_created.avg_processing_time']}ms")
```
### 死信队列管理
```python
# 查看死信队列
dlq_count = await mq.redis.llen("dlq:queue")
print(f"死信队列消息数: {dlq_count}")
# 获取死信消息列表
dlq_messages = await mq.redis.lrange("dlq:queue", 0, 9) # 获取前10条
for msg_id in dlq_messages:
payload = await mq.redis.hget("dlq:payload:map", msg_id)
print(f"死信消息: {msg_id} - {payload}")
# 手动重试死信消息(需要自定义实现)
async def retry_dead_message(message_id: str):
# 从死信队列获取消息
payload_json = await mq.redis.hget("dlq:payload:map", message_id)
if payload_json:
# 解析消息并重新生产
message = json.loads(payload_json)
await mq.produce(message["topic"], message["payload"])
# 从死信队列移除
await mq.redis.lrem("dlq:queue", 1, message_id)
await mq.redis.hdel("dlq:payload:map", message_id)
```
### 实时监控脚本
```python
import asyncio
import time
async def monitor_loop():
"""实时监控循环"""
collector = MetricsCollector(redis=mq.redis)
while True:
try:
# 收集指标
metrics = await collector.collect_all_metrics(["order_created"])
# 输出关键指标
print(f"[{time.strftime('%H:%M:%S')}] 队列状态:")
print(f" 待处理: {metrics.get('queue.order_created.pending', 0)}")
print(f" 处理中: {metrics.get('queue.order_created.processing', 0)}")
print(f" 死信队列: {metrics.get('queue.dlq.count', 0)}")
# 检查告警条件
pending = metrics.get('queue.order_created.pending', 0)
if pending > 100:
print(f"⚠️ 告警: 待处理消息积压 ({pending})")
dlq_count = metrics.get('queue.dlq.count', 0)
if dlq_count > 10:
print(f"🚨 告警: 死信队列消息过多 ({dlq_count})")
except Exception as e:
print(f"监控错误: {e}")
await asyncio.sleep(10) # 每10秒检查一次
# 启动监控
asyncio.create_task(monitor_loop())
```
## 部署指南
### 本地开发环境
```bash
# 1. 启动Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine
# 2. 运行应用
python your_app.py
```
### Docker 部署
**Dockerfile:**
```dockerfile
FROM python:3.12-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制代码
COPY . .
# 启动应用
CMD ["python", "main.py"]
```
**docker-compose.yml:**
```yaml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
app:
build: .
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
- MQ_MAX_WORKERS=10
restart: unless-stopped
volumes:
redis_data:
```
### 生产环境配置
**Redis 配置建议 (redis.conf):**
```conf
# 内存管理
maxmemory-policy allkeys-lru
maxmemory 2gb
# 连接管理
timeout 300
tcp-keepalive 300
# 持久化配置
save 900 1 # 900秒内至少1个key变化时保存
save 300 10 # 300秒内至少10个key变化时保存
save 60 10000 # 60秒内至少10000个key变化时保存
# AOF持久化
appendonly yes
appendfsync everysec
# 性能优化
hz 10
dynamic-hz yes
```
**应用配置:**
```python
# 生产环境配置
config = MQConfig(
redis_url="redis://redis-cluster:6379",
redis_password="your_secure_password",
max_workers=20,
task_queue_size=50,
connection_pool_size=30,
message_ttl=86400 * 7, # 7天
processing_timeout=600, # 10分钟
queue_prefix="prod", # 环境隔离
)
```
### 高可用部署
**Redis Sentinel 配置:**
```python
import redis.sentinel
# 配置Sentinel
sentinels = [
('sentinel1', 26379),
('sentinel2', 26379),
('sentinel3', 26379),
]
sentinel = redis.sentinel.Sentinel(sentinels, socket_timeout=0.1)
# 发现主节点
redis_master = sentinel.master_for('mymaster', socket_timeout=0.1)
# 自定义Redis连接
config = MQConfig(redis_url="") # 留空,使用自定义连接
mq = RedisMessageQueue(config)
mq.redis = redis_master # 使用Sentinel管理的连接
```
### 监控和告警
**Prometheus 指标暴露:**
```python
from prometheus_client import start_http_server, Gauge, Counter
# 定义指标
queue_size = Gauge('mq_queue_size', 'Queue size', ['topic', 'status'])
messages_processed = Counter('mq_messages_processed_total', 'Messages processed', ['topic', 'status'])
async def export_metrics():
"""导出Prometheus指标"""
collector = MetricsCollector(redis=mq.redis)
while True:
metrics = await collector.collect_all_metrics(['order_created'])
# 更新Prometheus指标
queue_size.labels(topic='order_created', status='pending').set(
metrics.get('queue.order_created.pending', 0)
)
queue_size.labels(topic='order_created', status='processing').set(
metrics.get('queue.order_created.processing', 0)
)
await asyncio.sleep(30)
# 启动Prometheus HTTP服务器
start_http_server(8000)
asyncio.create_task(export_metrics())
```
## 最佳实践
### 1. 消息设计
**✅ 推荐做法:**
```python
# 消息结构清晰,包含必要的上下文信息
await mq.produce("order_created", {
"order_id": "ORD_123456",
"user_id": 789,
"total_amount": 99.99,
"currency": "USD",
"timestamp": "2024-01-01T12:00:00Z",
"metadata": {
"source": "web",
"version": "v1.0"
}
})
```
**❌ 避免做法:**
```python
# 消息过于简单,缺少上下文
await mq.produce("process", {"id": 123})
# 消息过于复杂,包含大量数据
await mq.produce("user_update", {
"user": {...}, # 包含用户的所有信息
"history": [...], # 包含完整历史记录
"related_data": {...} # 包含大量关联数据
})
```
### 2. 错误处理
**✅ 推荐做法:**
```python
async def handle_payment(payload: dict) -> None:
try:
order_id = payload["order_id"]
amount = payload["amount"]
# 参数验证
if not order_id or amount <= 0:
raise ValueError(f"无效的订单参数: {payload}")
# 业务逻辑
result = await process_payment(order_id, amount)
# 记录成功日志
logger.info("支付处理成功", order_id=order_id, amount=amount)
except ValueError as e:
# 参数错误,不重试
logger.error("支付参数错误", error=str(e), payload=payload)
raise # 重新抛出,进入死信队列
except PaymentGatewayError as e:
# 外部服务错误,可重试
logger.warning("支付网关错误", error=str(e), order_id=order_id)
raise # 重新抛出,触发重试
except Exception as e:
# 未知错误
logger.error("支付处理失败", error=str(e), order_id=order_id)
raise
```
### 3. 幂等性处理
```python
async def handle_order_created(payload: dict) -> None:
order_id = payload["order_id"]
# 检查是否已处理(幂等性保护)
if await is_order_processed(order_id):
logger.info("订单已处理,跳过", order_id=order_id)
return
try:
# 处理订单
await process_order(order_id)
# 标记为已处理
await mark_order_processed(order_id)
except Exception as e:
logger.error("订单处理失败", order_id=order_id, error=str(e))
raise
```
### 4. 性能优化
**工作协程数调优:**
```python
import os
import multiprocessing
# 根据CPU核心数和IO特性调整工作协程数
cpu_count = multiprocessing.cpu_count()
config = MQConfig(
# CPU密集型任务:工作协程数 = CPU核心数
max_workers=cpu_count if is_cpu_intensive else cpu_count * 2,
# IO密集型任务:工作协程数 = CPU核心数 * 2-4
# max_workers=cpu_count * 3,
# 任务队列大小应该大于工作协程数
task_queue_size=max_workers * 2,
)
```
**批量处理优化:**
```python
async def handle_batch_emails(payload: dict) -> None:
"""批量处理邮件发送"""
email_list = payload["emails"]
# 分批处理,避免内存占用过大
batch_size = 10
for i in range(0, len(email_list), batch_size):
batch = email_list[i:i + batch_size]
# 并发发送邮件
tasks = [send_email(email) for email in batch]
await asyncio.gather(*tasks, return_exceptions=True)
# 避免过快的请求
await asyncio.sleep(0.1)
```
### 5. 监控和告警
```python
async def setup_monitoring():
"""设置监控和告警"""
collector = MetricsCollector(redis=mq.redis)
while True:
try:
metrics = await collector.collect_all_metrics(["order_created"])
# 队列积压告警
pending = metrics.get('queue.order_created.pending', 0)
if pending > 1000:
await send_alert(f"队列积压严重: {pending} 条消息待处理")
# 死信队列告警
dlq_count = metrics.get('queue.dlq.count', 0)
if dlq_count > 50:
await send_alert(f"死信队列消息过多: {dlq_count} 条")
# 处理时间告警
avg_time = metrics.get('processing.order_created.avg_time', 0)
if avg_time > 30000: # 30秒
await send_alert(f"消息处理时间过长: {avg_time}ms")
except Exception as e:
logger.error("监控检查失败", error=str(e))
await asyncio.sleep(60) # 每分钟检查一次
```
## 故障排除
### 常见问题
#### Q1: 消息丢失怎么办?
**症状:** 发送的消息没有被处理
**可能原因:**
1. Redis 连接中断
2. 消费者没有正确启动
3. 消息处理器抛出异常但没有正确处理
**解决方案:**
```python
# 1. 检查Redis连接
try:
await mq.redis.ping()
print("Redis连接正常")
except Exception as e:
print(f"Redis连接失败: {e}")
# 2. 检查消息是否在队列中
pending_count = await mq.redis.llen("order_created:pending")
processing_count = await mq.redis.llen("order_created:processing")
print(f"待处理: {pending_count}, 处理中: {processing_count}")
# 3. 检查死信队列
dlq_count = await mq.redis.llen("dlq:queue")
print(f"死信队列: {dlq_count}")
```
#### Q2: 消息处理过慢
**症状:** 队列积压,消息处理不及时
**可能原因:**
1. 工作协程数不足
2. 处理函数执行时间过长
3. Redis性能瓶颈
**解决方案:**
```python
# 1. 增加工作协程数
config = MQConfig(max_workers=20) # 增加到20个
# 2. 优化处理函数
async def optimized_handler(payload: dict) -> None:
# 使用异步IO
async with aiohttp.ClientSession() as session:
response = await session.post(url, json=payload)
# 避免阻塞操作
await asyncio.to_thread(blocking_operation, payload)
# 3. 监控处理时间
import time
async def timed_handler(payload: dict) -> None:
start_time = time.time()
try:
await actual_handler(payload)
finally:
processing_time = time.time() - start_time
if processing_time > 5: # 处理时间超过5秒
logger.warning("处理时间过长", time=processing_time, payload=payload)
```
#### Q3: 内存使用过高
**症状:** 应用内存持续增长
**可能原因:**
1. 本地队列积压
2. 消息对象没有正确释放
3. Redis连接池过大
**解决方案:**
```python
# 1. 调整队列大小
config = MQConfig(
task_queue_size=10, # 减少本地队列大小
connection_pool_size=10, # 减少连接池大小
)
# 2. 监控内存使用
import psutil
import gc
async def memory_monitor():
while True:
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
if memory_mb > 500: # 内存超过500MB
logger.warning("内存使用过高", memory_mb=memory_mb)
gc.collect() # 强制垃圾回收
await asyncio.sleep(60)
```
#### Q4: Redis 连接错误
**症状:** `ConnectionError`, `TimeoutError`
**解决方案:**
```python
# 1. 检查Redis配置
config = MQConfig(
redis_url="redis://localhost:6379",
connection_pool_size=20,
# 添加连接重试
)
# 2. 实现连接重试
async def create_redis_with_retry(config: MQConfig, max_retries: int = 3):
for attempt in range(max_retries):
try:
redis = aioredis.from_url(config.redis_url)
await redis.ping()
return redis
except Exception as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Redis连接失败,重试中 ({attempt + 1}/{max_retries})")
await asyncio.sleep(2 ** attempt)
```
### 性能诊断
#### 延迟分析
```python
import time
from collections import defaultdict
class PerformanceAnalyzer:
def __init__(self):
self.metrics = defaultdict(list)
async def analyze_handler(self, handler_name: str, handler_func):
"""分析处理器性能"""
async def wrapped_handler(payload: dict):
start_time = time.time()
try:
result = await handler_func(payload)
return result
finally:
end_time = time.time()
processing_time = (end_time - start_time) * 1000 # 毫秒
self.metrics[handler_name].append(processing_time)
# 定期输出统计信息
if len(self.metrics[handler_name]) % 100 == 0:
times = self.metrics[handler_name]
avg_time = sum(times) / len(times)
max_time = max(times)
min_time = min(times)
print(f"{handler_name} 性能统计 (最近100次):")
print(f" 平均时间: {avg_time:.2f}ms")
print(f" 最大时间: {max_time:.2f}ms")
print(f" 最小时间: {min_time:.2f}ms")
return wrapped_handler
# 使用示例
analyzer = PerformanceAnalyzer()
@mq.register("order_created")
async def handle_order(payload: dict):
# 处理逻辑
await process_order(payload)
# 包装处理器进行性能分析
mq.handlers["order_created"] = await analyzer.analyze_handler(
"order_created",
handle_order
)
```
Raw data
{
"_id": null,
"home_page": null,
"name": "mx-rmq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.12",
"maintainer_email": null,
"keywords": "async, distributed, message-queue, redis",
"author": null,
"author_email": "Alistar Max <codingox@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/6b/a9/4407c824c069f7920cbedb3f12342ae8414183b5b05022770ac016857b2e/mx_rmq-1.0.2.tar.gz",
"platform": null,
"description": "# MX-RMQ \u4f7f\u7528\u6307\u5357\n\n[](https://www.python.org/downloads/)\n[](https://opensource.org/licenses/MIT)\n[](https://redis.io/)\n\nMX-RMQ \u662f\u4e00\u4e2a\u9ad8\u6027\u80fd\u3001\u53ef\u9760\u7684\u57fa\u4e8eRedis\u7684\u5206\u5e03\u5f0f\u6d88\u606f\u961f\u5217\u7cfb\u7edf\uff0c\u652f\u6301\u666e\u901a\u6d88\u606f\u3001\u5ef6\u65f6\u6d88\u606f\u3001\u4f18\u5148\u7ea7\u6d88\u606f\uff0c\u5177\u5907\u5b8c\u5584\u7684\u76d1\u63a7\u548c\u91cd\u8bd5\u673a\u5236\u3002\n\n## \u76ee\u5f55\n\n- [\u7279\u6027\u6982\u89c8](#\u7279\u6027\u6982\u89c8)\n- [\u5feb\u901f\u5f00\u59cb](#\u5feb\u901f\u5f00\u59cb)\n- [\u5b89\u88c5](#\u5b89\u88c5)\n- [\u57fa\u672c\u4f7f\u7528](#\u57fa\u672c\u4f7f\u7528)\n- [\u9ad8\u7ea7\u529f\u80fd](#\u9ad8\u7ea7\u529f\u80fd)\n- [\u914d\u7f6e\u53c2\u8003](#\u914d\u7f6e\u53c2\u8003)\n- [API \u53c2\u8003](#api-\u53c2\u8003)\n- [\u76d1\u63a7\u548c\u7ba1\u7406](#\u76d1\u63a7\u548c\u7ba1\u7406)\n- [\u90e8\u7f72\u6307\u5357](#\u90e8\u7f72\u6307\u5357)\n- [\u6700\u4f73\u5b9e\u8df5](#\u6700\u4f73\u5b9e\u8df5)\n- [\u6545\u969c\u6392\u9664](#\u6545\u969c\u6392\u9664)\n\n## \u7279\u6027\u6982\u89c8\n\n- \ud83d\ude80 **\u9ad8\u6027\u80fd**: \u57fa\u4e8eRedis\u7684\u5185\u5b58\u5b58\u50a8\uff0c\u652f\u630110,000+\u6d88\u606f/\u79d2\u7684\u541e\u5410\u91cf\n- \ud83d\udd04 **\u53ef\u9760\u6027**: \u539f\u5b50\u6027Lua\u811a\u672c\u64cd\u4f5c\uff0c\u4fdd\u8bc1\u6d88\u606f\u4e0d\u4e22\u5931\n- \u23f0 **\u5ef6\u65f6\u6d88\u606f**: \u652f\u6301\u4efb\u610f\u65f6\u95f4\u5ef6\u8fdf\u7684\u6d88\u606f\u8c03\u5ea6\n- \ud83c\udff7\ufe0f **\u4f18\u5148\u7ea7**: \u652f\u6301\u9ad8\u3001\u4e2d\u3001\u4f4e\u4f18\u5148\u7ea7\u6d88\u606f\u5904\u7406\n- \ud83d\udd01 **\u81ea\u52a8\u91cd\u8bd5**: \u53ef\u914d\u7f6e\u7684\u91cd\u8bd5\u673a\u5236\u548c\u6307\u6570\u9000\u907f\n- \ud83d\udc80 **\u6b7b\u4fe1\u961f\u5217**: \u5931\u8d25\u6d88\u606f\u81ea\u52a8\u8fdb\u5165\u6b7b\u4fe1\u961f\u5217\uff0c\u652f\u6301\u4eba\u5de5\u5e72\u9884\n- \ud83d\udcca **\u76d1\u63a7\u6307\u6807**: \u5b9e\u65f6\u76d1\u63a7\u961f\u5217\u72b6\u6001\u3001\u5904\u7406\u65f6\u95f4\u3001\u541e\u5410\u7387\u7b49\n- \ud83d\uded1 **\u4f18\u96c5\u505c\u673a**: \u652f\u6301\u4f18\u96c5\u505c\u673a\uff0c\u786e\u4fdd\u6d88\u606f\u5904\u7406\u5b8c\u6210\n- \ud83d\udd27 **\u6613\u4e8e\u4f7f\u7528**: \u7b80\u6d01\u7684API\u8bbe\u8ba1\uff0c\u5f00\u7bb1\u5373\u7528\n\n## \u5feb\u901f\u5f00\u59cb\n\n### 30\u79d2\u5feb\u901f\u4f53\u9a8c\n\n```python\nimport asyncio\nfrom mx_rmq import MQConfig, RedisMessageQueue\n\nasync def handle_order(payload: dict) -> None:\n \"\"\"\u5904\u7406\u8ba2\u5355\u6d88\u606f\"\"\"\n print(f\"\u5904\u7406\u8ba2\u5355: {payload['order_id']}\")\n # \u4f60\u7684\u4e1a\u52a1\u903b\u8f91\n await asyncio.sleep(1)\n\nasync def main():\n # \u521b\u5efa\u6d88\u606f\u961f\u5217\n mq = RedisMessageQueue()\n \n # \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n mq.register(\"order_created\", handle_order)\n \n # \u751f\u4ea7\u6d88\u606f\n await mq.produce(\"order_created\", {\n \"order_id\": \"ORD_123\",\n \"user_id\": 456,\n \"amount\": 99.99\n })\n \n # \u542f\u52a8\u6d88\u8d39\u8005\uff08\u4f1a\u963b\u585e\uff09\n await mq.start_dispatch_consuming()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## \u5b89\u88c5\n\n### \u4f7f\u7528 uv (\u63a8\u8350)\n\n```bash\n# \u6dfb\u52a0\u5230\u73b0\u6709\u9879\u76ee\nuv add mx-rmq\n\n# \u6216\u8005\u4ece\u6e90\u7801\u5b89\u88c5\ngit clone https://github.com/CodingOX/mx-rmq.git\ncd mx-rmq\nuv sync\n```\n\n### \u4f7f\u7528 pip\n\n```bash\npip install mx-rmq\n\n# \u6216\u4ece\u6e90\u7801\u5b89\u88c5\npip install git+https://github.com/CodingOX/mx-rmq.git\n```\n\n### \u7cfb\u7edf\u8981\u6c42\n\n- Python 3.12+\n- Redis 5.0+\n- \u63a8\u8350\uff1aRedis 6.0+ (\u66f4\u597d\u7684\u6027\u80fd)\n\n## \u57fa\u672c\u4f7f\u7528\n\n### 1. \u521b\u5efa\u6d88\u606f\u961f\u5217\n\n```python\nfrom mx_rmq import MQConfig, RedisMessageQueue\n\n# \u4f7f\u7528\u9ed8\u8ba4\u914d\u7f6e\nmq = RedisMessageQueue()\n\n# \u6216\u81ea\u5b9a\u4e49\u914d\u7f6e\nconfig = MQConfig(\n redis_url=\"redis://localhost:6379\",\n max_workers=10,\n task_queue_size=20\n)\nmq = RedisMessageQueue(config)\n```\n\n### 2. \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n\n```python\n# \u65b9\u5f0f1: \u4f7f\u7528\u88c5\u9970\u5668\n@mq.register(\"user_registration\")\nasync def handle_user_registration(payload: dict) -> None:\n user_id = payload['user_id']\n email = payload['email']\n print(f\"\u6b22\u8fce\u65b0\u7528\u6237: {user_id} ({email})\")\n\n# \u65b9\u5f0f2: \u76f4\u63a5\u6ce8\u518c\nasync def handle_payment(payload: dict) -> None:\n print(f\"\u5904\u7406\u652f\u4ed8: {payload}\")\n\nmq.register(\"payment_completed\", handle_payment)\n```\n\n### 3. \u751f\u4ea7\u6d88\u606f\n\n```python\n# \u751f\u4ea7\u666e\u901a\u6d88\u606f\nmessage_id = await mq.produce(\"user_registration\", {\n \"user_id\": 12345,\n \"email\": \"user@example.com\",\n \"timestamp\": \"2024-01-01T00:00:00Z\"\n})\n\nprint(f\"\u6d88\u606f\u5df2\u53d1\u9001: {message_id}\")\n```\n\n### 4. \u542f\u52a8\u6d88\u8d39\u8005\n\n```python\n# \u542f\u52a8\u6d88\u8d39\u8005\uff08\u4f1a\u963b\u585e\uff0c\u76f4\u5230\u6536\u5230\u505c\u673a\u4fe1\u53f7\uff09\nawait mq.start_dispatch_consuming()\n```\n\n## \u9ad8\u7ea7\u529f\u80fd\n\n### \u5ef6\u65f6\u6d88\u606f\n\n```python\n# 5\u5206\u949f\u540e\u53d1\u9001\u63d0\u9192\nawait mq.produce(\n topic=\"send_reminder\",\n payload={\"user_id\": 123, \"type\": \"payment_due\"},\n delay=300 # 300\u79d2\u540e\u6267\u884c\n)\n\n# 1\u5c0f\u65f6\u540e\u53d1\u9001\u90ae\u4ef6\nawait mq.produce(\n topic=\"send_email\",\n payload={\n \"to\": \"user@example.com\",\n \"subject\": \"\u8ba2\u5355\u786e\u8ba4\",\n \"body\": \"\u611f\u8c22\u60a8\u7684\u8ba2\u5355...\"\n },\n delay=3600 # 1\u5c0f\u65f6\u540e\u6267\u884c\n)\n```\n\n### \u4f18\u5148\u7ea7\u6d88\u606f\n\n```python\nfrom mx_rmq import MessagePriority\n\n# \u9ad8\u4f18\u5148\u7ea7\u6d88\u606f\uff08\u4f18\u5148\u5904\u7406\uff09\nawait mq.produce(\n topic=\"system_alert\",\n payload={\"level\": \"critical\", \"message\": \"\u7cfb\u7edf\u544a\u8b66\"},\n priority=MessagePriority.HIGH\n)\n\n# \u666e\u901a\u4f18\u5148\u7ea7\uff08\u9ed8\u8ba4\uff09\nawait mq.produce(\n topic=\"user_activity\",\n payload={\"user_id\": 123, \"action\": \"login\"},\n priority=MessagePriority.NORMAL\n)\n\n# \u4f4e\u4f18\u5148\u7ea7\u6d88\u606f\uff08\u6700\u540e\u5904\u7406\uff09\nawait mq.produce(\n topic=\"analytics_data\",\n payload={\"event\": \"page_view\", \"page\": \"/home\"},\n priority=MessagePriority.LOW\n)\n```\n\n### \u81ea\u5b9a\u4e49\u91cd\u8bd5\u914d\u7f6e\n\n```python\nconfig = MQConfig(\n redis_url=\"redis://localhost:6379\",\n max_retries=5, # \u6700\u5927\u91cd\u8bd55\u6b21\n retry_delays=[30, 60, 300, 900, 1800], # \u91cd\u8bd5\u95f4\u9694\uff1a30s, 1m, 5m, 15m, 30m\n processing_timeout=300, # 5\u5206\u949f\u5904\u7406\u8d85\u65f6\n)\n\nmq = RedisMessageQueue(config)\n```\n\n### \u6d88\u606f\u751f\u5b58\u65f6\u95f4(TTL)\n\n```python\n# \u8bbe\u7f6e\u6d88\u606f1\u5c0f\u65f6\u540e\u8fc7\u671f\nawait mq.produce(\n topic=\"temp_notification\",\n payload={\"message\": \"\u4e34\u65f6\u901a\u77e5\"},\n ttl=3600 # 1\u5c0f\u65f6\u540e\u8fc7\u671f\n)\n```\n\n### \u6279\u91cf\u751f\u4ea7\u6d88\u606f\n\n```python\n# \u6279\u91cf\u53d1\u9001\u591a\u4e2a\u6d88\u606f\nmessages = [\n {\"topic\": \"order_created\", \"payload\": {\"order_id\": f\"ORD_{i}\"}}\n for i in range(100)\n]\n\nfor msg in messages:\n await mq.produce(msg[\"topic\"], msg[\"payload\"])\n```\n\n## \u914d\u7f6e\u53c2\u8003\n\n### MQConfig \u5b8c\u6574\u53c2\u6570\n\n```python\nfrom mx_rmq import MQConfig\n\nconfig = MQConfig(\n # Redis \u8fde\u63a5\u914d\u7f6e\n redis_url=\"redis://localhost:6379\", # Redis\u8fde\u63a5URL\n redis_db=0, # Redis\u6570\u636e\u5e93\u7f16\u53f7 (0-15)\n redis_password=None, # Redis\u5bc6\u7801\n queue_prefix=\"\", # \u961f\u5217\u524d\u7f00\uff0c\u7528\u4e8e\u591a\u73af\u5883\u9694\u79bb\n connection_pool_size=20, # \u8fde\u63a5\u6c60\u5927\u5c0f\n \n # \u6d88\u8d39\u8005\u914d\u7f6e\n max_workers=5, # \u6700\u5927\u5de5\u4f5c\u534f\u7a0b\u6570\n task_queue_size=8, # \u672c\u5730\u4efb\u52a1\u961f\u5217\u5927\u5c0f\n \n # \u6d88\u606f\u751f\u547d\u5468\u671f\u914d\u7f6e\n message_ttl=86400, # \u6d88\u606fTTL\uff08\u79d2\uff09\uff0c\u9ed8\u8ba424\u5c0f\u65f6\n processing_timeout=180, # \u6d88\u606f\u5904\u7406\u8d85\u65f6\uff08\u79d2\uff09\uff0c\u9ed8\u8ba43\u5206\u949f\n \n # \u91cd\u8bd5\u914d\u7f6e\n max_retries=3, # \u6700\u5927\u91cd\u8bd5\u6b21\u6570\n retry_delays=[60, 300, 1800], # \u91cd\u8bd5\u5ef6\u8fdf\u95f4\u9694\uff08\u79d2\uff09\n \n # \u6b7b\u4fe1\u961f\u5217\u914d\u7f6e\n enable_dead_letter=True, # \u662f\u5426\u542f\u7528\u6b7b\u4fe1\u961f\u5217\n \n # \u76d1\u63a7\u914d\u7f6e\n monitor_interval=30, # \u76d1\u63a7\u68c0\u67e5\u95f4\u9694\uff08\u79d2\uff09\n expired_check_interval=10, # \u8fc7\u671f\u6d88\u606f\u68c0\u67e5\u95f4\u9694\uff08\u79d2\uff09\n processing_monitor_interval=30, # Processing\u961f\u5217\u76d1\u63a7\u95f4\u9694\uff08\u79d2\uff09\n batch_size=100, # \u6279\u5904\u7406\u5927\u5c0f\n)\n```\n\n### \u73af\u5883\u53d8\u91cf\u914d\u7f6e\n\n\u652f\u6301\u901a\u8fc7\u73af\u5883\u53d8\u91cf\u914d\u7f6e\uff1a\n\n```bash\nexport REDIS_URL=\"redis://localhost:6379\"\nexport REDIS_PASSWORD=\"your_password\"\nexport MQ_MAX_WORKERS=10\nexport MQ_TASK_QUEUE_SIZE=20\nexport MQ_MESSAGE_TTL=86400\n```\n\n```python\nimport os\nfrom mx_rmq import MQConfig\n\nconfig = MQConfig(\n redis_url=os.getenv(\"REDIS_URL\", \"redis://localhost:6379\"),\n redis_password=os.getenv(\"REDIS_PASSWORD\"),\n max_workers=int(os.getenv(\"MQ_MAX_WORKERS\", \"5\")),\n task_queue_size=int(os.getenv(\"MQ_TASK_QUEUE_SIZE\", \"8\")),\n message_ttl=int(os.getenv(\"MQ_MESSAGE_TTL\", \"86400\")),\n)\n```\n\n## API \u53c2\u8003\n\n### RedisMessageQueue \u7c7b\n\n#### \u521d\u59cb\u5316\n\n```python\ndef __init__(self, config: MQConfig | None = None) -> None:\n \"\"\"\n \u521d\u59cb\u5316\u6d88\u606f\u961f\u5217\n \n Args:\n config: \u6d88\u606f\u961f\u5217\u914d\u7f6e\uff0c\u5982\u4e3aNone\u5219\u4f7f\u7528\u9ed8\u8ba4\u914d\u7f6e\n \"\"\"\n```\n\n#### \u6838\u5fc3\u65b9\u6cd5\n\n```python\nasync def produce(\n self,\n topic: str,\n payload: dict[str, Any],\n delay: int = 0,\n priority: MessagePriority = MessagePriority.NORMAL,\n ttl: int | None = None,\n message_id: str | None = None,\n) -> str:\n \"\"\"\n \u751f\u4ea7\u6d88\u606f\n \n Args:\n topic: \u4e3b\u9898\u540d\u79f0\n payload: \u6d88\u606f\u8d1f\u8f7d\uff08\u5fc5\u987b\u662f\u53efJSON\u5e8f\u5217\u5316\u7684\u5b57\u5178\uff09\n delay: \u5ef6\u8fdf\u6267\u884c\u65f6\u95f4\uff08\u79d2\uff09\uff0c0\u8868\u793a\u7acb\u5373\u6267\u884c\n priority: \u6d88\u606f\u4f18\u5148\u7ea7\n ttl: \u6d88\u606f\u751f\u5b58\u65f6\u95f4\uff08\u79d2\uff09\uff0cNone\u4f7f\u7528\u914d\u7f6e\u9ed8\u8ba4\u503c\n message_id: \u6d88\u606fID\uff0cNone\u5219\u81ea\u52a8\u751f\u6210UUID\n \n Returns:\n \u6d88\u606fID\uff08\u5b57\u7b26\u4e32\uff09\n \n Raises:\n ValueError: \u53c2\u6570\u9a8c\u8bc1\u5931\u8d25\n RedisError: Redis\u64cd\u4f5c\u5931\u8d25\n \"\"\"\n\ndef register(self, topic: str, handler: Callable) -> None:\n \"\"\"\n \u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\n \n Args:\n topic: \u4e3b\u9898\u540d\u79f0\n handler: \u5904\u7406\u51fd\u6570\uff0c\u5fc5\u987b\u662fasync\u51fd\u6570\uff0c\u63a5\u53d7\u4e00\u4e2adict\u53c2\u6570\n \n Raises:\n ValueError: \u5904\u7406\u5668\u4e0d\u662f\u53ef\u8c03\u7528\u5bf9\u8c61\n \"\"\"\n\nasync def start_dispatch_consuming(self) -> None:\n \"\"\"\n \u542f\u52a8\u6d88\u606f\u5206\u53d1\u548c\u6d88\u8d39\n \n \u6b64\u65b9\u6cd5\u4f1a\u963b\u585e\uff0c\u76f4\u5230\u6536\u5230\u505c\u673a\u4fe1\u53f7(SIGINT/SIGTERM)\n \n Raises:\n RuntimeError: \u7cfb\u7edf\u672a\u6b63\u786e\u521d\u59cb\u5316\n RedisError: Redis\u8fde\u63a5\u9519\u8bef\n \"\"\"\n\nasync def cleanup(self) -> None:\n \"\"\"\n \u6e05\u7406\u8d44\u6e90\uff0c\u5173\u95edRedis\u8fde\u63a5\u6c60\n \"\"\"\n```\n\n### Message \u7c7b\n\n```python\n@dataclass\nclass Message:\n \"\"\"\u6d88\u606f\u6570\u636e\u7c7b\"\"\"\n id: str # \u6d88\u606f\u552f\u4e00ID\n version: str # \u6d88\u606f\u683c\u5f0f\u7248\u672c\n topic: str # \u4e3b\u9898\u540d\u79f0 \n payload: dict[str, Any] # \u6d88\u606f\u8d1f\u8f7d\n priority: MessagePriority # \u6d88\u606f\u4f18\u5148\u7ea7\n created_at: int # \u521b\u5efa\u65f6\u95f4\u6233\uff08\u6beb\u79d2\uff09\n meta: MessageMeta # \u6d88\u606f\u5143\u6570\u636e\n\n@dataclass \nclass MessageMeta:\n \"\"\"\u6d88\u606f\u5143\u6570\u636e\"\"\"\n status: MessageStatus # \u6d88\u606f\u72b6\u6001\n retry_count: int # \u91cd\u8bd5\u6b21\u6570\n max_retries: int # \u6700\u5927\u91cd\u8bd5\u6b21\u6570\n retry_delays: list[int] # \u91cd\u8bd5\u5ef6\u8fdf\u914d\u7f6e\n last_error: str | None # \u6700\u540e\u4e00\u6b21\u9519\u8bef\u4fe1\u606f\n expire_at: int # \u8fc7\u671f\u65f6\u95f4\u6233\n # ... \u5176\u4ed6\u5143\u6570\u636e\u5b57\u6bb5\n```\n\n### \u679a\u4e3e\u7c7b\u578b\n\n```python\nclass MessagePriority(str, Enum):\n \"\"\"\u6d88\u606f\u4f18\u5148\u7ea7\"\"\"\n HIGH = \"high\" # \u9ad8\u4f18\u5148\u7ea7\n NORMAL = \"normal\" # \u666e\u901a\u4f18\u5148\u7ea7\n LOW = \"low\" # \u4f4e\u4f18\u5148\u7ea7\n\nclass MessageStatus(str, Enum):\n \"\"\"\u6d88\u606f\u72b6\u6001\"\"\"\n PENDING = \"pending\" # \u5f85\u5904\u7406\n PROCESSING = \"processing\" # \u5904\u7406\u4e2d\n COMPLETED = \"completed\" # \u5df2\u5b8c\u6210\n RETRYING = \"retrying\" # \u91cd\u8bd5\u4e2d\n DEAD_LETTER = \"dead_letter\" # \u6b7b\u4fe1\n```\n\n## \u76d1\u63a7\u548c\u7ba1\u7406\n\n### \u6307\u6807\u6536\u96c6\n\n```python\nfrom mx_rmq import MetricsCollector\n\n# \u521b\u5efa\u6307\u6807\u6536\u96c6\u5668\ncollector = MetricsCollector(redis=mq.redis, queue_prefix=config.queue_prefix)\n\n# \u6536\u96c6\u6240\u6709\u6307\u6807\nmetrics = await collector.collect_all_metrics([\"order_created\", \"user_registration\"])\n\n# \u6253\u5370\u5173\u952e\u6307\u6807\nprint(f\"\u5f85\u5904\u7406\u6d88\u606f: {metrics['queue.order_created.pending']}\")\nprint(f\"\u5904\u7406\u4e2d\u6d88\u606f: {metrics['queue.order_created.processing']}\")\nprint(f\"\u603b\u541e\u5410\u91cf: {metrics['throughput.messages_per_minute']}\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217: {metrics['queue.dlq.count']}\")\n```\n\n### \u961f\u5217\u76d1\u63a7\n\n```python\n# \u76d1\u63a7\u5355\u4e2a\u961f\u5217\nqueue_metrics = await collector.collect_queue_metrics([\"order_created\"])\nprint(f\"\u8ba2\u5355\u961f\u5217\u72b6\u6001: {queue_metrics}\")\n\n# \u76d1\u63a7\u5904\u7406\u6027\u80fd\nprocessing_metrics = await collector.collect_processing_metrics([\"order_created\"])\nprint(f\"\u5e73\u5747\u5904\u7406\u65f6\u95f4: {processing_metrics['order_created.avg_processing_time']}ms\")\n```\n\n### \u6b7b\u4fe1\u961f\u5217\u7ba1\u7406\n\n```python\n# \u67e5\u770b\u6b7b\u4fe1\u961f\u5217\ndlq_count = await mq.redis.llen(\"dlq:queue\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u6570: {dlq_count}\")\n\n# \u83b7\u53d6\u6b7b\u4fe1\u6d88\u606f\u5217\u8868\ndlq_messages = await mq.redis.lrange(\"dlq:queue\", 0, 9) # \u83b7\u53d6\u524d10\u6761\nfor msg_id in dlq_messages:\n payload = await mq.redis.hget(\"dlq:payload:map\", msg_id)\n print(f\"\u6b7b\u4fe1\u6d88\u606f: {msg_id} - {payload}\")\n\n# \u624b\u52a8\u91cd\u8bd5\u6b7b\u4fe1\u6d88\u606f\uff08\u9700\u8981\u81ea\u5b9a\u4e49\u5b9e\u73b0\uff09\nasync def retry_dead_message(message_id: str):\n # \u4ece\u6b7b\u4fe1\u961f\u5217\u83b7\u53d6\u6d88\u606f\n payload_json = await mq.redis.hget(\"dlq:payload:map\", message_id)\n if payload_json:\n # \u89e3\u6790\u6d88\u606f\u5e76\u91cd\u65b0\u751f\u4ea7\n message = json.loads(payload_json)\n await mq.produce(message[\"topic\"], message[\"payload\"])\n # \u4ece\u6b7b\u4fe1\u961f\u5217\u79fb\u9664\n await mq.redis.lrem(\"dlq:queue\", 1, message_id)\n await mq.redis.hdel(\"dlq:payload:map\", message_id)\n```\n\n### \u5b9e\u65f6\u76d1\u63a7\u811a\u672c\n\n```python\nimport asyncio\nimport time\n\nasync def monitor_loop():\n \"\"\"\u5b9e\u65f6\u76d1\u63a7\u5faa\u73af\"\"\"\n collector = MetricsCollector(redis=mq.redis)\n \n while True:\n try:\n # \u6536\u96c6\u6307\u6807\n metrics = await collector.collect_all_metrics([\"order_created\"])\n \n # \u8f93\u51fa\u5173\u952e\u6307\u6807\n print(f\"[{time.strftime('%H:%M:%S')}] \u961f\u5217\u72b6\u6001:\")\n print(f\" \u5f85\u5904\u7406: {metrics.get('queue.order_created.pending', 0)}\")\n print(f\" \u5904\u7406\u4e2d: {metrics.get('queue.order_created.processing', 0)}\")\n print(f\" \u6b7b\u4fe1\u961f\u5217: {metrics.get('queue.dlq.count', 0)}\")\n \n # \u68c0\u67e5\u544a\u8b66\u6761\u4ef6\n pending = metrics.get('queue.order_created.pending', 0)\n if pending > 100:\n print(f\"\u26a0\ufe0f \u544a\u8b66: \u5f85\u5904\u7406\u6d88\u606f\u79ef\u538b ({pending})\")\n \n dlq_count = metrics.get('queue.dlq.count', 0)\n if dlq_count > 10:\n print(f\"\ud83d\udea8 \u544a\u8b66: \u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u8fc7\u591a ({dlq_count})\")\n \n except Exception as e:\n print(f\"\u76d1\u63a7\u9519\u8bef: {e}\")\n \n await asyncio.sleep(10) # \u6bcf10\u79d2\u68c0\u67e5\u4e00\u6b21\n\n# \u542f\u52a8\u76d1\u63a7\nasyncio.create_task(monitor_loop())\n```\n\n## \u90e8\u7f72\u6307\u5357\n\n### \u672c\u5730\u5f00\u53d1\u73af\u5883\n\n```bash\n# 1. \u542f\u52a8Redis\ndocker run -d --name redis -p 6379:6379 redis:7-alpine\n\n# 2. \u8fd0\u884c\u5e94\u7528\npython your_app.py\n```\n\n### Docker \u90e8\u7f72\n\n**Dockerfile:**\n```dockerfile\nFROM python:3.12-slim\n\nWORKDIR /app\n\n# \u5b89\u88c5\u4f9d\u8d56\nCOPY requirements.txt .\nRUN pip install -r requirements.txt\n\n# \u590d\u5236\u4ee3\u7801\nCOPY . .\n\n# \u542f\u52a8\u5e94\u7528\nCMD [\"python\", \"main.py\"]\n```\n\n**docker-compose.yml:**\n```yaml\nversion: '3.8'\nservices:\n redis:\n image: redis:7-alpine\n ports:\n - \"6379:6379\"\n volumes:\n - redis_data:/data\n command: redis-server --appendonly yes\n \n app:\n build: .\n depends_on:\n - redis\n environment:\n - REDIS_URL=redis://redis:6379\n - MQ_MAX_WORKERS=10\n restart: unless-stopped\n \nvolumes:\n redis_data:\n```\n\n### \u751f\u4ea7\u73af\u5883\u914d\u7f6e\n\n**Redis \u914d\u7f6e\u5efa\u8bae (redis.conf):**\n```conf\n# \u5185\u5b58\u7ba1\u7406\nmaxmemory-policy allkeys-lru\nmaxmemory 2gb\n\n# \u8fde\u63a5\u7ba1\u7406\ntimeout 300\ntcp-keepalive 300\n\n# \u6301\u4e45\u5316\u914d\u7f6e\nsave 900 1 # 900\u79d2\u5185\u81f3\u5c111\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\nsave 300 10 # 300\u79d2\u5185\u81f3\u5c1110\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\nsave 60 10000 # 60\u79d2\u5185\u81f3\u5c1110000\u4e2akey\u53d8\u5316\u65f6\u4fdd\u5b58\n\n# AOF\u6301\u4e45\u5316\nappendonly yes\nappendfsync everysec\n\n# \u6027\u80fd\u4f18\u5316\nhz 10\ndynamic-hz yes\n```\n\n**\u5e94\u7528\u914d\u7f6e:**\n```python\n# \u751f\u4ea7\u73af\u5883\u914d\u7f6e\nconfig = MQConfig(\n redis_url=\"redis://redis-cluster:6379\",\n redis_password=\"your_secure_password\",\n max_workers=20,\n task_queue_size=50,\n connection_pool_size=30,\n message_ttl=86400 * 7, # 7\u5929\n processing_timeout=600, # 10\u5206\u949f\n queue_prefix=\"prod\", # \u73af\u5883\u9694\u79bb\n)\n```\n\n### \u9ad8\u53ef\u7528\u90e8\u7f72\n\n**Redis Sentinel \u914d\u7f6e:**\n```python\nimport redis.sentinel\n\n# \u914d\u7f6eSentinel\nsentinels = [\n ('sentinel1', 26379),\n ('sentinel2', 26379), \n ('sentinel3', 26379),\n]\n\nsentinel = redis.sentinel.Sentinel(sentinels, socket_timeout=0.1)\n\n# \u53d1\u73b0\u4e3b\u8282\u70b9\nredis_master = sentinel.master_for('mymaster', socket_timeout=0.1)\n\n# \u81ea\u5b9a\u4e49Redis\u8fde\u63a5\nconfig = MQConfig(redis_url=\"\") # \u7559\u7a7a\uff0c\u4f7f\u7528\u81ea\u5b9a\u4e49\u8fde\u63a5\nmq = RedisMessageQueue(config)\nmq.redis = redis_master # \u4f7f\u7528Sentinel\u7ba1\u7406\u7684\u8fde\u63a5\n```\n\n### \u76d1\u63a7\u548c\u544a\u8b66\n\n**Prometheus \u6307\u6807\u66b4\u9732:**\n```python\nfrom prometheus_client import start_http_server, Gauge, Counter\n\n# \u5b9a\u4e49\u6307\u6807\nqueue_size = Gauge('mq_queue_size', 'Queue size', ['topic', 'status'])\nmessages_processed = Counter('mq_messages_processed_total', 'Messages processed', ['topic', 'status'])\n\nasync def export_metrics():\n \"\"\"\u5bfc\u51faPrometheus\u6307\u6807\"\"\"\n collector = MetricsCollector(redis=mq.redis)\n \n while True:\n metrics = await collector.collect_all_metrics(['order_created'])\n \n # \u66f4\u65b0Prometheus\u6307\u6807\n queue_size.labels(topic='order_created', status='pending').set(\n metrics.get('queue.order_created.pending', 0)\n )\n queue_size.labels(topic='order_created', status='processing').set(\n metrics.get('queue.order_created.processing', 0)\n )\n \n await asyncio.sleep(30)\n\n# \u542f\u52a8Prometheus HTTP\u670d\u52a1\u5668\nstart_http_server(8000)\nasyncio.create_task(export_metrics())\n```\n\n## \u6700\u4f73\u5b9e\u8df5\n\n### 1. \u6d88\u606f\u8bbe\u8ba1\n\n**\u2705 \u63a8\u8350\u505a\u6cd5:**\n```python\n# \u6d88\u606f\u7ed3\u6784\u6e05\u6670\uff0c\u5305\u542b\u5fc5\u8981\u7684\u4e0a\u4e0b\u6587\u4fe1\u606f\nawait mq.produce(\"order_created\", {\n \"order_id\": \"ORD_123456\",\n \"user_id\": 789,\n \"total_amount\": 99.99,\n \"currency\": \"USD\",\n \"timestamp\": \"2024-01-01T12:00:00Z\",\n \"metadata\": {\n \"source\": \"web\",\n \"version\": \"v1.0\"\n }\n})\n```\n\n**\u274c \u907f\u514d\u505a\u6cd5:**\n```python\n# \u6d88\u606f\u8fc7\u4e8e\u7b80\u5355\uff0c\u7f3a\u5c11\u4e0a\u4e0b\u6587\nawait mq.produce(\"process\", {\"id\": 123})\n\n# \u6d88\u606f\u8fc7\u4e8e\u590d\u6742\uff0c\u5305\u542b\u5927\u91cf\u6570\u636e\nawait mq.produce(\"user_update\", {\n \"user\": {...}, # \u5305\u542b\u7528\u6237\u7684\u6240\u6709\u4fe1\u606f\n \"history\": [...], # \u5305\u542b\u5b8c\u6574\u5386\u53f2\u8bb0\u5f55\n \"related_data\": {...} # \u5305\u542b\u5927\u91cf\u5173\u8054\u6570\u636e\n})\n```\n\n### 2. \u9519\u8bef\u5904\u7406\n\n**\u2705 \u63a8\u8350\u505a\u6cd5:**\n```python\nasync def handle_payment(payload: dict) -> None:\n try:\n order_id = payload[\"order_id\"]\n amount = payload[\"amount\"]\n \n # \u53c2\u6570\u9a8c\u8bc1\n if not order_id or amount <= 0:\n raise ValueError(f\"\u65e0\u6548\u7684\u8ba2\u5355\u53c2\u6570: {payload}\")\n \n # \u4e1a\u52a1\u903b\u8f91\n result = await process_payment(order_id, amount)\n \n # \u8bb0\u5f55\u6210\u529f\u65e5\u5fd7\n logger.info(\"\u652f\u4ed8\u5904\u7406\u6210\u529f\", order_id=order_id, amount=amount)\n \n except ValueError as e:\n # \u53c2\u6570\u9519\u8bef\uff0c\u4e0d\u91cd\u8bd5\n logger.error(\"\u652f\u4ed8\u53c2\u6570\u9519\u8bef\", error=str(e), payload=payload)\n raise # \u91cd\u65b0\u629b\u51fa\uff0c\u8fdb\u5165\u6b7b\u4fe1\u961f\u5217\n \n except PaymentGatewayError as e:\n # \u5916\u90e8\u670d\u52a1\u9519\u8bef\uff0c\u53ef\u91cd\u8bd5\n logger.warning(\"\u652f\u4ed8\u7f51\u5173\u9519\u8bef\", error=str(e), order_id=order_id)\n raise # \u91cd\u65b0\u629b\u51fa\uff0c\u89e6\u53d1\u91cd\u8bd5\n \n except Exception as e:\n # \u672a\u77e5\u9519\u8bef\n logger.error(\"\u652f\u4ed8\u5904\u7406\u5931\u8d25\", error=str(e), order_id=order_id)\n raise\n```\n\n### 3. \u5e42\u7b49\u6027\u5904\u7406\n\n```python\nasync def handle_order_created(payload: dict) -> None:\n order_id = payload[\"order_id\"]\n \n # \u68c0\u67e5\u662f\u5426\u5df2\u5904\u7406\uff08\u5e42\u7b49\u6027\u4fdd\u62a4\uff09\n if await is_order_processed(order_id):\n logger.info(\"\u8ba2\u5355\u5df2\u5904\u7406\uff0c\u8df3\u8fc7\", order_id=order_id)\n return\n \n try:\n # \u5904\u7406\u8ba2\u5355\n await process_order(order_id)\n \n # \u6807\u8bb0\u4e3a\u5df2\u5904\u7406\n await mark_order_processed(order_id)\n \n except Exception as e:\n logger.error(\"\u8ba2\u5355\u5904\u7406\u5931\u8d25\", order_id=order_id, error=str(e))\n raise\n```\n\n### 4. \u6027\u80fd\u4f18\u5316\n\n**\u5de5\u4f5c\u534f\u7a0b\u6570\u8c03\u4f18:**\n```python\nimport os\nimport multiprocessing\n\n# \u6839\u636eCPU\u6838\u5fc3\u6570\u548cIO\u7279\u6027\u8c03\u6574\u5de5\u4f5c\u534f\u7a0b\u6570\ncpu_count = multiprocessing.cpu_count()\n\nconfig = MQConfig(\n # CPU\u5bc6\u96c6\u578b\u4efb\u52a1\uff1a\u5de5\u4f5c\u534f\u7a0b\u6570 = CPU\u6838\u5fc3\u6570\n max_workers=cpu_count if is_cpu_intensive else cpu_count * 2,\n \n # IO\u5bc6\u96c6\u578b\u4efb\u52a1\uff1a\u5de5\u4f5c\u534f\u7a0b\u6570 = CPU\u6838\u5fc3\u6570 * 2-4\n # max_workers=cpu_count * 3,\n \n # \u4efb\u52a1\u961f\u5217\u5927\u5c0f\u5e94\u8be5\u5927\u4e8e\u5de5\u4f5c\u534f\u7a0b\u6570\n task_queue_size=max_workers * 2,\n)\n```\n\n**\u6279\u91cf\u5904\u7406\u4f18\u5316:**\n```python\nasync def handle_batch_emails(payload: dict) -> None:\n \"\"\"\u6279\u91cf\u5904\u7406\u90ae\u4ef6\u53d1\u9001\"\"\"\n email_list = payload[\"emails\"]\n \n # \u5206\u6279\u5904\u7406\uff0c\u907f\u514d\u5185\u5b58\u5360\u7528\u8fc7\u5927\n batch_size = 10\n for i in range(0, len(email_list), batch_size):\n batch = email_list[i:i + batch_size]\n \n # \u5e76\u53d1\u53d1\u9001\u90ae\u4ef6\n tasks = [send_email(email) for email in batch]\n await asyncio.gather(*tasks, return_exceptions=True)\n \n # \u907f\u514d\u8fc7\u5feb\u7684\u8bf7\u6c42\n await asyncio.sleep(0.1)\n```\n\n### 5. \u76d1\u63a7\u548c\u544a\u8b66\n\n```python\nasync def setup_monitoring():\n \"\"\"\u8bbe\u7f6e\u76d1\u63a7\u548c\u544a\u8b66\"\"\"\n collector = MetricsCollector(redis=mq.redis)\n \n while True:\n try:\n metrics = await collector.collect_all_metrics([\"order_created\"])\n \n # \u961f\u5217\u79ef\u538b\u544a\u8b66\n pending = metrics.get('queue.order_created.pending', 0)\n if pending > 1000:\n await send_alert(f\"\u961f\u5217\u79ef\u538b\u4e25\u91cd: {pending} \u6761\u6d88\u606f\u5f85\u5904\u7406\")\n \n # \u6b7b\u4fe1\u961f\u5217\u544a\u8b66 \n dlq_count = metrics.get('queue.dlq.count', 0)\n if dlq_count > 50:\n await send_alert(f\"\u6b7b\u4fe1\u961f\u5217\u6d88\u606f\u8fc7\u591a: {dlq_count} \u6761\")\n \n # \u5904\u7406\u65f6\u95f4\u544a\u8b66\n avg_time = metrics.get('processing.order_created.avg_time', 0)\n if avg_time > 30000: # 30\u79d2\n await send_alert(f\"\u6d88\u606f\u5904\u7406\u65f6\u95f4\u8fc7\u957f: {avg_time}ms\")\n \n except Exception as e:\n logger.error(\"\u76d1\u63a7\u68c0\u67e5\u5931\u8d25\", error=str(e))\n \n await asyncio.sleep(60) # \u6bcf\u5206\u949f\u68c0\u67e5\u4e00\u6b21\n```\n\n## \u6545\u969c\u6392\u9664\n\n### \u5e38\u89c1\u95ee\u9898\n\n#### Q1: \u6d88\u606f\u4e22\u5931\u600e\u4e48\u529e\uff1f\n\n**\u75c7\u72b6:** \u53d1\u9001\u7684\u6d88\u606f\u6ca1\u6709\u88ab\u5904\u7406\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. Redis \u8fde\u63a5\u4e2d\u65ad\n2. \u6d88\u8d39\u8005\u6ca1\u6709\u6b63\u786e\u542f\u52a8\n3. \u6d88\u606f\u5904\u7406\u5668\u629b\u51fa\u5f02\u5e38\u4f46\u6ca1\u6709\u6b63\u786e\u5904\u7406\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u68c0\u67e5Redis\u8fde\u63a5\ntry:\n await mq.redis.ping()\n print(\"Redis\u8fde\u63a5\u6b63\u5e38\")\nexcept Exception as e:\n print(f\"Redis\u8fde\u63a5\u5931\u8d25: {e}\")\n\n# 2. \u68c0\u67e5\u6d88\u606f\u662f\u5426\u5728\u961f\u5217\u4e2d\npending_count = await mq.redis.llen(\"order_created:pending\")\nprocessing_count = await mq.redis.llen(\"order_created:processing\")\nprint(f\"\u5f85\u5904\u7406: {pending_count}, \u5904\u7406\u4e2d: {processing_count}\")\n\n# 3. \u68c0\u67e5\u6b7b\u4fe1\u961f\u5217\ndlq_count = await mq.redis.llen(\"dlq:queue\")\nprint(f\"\u6b7b\u4fe1\u961f\u5217: {dlq_count}\")\n```\n\n#### Q2: \u6d88\u606f\u5904\u7406\u8fc7\u6162\n\n**\u75c7\u72b6:** \u961f\u5217\u79ef\u538b\uff0c\u6d88\u606f\u5904\u7406\u4e0d\u53ca\u65f6\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. \u5de5\u4f5c\u534f\u7a0b\u6570\u4e0d\u8db3\n2. \u5904\u7406\u51fd\u6570\u6267\u884c\u65f6\u95f4\u8fc7\u957f\n3. Redis\u6027\u80fd\u74f6\u9888\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u589e\u52a0\u5de5\u4f5c\u534f\u7a0b\u6570\nconfig = MQConfig(max_workers=20) # \u589e\u52a0\u523020\u4e2a\n\n# 2. \u4f18\u5316\u5904\u7406\u51fd\u6570\nasync def optimized_handler(payload: dict) -> None:\n # \u4f7f\u7528\u5f02\u6b65IO\n async with aiohttp.ClientSession() as session:\n response = await session.post(url, json=payload)\n \n # \u907f\u514d\u963b\u585e\u64cd\u4f5c\n await asyncio.to_thread(blocking_operation, payload)\n\n# 3. \u76d1\u63a7\u5904\u7406\u65f6\u95f4\nimport time\n\nasync def timed_handler(payload: dict) -> None:\n start_time = time.time()\n try:\n await actual_handler(payload)\n finally:\n processing_time = time.time() - start_time\n if processing_time > 5: # \u5904\u7406\u65f6\u95f4\u8d85\u8fc75\u79d2\n logger.warning(\"\u5904\u7406\u65f6\u95f4\u8fc7\u957f\", time=processing_time, payload=payload)\n```\n\n#### Q3: \u5185\u5b58\u4f7f\u7528\u8fc7\u9ad8\n\n**\u75c7\u72b6:** \u5e94\u7528\u5185\u5b58\u6301\u7eed\u589e\u957f\n\n**\u53ef\u80fd\u539f\u56e0:**\n1. \u672c\u5730\u961f\u5217\u79ef\u538b\n2. \u6d88\u606f\u5bf9\u8c61\u6ca1\u6709\u6b63\u786e\u91ca\u653e\n3. Redis\u8fde\u63a5\u6c60\u8fc7\u5927\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u8c03\u6574\u961f\u5217\u5927\u5c0f\nconfig = MQConfig(\n task_queue_size=10, # \u51cf\u5c11\u672c\u5730\u961f\u5217\u5927\u5c0f\n connection_pool_size=10, # \u51cf\u5c11\u8fde\u63a5\u6c60\u5927\u5c0f\n)\n\n# 2. \u76d1\u63a7\u5185\u5b58\u4f7f\u7528\nimport psutil\nimport gc\n\nasync def memory_monitor():\n while True:\n process = psutil.Process()\n memory_mb = process.memory_info().rss / 1024 / 1024\n \n if memory_mb > 500: # \u5185\u5b58\u8d85\u8fc7500MB\n logger.warning(\"\u5185\u5b58\u4f7f\u7528\u8fc7\u9ad8\", memory_mb=memory_mb)\n gc.collect() # \u5f3a\u5236\u5783\u573e\u56de\u6536\n \n await asyncio.sleep(60)\n```\n\n#### Q4: Redis \u8fde\u63a5\u9519\u8bef\n\n**\u75c7\u72b6:** `ConnectionError`, `TimeoutError`\n\n**\u89e3\u51b3\u65b9\u6848:**\n```python\n# 1. \u68c0\u67e5Redis\u914d\u7f6e\nconfig = MQConfig(\n redis_url=\"redis://localhost:6379\",\n connection_pool_size=20,\n # \u6dfb\u52a0\u8fde\u63a5\u91cd\u8bd5\n)\n\n# 2. \u5b9e\u73b0\u8fde\u63a5\u91cd\u8bd5\nasync def create_redis_with_retry(config: MQConfig, max_retries: int = 3):\n for attempt in range(max_retries):\n try:\n redis = aioredis.from_url(config.redis_url)\n await redis.ping()\n return redis\n except Exception as e:\n if attempt == max_retries - 1:\n raise\n logger.warning(f\"Redis\u8fde\u63a5\u5931\u8d25\uff0c\u91cd\u8bd5\u4e2d ({attempt + 1}/{max_retries})\")\n await asyncio.sleep(2 ** attempt)\n```\n\n### \u6027\u80fd\u8bca\u65ad\n\n#### \u5ef6\u8fdf\u5206\u6790\n\n```python\nimport time\nfrom collections import defaultdict\n\nclass PerformanceAnalyzer:\n def __init__(self):\n self.metrics = defaultdict(list)\n \n async def analyze_handler(self, handler_name: str, handler_func):\n \"\"\"\u5206\u6790\u5904\u7406\u5668\u6027\u80fd\"\"\"\n async def wrapped_handler(payload: dict):\n start_time = time.time()\n try:\n result = await handler_func(payload)\n return result\n finally:\n end_time = time.time()\n processing_time = (end_time - start_time) * 1000 # \u6beb\u79d2\n self.metrics[handler_name].append(processing_time)\n \n # \u5b9a\u671f\u8f93\u51fa\u7edf\u8ba1\u4fe1\u606f\n if len(self.metrics[handler_name]) % 100 == 0:\n times = self.metrics[handler_name]\n avg_time = sum(times) / len(times)\n max_time = max(times)\n min_time = min(times)\n \n print(f\"{handler_name} \u6027\u80fd\u7edf\u8ba1 (\u6700\u8fd1100\u6b21):\")\n print(f\" \u5e73\u5747\u65f6\u95f4: {avg_time:.2f}ms\")\n print(f\" \u6700\u5927\u65f6\u95f4: {max_time:.2f}ms\") \n print(f\" \u6700\u5c0f\u65f6\u95f4: {min_time:.2f}ms\")\n \n return wrapped_handler\n\n# \u4f7f\u7528\u793a\u4f8b\nanalyzer = PerformanceAnalyzer()\n\n@mq.register(\"order_created\")\nasync def handle_order(payload: dict):\n # \u5904\u7406\u903b\u8f91\n await process_order(payload)\n\n# \u5305\u88c5\u5904\u7406\u5668\u8fdb\u884c\u6027\u80fd\u5206\u6790\nmq.handlers[\"order_created\"] = await analyzer.analyze_handler(\n \"order_created\", \n handle_order\n)\n```\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "\u57fa\u4e8eRedis\u7684\u5f02\u6b65\u6d88\u606f\u961f\u5217\u7cfb\u7edf",
"version": "1.0.2",
"project_urls": {
"Documentation": "https://github.com/CodingOX/mx-rmq#readme",
"Homepage": "https://github.com/CodingOX/mx-rmq",
"Issues": "https://github.com/CodingOX/mx-rmq/issues",
"Repository": "https://github.com/CodingOX/mx-rmq"
},
"split_keywords": [
"async",
" distributed",
" message-queue",
" redis"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "41e64aedf312965f53a79e257f04561efa79fb39c2d68a02a8e3b1960f784c9a",
"md5": "21e74a01a038656abbfaa67446aab2af",
"sha256": "e0e4d53250c028387be73c7570ed343397619ac1c4a7da22e7663bc27f20d348"
},
"downloads": -1,
"filename": "mx_rmq-1.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "21e74a01a038656abbfaa67446aab2af",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.12",
"size": 50126,
"upload_time": "2025-07-12T10:01:34",
"upload_time_iso_8601": "2025-07-12T10:01:34.169252Z",
"url": "https://files.pythonhosted.org/packages/41/e6/4aedf312965f53a79e257f04561efa79fb39c2d68a02a8e3b1960f784c9a/mx_rmq-1.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "6ba94407c824c069f7920cbedb3f12342ae8414183b5b05022770ac016857b2e",
"md5": "689a29d952044b78bd8f5875e32e7132",
"sha256": "ccba71b367e284ac84f3beafc078bb0739310c740eb135607bbecdf31fd45251"
},
"downloads": -1,
"filename": "mx_rmq-1.0.2.tar.gz",
"has_sig": false,
"md5_digest": "689a29d952044b78bd8f5875e32e7132",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.12",
"size": 54485,
"upload_time": "2025-07-12T10:01:35",
"upload_time_iso_8601": "2025-07-12T10:01:35.706139Z",
"url": "https://files.pythonhosted.org/packages/6b/a9/4407c824c069f7920cbedb3f12342ae8414183b5b05022770ac016857b2e/mx_rmq-1.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-12 10:01:35",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "CodingOX",
"github_project": "mx-rmq#readme",
"github_not_found": true,
"lcname": "mx-rmq"
}