task-helpers


Nametask-helpers JSON
Version 1.4.1 PyPI version JSON
download
home_pagehttps://github.com/loievskyi/task_helpers
SummaryA package for creating task helpers.
upload_time2024-02-24 20:20:41
maintainer
docs_urlNone
authorViacheslav Loievskyi
requires_python>=3.8
licenseBSD
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Task helpers - a package for creating task helpers.

[![build](https://github.com/loievskyi/task_helpers/actions/workflows/build.yml/badge.svg?branch=master)](https://github.com/loievskyi/task_helpers/actions/workflows/build.yml)
[![pypi](https://img.shields.io/pypi/v/task_helpers.svg)](https://pypi.org/project/task-helpers/)
[![coverage](https://img.shields.io/codecov/c/github/loievskyi/task_helpers/master.svg)](https://codecov.io/github/loievskyi/task_helpers?branch=master)

The package allows you to work with tasks.\
The idea is that it would be possible to create a task and send it for execution / processing somewhere (to the worker), without waiting for the result to be executed in the same block of code.
Or, for example, different clients (from different threads) can send many tasks for processing and each wait for its own result.

## Usage example. BaseWorker
```bash
# Run redis (This can be done in many ways, not necessarily through docker):
docker run -p 6379:6379 redis
```

### Client side:
```python3
import redis

from task_helpers.couriers.redis import RedisClientTaskCourier

task_courier = RedisClientTaskCourier(redis_connection=redis.Redis())
QUEUE_NAME = "bulk_data_saving"


def to_save(task_data):
    # Adding a task to the queue.
    task_id = task_courier.add_task_to_queue(
        queue_name=QUEUE_NAME,
        task_data=task_data)

    # waiting for the task to complete in the worker.
    saved_object = task_courier.wait_for_task_result(
        queue_name=QUEUE_NAME,
        task_id=task_id)
    return saved_object


if __name__ == "__main__":
    # Many clients can add tasks to the queue at the same time.
    task_data = {
        "name": "tomato",
        "price": "12.45"
    }
    saved_object = to_save(task_data=task_data)
    print(saved_object)
    # {'name': 'tomato', 'price': '12.45', 'id': UUID('...'), 'status': 'active')}

```

### Worker side:
```python3
import uuid
import redis

from task_helpers.couriers.redis import RedisWorkerTaskCourier
from task_helpers.workers.base import BaseWorker

task_courier = RedisWorkerTaskCourier(redis_connection=redis.Redis())
QUEUE_NAME = "bulk_data_saving"


class BulkSaveWorker(BaseWorker):
    queue_name = QUEUE_NAME
    max_tasks_per_iteration = 500

    def bulk_saving_plug(self, tasks):
        for task_id, task_data in tasks:
            task_data["id"] = uuid.uuid4()
            task_data["status"] = "active"
        return tasks

    def perform_tasks(self, tasks):
        tasks = self.bulk_saving_plug(tasks)
        # Bulk saving data_dicts (it's faster than saving 1 at a time.)

        print(f"saved {len(tasks)} objects.")
        # saved 1 objects.

        return tasks


if __name__ == "__main__":
    worker = BulkSaveWorker(task_courier=task_courier)
    worker.perform(total_iterations=500)
    # the worker will complete its work after 500 iterations
    # (in the future functionality it is necessary to prevent memory leaks)

```

## Installation
```bash
pip install task_helpers
```

## The couriers module
the couriers module is responsible for sending tasks from the worker to the client and back, as well as checking the execution status.

### Client side methods (ClientTaskCourier & AsyncClientTaskCourier):
- get_task_result - returns the result of the task, if it exists.
- wait_for_task_result - waits for the result of the task to appear, and then returns it.
- add_task_to_queue - adds one task to the queue for processing.
- bulk_add_tasks_to_queue - adds many tasks to the queue for processing.
- check_for_done - сhecks if the task has completed.

### Worker side methods (WorkerTaskCourier & AsyncWorkerTaskCourier):
- get_task - pops one task from the queue and returns it.
- bulk_get_tasks - pops many tasks from the queue and returns them.
- wait_for_task - Waits for a task to appear, pops it from the queue, and returns it.
- return_task_result - returns the result of the processing of the task to the client side.
- bulk_return_task_results - returns the results of processing multiple tasks to the client side.

### ClientWorkerTaskCourier & AsyncClientWorkerTaskCourier:
- all of the above

## The workers module
The workers module is intended for executing and processing tasks.

### BaseWorker & BaseAsyncWorker
A worker that can process many tasks in one iteration. (This can be useful if task_data are objects on which some operations can be done in bulk)
#### On BaseAsyncWorker.max_tasks_per_iteration default value is 1. If you want to process many tasks (similar to a BaseWorker), change the value of this field in the inherited class.

### BaseWorker methods:
- wait_for_tasks - waits for tasks in the queue, pops and returns them;
- perform_tasks - method for processing tasks. Should return a list of tasks.
- perform_single_task - abstract method for processing one task. Should return the result of the task. Not used if the "perform_tasks" method is overridden.
- return_task_results - method for sending task results to the clients.
- destroy - method for destroy objects after performing (requests.Session().close, for example)
- perform - the main method that starts the task worker. total_iterations argument are required (how many processing iterations the worker should do.)

### BaseAsyncWorker methods:
- async_init - aync init method for initialization async objects (aiohttp.ClientSession, for example).
- async_destroy - async destroy method for destroy async objects (aiohttp.ClientSession().close, for example).

The other methods are similar to the BaseWorker methods, but they are asynchronous and have slightly different logic inside:
- New task iteration start after starting previous "perform_tasks" method
(Not after its completion, as it was in the synchronous BaseWorker).

### ClassicWorker & ClassicAsyncWorker
Сlassic worker, where the task is a tuple: (task_id, task_data).
task_data is a dictionary with keys "function", "args" and "kwargs".
Arguments "args" and "kwargs" are optional.

### ClassicWorker methods:
- perform_single_task - method for processing one task. Should return the result of the task. Not used if the "perform_tasks" method is overridden.
task is a tuple: (task_id, task_data).
task_data is a dictionary with keys "function", "args" and "kwargs".
Calls a function with args "args" and kwargs "kwargs", unpacking them, and returns the execution result.
Arguments "args" and "kwargs" are optional.

### ClassicAsyncWorker methods:
- perform_single_task - method for processing one task. Should return the result of the task. Not used if the "perform_tasks" method is overridden.
task is a tuple: (task_id, task_data).
task_data is a dictionary with keys "function", "args" and "kwargs".
Calls a function asynchronously, with args "args" and kwargs "kwargs", unpacking them, and returns the execution result.
Arguments "args" and "kwargs" are optional. If the function is not asynchronous, will be called in "loop.run_in_executor" method.

## One more usage example. BaseAsyncWorker
```bash
# Run redis (This can be done in many ways, not necessarily through docker):
docker run -p 6379:6379 redis
```

### Client side:
```python3
import time
import redis
import requests

from task_helpers.couriers.redis import RedisClientTaskCourier

task_courier = RedisClientTaskCourier(redis_connection=redis.Redis())
QUEUE_NAME = "async_data_downloading"


def download_with_async_worker(urls: list):
    # Adding a task to the queue.
    task_ids = task_courier.bulk_add_tasks_to_queue(
        queue_name=QUEUE_NAME,
        tasks_data=urls)

    # waiting for the task to complete in the worker.
    for task_id in task_ids:
        downloaded_data = task_courier.wait_for_task_result(
            queue_name=QUEUE_NAME,
            task_id=task_id)
        if isinstance(downloaded_data, dict):
            yield downloaded_data["name"]
        else:
            yield downloaded_data.exception


def download_with_sync_session(urls: list):
    with requests.Session() as session:
        for url in urls:
            yield session.get(url)


if __name__ == "__main__":
    # Many clients can add tasks to the queue at the same time.
    urls = [f"https://pokeapi.co/api/v2/pokemon/{num}/" for num in range(100)]

    # async worker
    # Цaiting for the worker to start so that the execution time is correct.
    list(download_with_async_worker(urls=urls[:1]))

    before_time = time.perf_counter()
    names = list(download_with_async_worker(urls=urls))
    after_time = time.perf_counter()
    print(f"names: {names} \n")
    print(f"Time for downloading {len(names)} urls with async worker: "
          f"{after_time-before_time} sec.")

    # sync session
    before_time = time.perf_counter()
    names = list(download_with_sync_session(urls=urls))
    after_time = time.perf_counter()
    print(f"Time for downloading {len(names)} urls with requests.session: "
          f"{after_time-before_time} sec.")

```

### Worker side:
```python3
import redis
import asyncio
import aiohttp

from task_helpers.couriers.redis import RedisWorkerTaskCourier
from task_helpers.workers.base_async import BaseAsyncWorker

task_courier = RedisWorkerTaskCourier(redis_connection=redis.Redis())
QUEUE_NAME = "async_data_downloading"


class AsyncDownloadingWorker(BaseAsyncWorker):
    queue_name = QUEUE_NAME
    empty_queue_sleep_time = 0.01

    async def async_init(self):
        self.async_session = aiohttp.ClientSession()

    async def async_destroy(self):
        await self.async_session.close()

    async def download(self, url):
        print(f"Start downloading {url}")
        async with self.async_session.get(url) as response:
            if response.status == 200:
                response_json = await response.json()
                print(f"Downloaded. Status: {response.status}. Url: {url}")
                return response_json
            elif response.status == 404:
                print(f"Predownloaded. Status: {response.status}. Url: {url}")
                raise Exception(404)
            else:
                await asyncio.sleep(0.1)
                return await self.download(url)

    async def perform_single_task(self, task):
        task_id, task_data = task
        return await self.download(url=task_data)


if __name__ == "__main__":
    worker = AsyncDownloadingWorker(task_courier=task_courier)
    asyncio.run(
        worker.perform(total_iterations=10_000)
    )

```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/loievskyi/task_helpers",
    "name": "task-helpers",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "",
    "author": "Viacheslav Loievskyi",
    "author_email": "loievskyi.slava@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/b7/ff/8bbc751f8c51d0fafb37d48b85d2e8d30d58504340aa944653fe1b2cc657/task_helpers-1.4.1.tar.gz",
    "platform": null,
    "description": "# Task helpers - a package for creating task helpers.\n\n[![build](https://github.com/loievskyi/task_helpers/actions/workflows/build.yml/badge.svg?branch=master)](https://github.com/loievskyi/task_helpers/actions/workflows/build.yml)\n[![pypi](https://img.shields.io/pypi/v/task_helpers.svg)](https://pypi.org/project/task-helpers/)\n[![coverage](https://img.shields.io/codecov/c/github/loievskyi/task_helpers/master.svg)](https://codecov.io/github/loievskyi/task_helpers?branch=master)\n\nThe package allows you to work with tasks.\\\nThe idea is that it would be possible to create a task and send it for execution / processing somewhere (to the worker), without waiting for the result to be executed in the same block of code.\nOr, for example, different clients (from different threads) can send many tasks for processing and each wait for its own result.\n\n## Usage example. BaseWorker\n```bash\n# Run redis (This can be done in many ways, not necessarily through docker):\ndocker run -p 6379:6379 redis\n```\n\n### Client side:\n```python3\nimport redis\n\nfrom task_helpers.couriers.redis import RedisClientTaskCourier\n\ntask_courier = RedisClientTaskCourier(redis_connection=redis.Redis())\nQUEUE_NAME = \"bulk_data_saving\"\n\n\ndef to_save(task_data):\n    # Adding a task to the queue.\n    task_id = task_courier.add_task_to_queue(\n        queue_name=QUEUE_NAME,\n        task_data=task_data)\n\n    # waiting for the task to complete in the worker.\n    saved_object = task_courier.wait_for_task_result(\n        queue_name=QUEUE_NAME,\n        task_id=task_id)\n    return saved_object\n\n\nif __name__ == \"__main__\":\n    # Many clients can add tasks to the queue at the same time.\n    task_data = {\n        \"name\": \"tomato\",\n        \"price\": \"12.45\"\n    }\n    saved_object = to_save(task_data=task_data)\n    print(saved_object)\n    # {'name': 'tomato', 'price': '12.45', 'id': UUID('...'), 'status': 'active')}\n\n```\n\n### Worker side:\n```python3\nimport uuid\nimport redis\n\nfrom task_helpers.couriers.redis import RedisWorkerTaskCourier\nfrom task_helpers.workers.base import BaseWorker\n\ntask_courier = RedisWorkerTaskCourier(redis_connection=redis.Redis())\nQUEUE_NAME = \"bulk_data_saving\"\n\n\nclass BulkSaveWorker(BaseWorker):\n    queue_name = QUEUE_NAME\n    max_tasks_per_iteration = 500\n\n    def bulk_saving_plug(self, tasks):\n        for task_id, task_data in tasks:\n            task_data[\"id\"] = uuid.uuid4()\n            task_data[\"status\"] = \"active\"\n        return tasks\n\n    def perform_tasks(self, tasks):\n        tasks = self.bulk_saving_plug(tasks)\n        # Bulk saving data_dicts (it's faster than saving 1 at a time.)\n\n        print(f\"saved {len(tasks)} objects.\")\n        # saved 1 objects.\n\n        return tasks\n\n\nif __name__ == \"__main__\":\n    worker = BulkSaveWorker(task_courier=task_courier)\n    worker.perform(total_iterations=500)\n    # the worker will complete its work after 500 iterations\n    # (in the future functionality it is necessary to prevent memory leaks)\n\n```\n\n## Installation\n```bash\npip install task_helpers\n```\n\n## The couriers module\nthe couriers module is responsible for sending tasks from the worker to the client and back, as well as checking the execution status.\n\n### Client side methods (ClientTaskCourier & AsyncClientTaskCourier):\n- get_task_result - returns the result of the task, if it exists.\n- wait_for_task_result - waits for the result of the task to appear, and then returns it.\n- add_task_to_queue - adds one task to the queue for processing.\n- bulk_add_tasks_to_queue - adds many tasks to the queue for processing.\n- check_for_done - \u0441hecks if the task has completed.\n\n### Worker side methods (WorkerTaskCourier & AsyncWorkerTaskCourier):\n- get_task - pops one task from the queue and returns it.\n- bulk_get_tasks - pops many tasks from the queue and returns them.\n- wait_for_task - Waits for a task to appear, pops it from the queue, and returns it.\n- return_task_result - returns the result of the processing of the task to the client side.\n- bulk_return_task_results - returns the results of processing multiple tasks to the client side.\n\n### ClientWorkerTaskCourier & AsyncClientWorkerTaskCourier:\n- all of the above\n\n## The workers module\nThe workers module is intended for executing and processing tasks.\n\n### BaseWorker & BaseAsyncWorker\nA worker that can process many tasks in one iteration. (This can be useful if task_data are objects on which some operations can be done in bulk)\n#### On BaseAsyncWorker.max_tasks_per_iteration default value is 1. If you want to process many tasks (similar to a BaseWorker), change the value of this field in the inherited class.\n\n### BaseWorker methods:\n- wait_for_tasks - waits for tasks in the queue, pops and returns them;\n- perform_tasks - method for processing tasks. Should return a list of tasks.\n- perform_single_task - abstract method for processing one task. Should return the result of the task. Not used if the \"perform_tasks\" method is overridden.\n- return_task_results - method for sending task results to the clients.\n- destroy - method for destroy objects after performing (requests.Session().close, for example)\n- perform - the main method that starts the task worker. total_iterations argument are required (how many processing iterations the worker should do.)\n\n### BaseAsyncWorker methods:\n- async_init - aync init method for initialization async objects (aiohttp.ClientSession, for example).\n- async_destroy - async destroy method for destroy async objects (aiohttp.ClientSession().close, for example).\n\nThe other methods are similar to the BaseWorker methods, but they are asynchronous and have slightly different logic inside:\n- New task iteration start after starting previous \"perform_tasks\" method\n(Not after its completion, as it was in the synchronous BaseWorker).\n\n### ClassicWorker & ClassicAsyncWorker\n\u0421lassic worker, where the task is a tuple: (task_id, task_data).\ntask_data is a dictionary with keys \"function\", \"args\" and \"kwargs\".\nArguments \"args\" and \"kwargs\" are optional.\n\n### ClassicWorker methods:\n- perform_single_task - method for processing one task. Should return the result of the task. Not used if the \"perform_tasks\" method is overridden.\ntask is a tuple: (task_id, task_data).\ntask_data is a dictionary with keys \"function\", \"args\" and \"kwargs\".\nCalls a function with args \"args\" and kwargs \"kwargs\", unpacking them, and returns the execution result.\nArguments \"args\" and \"kwargs\" are optional.\n\n### ClassicAsyncWorker methods:\n- perform_single_task - method for processing one task. Should return the result of the task. Not used if the \"perform_tasks\" method is overridden.\ntask is a tuple: (task_id, task_data).\ntask_data is a dictionary with keys \"function\", \"args\" and \"kwargs\".\nCalls a function asynchronously, with args \"args\" and kwargs \"kwargs\", unpacking them, and returns the execution result.\nArguments \"args\" and \"kwargs\" are optional. If the function is not asynchronous, will be called in \"loop.run_in_executor\" method.\n\n## One more usage example. BaseAsyncWorker\n```bash\n# Run redis (This can be done in many ways, not necessarily through docker):\ndocker run -p 6379:6379 redis\n```\n\n### Client side:\n```python3\nimport time\nimport redis\nimport requests\n\nfrom task_helpers.couriers.redis import RedisClientTaskCourier\n\ntask_courier = RedisClientTaskCourier(redis_connection=redis.Redis())\nQUEUE_NAME = \"async_data_downloading\"\n\n\ndef download_with_async_worker(urls: list):\n    # Adding a task to the queue.\n    task_ids = task_courier.bulk_add_tasks_to_queue(\n        queue_name=QUEUE_NAME,\n        tasks_data=urls)\n\n    # waiting for the task to complete in the worker.\n    for task_id in task_ids:\n        downloaded_data = task_courier.wait_for_task_result(\n            queue_name=QUEUE_NAME,\n            task_id=task_id)\n        if isinstance(downloaded_data, dict):\n            yield downloaded_data[\"name\"]\n        else:\n            yield downloaded_data.exception\n\n\ndef download_with_sync_session(urls: list):\n    with requests.Session() as session:\n        for url in urls:\n            yield session.get(url)\n\n\nif __name__ == \"__main__\":\n    # Many clients can add tasks to the queue at the same time.\n    urls = [f\"https://pokeapi.co/api/v2/pokemon/{num}/\" for num in range(100)]\n\n    # async worker\n    # \u0426aiting for the worker to start so that the execution time is correct.\n    list(download_with_async_worker(urls=urls[:1]))\n\n    before_time = time.perf_counter()\n    names = list(download_with_async_worker(urls=urls))\n    after_time = time.perf_counter()\n    print(f\"names: {names} \\n\")\n    print(f\"Time for downloading {len(names)} urls with async worker: \"\n          f\"{after_time-before_time} sec.\")\n\n    # sync session\n    before_time = time.perf_counter()\n    names = list(download_with_sync_session(urls=urls))\n    after_time = time.perf_counter()\n    print(f\"Time for downloading {len(names)} urls with requests.session: \"\n          f\"{after_time-before_time} sec.\")\n\n```\n\n### Worker side:\n```python3\nimport redis\nimport asyncio\nimport aiohttp\n\nfrom task_helpers.couriers.redis import RedisWorkerTaskCourier\nfrom task_helpers.workers.base_async import BaseAsyncWorker\n\ntask_courier = RedisWorkerTaskCourier(redis_connection=redis.Redis())\nQUEUE_NAME = \"async_data_downloading\"\n\n\nclass AsyncDownloadingWorker(BaseAsyncWorker):\n    queue_name = QUEUE_NAME\n    empty_queue_sleep_time = 0.01\n\n    async def async_init(self):\n        self.async_session = aiohttp.ClientSession()\n\n    async def async_destroy(self):\n        await self.async_session.close()\n\n    async def download(self, url):\n        print(f\"Start downloading {url}\")\n        async with self.async_session.get(url) as response:\n            if response.status == 200:\n                response_json = await response.json()\n                print(f\"Downloaded. Status: {response.status}. Url: {url}\")\n                return response_json\n            elif response.status == 404:\n                print(f\"Predownloaded. Status: {response.status}. Url: {url}\")\n                raise Exception(404)\n            else:\n                await asyncio.sleep(0.1)\n                return await self.download(url)\n\n    async def perform_single_task(self, task):\n        task_id, task_data = task\n        return await self.download(url=task_data)\n\n\nif __name__ == \"__main__\":\n    worker = AsyncDownloadingWorker(task_courier=task_courier)\n    asyncio.run(\n        worker.perform(total_iterations=10_000)\n    )\n\n```\n",
    "bugtrack_url": null,
    "license": "BSD",
    "summary": "A package for creating task helpers.",
    "version": "1.4.1",
    "project_urls": {
        "Homepage": "https://github.com/loievskyi/task_helpers",
        "Source": "https://github.com/loievskyi/task_helpers"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4460283cc632c9cc3944526bed9cdcce01db0c36de601aa0bef6b6f984a877bb",
                "md5": "bbd1b0e5a2794df87ca8aed01ec24a4b",
                "sha256": "b72e56099379830e81a0ce9bf2d8f85678f2ca906ea7a6c4ab5d1c65ad537e2f"
            },
            "downloads": -1,
            "filename": "task_helpers-1.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "bbd1b0e5a2794df87ca8aed01ec24a4b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 21220,
            "upload_time": "2024-02-24T20:20:39",
            "upload_time_iso_8601": "2024-02-24T20:20:39.680496Z",
            "url": "https://files.pythonhosted.org/packages/44/60/283cc632c9cc3944526bed9cdcce01db0c36de601aa0bef6b6f984a877bb/task_helpers-1.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b7ff8bbc751f8c51d0fafb37d48b85d2e8d30d58504340aa944653fe1b2cc657",
                "md5": "770907ad34f5810f760c518cb61e966f",
                "sha256": "ef7fbc628cc25e733319c2d83360b54af9082b8b84a1145285c4610f95247a88"
            },
            "downloads": -1,
            "filename": "task_helpers-1.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "770907ad34f5810f760c518cb61e966f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 23421,
            "upload_time": "2024-02-24T20:20:41",
            "upload_time_iso_8601": "2024-02-24T20:20:41.813486Z",
            "url": "https://files.pythonhosted.org/packages/b7/ff/8bbc751f8c51d0fafb37d48b85d2e8d30d58504340aa944653fe1b2cc657/task_helpers-1.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-02-24 20:20:41",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "loievskyi",
    "github_project": "task_helpers",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "task-helpers"
}
        
Elapsed time: 0.20104s