async-pipeline


Nameasync-pipeline JSON
Version 0.1.2 PyPI version JSON
download
home_pagehttps://github.com/RockeyDon/async_pipeline
SummaryAsync pipeline with functional methods
upload_time2023-05-07 09:29:46
maintainer
docs_urlNone
authorRockey Don
requires_python>=3.7, <3.11
license
keywords async pipeline functional
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # async_pipeline
complex async data pipelines with functional methods

## Features
`async_pipeline` aims to simplify the creation of asynchronous pipelines.

Main features are:

* Simplicity. 
  Allows defining not only linear pipelines, 
  but also complex pipelines with cross structures.
* Fast. 
  Each pipeline process is implemented through a coroutine process, 
  which could avoid blocking by async functions.
* Easy-to-read. 
  Linear previous and after stages can be connected using pipeline operator, 
  making it more readable.
* Easy-to-use. 
  Provides functional like methods.

## Quick Start
### install
```shell
python setup.py install
```

### Simple linear pipeline
In this example, the data is multiplied by 2, and then filtered out those greater than 5. 
The functions in each stage can be synchronous or asynchronous.
```python
import asyncio
from async_pipeline.pipe import Pipe

async def double(x):
    await asyncio.sleep(0.01)
    return x * 2
    
pipe = Pipe()  # create a pipe
stage1 = pipe.data(range(5))  # define data stage
stage2 = pipe.map(func=double, src=stage1)  # define map stage, async func
stage3 = pipe.filter(func=lambda x: x > 5, src=stage2)  # define filter stage, sync func
stage4 = pipe.result(src=stage3)  # define result stage, store result in stage4.result
pipe.run()  # run pipe
print(stage4.result)  # [6, 8]
```

Use pipe operator to abbreviate:
```python
pipe = Pipe()
stage = range(5) | pipe.map(func=double) | pipe.filter(func=lambda x: x > 5) | pipe.result()
pipe.run()
```

### Complex non-liner pipeline
This pipeline can be described in the following diagram:
```mermaid
  graph LR;
      inp((data))--'>0'-->n1((21));
      inp((data))--'<0'-->n2((_2));
      inp((data))--'==0'-->n3((drop));
      n1((21))-->n4((31));
      n1((21))-->n5((32));
      n4((31))-->n6((4));
      n5((32))-->n6((4));
      n2((22))-->n7((5));
      n6((4))-->n7((5));
      n7((5))-->n8((end));
```
```python
from async_pipeline.pipe import Pipe

pipe = Pipe()
stage1 = [-1, 0, 1] | pipe.partition({
    'p': {
        'match': "elem > 0",
        'handle': pipe.map(func=lambda x: str(x) + '_21') 
                  | pipe.multiply(
                      pipe.map(func=lambda x, y: str(x) + y, func_kw={'y': '_31'}),
                      pipe.map(func=lambda x: str(x) + '_32')
                  )
    },
    'n': {
        'match': "elem < 0",
        'handle': pipe.map(func=lambda x: str(x) + '_22')
    }
})
stage2 = pipe.concat(
                pipe.concat(stage1['p'][0], stage1['p'][1]) | pipe.map(func=lambda x: str(x) + '_4'),
                stage1['n']
          ) \
          | pipe.map(func=lambda x: str(x) + '_5') \
          | pipe.result()
pipe.run()
print(sorted(stage2.result))  # ['-1_22_5', '1_21_31_4_5', '1_21_32_4_5']
```

## Supported Python Versions
* 3.7
* 3.8
* 3.9
* 3.10

## Unittest
```shell
pip install -r requirements/test.txt
python -m unittest
```

## API Reference


### class `Pipe`
All methods are called through pipe instance. 

#### `pipe = Pipe()`

Create a *pipe*. 

Different pipes do not affect each other.

#### `pipe.run()`

Execute stages in *pipe*, let the data start flowing. 

Before this, all Stage functions are just record as coroutines. 
Each pipe can be executed only once.

#### `await pipe.async_run()`

The asynchronous version of `run`.

#### `pipe.cancel()`

Cancel the coroutines in the pipeline that have not yet executed.

### class `Stage`
Each section of the pipeline, i.e. each process, is called a *stage*.

stages are always associated with a pipeline, 
so the stage instance is created through the *pipe* interface

#### `pipe.data(src)`

Create a *Data Stage*. Place it at the beginning.

`src` is data source, needs to be iterable or async iterable. 

This method is optional because the data source can be placed directly, 
except that the code check will say "Class 'xx' does not define '\__or__'"

```python
stage = pipe.data(range(10))
```

#### `pipe.result(src=None, output=None)`

Create a *Result Stage*. Place it at the end.

`src` is previous stage.
When using the pipeline operator, the left side of `|` is the previous Stage, so src is not specified at this time.

`output` locate the pipeline output, can be a list or any object that accepts `append` method. Default is self.result.

```python
# 1
previous = pipe.data(range(10))
end = pipe.result(src=previous)
# 2
end = previous | pipe.result()
# 3
result = []
end = previous | pipe.result(output=result)
```

#### `pipe.map(func, src=None, func_kw=None)`

Create a *Map Stage*, it executes `result = func(elem, **func_kw)` for each element passed in, and outputs the *result*.

`func` is function object, either sync or async. 

`func_kw` is parameters of `func` other than element, like *kwargs* dict.

`src` is previous stage, *None* when using `|`.

```python
# 1
stage = pipe.map(func=lambda x: x + 1, src=previous)
# 2
stage = previous | pipe.map(func=some_func)
# 3
stage = previous | pipe.map(func=lambda x, y: x + y, func_kw={'y': 1})
```

#### `pipe.filter(func, src=None, func_kw=None)`

Create a *Filter Stage*, it computes flag = func(elem, **func_kw) for each element passed in, and outputs the *element* with flag True.

`func` is function object, either a sync or async. 

`func_kw` is parameters of `func` other than element, like *kwargs* dictionary.

`src` is previous stage, *None* when using `|`.

```python
# 1
stage = pipe.filter(func=lambda x: x > 0, src=previous)
# 2
stage = previous | pipe.filter(func=some_bool_func)
# 3
stage = previous | pipe.filter(func=lambda x, y: x > y, func_kw={'y': 0})
```

#### `pipe.flatten(src=None)`

Create a *Flatten Stage*, it flattens each element passed in and output.

`src` is previous stage which outputs a two-level list, or iterable or async iterative two-level list. *None* when using `|`.

```python
# 1
stage = pipe.flatten(src=[[1], [2, 3]])
# 2
stage = previous | pipe.flatten()
```

#### `pipe.concat(*src)`

Create a *Concat Stage*, it merges the list of stages. 

`*src` is multi previous stages, or iterable or async iterative lists.

```python
# 1
stage = pipe.concat([1, 2], [3, 4])
# 2
stage = pipe.concat(
    data | pipe.map(func=lambda x: x), 
    data | pipe.filter(func=lambda x: x > 0), 
)
```

#### `pipe.partition(cases, src=None)`

Create a *Partition Stage*, it outputs elements to branch by condition.

`cases` can be a key-value dictionary. The key is the allies, and the value is the matching logic with single param *elem*. 

Elements are calculated by dict order, and output to the first match. Those do not match any branch will drop.

Next stage's `src` is partition\[allies]

```python
stage1 = range(-5, 6) | pipe.partition(cases={'p': 'elem > 0', 'n': 'elem < 0'})
stage21 = pipe.map(src=stage1['p'], func=lambda x: x + 1)
stage22 = pipe.map(src=stage1['n'], func=lambda x: x - 2)
end1 = pipe.result(src=stage21)  # end1.result [2, 3, 4, 5, 6]
end2 = pipe.result(src=stage22)  # end2.result [-7, -6, -5, -4, -3]
```

`cases` can also be a complex dictionary define both matching logic and next processing. 

handle is the subsequent processing, it's `src` uses the result by match.
```python
stage = range(-5, 6) | pipe.partition({
    'p': {
        'match': 'elem > 0',
        'handle': pipe.map(func=lambda x: x + 1)
    },
    'n': {
        'match': 'elem < 0',
        'handle': pipe.map(func=lambda x: x - 2)
    }
})
end1 = pipe.result(src=stage['p'])  # end1.result [2, 3, 4, 5, 6]
end2 = pipe.result(src=stage['n'])  # end2.result [-7, -6, -5, -4, -3]
```

`src` is previous stage, *None* when using `|`.

#### `pipe.multiply(*targets, src=None)`

Create a *Multiply Stage*, it multiply copies of the elements and output to `targets` branches. 

`targets` are the subsequent pipelines.

`src` is previous stage, *None* when using `|`.

```python
# 1
previous = pipe.data(range(10))
stage = pipe.multiply(
    pipe.map(func=lambda x: x),
    pipe.map(func=lambda x: 2 * x),
    src=previous
)
# 2
stage = pipe.data(range(10))
    pipe.map(func=lambda x: x),
    pipe.map(func=lambda x: 2 * x),
)
```

#### `pipe.each(func, src=None, func_kw=None)`

Create a *Each Stage*.

Similar to `map`, but elements are dropped after computation.

#### `pipe.peek(func, src=None, func_kw=None)`

Create a *Peek Stage*.

Similar to `map`, but still outputs the elements after computation.

Elements may changed if mutable.

#### `pipe.filter_not(func, src=None, func_kw=None)`

Create a *FilterNot Stage*.

Similar to `filter`, but the logic is reversed.

#### `pipe.distinct(func=hash, src=None, func_kw=None)`

Create a *Distinct Stage*.

Similar to `filter`, outputs the elements after de-duplication. 

`func` calculate whether an element is duplicated or not, default is hash function.

*Notice: python hash(-1) == hash(-2)*

#### `pipe.duplicate(func=hash, src=None, func_kw=None)`

Create a *Duplicate Stage*.

Similar to `filter`, outputs the duplicate elements. 

`func` calculate whether an element is duplicated or not, default is hash function.

*Notice: python hash(-1) == hash(-2)*

#### `pipe.limit(num, src=None)`

Create a *Limit Stage*.

Similar to `filter`, but only outputs the first `num` of elements.

```python
# 1
previous = pipe.data(range(10))
stage = pipe.limit(num=5, src=previous)
# 2
stage = pipe.data(range(10)) | pipe.limit(num=5)
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/RockeyDon/async_pipeline",
    "name": "async-pipeline",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7, <3.11",
    "maintainer_email": "",
    "keywords": "async,pipeline,functional",
    "author": "Rockey Don",
    "author_email": "bjxdrj@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/1f/32/3a0e4390b928e3b241cfb6dab144ec8e2f6a9b8b31abbdea5e282a24028b/async_pipeline-0.1.2.tar.gz",
    "platform": null,
    "description": "# async_pipeline\ncomplex async data pipelines with functional methods\n\n## Features\n`async_pipeline` aims to simplify the creation of asynchronous pipelines.\n\nMain features are:\n\n* Simplicity. \n  Allows defining not only linear pipelines, \n  but also complex pipelines with cross structures.\n* Fast. \n  Each pipeline process is implemented through a coroutine process, \n  which could avoid blocking by async functions.\n* Easy-to-read. \n  Linear previous and after stages can be connected using pipeline operator, \n  making it more readable.\n* Easy-to-use. \n  Provides functional like methods.\n\n## Quick Start\n### install\n```shell\npython setup.py install\n```\n\n### Simple linear pipeline\nIn this example, the data is multiplied by 2, and then filtered out those greater than 5. \nThe functions in each stage can be synchronous or asynchronous.\n```python\nimport asyncio\nfrom async_pipeline.pipe import Pipe\n\nasync def double(x):\n    await asyncio.sleep(0.01)\n    return x * 2\n    \npipe = Pipe()  # create a pipe\nstage1 = pipe.data(range(5))  # define data stage\nstage2 = pipe.map(func=double, src=stage1)  # define map stage, async func\nstage3 = pipe.filter(func=lambda x: x > 5, src=stage2)  # define filter stage, sync func\nstage4 = pipe.result(src=stage3)  # define result stage, store result in stage4.result\npipe.run()  # run pipe\nprint(stage4.result)  # [6, 8]\n```\n\nUse pipe operator to abbreviate:\n```python\npipe = Pipe()\nstage = range(5) | pipe.map(func=double) | pipe.filter(func=lambda x: x > 5) | pipe.result()\npipe.run()\n```\n\n### Complex non-liner pipeline\nThis pipeline can be described in the following diagram:\n```mermaid\n  graph LR;\n      inp((data))--'>0'-->n1((21));\n      inp((data))--'<0'-->n2((_2));\n      inp((data))--'==0'-->n3((drop));\n      n1((21))-->n4((31));\n      n1((21))-->n5((32));\n      n4((31))-->n6((4));\n      n5((32))-->n6((4));\n      n2((22))-->n7((5));\n      n6((4))-->n7((5));\n      n7((5))-->n8((end));\n```\n```python\nfrom async_pipeline.pipe import Pipe\n\npipe = Pipe()\nstage1 = [-1, 0, 1] | pipe.partition({\n    'p': {\n        'match': \"elem > 0\",\n        'handle': pipe.map(func=lambda x: str(x) + '_21') \n                  | pipe.multiply(\n                      pipe.map(func=lambda x, y: str(x) + y, func_kw={'y': '_31'}),\n                      pipe.map(func=lambda x: str(x) + '_32')\n                  )\n    },\n    'n': {\n        'match': \"elem < 0\",\n        'handle': pipe.map(func=lambda x: str(x) + '_22')\n    }\n})\nstage2 = pipe.concat(\n                pipe.concat(stage1['p'][0], stage1['p'][1]) | pipe.map(func=lambda x: str(x) + '_4'),\n                stage1['n']\n          ) \\\n          | pipe.map(func=lambda x: str(x) + '_5') \\\n          | pipe.result()\npipe.run()\nprint(sorted(stage2.result))  # ['-1_22_5', '1_21_31_4_5', '1_21_32_4_5']\n```\n\n## Supported Python Versions\n* 3.7\n* 3.8\n* 3.9\n* 3.10\n\n## Unittest\n```shell\npip install -r requirements/test.txt\npython -m unittest\n```\n\n## API Reference\n\n\n### class `Pipe`\nAll methods are called through pipe instance. \n\n#### `pipe = Pipe()`\n\nCreate a *pipe*. \n\nDifferent pipes do not affect each other.\n\n#### `pipe.run()`\n\nExecute stages in *pipe*, let the data start flowing. \n\nBefore this, all Stage functions are just record as coroutines. \nEach pipe can be executed only once.\n\n#### `await pipe.async_run()`\n\nThe asynchronous version of `run`.\n\n#### `pipe.cancel()`\n\nCancel the coroutines in the pipeline that have not yet executed.\n\n### class `Stage`\nEach section of the pipeline, i.e. each process, is called a *stage*.\n\nstages are always associated with a pipeline, \nso the stage instance is created through the *pipe* interface\n\n#### `pipe.data(src)`\n\nCreate a *Data Stage*. Place it at the beginning.\n\n`src` is data source, needs to be iterable or async iterable. \n\nThis method is optional because the data source can be placed directly, \nexcept that the code check will say \"Class 'xx' does not define '\\__or__'\"\n\n```python\nstage = pipe.data(range(10))\n```\n\n#### `pipe.result(src=None, output=None)`\n\nCreate a *Result Stage*. Place it at the end.\n\n`src` is previous stage.\nWhen using the pipeline operator, the left side of `|` is the previous Stage, so src is not specified at this time.\n\n`output` locate the pipeline output, can be a list or any object that accepts `append` method. Default is self.result.\n\n```python\n# 1\nprevious = pipe.data(range(10))\nend = pipe.result(src=previous)\n# 2\nend = previous | pipe.result()\n# 3\nresult = []\nend = previous | pipe.result(output=result)\n```\n\n#### `pipe.map(func, src=None, func_kw=None)`\n\nCreate a *Map Stage*, it executes `result = func(elem, **func_kw)` for each element passed in, and outputs the *result*.\n\n`func` is function object, either sync or async. \n\n`func_kw` is parameters of `func` other than element, like *kwargs* dict.\n\n`src` is previous stage, *None* when using `|`.\n\n```python\n# 1\nstage = pipe.map(func=lambda x: x + 1, src=previous)\n# 2\nstage = previous | pipe.map(func=some_func)\n# 3\nstage = previous | pipe.map(func=lambda x, y: x + y, func_kw={'y': 1})\n```\n\n#### `pipe.filter(func, src=None, func_kw=None)`\n\nCreate a *Filter Stage*, it computes flag = func(elem, **func_kw) for each element passed in, and outputs the *element* with flag True.\n\n`func` is function object, either a sync or async. \n\n`func_kw` is parameters of `func` other than element, like *kwargs* dictionary.\n\n`src` is previous stage, *None* when using `|`.\n\n```python\n# 1\nstage = pipe.filter(func=lambda x: x > 0, src=previous)\n# 2\nstage = previous | pipe.filter(func=some_bool_func)\n# 3\nstage = previous | pipe.filter(func=lambda x, y: x > y, func_kw={'y': 0})\n```\n\n#### `pipe.flatten(src=None)`\n\nCreate a *Flatten Stage*, it flattens each element passed in and output.\n\n`src` is previous stage which outputs a two-level list, or iterable or async iterative two-level list. *None* when using `|`.\n\n```python\n# 1\nstage = pipe.flatten(src=[[1], [2, 3]])\n# 2\nstage = previous | pipe.flatten()\n```\n\n#### `pipe.concat(*src)`\n\nCreate a *Concat Stage*, it merges the list of stages. \n\n`*src` is multi previous stages, or iterable or async iterative lists.\n\n```python\n# 1\nstage = pipe.concat([1, 2], [3, 4])\n# 2\nstage = pipe.concat(\n    data | pipe.map(func=lambda x: x), \n    data | pipe.filter(func=lambda x: x > 0), \n)\n```\n\n#### `pipe.partition(cases, src=None)`\n\nCreate a *Partition Stage*, it outputs elements to branch by condition.\n\n`cases` can be a key-value dictionary. The key is the allies, and the value is the matching logic with single param *elem*. \n\nElements are calculated by dict order, and output to the first match. Those do not match any branch will drop.\n\nNext stage's `src` is partition\\[allies]\n\n```python\nstage1 = range(-5, 6) | pipe.partition(cases={'p': 'elem > 0', 'n': 'elem < 0'})\nstage21 = pipe.map(src=stage1['p'], func=lambda x: x + 1)\nstage22 = pipe.map(src=stage1['n'], func=lambda x: x - 2)\nend1 = pipe.result(src=stage21)  # end1.result [2, 3, 4, 5, 6]\nend2 = pipe.result(src=stage22)  # end2.result [-7, -6, -5, -4, -3]\n```\n\n`cases` can also be a complex dictionary define both matching logic and next processing. \n\nhandle is the subsequent processing, it's `src` uses the result by match.\n```python\nstage = range(-5, 6) | pipe.partition({\n    'p': {\n        'match': 'elem > 0',\n        'handle': pipe.map(func=lambda x: x + 1)\n    },\n    'n': {\n        'match': 'elem < 0',\n        'handle': pipe.map(func=lambda x: x - 2)\n    }\n})\nend1 = pipe.result(src=stage['p'])  # end1.result [2, 3, 4, 5, 6]\nend2 = pipe.result(src=stage['n'])  # end2.result [-7, -6, -5, -4, -3]\n```\n\n`src` is previous stage, *None* when using `|`.\n\n#### `pipe.multiply(*targets, src=None)`\n\nCreate a *Multiply Stage*, it multiply copies of the elements and output to `targets` branches. \n\n`targets` are the subsequent pipelines.\n\n`src` is previous stage, *None* when using `|`.\n\n```python\n# 1\nprevious = pipe.data(range(10))\nstage = pipe.multiply(\n    pipe.map(func=lambda x: x),\n    pipe.map(func=lambda x: 2 * x),\n    src=previous\n)\n# 2\nstage = pipe.data(range(10))\n    pipe.map(func=lambda x: x),\n    pipe.map(func=lambda x: 2 * x),\n)\n```\n\n#### `pipe.each(func, src=None, func_kw=None)`\n\nCreate a *Each Stage*.\n\nSimilar to `map`, but elements are dropped after computation.\n\n#### `pipe.peek(func, src=None, func_kw=None)`\n\nCreate a *Peek Stage*.\n\nSimilar to `map`, but still outputs the elements after computation.\n\nElements may changed if mutable.\n\n#### `pipe.filter_not(func, src=None, func_kw=None)`\n\nCreate a *FilterNot Stage*.\n\nSimilar to `filter`, but the logic is reversed.\n\n#### `pipe.distinct(func=hash, src=None, func_kw=None)`\n\nCreate a *Distinct Stage*.\n\nSimilar to `filter`, outputs the elements after de-duplication. \n\n`func` calculate whether an element is duplicated or not, default is hash function.\n\n*Notice: python hash(-1) == hash(-2)*\n\n#### `pipe.duplicate(func=hash, src=None, func_kw=None)`\n\nCreate a *Duplicate Stage*.\n\nSimilar to `filter`, outputs the duplicate elements. \n\n`func` calculate whether an element is duplicated or not, default is hash function.\n\n*Notice: python hash(-1) == hash(-2)*\n\n#### `pipe.limit(num, src=None)`\n\nCreate a *Limit Stage*.\n\nSimilar to `filter`, but only outputs the first `num` of elements.\n\n```python\n# 1\nprevious = pipe.data(range(10))\nstage = pipe.limit(num=5, src=previous)\n# 2\nstage = pipe.data(range(10)) | pipe.limit(num=5)\n```\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Async pipeline with functional methods",
    "version": "0.1.2",
    "project_urls": {
        "Homepage": "https://github.com/RockeyDon/async_pipeline"
    },
    "split_keywords": [
        "async",
        "pipeline",
        "functional"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4286814a877eb95e8e0b87a5c1c9855f8a09db795e46499c5c1a4d39a7539215",
                "md5": "b61535fe47a9621d3e428fa28a2312cf",
                "sha256": "bfc8cf15447efc86e041f10e9fdbc19f26f2dd8a31551b7f2ea1585d44819ca7"
            },
            "downloads": -1,
            "filename": "async_pipeline-0.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "b61535fe47a9621d3e428fa28a2312cf",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7, <3.11",
            "size": 14779,
            "upload_time": "2023-05-07T09:29:44",
            "upload_time_iso_8601": "2023-05-07T09:29:44.278509Z",
            "url": "https://files.pythonhosted.org/packages/42/86/814a877eb95e8e0b87a5c1c9855f8a09db795e46499c5c1a4d39a7539215/async_pipeline-0.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "1f323a0e4390b928e3b241cfb6dab144ec8e2f6a9b8b31abbdea5e282a24028b",
                "md5": "a65ac879dbbaec16b4e774d3e70d6541",
                "sha256": "6b087bdaebf361fed7e95fb95b08aa86624e38c813642b55638350ea61641c5f"
            },
            "downloads": -1,
            "filename": "async_pipeline-0.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "a65ac879dbbaec16b4e774d3e70d6541",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7, <3.11",
            "size": 15938,
            "upload_time": "2023-05-07T09:29:46",
            "upload_time_iso_8601": "2023-05-07T09:29:46.849032Z",
            "url": "https://files.pythonhosted.org/packages/1f/32/3a0e4390b928e3b241cfb6dab144ec8e2f6a9b8b31abbdea5e282a24028b/async_pipeline-0.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-05-07 09:29:46",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "RockeyDon",
    "github_project": "async_pipeline",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "async-pipeline"
}
        
Elapsed time: 0.06126s