# Jettask
基于asyncio和Redis Stream的高性能分布式任务队列系统,支持异步、线程和进程三种执行模式。
## 特性
- 支持异步(asyncio)、线程(thread)和进程(process)三种执行器
- 基于Redis Stream实现可靠的消息传递
- 支持任务路由和批处理
- 支持任务重试和延迟执行
- 支持文件变化自动重载
- 模块化设计,易于扩展
使用示例:
# 测试弹性恢复
python examples/resilience_test.py worker # 终端1:启动Worker
python examples/resilience_test.py test # 终端2:测试删除队列
python examples/resilience_test.py monitor # 终端3:监控队列状态
# 队列管理
python examples/queue_manager.py list # 列出所有队列
python examples/queue_manager.py info queue_name # 查看队列信息
python examples/queue_manager.py monitor # 实时监控
python examples/queue_manager.py delete queue_name # 删除队列
特性:
1. 高可用性 - Worker不会因为队列被删除而崩溃
2. 自动恢复 - 无需人工干预,自动重建必要的资源
3. 零消息丢失 - 新消息会在队列重建后正常处理
4. 完整监控 - 提供工具监控队列状态和健康度
## 项目结构
```
jettask/
├── jettask/
│ ├── __init__.py
│ ├── core/ # 核心功能
│ │ ├── app.py # 主应用类
│ │ ├── task.py # 任务相关类
│ │ └── event_pool.py # 事件池管理
│ ├── executors/ # 执行器
│ │ ├── base.py # 执行器基类
│ │ ├── asyncio.py # 异步执行器
│ │ ├── thread.py # 线程执行器
│ │ └── process.py # 进程执行器
│ ├── utils/ # 工具函数
│ └── monitoring/ # 监控相关
├── examples/ # 示例代码
└── requirements.txt
```
## 安装
### 从PyPI安装(推荐)
```bash
pip install jettask
```
### 从源码安装
```bash
# 克隆仓库
git clone https://github.com/yourusername/jettask.git
cd jettask
# 安装开发版本
pip install -e .
# 或者构建并安装
python setup.py install
```
### 安装额外依赖
```bash
# 安装开发依赖
pip install jettask[dev]
# 安装文档依赖
pip install jettask[docs]
```
## 快速开始
### 1. 定义任务
```python
from jettask import Jettask
app = Jettask(redis_url="redis://localhost:6379/0")
# 异步任务
@app.task(queue="async_queue")
async def async_task(name: str):
print(f"Processing {name} asynchronously")
return f"Result: {name}"
# 同步任务
@app.task(queue="sync_queue")
def sync_task(name: str):
print(f"Processing {name} synchronously")
return f"Result: {name}"
```
### 2. 发送任务
```python
# 发送单个任务
task_id = async_task.apply_async(args=("test",))
# 批量发送
tasks = []
for i in range(10):
task_msg = sync_task.apply_async(
kwargs={"name": f"task_{i}"},
at_once=False # 不立即发送
)
tasks.append(task_msg)
event_ids = app.bulk_write(tasks)
```
### 3. 启动Worker
```python
# 异步执行器
app.start(
execute_type="asyncio",
queues=["async_queue"],
concurrency=4,
reload=True
)
# 线程执行器
app.start(
execute_type="thread",
queues=["sync_queue"],
concurrency=4
)
# 进程执行器
app.start(
execute_type="process",
queues=["cpu_queue"],
concurrency=4
)
```
## 高级功能
### 任务路由
```python
# 使用路由键控制任务执行
task.apply_async(
args=("data",),
routing={
"routing_key": "important",
"agg_key": "batch_1",
"max_records": 10, # 批处理最大记录数
"max_wait_time": 5 # 批处理最大等待时间
}
)
```
### 任务回调
```python
@app.task(bind=True)
def task_with_callbacks(request, data):
print(f"Task ID: {request.id}")
# 任务逻辑
return result
# 覆盖生命周期方法
class CustomTask(Task):
def on_before(self, event_id, pedding_count, args, kwargs):
# 任务执行前
return ExecuteResponse()
def on_success(self, event_id, args, kwargs, result):
# 任务成功后
return ExecuteResponse()
def on_end(self, event_id, pedding_count, args, kwargs, result):
# 任务结束后
return ExecuteResponse()
```
## 执行器选择指南
### AsyncIO执行器
- **适用场景**: I/O密集型任务,如网络请求、数据库操作
- **特点**: 单进程内高并发,内存占用小
- **支持**: 异步任务(async/await)和同步任务
### Thread执行器
- **适用场景**: 需要使用同步库的I/O密集型任务
- **特点**: 真实线程,可以使用阻塞I/O库
- **限制**: 只支持同步任务,不支持async/await
### Process执行器
- **适用场景**: CPU密集型任务 + 高并发异步I/O
- **特点**:
- 多进程并行执行,绕过GIL限制
- 每个进程内运行事件循环
- 同时支持异步和同步任务
- 可配置进程数和每进程协程并发数
- **优势**: 最全能的执行器,适合混合工作负载
#### Process执行器配置
```python
app.start(
execute_type="process",
queues=["task_queue"],
concurrency=4, # 进程数
max_coroutines_per_process=100 # 每进程最大协程数
)
```
配置建议:
- **I/O密集型**: `concurrency=4, max_coroutines_per_process=100`
- **CPU密集型**: `concurrency=8, max_coroutines_per_process=10`
- **混合负载**: `concurrency=6, max_coroutines_per_process=50`
- **内存受限**: `concurrency=2, max_coroutines_per_process=20`
## 运行示例
```bash
# 启动Redis
redis-server
# 启动Worker (异步模式)
python examples/demo.py worker asyncio
# 启动Worker (线程模式)
python examples/demo.py worker thread
# 启动Worker (进程模式)
python examples/demo.py worker process
# 发送任务
python examples/demo.py
```
Raw data
{
"_id": null,
"home_page": "https://github.com/qiyuebuku/easy-task",
"name": "JetTask",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "asyncio, redis, task-queue, distributed, celery",
"author": "yuyang",
"author_email": "yuyang <1194681498@qq.com>",
"download_url": "https://files.pythonhosted.org/packages/bc/3f/67671b8a3a86f260a4c3fd05bcf91c0f304823b7c862017952c1e6f467bd/jettask-0.1.0.tar.gz",
"platform": null,
"description": "# Jettask\n\n\u57fa\u4e8easyncio\u548cRedis Stream\u7684\u9ad8\u6027\u80fd\u5206\u5e03\u5f0f\u4efb\u52a1\u961f\u5217\u7cfb\u7edf\uff0c\u652f\u6301\u5f02\u6b65\u3001\u7ebf\u7a0b\u548c\u8fdb\u7a0b\u4e09\u79cd\u6267\u884c\u6a21\u5f0f\u3002\n\n## \u7279\u6027\n\n- \u652f\u6301\u5f02\u6b65(asyncio)\u3001\u7ebf\u7a0b(thread)\u548c\u8fdb\u7a0b(process)\u4e09\u79cd\u6267\u884c\u5668\n- \u57fa\u4e8eRedis Stream\u5b9e\u73b0\u53ef\u9760\u7684\u6d88\u606f\u4f20\u9012\n- \u652f\u6301\u4efb\u52a1\u8def\u7531\u548c\u6279\u5904\u7406\n- \u652f\u6301\u4efb\u52a1\u91cd\u8bd5\u548c\u5ef6\u8fdf\u6267\u884c\n- \u652f\u6301\u6587\u4ef6\u53d8\u5316\u81ea\u52a8\u91cd\u8f7d\n- \u6a21\u5757\u5316\u8bbe\u8ba1\uff0c\u6613\u4e8e\u6269\u5c55\n\n\u4f7f\u7528\u793a\u4f8b\uff1a\n\n# \u6d4b\u8bd5\u5f39\u6027\u6062\u590d\npython examples/resilience_test.py worker # \u7ec8\u7aef1\uff1a\u542f\u52a8Worker\npython examples/resilience_test.py test # \u7ec8\u7aef2\uff1a\u6d4b\u8bd5\u5220\u9664\u961f\u5217\npython examples/resilience_test.py monitor # \u7ec8\u7aef3\uff1a\u76d1\u63a7\u961f\u5217\u72b6\u6001\n\n# \u961f\u5217\u7ba1\u7406\npython examples/queue_manager.py list # \u5217\u51fa\u6240\u6709\u961f\u5217\npython examples/queue_manager.py info queue_name # \u67e5\u770b\u961f\u5217\u4fe1\u606f\npython examples/queue_manager.py monitor # \u5b9e\u65f6\u76d1\u63a7\npython examples/queue_manager.py delete queue_name # \u5220\u9664\u961f\u5217\n\n\u7279\u6027\uff1a\n\n1. \u9ad8\u53ef\u7528\u6027 - Worker\u4e0d\u4f1a\u56e0\u4e3a\u961f\u5217\u88ab\u5220\u9664\u800c\u5d29\u6e83\n2. \u81ea\u52a8\u6062\u590d - \u65e0\u9700\u4eba\u5de5\u5e72\u9884\uff0c\u81ea\u52a8\u91cd\u5efa\u5fc5\u8981\u7684\u8d44\u6e90\n3. \u96f6\u6d88\u606f\u4e22\u5931 - \u65b0\u6d88\u606f\u4f1a\u5728\u961f\u5217\u91cd\u5efa\u540e\u6b63\u5e38\u5904\u7406\n4. \u5b8c\u6574\u76d1\u63a7 - \u63d0\u4f9b\u5de5\u5177\u76d1\u63a7\u961f\u5217\u72b6\u6001\u548c\u5065\u5eb7\u5ea6\n\n\n\n## \u9879\u76ee\u7ed3\u6784\n\n```\njettask/\n\u251c\u2500\u2500 jettask/\n\u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u251c\u2500\u2500 core/ # \u6838\u5fc3\u529f\u80fd\n\u2502 \u2502 \u251c\u2500\u2500 app.py # \u4e3b\u5e94\u7528\u7c7b\n\u2502 \u2502 \u251c\u2500\u2500 task.py # \u4efb\u52a1\u76f8\u5173\u7c7b\n\u2502 \u2502 \u2514\u2500\u2500 event_pool.py # \u4e8b\u4ef6\u6c60\u7ba1\u7406\n\u2502 \u251c\u2500\u2500 executors/ # \u6267\u884c\u5668\n\u2502 \u2502 \u251c\u2500\u2500 base.py # \u6267\u884c\u5668\u57fa\u7c7b\n\u2502 \u2502 \u251c\u2500\u2500 asyncio.py # \u5f02\u6b65\u6267\u884c\u5668\n\u2502 \u2502 \u251c\u2500\u2500 thread.py # \u7ebf\u7a0b\u6267\u884c\u5668\n\u2502 \u2502 \u2514\u2500\u2500 process.py # \u8fdb\u7a0b\u6267\u884c\u5668\n\u2502 \u251c\u2500\u2500 utils/ # \u5de5\u5177\u51fd\u6570\n\u2502 \u2514\u2500\u2500 monitoring/ # \u76d1\u63a7\u76f8\u5173\n\u251c\u2500\u2500 examples/ # \u793a\u4f8b\u4ee3\u7801\n\u2514\u2500\u2500 requirements.txt\n```\n\n## \u5b89\u88c5\n\n### \u4ecePyPI\u5b89\u88c5\uff08\u63a8\u8350\uff09\n\n```bash\npip install jettask\n```\n\n### \u4ece\u6e90\u7801\u5b89\u88c5\n\n```bash\n# \u514b\u9686\u4ed3\u5e93\ngit clone https://github.com/yourusername/jettask.git\ncd jettask\n\n# \u5b89\u88c5\u5f00\u53d1\u7248\u672c\npip install -e .\n\n# \u6216\u8005\u6784\u5efa\u5e76\u5b89\u88c5\npython setup.py install\n```\n\n### \u5b89\u88c5\u989d\u5916\u4f9d\u8d56\n\n```bash\n# \u5b89\u88c5\u5f00\u53d1\u4f9d\u8d56\npip install jettask[dev]\n\n# \u5b89\u88c5\u6587\u6863\u4f9d\u8d56\npip install jettask[docs]\n```\n\n## \u5feb\u901f\u5f00\u59cb\n\n### 1. \u5b9a\u4e49\u4efb\u52a1\n\n```python\nfrom jettask import Jettask\n\napp = Jettask(redis_url=\"redis://localhost:6379/0\")\n\n# \u5f02\u6b65\u4efb\u52a1\n@app.task(queue=\"async_queue\")\nasync def async_task(name: str):\n print(f\"Processing {name} asynchronously\")\n return f\"Result: {name}\"\n\n# \u540c\u6b65\u4efb\u52a1\n@app.task(queue=\"sync_queue\")\ndef sync_task(name: str):\n print(f\"Processing {name} synchronously\")\n return f\"Result: {name}\"\n```\n\n### 2. \u53d1\u9001\u4efb\u52a1\n\n```python\n# \u53d1\u9001\u5355\u4e2a\u4efb\u52a1\ntask_id = async_task.apply_async(args=(\"test\",))\n\n# \u6279\u91cf\u53d1\u9001\ntasks = []\nfor i in range(10):\n task_msg = sync_task.apply_async(\n kwargs={\"name\": f\"task_{i}\"},\n at_once=False # \u4e0d\u7acb\u5373\u53d1\u9001\n )\n tasks.append(task_msg)\n\nevent_ids = app.bulk_write(tasks)\n```\n\n### 3. \u542f\u52a8Worker\n\n```python\n# \u5f02\u6b65\u6267\u884c\u5668\napp.start(\n execute_type=\"asyncio\",\n queues=[\"async_queue\"],\n concurrency=4,\n reload=True\n)\n\n# \u7ebf\u7a0b\u6267\u884c\u5668\napp.start(\n execute_type=\"thread\",\n queues=[\"sync_queue\"],\n concurrency=4\n)\n\n# \u8fdb\u7a0b\u6267\u884c\u5668\napp.start(\n execute_type=\"process\",\n queues=[\"cpu_queue\"],\n concurrency=4\n)\n```\n\n## \u9ad8\u7ea7\u529f\u80fd\n\n### \u4efb\u52a1\u8def\u7531\n\n```python\n# \u4f7f\u7528\u8def\u7531\u952e\u63a7\u5236\u4efb\u52a1\u6267\u884c\ntask.apply_async(\n args=(\"data\",),\n routing={\n \"routing_key\": \"important\",\n \"agg_key\": \"batch_1\",\n \"max_records\": 10, # \u6279\u5904\u7406\u6700\u5927\u8bb0\u5f55\u6570\n \"max_wait_time\": 5 # \u6279\u5904\u7406\u6700\u5927\u7b49\u5f85\u65f6\u95f4\n }\n)\n```\n\n### \u4efb\u52a1\u56de\u8c03\n\n```python\n@app.task(bind=True)\ndef task_with_callbacks(request, data):\n print(f\"Task ID: {request.id}\")\n # \u4efb\u52a1\u903b\u8f91\n return result\n\n# \u8986\u76d6\u751f\u547d\u5468\u671f\u65b9\u6cd5\nclass CustomTask(Task):\n def on_before(self, event_id, pedding_count, args, kwargs):\n # \u4efb\u52a1\u6267\u884c\u524d\n return ExecuteResponse()\n \n def on_success(self, event_id, args, kwargs, result):\n # \u4efb\u52a1\u6210\u529f\u540e\n return ExecuteResponse()\n \n def on_end(self, event_id, pedding_count, args, kwargs, result):\n # \u4efb\u52a1\u7ed3\u675f\u540e\n return ExecuteResponse()\n```\n\n## \u6267\u884c\u5668\u9009\u62e9\u6307\u5357\n\n### AsyncIO\u6267\u884c\u5668\n- **\u9002\u7528\u573a\u666f**: I/O\u5bc6\u96c6\u578b\u4efb\u52a1\uff0c\u5982\u7f51\u7edc\u8bf7\u6c42\u3001\u6570\u636e\u5e93\u64cd\u4f5c\n- **\u7279\u70b9**: \u5355\u8fdb\u7a0b\u5185\u9ad8\u5e76\u53d1\uff0c\u5185\u5b58\u5360\u7528\u5c0f\n- **\u652f\u6301**: \u5f02\u6b65\u4efb\u52a1\uff08async/await\uff09\u548c\u540c\u6b65\u4efb\u52a1\n\n### Thread\u6267\u884c\u5668\n- **\u9002\u7528\u573a\u666f**: \u9700\u8981\u4f7f\u7528\u540c\u6b65\u5e93\u7684I/O\u5bc6\u96c6\u578b\u4efb\u52a1\n- **\u7279\u70b9**: \u771f\u5b9e\u7ebf\u7a0b\uff0c\u53ef\u4ee5\u4f7f\u7528\u963b\u585eI/O\u5e93\n- **\u9650\u5236**: \u53ea\u652f\u6301\u540c\u6b65\u4efb\u52a1\uff0c\u4e0d\u652f\u6301async/await\n\n### Process\u6267\u884c\u5668\n- **\u9002\u7528\u573a\u666f**: CPU\u5bc6\u96c6\u578b\u4efb\u52a1 + \u9ad8\u5e76\u53d1\u5f02\u6b65I/O\n- **\u7279\u70b9**: \n - \u591a\u8fdb\u7a0b\u5e76\u884c\u6267\u884c\uff0c\u7ed5\u8fc7GIL\u9650\u5236\n - \u6bcf\u4e2a\u8fdb\u7a0b\u5185\u8fd0\u884c\u4e8b\u4ef6\u5faa\u73af\n - \u540c\u65f6\u652f\u6301\u5f02\u6b65\u548c\u540c\u6b65\u4efb\u52a1\n - \u53ef\u914d\u7f6e\u8fdb\u7a0b\u6570\u548c\u6bcf\u8fdb\u7a0b\u534f\u7a0b\u5e76\u53d1\u6570\n- **\u4f18\u52bf**: \u6700\u5168\u80fd\u7684\u6267\u884c\u5668\uff0c\u9002\u5408\u6df7\u5408\u5de5\u4f5c\u8d1f\u8f7d\n\n#### Process\u6267\u884c\u5668\u914d\u7f6e\n\n```python\napp.start(\n execute_type=\"process\",\n queues=[\"task_queue\"],\n concurrency=4, # \u8fdb\u7a0b\u6570\n max_coroutines_per_process=100 # \u6bcf\u8fdb\u7a0b\u6700\u5927\u534f\u7a0b\u6570\n)\n```\n\n\u914d\u7f6e\u5efa\u8bae\uff1a\n- **I/O\u5bc6\u96c6\u578b**: `concurrency=4, max_coroutines_per_process=100`\n- **CPU\u5bc6\u96c6\u578b**: `concurrency=8, max_coroutines_per_process=10`\n- **\u6df7\u5408\u8d1f\u8f7d**: `concurrency=6, max_coroutines_per_process=50`\n- **\u5185\u5b58\u53d7\u9650**: `concurrency=2, max_coroutines_per_process=20`\n\n## \u8fd0\u884c\u793a\u4f8b\n\n```bash\n# \u542f\u52a8Redis\nredis-server\n\n# \u542f\u52a8Worker (\u5f02\u6b65\u6a21\u5f0f)\npython examples/demo.py worker asyncio\n\n# \u542f\u52a8Worker (\u7ebf\u7a0b\u6a21\u5f0f)\npython examples/demo.py worker thread\n\n# \u542f\u52a8Worker (\u8fdb\u7a0b\u6a21\u5f0f)\npython examples/demo.py worker process\n\n# \u53d1\u9001\u4efb\u52a1\npython examples/demo.py\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "\u57fa\u4e8easyncio\u548cRedis Stream\u7684\u9ad8\u6027\u80fd\u5206\u5e03\u5f0f\u4efb\u52a1\u961f\u5217\u7cfb\u7edf",
"version": "0.1.0",
"project_urls": {
"Bug Tracker": "https://github.com/qiyuebuku/easy-task/issues",
"Documentation": "https://github.com/qiyuebuku/easy-task#readme",
"Homepage": "https://github.com/qiyuebuku/easy-task",
"Source": "https://github.com/qiyuebuku/easy-task"
},
"split_keywords": [
"asyncio",
" redis",
" task-queue",
" distributed",
" celery"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "840c66ea69b1d93352cb825918eada3129a1a1b74802e1fa253d0eb1961aeb6a",
"md5": "44515a107168908c28d2d54893c41488",
"sha256": "7f7b7305c2cca8f0fbcfbada9e09894c84156a00f292bf0ba53fbfe1236c0e41"
},
"downloads": -1,
"filename": "jettask-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "44515a107168908c28d2d54893c41488",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 82753,
"upload_time": "2025-08-01T11:48:36",
"upload_time_iso_8601": "2025-08-01T11:48:36.366359Z",
"url": "https://files.pythonhosted.org/packages/84/0c/66ea69b1d93352cb825918eada3129a1a1b74802e1fa253d0eb1961aeb6a/jettask-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "bc3f67671b8a3a86f260a4c3fd05bcf91c0f304823b7c862017952c1e6f467bd",
"md5": "416a9e32ab670de6ecd0b63da5420361",
"sha256": "4ca84cf652d9f52b58265d8f6c04a3634d809d497bc65581a2d2d1c185c3ca6b"
},
"downloads": -1,
"filename": "jettask-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "416a9e32ab670de6ecd0b63da5420361",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 77022,
"upload_time": "2025-08-01T11:48:37",
"upload_time_iso_8601": "2025-08-01T11:48:37.816982Z",
"url": "https://files.pythonhosted.org/packages/bc/3f/67671b8a3a86f260a4c3fd05bcf91c0f304823b7c862017952c1e6f467bd/jettask-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-01 11:48:37",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "qiyuebuku",
"github_project": "easy-task",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "jettask"
}