# celery-streaming-result
Celery任务结果分片管理。
## 安装
```
pip install celery-streaming-result
```
## 使用方法
### 服务端
```python
import time
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
@app.task(bind=True)
def task1(celery_task):
result = []
for i in range(10):
csrm.append_result_chunk(celery_task, i)
result.append(i)
csrm.append_ended_chunk(celery_task)
return result
```
## 客户端(同步)
```python
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
# 根据你的task定义,正确引用
from test_server import task1
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
# 生成一个异步任务
atask1 = task1.delay()
# 读取该异步任务的结果分片
for chunk in csrm.get_result_chunks(atask1):
print(chunk, end="-", flush=True)
```
## 客户端(异步)
```python
from redis import asyncio as aioredis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
from celery_streaming_result import start_celery_task_async
from celery_streaming_result import get_celery_task_result_async
from test_server import task1 # 根据你的task定义,正确引用
app = app_or_default()
redis_instance = aioredis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
async def on_finished(celery_task, break_flag=False):
print("on finished...")
task_result = await get_celery_task_result_async(
celery_task,
)
# 这里的task_result值是celery任务的返回值。
# 一般来说是所有结果分片的集合,但实际只取决于celery任务的实现。
atask1 = await start_celery_task_async(
task1
) # task1.delay()是一个同步函数。需要使用`sync_to_async`进行转化。
# 读取该异步任务的结果分片,如果任务结果,则回调on_finished函数。
async for chunk in csrm.get_result_chunks(
atask1,
on_finished=on_finished,
):
print(chunk, end="-", flush=True)
```
## 版本记录
### v0.1.0
1. 首次发布。
### v0.1.1
1. 添加asyncio支持。
1. 获取结果支持on_finished回调。
### v0.1.3
1. 流式中断支持。
Raw data
{
"_id": null,
"home_page": null,
"name": "celery-streaming-result",
"maintainer": "Zhou WeiKe",
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "celery_streaming_result, celery, streaming result",
"author": "Zhou WeiKe",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/ad/ef/37cc840f6a3ed0927d8fe2d18445d26993c076a27b9bdabca9e8484ddddb/celery_streaming_result-0.1.3.tar.gz",
"platform": null,
"description": "# celery-streaming-result\n\nCelery\u4efb\u52a1\u7ed3\u679c\u5206\u7247\u7ba1\u7406\u3002\n\n## \u5b89\u88c5\n\n```\npip install celery-streaming-result\n```\n\n## \u4f7f\u7528\u65b9\u6cd5\n\n### \u670d\u52a1\u7aef\n\n```python\nimport time\nimport redis\nfrom celery.app import app_or_default\nfrom celery_streaming_result import CeleryStreamingResultManager\n\napp = app_or_default()\nredis_instance = redis.Redis()\ncsrm = CeleryStreamingResultManager(redis_instance)\n\n@app.task(bind=True)\ndef task1(celery_task):\n result = []\n for i in range(10):\n csrm.append_result_chunk(celery_task, i)\n result.append(i)\n csrm.append_ended_chunk(celery_task)\n return result\n\n```\n\n## \u5ba2\u6237\u7aef\uff08\u540c\u6b65\uff09\n\n```python\nimport redis\nfrom celery.app import app_or_default\nfrom celery_streaming_result import CeleryStreamingResultManager\n\n# \u6839\u636e\u4f60\u7684task\u5b9a\u4e49\uff0c\u6b63\u786e\u5f15\u7528\nfrom test_server import task1\n\napp = app_or_default()\nredis_instance = redis.Redis()\ncsrm = CeleryStreamingResultManager(redis_instance)\n\n# \u751f\u6210\u4e00\u4e2a\u5f02\u6b65\u4efb\u52a1\natask1 = task1.delay()\n# \u8bfb\u53d6\u8be5\u5f02\u6b65\u4efb\u52a1\u7684\u7ed3\u679c\u5206\u7247\nfor chunk in csrm.get_result_chunks(atask1):\n print(chunk, end=\"-\", flush=True)\n```\n\n## \u5ba2\u6237\u7aef\uff08\u5f02\u6b65\uff09\n\n```python\nfrom redis import asyncio as aioredis\nfrom celery.app import app_or_default\nfrom celery_streaming_result import CeleryStreamingResultManager\nfrom celery_streaming_result import start_celery_task_async\nfrom celery_streaming_result import get_celery_task_result_async\nfrom test_server import task1 # \u6839\u636e\u4f60\u7684task\u5b9a\u4e49\uff0c\u6b63\u786e\u5f15\u7528\n\napp = app_or_default()\nredis_instance = aioredis.Redis()\ncsrm = CeleryStreamingResultManager(redis_instance)\n\n\nasync def on_finished(celery_task, break_flag=False):\n print(\"on finished...\")\n task_result = await get_celery_task_result_async(\n celery_task,\n )\n # \u8fd9\u91cc\u7684task_result\u503c\u662fcelery\u4efb\u52a1\u7684\u8fd4\u56de\u503c\u3002\n # \u4e00\u822c\u6765\u8bf4\u662f\u6240\u6709\u7ed3\u679c\u5206\u7247\u7684\u96c6\u5408\uff0c\u4f46\u5b9e\u9645\u53ea\u53d6\u51b3\u4e8ecelery\u4efb\u52a1\u7684\u5b9e\u73b0\u3002\n\n\natask1 = await start_celery_task_async(\n task1\n) # task1.delay()\u662f\u4e00\u4e2a\u540c\u6b65\u51fd\u6570\u3002\u9700\u8981\u4f7f\u7528`sync_to_async`\u8fdb\u884c\u8f6c\u5316\u3002\n# \u8bfb\u53d6\u8be5\u5f02\u6b65\u4efb\u52a1\u7684\u7ed3\u679c\u5206\u7247\uff0c\u5982\u679c\u4efb\u52a1\u7ed3\u679c\uff0c\u5219\u56de\u8c03on_finished\u51fd\u6570\u3002\nasync for chunk in csrm.get_result_chunks(\n atask1,\n on_finished=on_finished,\n):\n print(chunk, end=\"-\", flush=True)\n```\n\n## \u7248\u672c\u8bb0\u5f55\n\n### v0.1.0\n\n1. \u9996\u6b21\u53d1\u5e03\u3002\n\n### v0.1.1\n\n1. \u6dfb\u52a0asyncio\u652f\u6301\u3002\n1. \u83b7\u53d6\u7ed3\u679c\u652f\u6301on_finished\u56de\u8c03\u3002\n\n### v0.1.3\n\n1. \u6d41\u5f0f\u4e2d\u65ad\u652f\u6301\u3002\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Celery\u4efb\u52a1\u7ed3\u679c\u5206\u7247\u7ba1\u7406",
"version": "0.1.3",
"project_urls": null,
"split_keywords": [
"celery_streaming_result",
" celery",
" streaming result"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "627a694656ce345f6f3639aec6fd34bf615f736bf339731054254f3a788b6680",
"md5": "aa441195bd156c1d9b2314d0e454595f",
"sha256": "88651642c032c42b00136af41ab048cb3baafbabc4d281428a4abd5f25babb55"
},
"downloads": -1,
"filename": "celery_streaming_result-0.1.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "aa441195bd156c1d9b2314d0e454595f",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 5483,
"upload_time": "2024-05-12T12:25:17",
"upload_time_iso_8601": "2024-05-12T12:25:17.879225Z",
"url": "https://files.pythonhosted.org/packages/62/7a/694656ce345f6f3639aec6fd34bf615f736bf339731054254f3a788b6680/celery_streaming_result-0.1.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "adef37cc840f6a3ed0927d8fe2d18445d26993c076a27b9bdabca9e8484ddddb",
"md5": "8c1f627cea0de228928b167e7c1eb5ca",
"sha256": "9a2a7f60c60a02600143ae122e3ba92d57d23137d2c4a160a66b2d762ea59a1b"
},
"downloads": -1,
"filename": "celery_streaming_result-0.1.3.tar.gz",
"has_sig": false,
"md5_digest": "8c1f627cea0de228928b167e7c1eb5ca",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 5392,
"upload_time": "2024-05-12T12:25:19",
"upload_time_iso_8601": "2024-05-12T12:25:19.346634Z",
"url": "https://files.pythonhosted.org/packages/ad/ef/37cc840f6a3ed0927d8fe2d18445d26993c076a27b9bdabca9e8484ddddb/celery_streaming_result-0.1.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-12 12:25:19",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "celery-streaming-result"
}