taskiq-pipelines


Nametaskiq-pipelines JSON
Version 0.1.2 PyPI version JSON
download
home_pagehttps://github.com/taskiq-python/taskiq-pipelines
SummaryTaskiq pipelines for task chaining.
upload_time2024-08-03 11:58:35
maintainerNone
docs_urlNone
authorPavel Kirilin
requires_python<4.0.0,>=3.8.1
licenseLICENSE
keywords taskiq pipelines tasks distributed async
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Pipelines for taskiq

Taskiq pipelines is a `fire-and-forget` at its limit.

Imagine you have a really tough functions and you want
to call them sequentially one after one, but you don't want to wait for them
to complete. taskiq-pipeline solves this for you.

## Installation


You can install it from pypi:
```
pip install taskiq-pipelines
```

After you installed it you need to add our super clever middleware
to your broker.

This middleware actually decides what to do next, after current step
is completed.

```python
from taskiq_pipelines.middleware import PipelineMiddleware

my_super_broker = ...


my_super_broker.add_middlewares(
    [
        PipelineMiddleware(),
    ]
)
```

Also we have to admit that your broker MUST use result_backend that
can be read by all your workers. Pipelines work with inmemorybroker,
feel free to use it in local development.


### Example

For this example I'm going to use one single script file.

```python
import asyncio
from typing import Any, List
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq_pipelines import PipelineMiddleware, Pipeline

broker = InMemoryBroker()
broker.add_middlewares([PipelineMiddleware()])


@broker.task
def add_one(value: int) -> int:
    return value + 1


@broker.task
def repeat(value: Any, reps: int) -> List[Any]:
    return [value] * reps


@broker.task
def check(value: int) -> bool:
    return value >= 0


async def main():
    pipe = (
        Pipeline(
            broker,
            add_one,  # First of all we call add_one function.
        )
        # 2
        .call_next(repeat, reps=4)  #  Here we repeat our value 4 times
        # [2, 2, 2, 2]
        .map(add_one)  # Here we execute given function for each value.
        # [3, 3, 3, 3]
        .filter(check)  # Here we filter some values.
        # But sice our filter filters out all numbers less than zero,
        # our value won't change.
        # [3, 3, 3, 3]
    )
    task = await pipe.kiq(1)
    result = await task.wait_result()
    print("Calculated value:", result.return_value)


if __name__ == "__main__":
    asyncio.run(main())

```

If you run this example, it prints this:
```bash
$ python script.py
Calculated value: [3, 3, 3, 3]
```

Let's talk about this example.
Two notable things here:
1. We must add PipelineMiddleware in the list of our middlewares.
2. We can use only tasks as functions we wan to execute in pipeline.
    If you want to execute ordinary python function - you must wrap it in task.

Pipeline itself is just a convinient wrapper over list of steps.
Constructed pipeline has the same semantics as the ordinary task, and you can add steps
manually. But all steps of the pipeline must implement `taskiq_pipelines.abc.AbstractStep` class.

Pipelines can be serialized to strings with `dumps` method, and you can load them back with `Pipeline.loads` method. So you can share pipelines you want to execute as simple strings.

Pipeline assign `task_id` for each task when you call `kiq`, and executes every step with pre-calculated `task_id`,
so you know all task ids after you call kiq method.


## How does it work?

After you call `kiq` method of the pipeline it pre-calculates
all task_ids, serializes itself and adds serialized string to
the labels of the first task in the chain.

All the magic happens in the middleware.
After task is executed and result is saved, you can easily deserialize pipeline
back and calculate pipeline's next move. And that's the trick.
You can get more information from the source code of each pipeline step.

# Available steps

We have a few steps available for chaining calls:
1. Sequential
2. Mapper
3. Filter

### Sequential steps

This type of step is just an ordinary call of the function.
If you haven't specified `param_name` argument, then the result
of the previous step will be passed as the first argument of the function.
If you did specify the `param_name` argument, then the result of the previous
step can be found in key word arguments with the param name you specified.

You can add sequential steps with `.call_next` method of the pipeline.

If you don't want to pass the result of the previous step to the next one,
you can use `.call_after` method of the pipeline.

### Mapper step

This step runs specified task for each item of the previous task's result spawning
multiple tasks.
But I have to admit, that the result of the previous task must be iterable.
Otherwise it will mark the pipeline as failed.

After the execution you'll have mapped list.
You can add mappers by calling `.map` method of the pipeline.

### Filter step

This step runs specified task for each item of the previous task's result.
But I have to admit, that the result of the previous task must be iterable.
Otherwise it will mark the pipeline as failed.

If called tasks returned `True` for some element, this element will be added in the final list.

After the execution you'll get a list with filtered results.
You can add filters by calling `.filter` method of the pipeline.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/taskiq-python/taskiq-pipelines",
    "name": "taskiq-pipelines",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0.0,>=3.8.1",
    "maintainer_email": null,
    "keywords": "taskiq, pipelines, tasks, distributed, async",
    "author": "Pavel Kirilin",
    "author_email": "win10@list.ru",
    "download_url": "https://files.pythonhosted.org/packages/75/53/4727eed5072a63609a5cb5b13da81e954dca0c063c7a75031859c244db06/taskiq_pipelines-0.1.2.tar.gz",
    "platform": null,
    "description": "# Pipelines for taskiq\n\nTaskiq pipelines is a `fire-and-forget` at its limit.\n\nImagine you have a really tough functions and you want\nto call them sequentially one after one, but you don't want to wait for them\nto complete. taskiq-pipeline solves this for you.\n\n## Installation\n\n\nYou can install it from pypi:\n```\npip install taskiq-pipelines\n```\n\nAfter you installed it you need to add our super clever middleware\nto your broker.\n\nThis middleware actually decides what to do next, after current step\nis completed.\n\n```python\nfrom taskiq_pipelines.middleware import PipelineMiddleware\n\nmy_super_broker = ...\n\n\nmy_super_broker.add_middlewares(\n    [\n        PipelineMiddleware(),\n    ]\n)\n```\n\nAlso we have to admit that your broker MUST use result_backend that\ncan be read by all your workers. Pipelines work with inmemorybroker,\nfeel free to use it in local development.\n\n\n### Example\n\nFor this example I'm going to use one single script file.\n\n```python\nimport asyncio\nfrom typing import Any, List\nfrom taskiq.brokers.inmemory_broker import InMemoryBroker\nfrom taskiq_pipelines import PipelineMiddleware, Pipeline\n\nbroker = InMemoryBroker()\nbroker.add_middlewares([PipelineMiddleware()])\n\n\n@broker.task\ndef add_one(value: int) -> int:\n    return value + 1\n\n\n@broker.task\ndef repeat(value: Any, reps: int) -> List[Any]:\n    return [value] * reps\n\n\n@broker.task\ndef check(value: int) -> bool:\n    return value >= 0\n\n\nasync def main():\n    pipe = (\n        Pipeline(\n            broker,\n            add_one,  # First of all we call add_one function.\n        )\n        # 2\n        .call_next(repeat, reps=4)  #  Here we repeat our value 4 times\n        # [2, 2, 2, 2]\n        .map(add_one)  # Here we execute given function for each value.\n        # [3, 3, 3, 3]\n        .filter(check)  # Here we filter some values.\n        # But sice our filter filters out all numbers less than zero,\n        # our value won't change.\n        # [3, 3, 3, 3]\n    )\n    task = await pipe.kiq(1)\n    result = await task.wait_result()\n    print(\"Calculated value:\", result.return_value)\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n\n```\n\nIf you run this example, it prints this:\n```bash\n$ python script.py\nCalculated value: [3, 3, 3, 3]\n```\n\nLet's talk about this example.\nTwo notable things here:\n1. We must add PipelineMiddleware in the list of our middlewares.\n2. We can use only tasks as functions we wan to execute in pipeline.\n    If you want to execute ordinary python function - you must wrap it in task.\n\nPipeline itself is just a convinient wrapper over list of steps.\nConstructed pipeline has the same semantics as the ordinary task, and you can add steps\nmanually. But all steps of the pipeline must implement `taskiq_pipelines.abc.AbstractStep` class.\n\nPipelines can be serialized to strings with `dumps` method, and you can load them back with `Pipeline.loads` method. So you can share pipelines you want to execute as simple strings.\n\nPipeline assign `task_id` for each task when you call `kiq`, and executes every step with pre-calculated `task_id`,\nso you know all task ids after you call kiq method.\n\n\n## How does it work?\n\nAfter you call `kiq` method of the pipeline it pre-calculates\nall task_ids, serializes itself and adds serialized string to\nthe labels of the first task in the chain.\n\nAll the magic happens in the middleware.\nAfter task is executed and result is saved, you can easily deserialize pipeline\nback and calculate pipeline's next move. And that's the trick.\nYou can get more information from the source code of each pipeline step.\n\n# Available steps\n\nWe have a few steps available for chaining calls:\n1. Sequential\n2. Mapper\n3. Filter\n\n### Sequential steps\n\nThis type of step is just an ordinary call of the function.\nIf you haven't specified `param_name` argument, then the result\nof the previous step will be passed as the first argument of the function.\nIf you did specify the `param_name` argument, then the result of the previous\nstep can be found in key word arguments with the param name you specified.\n\nYou can add sequential steps with `.call_next` method of the pipeline.\n\nIf you don't want to pass the result of the previous step to the next one,\nyou can use `.call_after` method of the pipeline.\n\n### Mapper step\n\nThis step runs specified task for each item of the previous task's result spawning\nmultiple tasks.\nBut I have to admit, that the result of the previous task must be iterable.\nOtherwise it will mark the pipeline as failed.\n\nAfter the execution you'll have mapped list.\nYou can add mappers by calling `.map` method of the pipeline.\n\n### Filter step\n\nThis step runs specified task for each item of the previous task's result.\nBut I have to admit, that the result of the previous task must be iterable.\nOtherwise it will mark the pipeline as failed.\n\nIf called tasks returned `True` for some element, this element will be added in the final list.\n\nAfter the execution you'll get a list with filtered results.\nYou can add filters by calling `.filter` method of the pipeline.\n",
    "bugtrack_url": null,
    "license": "LICENSE",
    "summary": "Taskiq pipelines for task chaining.",
    "version": "0.1.2",
    "project_urls": {
        "Homepage": "https://github.com/taskiq-python/taskiq-pipelines",
        "Repository": "https://github.com/taskiq-python/taskiq-pipelines"
    },
    "split_keywords": [
        "taskiq",
        " pipelines",
        " tasks",
        " distributed",
        " async"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5ff1300ee824042ea56188fc084f7f0825cccb710b1b4c1da60978eb0630c1bb",
                "md5": "4afa3e6495945659d102ad6dab8a1434",
                "sha256": "c748e6deaf895d305a34bdc2575514cd5c712894e826f88b59d78776aa97e838"
            },
            "downloads": -1,
            "filename": "taskiq_pipelines-0.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4afa3e6495945659d102ad6dab8a1434",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0.0,>=3.8.1",
            "size": 15402,
            "upload_time": "2024-08-03T11:58:34",
            "upload_time_iso_8601": "2024-08-03T11:58:34.194226Z",
            "url": "https://files.pythonhosted.org/packages/5f/f1/300ee824042ea56188fc084f7f0825cccb710b1b4c1da60978eb0630c1bb/taskiq_pipelines-0.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "75534727eed5072a63609a5cb5b13da81e954dca0c063c7a75031859c244db06",
                "md5": "0555cde232de3b5e076aea76df380bc6",
                "sha256": "af887a0452b83a6f9c014c0c37e42fa3085bf7d244f0920ad628397bc65d1119"
            },
            "downloads": -1,
            "filename": "taskiq_pipelines-0.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "0555cde232de3b5e076aea76df380bc6",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0.0,>=3.8.1",
            "size": 13039,
            "upload_time": "2024-08-03T11:58:35",
            "upload_time_iso_8601": "2024-08-03T11:58:35.570687Z",
            "url": "https://files.pythonhosted.org/packages/75/53/4727eed5072a63609a5cb5b13da81e954dca0c063c7a75031859c244db06/taskiq_pipelines-0.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-08-03 11:58:35",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "taskiq-python",
    "github_project": "taskiq-pipelines",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "taskiq-pipelines"
}
        
Elapsed time: 0.59198s