iter-pipes


Nameiter-pipes JSON
Version 0.1.7 PyPI version JSON
download
home_pagehttps://github.com/brightnetwork/iter-pipes
SummaryFunctional pythonic pipelines for iterables.
upload_time2024-04-16 07:45:17
maintainerNone
docs_urlNone
authorbrightnetwork
requires_python<4.0,>=3.10
licenseMIT
keywords iterable pipes collection
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            [![image](https://img.shields.io/pypi/v/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)
[![image](https://img.shields.io/pypi/l/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)
[![image](https://img.shields.io/pypi/pyversions/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)
[![Code Coverage](https://img.shields.io/codecov/c/github/brightnetwork/iter-pipes)](https://app.codecov.io/gh/brightnetwork/iter-pipes)
[![Actions status](https://github.com/brightnetwork/iter-pipes/workflows/test/badge.svg)](https://github.com/brightnetwork/iter-pipes/actions)

## `iter_pipes`: Iterable Pipes

Functional pythonic pipelines for iterables.


```bash
pip install iter-pipes
```

### Examples

#### map / filter:

```python
import math

from iter_pipes import PipelineFactory

pipeline = (
    PipelineFactory[int]()
    .map(math.exp)
    .filter(lambda x: x > math.exp(2))
    .map(math.log)
    .map(str)
)

assert pipeline(range(5)).to_list() == ["3.0", "4.0"]
```

#### Batch operations

```python
def get_user_names_from_db(user_ids: list[int]) -> list[str]:
    # typical batch operation:
    #   - duration is roughly constant for a batch
    #   - batch size has to be below a fixed threshold
    print("processing batch", user_ids)
    return [f"user_{user_id}" for user_id in user_ids]


pipeline = (
    PipelineFactory[int]()
    .batch(get_user_names_from_db, batch_size=3)
    .for_each(lambda user_name: print("Hello ", user_name))
)

pipeline(range(5)).to_list()
# returns
#   ["user_0", "user_1", "user_2", "user_3", "user_4"]
# prints
#   processing batch [0, 1, 2]
#   Hello  user_0
#   Hello  user_1
#   Hello  user_2
#   processing batch [3, 4]
#   Hello  user_3
#   Hello  user_4
```


#### Storing state

Class with a `__call__` method provide a easy way to store a state during the processing.

```python
class CountUsers:
    def __init__(self):
        self._count = 0

    def __call__(self, item: str) -> str:
        self._count += 1
        return f"{item} (position {self._count})"


pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(CountUsers())

pipeline.process(range(5)).to_list()
# return
#    ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']
```

One could also use a closure:

```python
def count_users():
    count = 0

    def wrapper(item: str) -> str:
        nonlocal count
        count += 1
        return f"{item} (position {count})"

    return wrapper


pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(count_users())

pipeline.process(range(5)).to_list()
# return
#    ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']
```

#### Branches

![branch](https://github.com/brightnetwork/iter-pipes/assets/20539361/cddca673-1bf9-483b-874d-b33dfe6a88c8)

```python
pipeline = (
    PipelineFactory[int]()
    .branch(
        lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),
        lambda x: x.map(lambda x: -x),
    )
    .map(str)
)

expected = ["0", "0", "4", "-1", "-2", "16", "-3", "-4", "36", "-5", "-6", "-7"]
assert pipeline(range(8)).to_list() == expected
```

Each "branch" order will be preserved, but there is not guarantee in term of how the two are merged.

There is also `branch_off` which discard the output of the branch:

![branch-off](https://github.com/brightnetwork/iter-pipes/assets/20539361/ba4950b4-3683-4f39-b614-b65120ae81f3)


```python
pipeline = (
    PipelineFactory[int]()
    .branch_off(
        lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),
    )
    .map(str)
)

expected = ["0", "0", "4", "16", "36"]
assert pipeline(range(8)).to_list() == expected
```

#### Pipe operator overload

```python
import iter_pipes.functional as itp

pipeline = (
    PipelineFactory[int]()
    | itp.map(math.exp)
    | itp.filter(lambda x: x > math.exp(2))  # type checker might complain
    | itp.map(math.log)
    | itp.map(str)
)

assert pipeline(range(6)).to_list() == ["3.0", "4.0", "5.0"]
```

note that typing of lambda function inside functional map is not as good as the one from the `Pipeline.XXX` methods. To work around this, one should either use the non functional style, either use fully typed function instead of lambda.


#### Resumability

```python
pipeline = PipelineFactory[int]().branch(
    lambda x: x.filter(lambda x: x % 3 == 0).map(str),
    lambda x: x,
)

print(pipeline.process(range(12)).to_list())
# return
#    ['0', 0, '3', 1, 2, 3, '6', 4, 5, 6, '9', 7, 8, 9, 10, 11]
# note that between each yield from the first branch, the pipeline will yield everything
# from the second branch so that we don't store too many messages in the inflight buffer.


def filter_out_everything(items: Iterable[int]) -> Iterable[int]:
    print("starting")
    for item in items:
        if False:
            yield item


pipeline = PipelineFactory[int]().branch(
    lambda x: x.pipe(filter_out_everything).map(str),
    lambda x: x,
    max_inflight=5,
)

print(pipeline.process(range(9)).to_list())
# return
#    [0, 1, 2, 3, 4, 5, 6, 7, 8]
# print
#    starting
#    starting
#    starting
```

### Motivations

Goal of the library is to provide a structure to work with [collection pipelines](https://martinfowler.com/articles/collection-pipeline/).

> Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next. 

In this library, each "operation" is called a "step". We differentiate different subtype for steps:
- `map` steps will operate on each item of the collection, one by one
- `filter` steps will reduce the number of item in the collection, without changing their values
- `for_each` steps will do some processing, but without impacting the following steps (they won't change the input)
- `batch` steps will operate by batch of a fixed size - can be useful for example to batch database calls.

In addition to that, we also define pipeline `branch`, which allow to run several steps after a single one.

Library goal:
- declarative, expressive syntax for the steps above
- memory efficiency:
    - pure python, so it's not optimal at all
    - but what we care about is ensuring that the memory used by the pipeline does not scale with the number of items in the collection.
- performant:
    - pure python, so the code itself is not really performant
    - but the library allow for an optimal usage of the "slow" operations (network calls mainly) that are computed in the pipeline. This is what is meant by "performant"
- lightweight usage, as in existing function can be used as a step without the need for a wrapper
- provide as good of a type experience as possible



### Documentation

Have a look at the [`docs`](./tests/docs/) part of the test suites for examples.

### Contributing

Please refer to the [`test`](./.github/workflows/test.yml) actions. 100% test coverage is a start.


            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/brightnetwork/iter-pipes",
    "name": "iter-pipes",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.10",
    "maintainer_email": null,
    "keywords": "iterable, pipes, collection",
    "author": "brightnetwork",
    "author_email": "dev@brightnetwork.co.uk",
    "download_url": "https://files.pythonhosted.org/packages/fd/c3/4ad5746eef084ce37ef69a060094dd20920f0ebb2e49dcd932309dc307e0/iter_pipes-0.1.7.tar.gz",
    "platform": null,
    "description": "[![image](https://img.shields.io/pypi/v/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)\n[![image](https://img.shields.io/pypi/l/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)\n[![image](https://img.shields.io/pypi/pyversions/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)\n[![Code Coverage](https://img.shields.io/codecov/c/github/brightnetwork/iter-pipes)](https://app.codecov.io/gh/brightnetwork/iter-pipes)\n[![Actions status](https://github.com/brightnetwork/iter-pipes/workflows/test/badge.svg)](https://github.com/brightnetwork/iter-pipes/actions)\n\n## `iter_pipes`: Iterable Pipes\n\nFunctional pythonic pipelines for iterables.\n\n\n```bash\npip install iter-pipes\n```\n\n### Examples\n\n#### map / filter:\n\n```python\nimport math\n\nfrom iter_pipes import PipelineFactory\n\npipeline = (\n    PipelineFactory[int]()\n    .map(math.exp)\n    .filter(lambda x: x > math.exp(2))\n    .map(math.log)\n    .map(str)\n)\n\nassert pipeline(range(5)).to_list() == [\"3.0\", \"4.0\"]\n```\n\n#### Batch operations\n\n```python\ndef get_user_names_from_db(user_ids: list[int]) -> list[str]:\n    # typical batch operation:\n    #   - duration is roughly constant for a batch\n    #   - batch size has to be below a fixed threshold\n    print(\"processing batch\", user_ids)\n    return [f\"user_{user_id}\" for user_id in user_ids]\n\n\npipeline = (\n    PipelineFactory[int]()\n    .batch(get_user_names_from_db, batch_size=3)\n    .for_each(lambda user_name: print(\"Hello \", user_name))\n)\n\npipeline(range(5)).to_list()\n# returns\n#   [\"user_0\", \"user_1\", \"user_2\", \"user_3\", \"user_4\"]\n# prints\n#   processing batch [0, 1, 2]\n#   Hello  user_0\n#   Hello  user_1\n#   Hello  user_2\n#   processing batch [3, 4]\n#   Hello  user_3\n#   Hello  user_4\n```\n\n\n#### Storing state\n\nClass with a `__call__` method provide a easy way to store a state during the processing.\n\n```python\nclass CountUsers:\n    def __init__(self):\n        self._count = 0\n\n    def __call__(self, item: str) -> str:\n        self._count += 1\n        return f\"{item} (position {self._count})\"\n\n\npipeline = PipelineFactory[int]().map(lambda x: f\"user {x}\").map(CountUsers())\n\npipeline.process(range(5)).to_list()\n# return\n#    ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']\n```\n\nOne could also use a closure:\n\n```python\ndef count_users():\n    count = 0\n\n    def wrapper(item: str) -> str:\n        nonlocal count\n        count += 1\n        return f\"{item} (position {count})\"\n\n    return wrapper\n\n\npipeline = PipelineFactory[int]().map(lambda x: f\"user {x}\").map(count_users())\n\npipeline.process(range(5)).to_list()\n# return\n#    ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']\n```\n\n#### Branches\n\n![branch](https://github.com/brightnetwork/iter-pipes/assets/20539361/cddca673-1bf9-483b-874d-b33dfe6a88c8)\n\n```python\npipeline = (\n    PipelineFactory[int]()\n    .branch(\n        lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),\n        lambda x: x.map(lambda x: -x),\n    )\n    .map(str)\n)\n\nexpected = [\"0\", \"0\", \"4\", \"-1\", \"-2\", \"16\", \"-3\", \"-4\", \"36\", \"-5\", \"-6\", \"-7\"]\nassert pipeline(range(8)).to_list() == expected\n```\n\nEach \"branch\" order will be preserved, but there is not guarantee in term of how the two are merged.\n\nThere is also `branch_off` which discard the output of the branch:\n\n![branch-off](https://github.com/brightnetwork/iter-pipes/assets/20539361/ba4950b4-3683-4f39-b614-b65120ae81f3)\n\n\n```python\npipeline = (\n    PipelineFactory[int]()\n    .branch_off(\n        lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),\n    )\n    .map(str)\n)\n\nexpected = [\"0\", \"0\", \"4\", \"16\", \"36\"]\nassert pipeline(range(8)).to_list() == expected\n```\n\n#### Pipe operator overload\n\n```python\nimport iter_pipes.functional as itp\n\npipeline = (\n    PipelineFactory[int]()\n    | itp.map(math.exp)\n    | itp.filter(lambda x: x > math.exp(2))  # type checker might complain\n    | itp.map(math.log)\n    | itp.map(str)\n)\n\nassert pipeline(range(6)).to_list() == [\"3.0\", \"4.0\", \"5.0\"]\n```\n\nnote that typing of lambda function inside functional map is not as good as the one from the `Pipeline.XXX` methods. To work around this, one should either use the non functional style, either use fully typed function instead of lambda.\n\n\n#### Resumability\n\n```python\npipeline = PipelineFactory[int]().branch(\n    lambda x: x.filter(lambda x: x % 3 == 0).map(str),\n    lambda x: x,\n)\n\nprint(pipeline.process(range(12)).to_list())\n# return\n#    ['0', 0, '3', 1, 2, 3, '6', 4, 5, 6, '9', 7, 8, 9, 10, 11]\n# note that between each yield from the first branch, the pipeline will yield everything\n# from the second branch so that we don't store too many messages in the inflight buffer.\n\n\ndef filter_out_everything(items: Iterable[int]) -> Iterable[int]:\n    print(\"starting\")\n    for item in items:\n        if False:\n            yield item\n\n\npipeline = PipelineFactory[int]().branch(\n    lambda x: x.pipe(filter_out_everything).map(str),\n    lambda x: x,\n    max_inflight=5,\n)\n\nprint(pipeline.process(range(9)).to_list())\n# return\n#    [0, 1, 2, 3, 4, 5, 6, 7, 8]\n# print\n#    starting\n#    starting\n#    starting\n```\n\n### Motivations\n\nGoal of the library is to provide a structure to work with [collection pipelines](https://martinfowler.com/articles/collection-pipeline/).\n\n> Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next. \n\nIn this library, each \"operation\" is called a \"step\". We differentiate different subtype for steps:\n- `map` steps will operate on each item of the collection, one by one\n- `filter` steps will reduce the number of item in the collection, without changing their values\n- `for_each` steps will do some processing, but without impacting the following steps (they won't change the input)\n- `batch` steps will operate by batch of a fixed size - can be useful for example to batch database calls.\n\nIn addition to that, we also define pipeline `branch`, which allow to run several steps after a single one.\n\nLibrary goal:\n- declarative, expressive syntax for the steps above\n- memory efficiency:\n    - pure python, so it's not optimal at all\n    - but what we care about is ensuring that the memory used by the pipeline does not scale with the number of items in the collection.\n- performant:\n    - pure python, so the code itself is not really performant\n    - but the library allow for an optimal usage of the \"slow\" operations (network calls mainly) that are computed in the pipeline. This is what is meant by \"performant\"\n- lightweight usage, as in existing function can be used as a step without the need for a wrapper\n- provide as good of a type experience as possible\n\n\n\n### Documentation\n\nHave a look at the [`docs`](./tests/docs/) part of the test suites for examples.\n\n### Contributing\n\nPlease refer to the [`test`](./.github/workflows/test.yml) actions. 100% test coverage is a start.\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Functional pythonic pipelines for iterables.",
    "version": "0.1.7",
    "project_urls": {
        "Documentation": "https://github.com/brightnetwork/iter-pipes",
        "Homepage": "https://github.com/brightnetwork/iter-pipes",
        "Repository": "https://github.com/brightnetwork/iter-pipes"
    },
    "split_keywords": [
        "iterable",
        " pipes",
        " collection"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f3952f1ee206caa74f28e92839c7423ea7542171d9632b0ecdf041c43fc1cf31",
                "md5": "29c16d44e559ec47f3b751e1baa2e8f3",
                "sha256": "5bab9eb2722980b7920a2c9f9ee76898448394f8434861d58a72f87a6fa333dd"
            },
            "downloads": -1,
            "filename": "iter_pipes-0.1.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "29c16d44e559ec47f3b751e1baa2e8f3",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.10",
            "size": 7634,
            "upload_time": "2024-04-16T07:45:16",
            "upload_time_iso_8601": "2024-04-16T07:45:16.083477Z",
            "url": "https://files.pythonhosted.org/packages/f3/95/2f1ee206caa74f28e92839c7423ea7542171d9632b0ecdf041c43fc1cf31/iter_pipes-0.1.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "fdc34ad5746eef084ce37ef69a060094dd20920f0ebb2e49dcd932309dc307e0",
                "md5": "e3ecadebc5a926ebec7daa6b27e3c634",
                "sha256": "ccd5db3ff4d22e5a7e9dd4d5f81fb1150d55cab85ee587bb83a353c2b1e626ea"
            },
            "downloads": -1,
            "filename": "iter_pipes-0.1.7.tar.gz",
            "has_sig": false,
            "md5_digest": "e3ecadebc5a926ebec7daa6b27e3c634",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.10",
            "size": 6835,
            "upload_time": "2024-04-16T07:45:17",
            "upload_time_iso_8601": "2024-04-16T07:45:17.067723Z",
            "url": "https://files.pythonhosted.org/packages/fd/c3/4ad5746eef084ce37ef69a060094dd20920f0ebb2e49dcd932309dc307e0/iter_pipes-0.1.7.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-16 07:45:17",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "brightnetwork",
    "github_project": "iter-pipes",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "iter-pipes"
}
        
Elapsed time: 0.23384s