[![image](https://img.shields.io/pypi/v/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)
[![image](https://img.shields.io/pypi/l/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)
[![image](https://img.shields.io/pypi/pyversions/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)
[![Code Coverage](https://img.shields.io/codecov/c/github/brightnetwork/iter-pipes)](https://app.codecov.io/gh/brightnetwork/iter-pipes)
[![Actions status](https://github.com/brightnetwork/iter-pipes/workflows/test/badge.svg)](https://github.com/brightnetwork/iter-pipes/actions)
## `iter_pipes`: Iterable Pipes
Functional pythonic pipelines for iterables.
```bash
pip install iter-pipes
```
### Examples
#### map / filter:
```python
import math
from iter_pipes import PipelineFactory
pipeline = (
PipelineFactory[int]()
.map(math.exp)
.filter(lambda x: x > math.exp(2))
.map(math.log)
.map(str)
)
assert pipeline(range(5)).to_list() == ["3.0", "4.0"]
```
#### Batch operations
```python
def get_user_names_from_db(user_ids: list[int]) -> list[str]:
# typical batch operation:
# - duration is roughly constant for a batch
# - batch size has to be below a fixed threshold
print("processing batch", user_ids)
return [f"user_{user_id}" for user_id in user_ids]
pipeline = (
PipelineFactory[int]()
.batch(get_user_names_from_db, batch_size=3)
.for_each(lambda user_name: print("Hello ", user_name))
)
pipeline(range(5)).to_list()
# returns
# ["user_0", "user_1", "user_2", "user_3", "user_4"]
# prints
# processing batch [0, 1, 2]
# Hello user_0
# Hello user_1
# Hello user_2
# processing batch [3, 4]
# Hello user_3
# Hello user_4
```
#### Storing state
Class with a `__call__` method provide a easy way to store a state during the processing.
```python
class CountUsers:
def __init__(self):
self._count = 0
def __call__(self, item: str) -> str:
self._count += 1
return f"{item} (position {self._count})"
pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(CountUsers())
pipeline.process(range(5)).to_list()
# return
# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']
```
One could also use a closure:
```python
def count_users():
count = 0
def wrapper(item: str) -> str:
nonlocal count
count += 1
return f"{item} (position {count})"
return wrapper
pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(count_users())
pipeline.process(range(5)).to_list()
# return
# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']
```
#### Branches
![branch](https://github.com/brightnetwork/iter-pipes/assets/20539361/cddca673-1bf9-483b-874d-b33dfe6a88c8)
```python
pipeline = (
PipelineFactory[int]()
.branch(
lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),
lambda x: x.map(lambda x: -x),
)
.map(str)
)
expected = ["0", "0", "4", "-1", "-2", "16", "-3", "-4", "36", "-5", "-6", "-7"]
assert pipeline(range(8)).to_list() == expected
```
Each "branch" order will be preserved, but there is not guarantee in term of how the two are merged.
There is also `branch_off` which discard the output of the branch:
![branch-off](https://github.com/brightnetwork/iter-pipes/assets/20539361/ba4950b4-3683-4f39-b614-b65120ae81f3)
```python
pipeline = (
PipelineFactory[int]()
.branch_off(
lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),
)
.map(str)
)
expected = ["0", "0", "4", "16", "36"]
assert pipeline(range(8)).to_list() == expected
```
#### Pipe operator overload
```python
import iter_pipes.functional as itp
pipeline = (
PipelineFactory[int]()
| itp.map(math.exp)
| itp.filter(lambda x: x > math.exp(2)) # type checker might complain
| itp.map(math.log)
| itp.map(str)
)
assert pipeline(range(6)).to_list() == ["3.0", "4.0", "5.0"]
```
note that typing of lambda function inside functional map is not as good as the one from the `Pipeline.XXX` methods. To work around this, one should either use the non functional style, either use fully typed function instead of lambda.
#### Resumability
```python
pipeline = PipelineFactory[int]().branch(
lambda x: x.filter(lambda x: x % 3 == 0).map(str),
lambda x: x,
)
print(pipeline.process(range(12)).to_list())
# return
# ['0', 0, '3', 1, 2, 3, '6', 4, 5, 6, '9', 7, 8, 9, 10, 11]
# note that between each yield from the first branch, the pipeline will yield everything
# from the second branch so that we don't store too many messages in the inflight buffer.
def filter_out_everything(items: Iterable[int]) -> Iterable[int]:
print("starting")
for item in items:
if False:
yield item
pipeline = PipelineFactory[int]().branch(
lambda x: x.pipe(filter_out_everything).map(str),
lambda x: x,
max_inflight=5,
)
print(pipeline.process(range(9)).to_list())
# return
# [0, 1, 2, 3, 4, 5, 6, 7, 8]
# print
# starting
# starting
# starting
```
### Motivations
Goal of the library is to provide a structure to work with [collection pipelines](https://martinfowler.com/articles/collection-pipeline/).
> Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next.
In this library, each "operation" is called a "step". We differentiate different subtype for steps:
- `map` steps will operate on each item of the collection, one by one
- `filter` steps will reduce the number of item in the collection, without changing their values
- `for_each` steps will do some processing, but without impacting the following steps (they won't change the input)
- `batch` steps will operate by batch of a fixed size - can be useful for example to batch database calls.
In addition to that, we also define pipeline `branch`, which allow to run several steps after a single one.
Library goal:
- declarative, expressive syntax for the steps above
- memory efficiency:
- pure python, so it's not optimal at all
- but what we care about is ensuring that the memory used by the pipeline does not scale with the number of items in the collection.
- performant:
- pure python, so the code itself is not really performant
- but the library allow for an optimal usage of the "slow" operations (network calls mainly) that are computed in the pipeline. This is what is meant by "performant"
- lightweight usage, as in existing function can be used as a step without the need for a wrapper
- provide as good of a type experience as possible
### Documentation
Have a look at the [`docs`](./tests/docs/) part of the test suites for examples.
### Contributing
Please refer to the [`test`](./.github/workflows/test.yml) actions. 100% test coverage is a start.
Raw data
{
"_id": null,
"home_page": "https://github.com/brightnetwork/iter-pipes",
"name": "iter-pipes",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.10",
"maintainer_email": null,
"keywords": "iterable, pipes, collection",
"author": "brightnetwork",
"author_email": "dev@brightnetwork.co.uk",
"download_url": "https://files.pythonhosted.org/packages/fd/c3/4ad5746eef084ce37ef69a060094dd20920f0ebb2e49dcd932309dc307e0/iter_pipes-0.1.7.tar.gz",
"platform": null,
"description": "[![image](https://img.shields.io/pypi/v/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)\n[![image](https://img.shields.io/pypi/l/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)\n[![image](https://img.shields.io/pypi/pyversions/iter-pipes.svg)](https://pypi.python.org/pypi/iter-pipes)\n[![Code Coverage](https://img.shields.io/codecov/c/github/brightnetwork/iter-pipes)](https://app.codecov.io/gh/brightnetwork/iter-pipes)\n[![Actions status](https://github.com/brightnetwork/iter-pipes/workflows/test/badge.svg)](https://github.com/brightnetwork/iter-pipes/actions)\n\n## `iter_pipes`: Iterable Pipes\n\nFunctional pythonic pipelines for iterables.\n\n\n```bash\npip install iter-pipes\n```\n\n### Examples\n\n#### map / filter:\n\n```python\nimport math\n\nfrom iter_pipes import PipelineFactory\n\npipeline = (\n PipelineFactory[int]()\n .map(math.exp)\n .filter(lambda x: x > math.exp(2))\n .map(math.log)\n .map(str)\n)\n\nassert pipeline(range(5)).to_list() == [\"3.0\", \"4.0\"]\n```\n\n#### Batch operations\n\n```python\ndef get_user_names_from_db(user_ids: list[int]) -> list[str]:\n # typical batch operation:\n # - duration is roughly constant for a batch\n # - batch size has to be below a fixed threshold\n print(\"processing batch\", user_ids)\n return [f\"user_{user_id}\" for user_id in user_ids]\n\n\npipeline = (\n PipelineFactory[int]()\n .batch(get_user_names_from_db, batch_size=3)\n .for_each(lambda user_name: print(\"Hello \", user_name))\n)\n\npipeline(range(5)).to_list()\n# returns\n# [\"user_0\", \"user_1\", \"user_2\", \"user_3\", \"user_4\"]\n# prints\n# processing batch [0, 1, 2]\n# Hello user_0\n# Hello user_1\n# Hello user_2\n# processing batch [3, 4]\n# Hello user_3\n# Hello user_4\n```\n\n\n#### Storing state\n\nClass with a `__call__` method provide a easy way to store a state during the processing.\n\n```python\nclass CountUsers:\n def __init__(self):\n self._count = 0\n\n def __call__(self, item: str) -> str:\n self._count += 1\n return f\"{item} (position {self._count})\"\n\n\npipeline = PipelineFactory[int]().map(lambda x: f\"user {x}\").map(CountUsers())\n\npipeline.process(range(5)).to_list()\n# return\n# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']\n```\n\nOne could also use a closure:\n\n```python\ndef count_users():\n count = 0\n\n def wrapper(item: str) -> str:\n nonlocal count\n count += 1\n return f\"{item} (position {count})\"\n\n return wrapper\n\n\npipeline = PipelineFactory[int]().map(lambda x: f\"user {x}\").map(count_users())\n\npipeline.process(range(5)).to_list()\n# return\n# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']\n```\n\n#### Branches\n\n![branch](https://github.com/brightnetwork/iter-pipes/assets/20539361/cddca673-1bf9-483b-874d-b33dfe6a88c8)\n\n```python\npipeline = (\n PipelineFactory[int]()\n .branch(\n lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),\n lambda x: x.map(lambda x: -x),\n )\n .map(str)\n)\n\nexpected = [\"0\", \"0\", \"4\", \"-1\", \"-2\", \"16\", \"-3\", \"-4\", \"36\", \"-5\", \"-6\", \"-7\"]\nassert pipeline(range(8)).to_list() == expected\n```\n\nEach \"branch\" order will be preserved, but there is not guarantee in term of how the two are merged.\n\nThere is also `branch_off` which discard the output of the branch:\n\n![branch-off](https://github.com/brightnetwork/iter-pipes/assets/20539361/ba4950b4-3683-4f39-b614-b65120ae81f3)\n\n\n```python\npipeline = (\n PipelineFactory[int]()\n .branch_off(\n lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),\n )\n .map(str)\n)\n\nexpected = [\"0\", \"0\", \"4\", \"16\", \"36\"]\nassert pipeline(range(8)).to_list() == expected\n```\n\n#### Pipe operator overload\n\n```python\nimport iter_pipes.functional as itp\n\npipeline = (\n PipelineFactory[int]()\n | itp.map(math.exp)\n | itp.filter(lambda x: x > math.exp(2)) # type checker might complain\n | itp.map(math.log)\n | itp.map(str)\n)\n\nassert pipeline(range(6)).to_list() == [\"3.0\", \"4.0\", \"5.0\"]\n```\n\nnote that typing of lambda function inside functional map is not as good as the one from the `Pipeline.XXX` methods. To work around this, one should either use the non functional style, either use fully typed function instead of lambda.\n\n\n#### Resumability\n\n```python\npipeline = PipelineFactory[int]().branch(\n lambda x: x.filter(lambda x: x % 3 == 0).map(str),\n lambda x: x,\n)\n\nprint(pipeline.process(range(12)).to_list())\n# return\n# ['0', 0, '3', 1, 2, 3, '6', 4, 5, 6, '9', 7, 8, 9, 10, 11]\n# note that between each yield from the first branch, the pipeline will yield everything\n# from the second branch so that we don't store too many messages in the inflight buffer.\n\n\ndef filter_out_everything(items: Iterable[int]) -> Iterable[int]:\n print(\"starting\")\n for item in items:\n if False:\n yield item\n\n\npipeline = PipelineFactory[int]().branch(\n lambda x: x.pipe(filter_out_everything).map(str),\n lambda x: x,\n max_inflight=5,\n)\n\nprint(pipeline.process(range(9)).to_list())\n# return\n# [0, 1, 2, 3, 4, 5, 6, 7, 8]\n# print\n# starting\n# starting\n# starting\n```\n\n### Motivations\n\nGoal of the library is to provide a structure to work with [collection pipelines](https://martinfowler.com/articles/collection-pipeline/).\n\n> Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next. \n\nIn this library, each \"operation\" is called a \"step\". We differentiate different subtype for steps:\n- `map` steps will operate on each item of the collection, one by one\n- `filter` steps will reduce the number of item in the collection, without changing their values\n- `for_each` steps will do some processing, but without impacting the following steps (they won't change the input)\n- `batch` steps will operate by batch of a fixed size - can be useful for example to batch database calls.\n\nIn addition to that, we also define pipeline `branch`, which allow to run several steps after a single one.\n\nLibrary goal:\n- declarative, expressive syntax for the steps above\n- memory efficiency:\n - pure python, so it's not optimal at all\n - but what we care about is ensuring that the memory used by the pipeline does not scale with the number of items in the collection.\n- performant:\n - pure python, so the code itself is not really performant\n - but the library allow for an optimal usage of the \"slow\" operations (network calls mainly) that are computed in the pipeline. This is what is meant by \"performant\"\n- lightweight usage, as in existing function can be used as a step without the need for a wrapper\n- provide as good of a type experience as possible\n\n\n\n### Documentation\n\nHave a look at the [`docs`](./tests/docs/) part of the test suites for examples.\n\n### Contributing\n\nPlease refer to the [`test`](./.github/workflows/test.yml) actions. 100% test coverage is a start.\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Functional pythonic pipelines for iterables.",
"version": "0.1.7",
"project_urls": {
"Documentation": "https://github.com/brightnetwork/iter-pipes",
"Homepage": "https://github.com/brightnetwork/iter-pipes",
"Repository": "https://github.com/brightnetwork/iter-pipes"
},
"split_keywords": [
"iterable",
" pipes",
" collection"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "f3952f1ee206caa74f28e92839c7423ea7542171d9632b0ecdf041c43fc1cf31",
"md5": "29c16d44e559ec47f3b751e1baa2e8f3",
"sha256": "5bab9eb2722980b7920a2c9f9ee76898448394f8434861d58a72f87a6fa333dd"
},
"downloads": -1,
"filename": "iter_pipes-0.1.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "29c16d44e559ec47f3b751e1baa2e8f3",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.10",
"size": 7634,
"upload_time": "2024-04-16T07:45:16",
"upload_time_iso_8601": "2024-04-16T07:45:16.083477Z",
"url": "https://files.pythonhosted.org/packages/f3/95/2f1ee206caa74f28e92839c7423ea7542171d9632b0ecdf041c43fc1cf31/iter_pipes-0.1.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "fdc34ad5746eef084ce37ef69a060094dd20920f0ebb2e49dcd932309dc307e0",
"md5": "e3ecadebc5a926ebec7daa6b27e3c634",
"sha256": "ccd5db3ff4d22e5a7e9dd4d5f81fb1150d55cab85ee587bb83a353c2b1e626ea"
},
"downloads": -1,
"filename": "iter_pipes-0.1.7.tar.gz",
"has_sig": false,
"md5_digest": "e3ecadebc5a926ebec7daa6b27e3c634",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.10",
"size": 6835,
"upload_time": "2024-04-16T07:45:17",
"upload_time_iso_8601": "2024-04-16T07:45:17.067723Z",
"url": "https://files.pythonhosted.org/packages/fd/c3/4ad5746eef084ce37ef69a060094dd20920f0ebb2e49dcd932309dc307e0/iter_pipes-0.1.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-16 07:45:17",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "brightnetwork",
"github_project": "iter-pipes",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "iter-pipes"
}