olympipe


Nameolympipe JSON
Version 1.4.5 PyPI version JSON
download
home_pagehttps://gitlab.com/gabraken/olympipe
SummaryA powerful parallel pipelining tool
upload_time2023-12-10 13:10:38
maintainer
docs_urlNone
authorGabriel Kasser
requires_python>=3.8,<4.0
licenseMIT
keywords pipeline multiprocessing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Olympipe

![coverage](https://gitlab.com/gabraken/olympipe/badges/master/coverage.svg?job=tests)![status](https://gitlab.com/gabraken/olympipe/badges/master/pipeline.svg)

![Olympipe](https://gitlab.com/gabraken/olympipe/-/raw/master/Olympipe.png)

This project will make pipelines
easy to use to improve parallel computing using the basic multiprocessing module. This module uses type checking to ensure your data process validity from the start.

## Basic usage

Each pipeline starts from an interator as a source of packets (a list, tuple, or any complex iterator). This pipeline will then be extended by adding basic `.task(<function>)`. The pipeline process join the main process when using the `.wait_for_results()` or `.wait_for_completion()` functions.

```python

from olympipe import Pipeline

def times_2(x: int) -> int:
    return x * 2

p = Pipeline(range(10))

p1 = p.task(times_2) # Multiply each packet by 2
# or
p1 = p.task(lambda x: x * 2) # using a lambda function

res = p1.wait_for_result()

print(res) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

```

## Filtering

You can choose which packets to `.filter(<keep_function>)` by passing them a function returning True or False when applied to this packet.

```python

from olympipe import Pipeline

p = Pipeline(range(20))
p1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers
p2 = p1.batch(2) # Group in arrays of 2 elements

res = p2.wait_for_result()

print(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]

```

## In line formalization

You can chain declarations to have a more readable pipeline.

```python

from olympipe import Pipeline

[res] = Pipeline(range(20)).filter(lambda x: x % 2 == 0).batch(2).wait_for_results()

print(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]

```

## Debugging

Interpolate `.debug()` function anywhere in the pipe to print packets as they arrive in the pipe.

```python
from olympipe import Pipeline

p = Pipeline(range(20))
p1 = p.filter(lambda x: x % 2 == 0).debug() # Keep pair numbers
p2 = p1.batch(2).debug() # Group in arrays of 2 elements

p2.wait_for_completion()
```

## Real time processing (for sound, video...)

Use the `.temporal_batch(<seconds_float>)` pipe to aggregate packets received at this point each <seconds_float> seconds.

```python
import time
from olympipe import Pipeline

def delay(x: int) -> int:
    time.sleep(0.1)
    return x

p = Pipeline(range(20)).task(delay) # Wait 0.1 s for each queue element
p1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers
p2 = p1.temporal_batch(1.0) # Group in arrays of 2 elements

[res] = p2.wait_for_results()

print(res) # [[0, 2, 4, 6, 8], [10, 12, 14, 16, 18], []]
```

## Using classes in a pipeline

You can add a stateful class instance to a pipeline. The method used will be typecheked as well to ensure data coherence. You just have to use the `.class_task(<Class>, <Class.method>, ...)` method where Class.method is the actual method you will use to process each packet.

```python
item_count  = 5

class StockPile:
    def __init__(self, mul:int):
        self.mul = mul
        self.last = 0

    def pile(self, num: int) -> int:
        out = self.last
        self.last = num * self.mul
        return out


p1 = Pipeline(range(item_count))

p2 = p1.class_task(StockPile, StockPile.pile, [3])

[res] = p2.wait_for_results()

print(res) # [0, 0, 3, 6, 9]

```

This project is still an early version, feedback is very helpful.

            

Raw data

            {
    "_id": null,
    "home_page": "https://gitlab.com/gabraken/olympipe",
    "name": "olympipe",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8,<4.0",
    "maintainer_email": "",
    "keywords": "pipeline,multiprocessing",
    "author": "Gabriel Kasser",
    "author_email": "gabriel.kasser@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/07/71/aa16adac918f964e0e3b3702df8e6634fe9a178eedcbcf63b7635d6616a8/olympipe-1.4.5.tar.gz",
    "platform": null,
    "description": "# Olympipe\n\n![coverage](https://gitlab.com/gabraken/olympipe/badges/master/coverage.svg?job=tests)![status](https://gitlab.com/gabraken/olympipe/badges/master/pipeline.svg)\n\n![Olympipe](https://gitlab.com/gabraken/olympipe/-/raw/master/Olympipe.png)\n\nThis project will make pipelines\neasy to use to improve parallel computing using the basic multiprocessing module. This module uses type checking to ensure your data process validity from the start.\n\n## Basic usage\n\nEach pipeline starts from an interator as a source of packets (a list, tuple, or any complex iterator). This pipeline will then be extended by adding basic `.task(<function>)`. The pipeline process join the main process when using the `.wait_for_results()` or `.wait_for_completion()` functions.\n\n```python\n\nfrom olympipe import Pipeline\n\ndef times_2(x: int) -> int:\n    return x * 2\n\np = Pipeline(range(10))\n\np1 = p.task(times_2) # Multiply each packet by 2\n# or\np1 = p.task(lambda x: x * 2) # using a lambda function\n\nres = p1.wait_for_result()\n\nprint(res) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]\n\n```\n\n## Filtering\n\nYou can choose which packets to `.filter(<keep_function>)` by passing them a function returning True or False when applied to this packet.\n\n```python\n\nfrom olympipe import Pipeline\n\np = Pipeline(range(20))\np1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers\np2 = p1.batch(2) # Group in arrays of 2 elements\n\nres = p2.wait_for_result()\n\nprint(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]\n\n```\n\n## In line formalization\n\nYou can chain declarations to have a more readable pipeline.\n\n```python\n\nfrom olympipe import Pipeline\n\n[res] = Pipeline(range(20)).filter(lambda x: x % 2 == 0).batch(2).wait_for_results()\n\nprint(res) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]\n\n```\n\n## Debugging\n\nInterpolate `.debug()` function anywhere in the pipe to print packets as they arrive in the pipe.\n\n```python\nfrom olympipe import Pipeline\n\np = Pipeline(range(20))\np1 = p.filter(lambda x: x % 2 == 0).debug() # Keep pair numbers\np2 = p1.batch(2).debug() # Group in arrays of 2 elements\n\np2.wait_for_completion()\n```\n\n## Real time processing (for sound, video...)\n\nUse the `.temporal_batch(<seconds_float>)` pipe to aggregate packets received at this point each <seconds_float> seconds.\n\n```python\nimport time\nfrom olympipe import Pipeline\n\ndef delay(x: int) -> int:\n    time.sleep(0.1)\n    return x\n\np = Pipeline(range(20)).task(delay) # Wait 0.1 s for each queue element\np1 = p.filter(lambda x: x % 2 == 0) # Keep pair numbers\np2 = p1.temporal_batch(1.0) # Group in arrays of 2 elements\n\n[res] = p2.wait_for_results()\n\nprint(res) # [[0, 2, 4, 6, 8], [10, 12, 14, 16, 18], []]\n```\n\n## Using classes in a pipeline\n\nYou can add a stateful class instance to a pipeline. The method used will be typecheked as well to ensure data coherence. You just have to use the `.class_task(<Class>, <Class.method>, ...)` method where Class.method is the actual method you will use to process each packet.\n\n```python\nitem_count  = 5\n\nclass StockPile:\n    def __init__(self, mul:int):\n        self.mul = mul\n        self.last = 0\n\n    def pile(self, num: int) -> int:\n        out = self.last\n        self.last = num * self.mul\n        return out\n\n\np1 = Pipeline(range(item_count))\n\np2 = p1.class_task(StockPile, StockPile.pile, [3])\n\n[res] = p2.wait_for_results()\n\nprint(res) # [0, 0, 3, 6, 9]\n\n```\n\nThis project is still an early version, feedback is very helpful.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A powerful parallel pipelining tool",
    "version": "1.4.5",
    "project_urls": {
        "Homepage": "https://gitlab.com/gabraken/olympipe",
        "Repository": "https://gitlab.com/gabraken/olympipe"
    },
    "split_keywords": [
        "pipeline",
        "multiprocessing"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ad237f390c403d9ea5222b829a0f28479ffe9f3e144d6c35a3529123ab66c34b",
                "md5": "ec4a5319038c5b4a498c461772997c06",
                "sha256": "e7623c856cacfff0b0b536364e02df2ad8661bff03276edaef0125e89f454d7f"
            },
            "downloads": -1,
            "filename": "olympipe-1.4.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ec4a5319038c5b4a498c461772997c06",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8,<4.0",
            "size": 17785,
            "upload_time": "2023-12-10T13:10:36",
            "upload_time_iso_8601": "2023-12-10T13:10:36.789487Z",
            "url": "https://files.pythonhosted.org/packages/ad/23/7f390c403d9ea5222b829a0f28479ffe9f3e144d6c35a3529123ab66c34b/olympipe-1.4.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0771aa16adac918f964e0e3b3702df8e6634fe9a178eedcbcf63b7635d6616a8",
                "md5": "1215080be30f743d1884ef04539d9710",
                "sha256": "8ae77f9499f942189256f11f7445fce3d976fcb576266a9ba6e562f002e1e34c"
            },
            "downloads": -1,
            "filename": "olympipe-1.4.5.tar.gz",
            "has_sig": false,
            "md5_digest": "1215080be30f743d1884ef04539d9710",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8,<4.0",
            "size": 12175,
            "upload_time": "2023-12-10T13:10:38",
            "upload_time_iso_8601": "2023-12-10T13:10:38.206707Z",
            "url": "https://files.pythonhosted.org/packages/07/71/aa16adac918f964e0e3b3702df8e6634fe9a178eedcbcf63b7635d6616a8/olympipe-1.4.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-10 13:10:38",
    "github": false,
    "gitlab": true,
    "bitbucket": false,
    "codeberg": false,
    "gitlab_user": "gabraken",
    "gitlab_project": "olympipe",
    "lcname": "olympipe"
}
        
Elapsed time: 0.16124s