arrlio


Namearrlio JSON
Version 0.24.0 PyPI version JSON
download
home_pageNone
SummaryNone
upload_time2024-04-16 13:18:53
maintainerNone
docs_urlNone
authorRoman Koshel
requires_python<4.0,>=3.10
licenseMIT
keywords rabbitmq
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Arrlio [WIP]

[Documentation](https://levsh.github.io/arrlio) (WIP)

Asyncio distributed task/workflow system with supports generators and graphs

![tests](https://github.com/levsh/arrlio/workflows/tests/badge.svg)
![coverage](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/levsh/727ed723ccaee0d5825513af6472e3a5/raw/coverage.json)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

### Installation
```bash
pip install arrlio
```
Or to use latest develop version
```bash
pip install git+https://github.com/levsh/arrlio
```

### Usage

#### Create tasks file
```python
# tasks.py

import io

import arrlio
import invoke

@arrlio.task
async def hello_world():
    return "Hello World!"

# task custom name
@arrlio.task(name="foo")
async def foo():
    arrlio.logger.info("Hello from task 'foo'!")

# exception example
@arrlio.task
async def exception():
    raise ZeroDivisionError

# Arrlio supports generators and async generators
@arrlio.task
def xrange(count):
    for x in range(count):
        yield x

@arrlio.task
async def add_one(value: str):
    return int(value) + 1


@arrlio.task
async def bash(cmd, stdin: str = None):
    in_stream = io.StringIO(stdin)
    out_stream = io.StringIO()
    result = invoke.run(
        cmd,
        in_stream=in_stream,
        out_stream=out_stream
    )
    return result.stdout
```

#### Create main file and run it

```python
import asyncio
import logging

import arrlio
import tasks

logger = logging.getLogger("arrlio")
logger.setLevel("INFO")

BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"

async def main():
    app = arrlio.App(arrlio.Config(backend={"module": BACKEND}))

    async with app:
        await app.consume_tasks()

        # call by task object
        ar = await app.send_task(tasks.hello_world)
        logger.info(await ar.get())

        # call by task name
        ar = await app.send_task("foo")
        logger.info(await ar.get())

        # task args example
        ar = await app.send_task(tasks.add_one, args=(1,))
        logger.info(await ar.get())

        # exception example
        try:
            ar = await app.send_task(tasks.exception)
            logger.info(await ar.get())
        except Exception as e:
            print(f"\nThis is example exception for {app.backend}:\n")
            logger.exception(e)
            print()

        # generator example
        results = []
        ar = await app.send_task(tasks.xrange, args=(3,))
        async for result in ar:
            results.append(result)
        logger.info(results)  # -> [0, 1, 2]


if __name__ == "__main__":
    asyncio.run(main())
```

#### Arrlio supports graph execution
```python
import asyncio
import logging

import arrlio
import tasks

logger = logging.getLogger("arrlio")
logger.setLevel("INFO")

BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"


async def main():
    graph = arrlio.Graph("My Graph")
    graph.add_node("A", tasks.add_one, root=True)
    graph.add_node("B", tasks.add_one)
    graph.add_node("C", tasks.add_one)
    graph.add_edge("A", "B")
    graph.add_edge("B", "C")

	# arrlio.plugins.events and arrlio.plugins.graphs
	# plugins are required
    app = arrlio.App(
        arrlio.Config(
            backend={"module": BACKEND},
            plugins=[
                {"module": "arrlio.plugins.events"},
                {"module": "arrlio.plugins.graphs"},
            ],
        )
    )

    async with app:
        await app.consume_tasks()

		# execute graph with argument 0
        ars = await app.send_graph(graph, args=(0,))
        logger.info("A: %i", await ars["A"].get())  # -> A: 1
        logger.info("B: %i", await ars["B"].get())  # -> B: 2
        logger.info("C: %i", await ars["C"].get())  # -> C: 3


if __name__ == "__main__":
    asyncio.run(main())
```
#### Another graph example
```python
import asyncio
import logging

import arrlio
import tasks

logger = logging.getLogger("arrlio")
logger.setLevel("INFO")

BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"


async def main():
    graph = arrlio.Graph("My Graph")
    graph.add_node("A", tasks.bash, root=True)
    graph.add_node("B", tasks.bash, args=("wc -w",))
    graph.add_edge("A", "B")

    app = arrlio.App(
        arrlio.Config(
            backend={"module": BACKEND},
            plugins=[
                {"module": "arrlio.plugins.events"},
                {"module": "arrlio.plugins.graphs"},
            ],
        )
    )

    async with app:
        await app.consume_tasks()

        ars = await app.send_graph(
            graph,
            args=('echo "Number of words in this sentence:"',)
        )
        logger.info(await asyncio.wait_for(ars["B"].get(), timeout=2))  # -> 6


if __name__ == "__main__":
    asyncio.run(main())
```

#### And more examples
```bash
poetry install
poetry run python examples/main.py
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "arrlio",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.10",
    "maintainer_email": null,
    "keywords": "rabbitmq",
    "author": "Roman Koshel",
    "author_email": "roma.koshel@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/13/56/9b41d594715ee27c3b393a9c86e90ef089b6f43f4ff1c73d0af5f3e471fd/arrlio-0.24.0.tar.gz",
    "platform": null,
    "description": "# Arrlio [WIP]\n\n[Documentation](https://levsh.github.io/arrlio) (WIP)\n\nAsyncio distributed task/workflow system with supports generators and graphs\n\n![tests](https://github.com/levsh/arrlio/workflows/tests/badge.svg)\n![coverage](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/levsh/727ed723ccaee0d5825513af6472e3a5/raw/coverage.json)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n\n### Installation\n```bash\npip install arrlio\n```\nOr to use latest develop version\n```bash\npip install git+https://github.com/levsh/arrlio\n```\n\n### Usage\n\n#### Create tasks file\n```python\n# tasks.py\n\nimport io\n\nimport arrlio\nimport invoke\n\n@arrlio.task\nasync def hello_world():\n    return \"Hello World!\"\n\n# task custom name\n@arrlio.task(name=\"foo\")\nasync def foo():\n    arrlio.logger.info(\"Hello from task 'foo'!\")\n\n# exception example\n@arrlio.task\nasync def exception():\n    raise ZeroDivisionError\n\n# Arrlio supports generators and async generators\n@arrlio.task\ndef xrange(count):\n    for x in range(count):\n        yield x\n\n@arrlio.task\nasync def add_one(value: str):\n    return int(value) + 1\n\n\n@arrlio.task\nasync def bash(cmd, stdin: str = None):\n    in_stream = io.StringIO(stdin)\n    out_stream = io.StringIO()\n    result = invoke.run(\n        cmd,\n        in_stream=in_stream,\n        out_stream=out_stream\n    )\n    return result.stdout\n```\n\n#### Create main file and run it\n\n```python\nimport asyncio\nimport logging\n\nimport arrlio\nimport tasks\n\nlogger = logging.getLogger(\"arrlio\")\nlogger.setLevel(\"INFO\")\n\nBACKEND = \"arrlio.backends.local\"\n# BACKEND = \"arrlio.backends.rabbitmq\"\n\nasync def main():\n    app = arrlio.App(arrlio.Config(backend={\"module\": BACKEND}))\n\n    async with app:\n        await app.consume_tasks()\n\n        # call by task object\n        ar = await app.send_task(tasks.hello_world)\n        logger.info(await ar.get())\n\n        # call by task name\n        ar = await app.send_task(\"foo\")\n        logger.info(await ar.get())\n\n        # task args example\n        ar = await app.send_task(tasks.add_one, args=(1,))\n        logger.info(await ar.get())\n\n        # exception example\n        try:\n            ar = await app.send_task(tasks.exception)\n            logger.info(await ar.get())\n        except Exception as e:\n            print(f\"\\nThis is example exception for {app.backend}:\\n\")\n            logger.exception(e)\n            print()\n\n        # generator example\n        results = []\n        ar = await app.send_task(tasks.xrange, args=(3,))\n        async for result in ar:\n            results.append(result)\n        logger.info(results)  # -> [0, 1, 2]\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n#### Arrlio supports graph execution\n```python\nimport asyncio\nimport logging\n\nimport arrlio\nimport tasks\n\nlogger = logging.getLogger(\"arrlio\")\nlogger.setLevel(\"INFO\")\n\nBACKEND = \"arrlio.backends.local\"\n# BACKEND = \"arrlio.backends.rabbitmq\"\n\n\nasync def main():\n    graph = arrlio.Graph(\"My Graph\")\n    graph.add_node(\"A\", tasks.add_one, root=True)\n    graph.add_node(\"B\", tasks.add_one)\n    graph.add_node(\"C\", tasks.add_one)\n    graph.add_edge(\"A\", \"B\")\n    graph.add_edge(\"B\", \"C\")\n\n\t# arrlio.plugins.events and arrlio.plugins.graphs\n\t# plugins are required\n    app = arrlio.App(\n        arrlio.Config(\n            backend={\"module\": BACKEND},\n            plugins=[\n                {\"module\": \"arrlio.plugins.events\"},\n                {\"module\": \"arrlio.plugins.graphs\"},\n            ],\n        )\n    )\n\n    async with app:\n        await app.consume_tasks()\n\n\t\t# execute graph with argument 0\n        ars = await app.send_graph(graph, args=(0,))\n        logger.info(\"A: %i\", await ars[\"A\"].get())  # -> A: 1\n        logger.info(\"B: %i\", await ars[\"B\"].get())  # -> B: 2\n        logger.info(\"C: %i\", await ars[\"C\"].get())  # -> C: 3\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n#### Another graph example\n```python\nimport asyncio\nimport logging\n\nimport arrlio\nimport tasks\n\nlogger = logging.getLogger(\"arrlio\")\nlogger.setLevel(\"INFO\")\n\nBACKEND = \"arrlio.backends.local\"\n# BACKEND = \"arrlio.backends.rabbitmq\"\n\n\nasync def main():\n    graph = arrlio.Graph(\"My Graph\")\n    graph.add_node(\"A\", tasks.bash, root=True)\n    graph.add_node(\"B\", tasks.bash, args=(\"wc -w\",))\n    graph.add_edge(\"A\", \"B\")\n\n    app = arrlio.App(\n        arrlio.Config(\n            backend={\"module\": BACKEND},\n            plugins=[\n                {\"module\": \"arrlio.plugins.events\"},\n                {\"module\": \"arrlio.plugins.graphs\"},\n            ],\n        )\n    )\n\n    async with app:\n        await app.consume_tasks()\n\n        ars = await app.send_graph(\n            graph,\n            args=('echo \"Number of words in this sentence:\"',)\n        )\n        logger.info(await asyncio.wait_for(ars[\"B\"].get(), timeout=2))  # -> 6\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n#### And more examples\n```bash\npoetry install\npoetry run python examples/main.py\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": null,
    "version": "0.24.0",
    "project_urls": null,
    "split_keywords": [
        "rabbitmq"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "267341f40d2652a64125788ff101aae52942ff2f5716ac8f9752346db6b9aa62",
                "md5": "555beb95fa314375076373d8936b345a",
                "sha256": "2c5510efc3a87afddf9fdb760215a99b82ef08a315bd18a35dfa946ffda0c28a"
            },
            "downloads": -1,
            "filename": "arrlio-0.24.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "555beb95fa314375076373d8936b345a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.10",
            "size": 38613,
            "upload_time": "2024-04-16T13:18:51",
            "upload_time_iso_8601": "2024-04-16T13:18:51.409917Z",
            "url": "https://files.pythonhosted.org/packages/26/73/41f40d2652a64125788ff101aae52942ff2f5716ac8f9752346db6b9aa62/arrlio-0.24.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "13569b41d594715ee27c3b393a9c86e90ef089b6f43f4ff1c73d0af5f3e471fd",
                "md5": "8cdf442b670314811e0657e464ad85f6",
                "sha256": "21d97401ea758ee67057f5e4e968d48c9cc8d0408aa66c421c6852f4be902576"
            },
            "downloads": -1,
            "filename": "arrlio-0.24.0.tar.gz",
            "has_sig": false,
            "md5_digest": "8cdf442b670314811e0657e464ad85f6",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.10",
            "size": 31178,
            "upload_time": "2024-04-16T13:18:53",
            "upload_time_iso_8601": "2024-04-16T13:18:53.030367Z",
            "url": "https://files.pythonhosted.org/packages/13/56/9b41d594715ee27c3b393a9c86e90ef089b6f43f4ff1c73d0af5f3e471fd/arrlio-0.24.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-16 13:18:53",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "arrlio"
}
        
Elapsed time: 0.30883s