Name | arrlio JSON |
Version |
0.24.0
JSON |
| download |
home_page | None |
Summary | None |
upload_time | 2024-04-16 13:18:53 |
maintainer | None |
docs_url | None |
author | Roman Koshel |
requires_python | <4.0,>=3.10 |
license | MIT |
keywords |
rabbitmq
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Arrlio [WIP]
[Documentation](https://levsh.github.io/arrlio) (WIP)
Asyncio distributed task/workflow system with supports generators and graphs
![tests](https://github.com/levsh/arrlio/workflows/tests/badge.svg)
![coverage](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/levsh/727ed723ccaee0d5825513af6472e3a5/raw/coverage.json)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
### Installation
```bash
pip install arrlio
```
Or to use latest develop version
```bash
pip install git+https://github.com/levsh/arrlio
```
### Usage
#### Create tasks file
```python
# tasks.py
import io
import arrlio
import invoke
@arrlio.task
async def hello_world():
return "Hello World!"
# task custom name
@arrlio.task(name="foo")
async def foo():
arrlio.logger.info("Hello from task 'foo'!")
# exception example
@arrlio.task
async def exception():
raise ZeroDivisionError
# Arrlio supports generators and async generators
@arrlio.task
def xrange(count):
for x in range(count):
yield x
@arrlio.task
async def add_one(value: str):
return int(value) + 1
@arrlio.task
async def bash(cmd, stdin: str = None):
in_stream = io.StringIO(stdin)
out_stream = io.StringIO()
result = invoke.run(
cmd,
in_stream=in_stream,
out_stream=out_stream
)
return result.stdout
```
#### Create main file and run it
```python
import asyncio
import logging
import arrlio
import tasks
logger = logging.getLogger("arrlio")
logger.setLevel("INFO")
BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"
async def main():
app = arrlio.App(arrlio.Config(backend={"module": BACKEND}))
async with app:
await app.consume_tasks()
# call by task object
ar = await app.send_task(tasks.hello_world)
logger.info(await ar.get())
# call by task name
ar = await app.send_task("foo")
logger.info(await ar.get())
# task args example
ar = await app.send_task(tasks.add_one, args=(1,))
logger.info(await ar.get())
# exception example
try:
ar = await app.send_task(tasks.exception)
logger.info(await ar.get())
except Exception as e:
print(f"\nThis is example exception for {app.backend}:\n")
logger.exception(e)
print()
# generator example
results = []
ar = await app.send_task(tasks.xrange, args=(3,))
async for result in ar:
results.append(result)
logger.info(results) # -> [0, 1, 2]
if __name__ == "__main__":
asyncio.run(main())
```
#### Arrlio supports graph execution
```python
import asyncio
import logging
import arrlio
import tasks
logger = logging.getLogger("arrlio")
logger.setLevel("INFO")
BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"
async def main():
graph = arrlio.Graph("My Graph")
graph.add_node("A", tasks.add_one, root=True)
graph.add_node("B", tasks.add_one)
graph.add_node("C", tasks.add_one)
graph.add_edge("A", "B")
graph.add_edge("B", "C")
# arrlio.plugins.events and arrlio.plugins.graphs
# plugins are required
app = arrlio.App(
arrlio.Config(
backend={"module": BACKEND},
plugins=[
{"module": "arrlio.plugins.events"},
{"module": "arrlio.plugins.graphs"},
],
)
)
async with app:
await app.consume_tasks()
# execute graph with argument 0
ars = await app.send_graph(graph, args=(0,))
logger.info("A: %i", await ars["A"].get()) # -> A: 1
logger.info("B: %i", await ars["B"].get()) # -> B: 2
logger.info("C: %i", await ars["C"].get()) # -> C: 3
if __name__ == "__main__":
asyncio.run(main())
```
#### Another graph example
```python
import asyncio
import logging
import arrlio
import tasks
logger = logging.getLogger("arrlio")
logger.setLevel("INFO")
BACKEND = "arrlio.backends.local"
# BACKEND = "arrlio.backends.rabbitmq"
async def main():
graph = arrlio.Graph("My Graph")
graph.add_node("A", tasks.bash, root=True)
graph.add_node("B", tasks.bash, args=("wc -w",))
graph.add_edge("A", "B")
app = arrlio.App(
arrlio.Config(
backend={"module": BACKEND},
plugins=[
{"module": "arrlio.plugins.events"},
{"module": "arrlio.plugins.graphs"},
],
)
)
async with app:
await app.consume_tasks()
ars = await app.send_graph(
graph,
args=('echo "Number of words in this sentence:"',)
)
logger.info(await asyncio.wait_for(ars["B"].get(), timeout=2)) # -> 6
if __name__ == "__main__":
asyncio.run(main())
```
#### And more examples
```bash
poetry install
poetry run python examples/main.py
```
Raw data
{
"_id": null,
"home_page": null,
"name": "arrlio",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.10",
"maintainer_email": null,
"keywords": "rabbitmq",
"author": "Roman Koshel",
"author_email": "roma.koshel@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/13/56/9b41d594715ee27c3b393a9c86e90ef089b6f43f4ff1c73d0af5f3e471fd/arrlio-0.24.0.tar.gz",
"platform": null,
"description": "# Arrlio [WIP]\n\n[Documentation](https://levsh.github.io/arrlio) (WIP)\n\nAsyncio distributed task/workflow system with supports generators and graphs\n\n![tests](https://github.com/levsh/arrlio/workflows/tests/badge.svg)\n![coverage](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/levsh/727ed723ccaee0d5825513af6472e3a5/raw/coverage.json)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n\n### Installation\n```bash\npip install arrlio\n```\nOr to use latest develop version\n```bash\npip install git+https://github.com/levsh/arrlio\n```\n\n### Usage\n\n#### Create tasks file\n```python\n# tasks.py\n\nimport io\n\nimport arrlio\nimport invoke\n\n@arrlio.task\nasync def hello_world():\n return \"Hello World!\"\n\n# task custom name\n@arrlio.task(name=\"foo\")\nasync def foo():\n arrlio.logger.info(\"Hello from task 'foo'!\")\n\n# exception example\n@arrlio.task\nasync def exception():\n raise ZeroDivisionError\n\n# Arrlio supports generators and async generators\n@arrlio.task\ndef xrange(count):\n for x in range(count):\n yield x\n\n@arrlio.task\nasync def add_one(value: str):\n return int(value) + 1\n\n\n@arrlio.task\nasync def bash(cmd, stdin: str = None):\n in_stream = io.StringIO(stdin)\n out_stream = io.StringIO()\n result = invoke.run(\n cmd,\n in_stream=in_stream,\n out_stream=out_stream\n )\n return result.stdout\n```\n\n#### Create main file and run it\n\n```python\nimport asyncio\nimport logging\n\nimport arrlio\nimport tasks\n\nlogger = logging.getLogger(\"arrlio\")\nlogger.setLevel(\"INFO\")\n\nBACKEND = \"arrlio.backends.local\"\n# BACKEND = \"arrlio.backends.rabbitmq\"\n\nasync def main():\n app = arrlio.App(arrlio.Config(backend={\"module\": BACKEND}))\n\n async with app:\n await app.consume_tasks()\n\n # call by task object\n ar = await app.send_task(tasks.hello_world)\n logger.info(await ar.get())\n\n # call by task name\n ar = await app.send_task(\"foo\")\n logger.info(await ar.get())\n\n # task args example\n ar = await app.send_task(tasks.add_one, args=(1,))\n logger.info(await ar.get())\n\n # exception example\n try:\n ar = await app.send_task(tasks.exception)\n logger.info(await ar.get())\n except Exception as e:\n print(f\"\\nThis is example exception for {app.backend}:\\n\")\n logger.exception(e)\n print()\n\n # generator example\n results = []\n ar = await app.send_task(tasks.xrange, args=(3,))\n async for result in ar:\n results.append(result)\n logger.info(results) # -> [0, 1, 2]\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n#### Arrlio supports graph execution\n```python\nimport asyncio\nimport logging\n\nimport arrlio\nimport tasks\n\nlogger = logging.getLogger(\"arrlio\")\nlogger.setLevel(\"INFO\")\n\nBACKEND = \"arrlio.backends.local\"\n# BACKEND = \"arrlio.backends.rabbitmq\"\n\n\nasync def main():\n graph = arrlio.Graph(\"My Graph\")\n graph.add_node(\"A\", tasks.add_one, root=True)\n graph.add_node(\"B\", tasks.add_one)\n graph.add_node(\"C\", tasks.add_one)\n graph.add_edge(\"A\", \"B\")\n graph.add_edge(\"B\", \"C\")\n\n\t# arrlio.plugins.events and arrlio.plugins.graphs\n\t# plugins are required\n app = arrlio.App(\n arrlio.Config(\n backend={\"module\": BACKEND},\n plugins=[\n {\"module\": \"arrlio.plugins.events\"},\n {\"module\": \"arrlio.plugins.graphs\"},\n ],\n )\n )\n\n async with app:\n await app.consume_tasks()\n\n\t\t# execute graph with argument 0\n ars = await app.send_graph(graph, args=(0,))\n logger.info(\"A: %i\", await ars[\"A\"].get()) # -> A: 1\n logger.info(\"B: %i\", await ars[\"B\"].get()) # -> B: 2\n logger.info(\"C: %i\", await ars[\"C\"].get()) # -> C: 3\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n#### Another graph example\n```python\nimport asyncio\nimport logging\n\nimport arrlio\nimport tasks\n\nlogger = logging.getLogger(\"arrlio\")\nlogger.setLevel(\"INFO\")\n\nBACKEND = \"arrlio.backends.local\"\n# BACKEND = \"arrlio.backends.rabbitmq\"\n\n\nasync def main():\n graph = arrlio.Graph(\"My Graph\")\n graph.add_node(\"A\", tasks.bash, root=True)\n graph.add_node(\"B\", tasks.bash, args=(\"wc -w\",))\n graph.add_edge(\"A\", \"B\")\n\n app = arrlio.App(\n arrlio.Config(\n backend={\"module\": BACKEND},\n plugins=[\n {\"module\": \"arrlio.plugins.events\"},\n {\"module\": \"arrlio.plugins.graphs\"},\n ],\n )\n )\n\n async with app:\n await app.consume_tasks()\n\n ars = await app.send_graph(\n graph,\n args=('echo \"Number of words in this sentence:\"',)\n )\n logger.info(await asyncio.wait_for(ars[\"B\"].get(), timeout=2)) # -> 6\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n#### And more examples\n```bash\npoetry install\npoetry run python examples/main.py\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": null,
"version": "0.24.0",
"project_urls": null,
"split_keywords": [
"rabbitmq"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "267341f40d2652a64125788ff101aae52942ff2f5716ac8f9752346db6b9aa62",
"md5": "555beb95fa314375076373d8936b345a",
"sha256": "2c5510efc3a87afddf9fdb760215a99b82ef08a315bd18a35dfa946ffda0c28a"
},
"downloads": -1,
"filename": "arrlio-0.24.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "555beb95fa314375076373d8936b345a",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.10",
"size": 38613,
"upload_time": "2024-04-16T13:18:51",
"upload_time_iso_8601": "2024-04-16T13:18:51.409917Z",
"url": "https://files.pythonhosted.org/packages/26/73/41f40d2652a64125788ff101aae52942ff2f5716ac8f9752346db6b9aa62/arrlio-0.24.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "13569b41d594715ee27c3b393a9c86e90ef089b6f43f4ff1c73d0af5f3e471fd",
"md5": "8cdf442b670314811e0657e464ad85f6",
"sha256": "21d97401ea758ee67057f5e4e968d48c9cc8d0408aa66c421c6852f4be902576"
},
"downloads": -1,
"filename": "arrlio-0.24.0.tar.gz",
"has_sig": false,
"md5_digest": "8cdf442b670314811e0657e464ad85f6",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.10",
"size": 31178,
"upload_time": "2024-04-16T13:18:53",
"upload_time_iso_8601": "2024-04-16T13:18:53.030367Z",
"url": "https://files.pythonhosted.org/packages/13/56/9b41d594715ee27c3b393a9c86e90ef089b6f43f4ff1c73d0af5f3e471fd/arrlio-0.24.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-16 13:18:53",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "arrlio"
}