# Olympipe
![coverage](https://gitlab.com/gabraken/olympipe/badges/master/coverage.svg?job=tests)![status](https://gitlab.com/gabraken/olympipe/badges/master/pipeline.svg)
![Olympipe](https://gitlab.com/gabraken/olympipe/-/raw/master/Olympipe.png)
This project will make pipelines
easy to use to improve parallel computing using the basic multiprocessing module. This module uses type checking to ensure your data process validity from the start.
## Basic usage
Each pipeline starts from an interator as a source of packets (a list, tuple, or any complex iterator). This pipeline will then be extended by adding basic `.task(<function>)`. The pipeline process join the main process when using the `.wait_for_results()` or `.wait_for_completion()` functions.
```python
from olympipe import Pipeline
def times_2(x: int) -> int:
return x * 2
p = Pipeline(range(10))
p1 = p.task(times_2) # Multiply each packet by 2
# or
p1 = p.task(lambda x: x * 2) # using a lambda function
res = p1.wait_for_result()
print(res) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
```
## Filtering
You can choose which packets to `.filter(<keep_function>)` by passing them a function returning True or False when applied to this packet.
```python
from olympipe import Pipeline
p = Pipeline(range(20))
p1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers
p2 = p1.batch(2) # Group in arrays of 2 elements
res = p2.wait_for_result()
print(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]
```
## In line formalization
You can chain declarations to have a more readable pipeline.
```python
from olympipe import Pipeline
[res] = Pipeline(range(20)).filter(lambda x: x % 2 == 0).batch(2).wait_for_results()
print(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]
```
## Debugging
Interpolate `.debug()` function anywhere in the pipe to print packets as they arrive in the pipe.
```python
from olympipe import Pipeline
p = Pipeline(range(20))
p1 = p.filter(lambda x: x % 2 == 0).debug() # Keep pair numbers
p2 = p1.batch(2).debug() # Group in arrays of 2 elements
p2.wait_for_completion()
```
## Real time processing (for sound, video...)
Use the `.temporal_batch(<seconds_float>)` pipe to aggregate packets received at this point each <seconds_float> seconds.
```python
import time
from olympipe import Pipeline
def delay(x: int) -> int:
time.sleep(0.1)
return x
p = Pipeline(range(20)).task(delay) # Wait 0.1 s for each queue element
p1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers
p2 = p1.temporal_batch(1.0) # Group in arrays of 2 elements
[res] = p2.wait_for_results()
print(res) # [[0, 2, 4, 6, 8], [10, 12, 14, 16, 18], []]
```
## Using classes in a pipeline
You can add a stateful class instance to a pipeline. The method used will be typecheked as well to ensure data coherence. You just have to use the `.class_task(<Class>, <Class.method>, ...)` method where Class.method is the actual method you will use to process each packet.
```python
item_count = 5
class StockPile:
def __init__(self, mul:int):
self.mul = mul
self.last = 0
def pile(self, num: int) -> int:
out = self.last
self.last = num * self.mul
return out
p1 = Pipeline(range(item_count))
p2 = p1.class_task(StockPile, StockPile.pile, [3])
[res] = p2.wait_for_results()
print(res) # [0, 0, 3, 6, 9]
```
This project is still an early version, feedback is very helpful.
Raw data
{
"_id": null,
"home_page": "https://gitlab.com/gabraken/olympipe",
"name": "olympipe",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.8,<4.0",
"maintainer_email": "",
"keywords": "pipeline,multiprocessing",
"author": "Gabriel Kasser",
"author_email": "gabriel.kasser@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/07/71/aa16adac918f964e0e3b3702df8e6634fe9a178eedcbcf63b7635d6616a8/olympipe-1.4.5.tar.gz",
"platform": null,
"description": "# Olympipe\n\n![coverage](https://gitlab.com/gabraken/olympipe/badges/master/coverage.svg?job=tests)![status](https://gitlab.com/gabraken/olympipe/badges/master/pipeline.svg)\n\n![Olympipe](https://gitlab.com/gabraken/olympipe/-/raw/master/Olympipe.png)\n\nThis project will make pipelines\neasy to use to improve parallel computing using the basic multiprocessing module. This module uses type checking to ensure your data process validity from the start.\n\n## Basic usage\n\nEach pipeline starts from an interator as a source of packets (a list, tuple, or any complex iterator). This pipeline will then be extended by adding basic `.task(<function>)`. The pipeline process join the main process when using the `.wait_for_results()` or `.wait_for_completion()` functions.\n\n```python\n\nfrom olympipe import Pipeline\n\ndef times_2(x: int) -> int:\n return x * 2\n\np = Pipeline(range(10))\n\np1 = p.task(times_2) # Multiply each packet by 2\n# or\np1 = p.task(lambda x: x * 2) # using a lambda function\n\nres = p1.wait_for_result()\n\nprint(res) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]\n\n```\n\n## Filtering\n\nYou can choose which packets to `.filter(<keep_function>)` by passing them a function returning True or False when applied to this packet.\n\n```python\n\nfrom olympipe import Pipeline\n\np = Pipeline(range(20))\np1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers\np2 = p1.batch(2) # Group in arrays of 2 elements\n\nres = p2.wait_for_result()\n\nprint(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]\n\n```\n\n## In line formalization\n\nYou can chain declarations to have a more readable pipeline.\n\n```python\n\nfrom olympipe import Pipeline\n\n[res] = Pipeline(range(20)).filter(lambda x: x % 2 == 0).batch(2).wait_for_results()\n\nprint(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]\n\n```\n\n## Debugging\n\nInterpolate `.debug()` function anywhere in the pipe to print packets as they arrive in the pipe.\n\n```python\nfrom olympipe import Pipeline\n\np = Pipeline(range(20))\np1 = p.filter(lambda x: x % 2 == 0).debug() # Keep pair numbers\np2 = p1.batch(2).debug() # Group in arrays of 2 elements\n\np2.wait_for_completion()\n```\n\n## Real time processing (for sound, video...)\n\nUse the `.temporal_batch(<seconds_float>)` pipe to aggregate packets received at this point each <seconds_float> seconds.\n\n```python\nimport time\nfrom olympipe import Pipeline\n\ndef delay(x: int) -> int:\n time.sleep(0.1)\n return x\n\np = Pipeline(range(20)).task(delay) # Wait 0.1 s for each queue element\np1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers\np2 = p1.temporal_batch(1.0) # Group in arrays of 2 elements\n\n[res] = p2.wait_for_results()\n\nprint(res) # [[0, 2, 4, 6, 8], [10, 12, 14, 16, 18], []]\n```\n\n## Using classes in a pipeline\n\nYou can add a stateful class instance to a pipeline. The method used will be typecheked as well to ensure data coherence. You just have to use the `.class_task(<Class>, <Class.method>, ...)` method where Class.method is the actual method you will use to process each packet.\n\n```python\nitem_count = 5\n\nclass StockPile:\n def __init__(self, mul:int):\n self.mul = mul\n self.last = 0\n\n def pile(self, num: int) -> int:\n out = self.last\n self.last = num * self.mul\n return out\n\n\np1 = Pipeline(range(item_count))\n\np2 = p1.class_task(StockPile, StockPile.pile, [3])\n\n[res] = p2.wait_for_results()\n\nprint(res) # [0, 0, 3, 6, 9]\n\n```\n\nThis project is still an early version, feedback is very helpful.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A powerful parallel pipelining tool",
"version": "1.4.5",
"project_urls": {
"Homepage": "https://gitlab.com/gabraken/olympipe",
"Repository": "https://gitlab.com/gabraken/olympipe"
},
"split_keywords": [
"pipeline",
"multiprocessing"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "ad237f390c403d9ea5222b829a0f28479ffe9f3e144d6c35a3529123ab66c34b",
"md5": "ec4a5319038c5b4a498c461772997c06",
"sha256": "e7623c856cacfff0b0b536364e02df2ad8661bff03276edaef0125e89f454d7f"
},
"downloads": -1,
"filename": "olympipe-1.4.5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ec4a5319038c5b4a498c461772997c06",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8,<4.0",
"size": 17785,
"upload_time": "2023-12-10T13:10:36",
"upload_time_iso_8601": "2023-12-10T13:10:36.789487Z",
"url": "https://files.pythonhosted.org/packages/ad/23/7f390c403d9ea5222b829a0f28479ffe9f3e144d6c35a3529123ab66c34b/olympipe-1.4.5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "0771aa16adac918f964e0e3b3702df8e6634fe9a178eedcbcf63b7635d6616a8",
"md5": "1215080be30f743d1884ef04539d9710",
"sha256": "8ae77f9499f942189256f11f7445fce3d976fcb576266a9ba6e562f002e1e34c"
},
"downloads": -1,
"filename": "olympipe-1.4.5.tar.gz",
"has_sig": false,
"md5_digest": "1215080be30f743d1884ef04539d9710",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8,<4.0",
"size": 12175,
"upload_time": "2023-12-10T13:10:38",
"upload_time_iso_8601": "2023-12-10T13:10:38.206707Z",
"url": "https://files.pythonhosted.org/packages/07/71/aa16adac918f964e0e3b3702df8e6634fe9a178eedcbcf63b7635d6616a8/olympipe-1.4.5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-12-10 13:10:38",
"github": false,
"gitlab": true,
"bitbucket": false,
"codeberg": false,
"gitlab_user": "gabraken",
"gitlab_project": "olympipe",
"lcname": "olympipe"
}