celery-streaming-result


Namecelery-streaming-result JSON
Version 0.1.3 PyPI version JSON
download
home_pageNone
SummaryCelery任务结果分片管理
upload_time2024-05-12 12:25:19
maintainerZhou WeiKe
docs_urlNone
authorZhou WeiKe
requires_pythonNone
licenseMIT
keywords celery_streaming_result celery streaming result
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # celery-streaming-result

Celery任务结果分片管理。

## 安装

```
pip install celery-streaming-result
```

## 使用方法

### 服务端

```python
import time
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager

app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)

@app.task(bind=True)
def task1(celery_task):
    result = []
    for i in range(10):
        csrm.append_result_chunk(celery_task, i)
        result.append(i)
    csrm.append_ended_chunk(celery_task)
    return result

```

## 客户端(同步)

```python
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager

# 根据你的task定义,正确引用
from test_server import task1

app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)

# 生成一个异步任务
atask1 = task1.delay()
# 读取该异步任务的结果分片
for chunk in csrm.get_result_chunks(atask1):
    print(chunk, end="-", flush=True)
```

## 客户端(异步)

```python
from redis import asyncio as aioredis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
from celery_streaming_result import start_celery_task_async
from celery_streaming_result import get_celery_task_result_async
from test_server import task1  # 根据你的task定义,正确引用

app = app_or_default()
redis_instance = aioredis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)


async def on_finished(celery_task, break_flag=False):
    print("on finished...")
    task_result = await get_celery_task_result_async(
        celery_task,
    )
    # 这里的task_result值是celery任务的返回值。
    # 一般来说是所有结果分片的集合,但实际只取决于celery任务的实现。


atask1 = await start_celery_task_async(
    task1
)  # task1.delay()是一个同步函数。需要使用`sync_to_async`进行转化。
# 读取该异步任务的结果分片,如果任务结果,则回调on_finished函数。
async for chunk in csrm.get_result_chunks(
    atask1,
    on_finished=on_finished,
):
    print(chunk, end="-", flush=True)
```

## 版本记录

### v0.1.0

1. 首次发布。

### v0.1.1

1. 添加asyncio支持。
1. 获取结果支持on_finished回调。

### v0.1.3

1. 流式中断支持。

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "celery-streaming-result",
    "maintainer": "Zhou WeiKe",
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "celery_streaming_result, celery, streaming result",
    "author": "Zhou WeiKe",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/ad/ef/37cc840f6a3ed0927d8fe2d18445d26993c076a27b9bdabca9e8484ddddb/celery_streaming_result-0.1.3.tar.gz",
    "platform": null,
    "description": "# celery-streaming-result\n\nCelery\u4efb\u52a1\u7ed3\u679c\u5206\u7247\u7ba1\u7406\u3002\n\n## \u5b89\u88c5\n\n```\npip install celery-streaming-result\n```\n\n## \u4f7f\u7528\u65b9\u6cd5\n\n### \u670d\u52a1\u7aef\n\n```python\nimport time\nimport redis\nfrom celery.app import app_or_default\nfrom celery_streaming_result import CeleryStreamingResultManager\n\napp = app_or_default()\nredis_instance = redis.Redis()\ncsrm = CeleryStreamingResultManager(redis_instance)\n\n@app.task(bind=True)\ndef task1(celery_task):\n    result = []\n    for i in range(10):\n        csrm.append_result_chunk(celery_task, i)\n        result.append(i)\n    csrm.append_ended_chunk(celery_task)\n    return result\n\n```\n\n## \u5ba2\u6237\u7aef\uff08\u540c\u6b65\uff09\n\n```python\nimport redis\nfrom celery.app import app_or_default\nfrom celery_streaming_result import CeleryStreamingResultManager\n\n# \u6839\u636e\u4f60\u7684task\u5b9a\u4e49\uff0c\u6b63\u786e\u5f15\u7528\nfrom test_server import task1\n\napp = app_or_default()\nredis_instance = redis.Redis()\ncsrm = CeleryStreamingResultManager(redis_instance)\n\n# \u751f\u6210\u4e00\u4e2a\u5f02\u6b65\u4efb\u52a1\natask1 = task1.delay()\n# \u8bfb\u53d6\u8be5\u5f02\u6b65\u4efb\u52a1\u7684\u7ed3\u679c\u5206\u7247\nfor chunk in csrm.get_result_chunks(atask1):\n    print(chunk, end=\"-\", flush=True)\n```\n\n## \u5ba2\u6237\u7aef\uff08\u5f02\u6b65\uff09\n\n```python\nfrom redis import asyncio as aioredis\nfrom celery.app import app_or_default\nfrom celery_streaming_result import CeleryStreamingResultManager\nfrom celery_streaming_result import start_celery_task_async\nfrom celery_streaming_result import get_celery_task_result_async\nfrom test_server import task1  # \u6839\u636e\u4f60\u7684task\u5b9a\u4e49\uff0c\u6b63\u786e\u5f15\u7528\n\napp = app_or_default()\nredis_instance = aioredis.Redis()\ncsrm = CeleryStreamingResultManager(redis_instance)\n\n\nasync def on_finished(celery_task, break_flag=False):\n    print(\"on finished...\")\n    task_result = await get_celery_task_result_async(\n        celery_task,\n    )\n    # \u8fd9\u91cc\u7684task_result\u503c\u662fcelery\u4efb\u52a1\u7684\u8fd4\u56de\u503c\u3002\n    # \u4e00\u822c\u6765\u8bf4\u662f\u6240\u6709\u7ed3\u679c\u5206\u7247\u7684\u96c6\u5408\uff0c\u4f46\u5b9e\u9645\u53ea\u53d6\u51b3\u4e8ecelery\u4efb\u52a1\u7684\u5b9e\u73b0\u3002\n\n\natask1 = await start_celery_task_async(\n    task1\n)  # task1.delay()\u662f\u4e00\u4e2a\u540c\u6b65\u51fd\u6570\u3002\u9700\u8981\u4f7f\u7528`sync_to_async`\u8fdb\u884c\u8f6c\u5316\u3002\n# \u8bfb\u53d6\u8be5\u5f02\u6b65\u4efb\u52a1\u7684\u7ed3\u679c\u5206\u7247\uff0c\u5982\u679c\u4efb\u52a1\u7ed3\u679c\uff0c\u5219\u56de\u8c03on_finished\u51fd\u6570\u3002\nasync for chunk in csrm.get_result_chunks(\n    atask1,\n    on_finished=on_finished,\n):\n    print(chunk, end=\"-\", flush=True)\n```\n\n## \u7248\u672c\u8bb0\u5f55\n\n### v0.1.0\n\n1. \u9996\u6b21\u53d1\u5e03\u3002\n\n### v0.1.1\n\n1. \u6dfb\u52a0asyncio\u652f\u6301\u3002\n1. \u83b7\u53d6\u7ed3\u679c\u652f\u6301on_finished\u56de\u8c03\u3002\n\n### v0.1.3\n\n1. \u6d41\u5f0f\u4e2d\u65ad\u652f\u6301\u3002\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Celery\u4efb\u52a1\u7ed3\u679c\u5206\u7247\u7ba1\u7406",
    "version": "0.1.3",
    "project_urls": null,
    "split_keywords": [
        "celery_streaming_result",
        " celery",
        " streaming result"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "627a694656ce345f6f3639aec6fd34bf615f736bf339731054254f3a788b6680",
                "md5": "aa441195bd156c1d9b2314d0e454595f",
                "sha256": "88651642c032c42b00136af41ab048cb3baafbabc4d281428a4abd5f25babb55"
            },
            "downloads": -1,
            "filename": "celery_streaming_result-0.1.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "aa441195bd156c1d9b2314d0e454595f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 5483,
            "upload_time": "2024-05-12T12:25:17",
            "upload_time_iso_8601": "2024-05-12T12:25:17.879225Z",
            "url": "https://files.pythonhosted.org/packages/62/7a/694656ce345f6f3639aec6fd34bf615f736bf339731054254f3a788b6680/celery_streaming_result-0.1.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "adef37cc840f6a3ed0927d8fe2d18445d26993c076a27b9bdabca9e8484ddddb",
                "md5": "8c1f627cea0de228928b167e7c1eb5ca",
                "sha256": "9a2a7f60c60a02600143ae122e3ba92d57d23137d2c4a160a66b2d762ea59a1b"
            },
            "downloads": -1,
            "filename": "celery_streaming_result-0.1.3.tar.gz",
            "has_sig": false,
            "md5_digest": "8c1f627cea0de228928b167e7c1eb5ca",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 5392,
            "upload_time": "2024-05-12T12:25:19",
            "upload_time_iso_8601": "2024-05-12T12:25:19.346634Z",
            "url": "https://files.pythonhosted.org/packages/ad/ef/37cc840f6a3ed0927d8fe2d18445d26993c076a27b9bdabca9e8484ddddb/celery_streaming_result-0.1.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-05-12 12:25:19",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "celery-streaming-result"
}
        
Elapsed time: 0.25711s