thecodecrate-pipeline


Namethecodecrate-pipeline JSON
Version 1.23.0 PyPI version JSON
download
home_pageNone
SummaryThis package provides a pipeline pattern implementation
upload_time2024-12-09 22:38:40
maintainerNone
docs_urlNone
authorNone
requires_python>=3.13
licenseThe MIT License (MIT) Copyright (c) 2024 TheCodeCrate <loureiro.rg@gmail.com> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
keywords library pipeline python python-pipeline
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # TheCodeCrate's Pipeline

This package provides a pipeline pattern implementation.

The implementation is inspired by the excellent [PHP League Pipeline](https://github.com/thephpleague/pipeline) package.

## Installation

```bash
pip install thecodecrate-pipeline
```

## Pipeline Pattern

The pipeline pattern allows you to easily compose sequential stages by chaining stages.

In this particular implementation, the interface consists of two parts:

- `StageInterface`
- `PipelineInterface`

A pipeline consists of zero, one, or multiple stages. A pipeline can process a payload. During the processing, the payload will be passed to the first stage. From that moment on, the resulting value is passed on from stage to stage.

In the simplest form, the execution chain can be represented as a for loop:

```python
result = payload

for stage in stages:
    result = stage(result)

return result
```

Effectively, this is the same as:

```python
result = stage3(stage2(stage1(payload)))
```

## Immutability

Pipelines are implemented as immutable stage chains. When you pipe a new stage, a new pipeline will be created with the added stage. This makes pipelines easy to reuse and minimizes side-effects.

## Usage

Operations in a pipeline, stages, can be anything that satisfies the `Callable` type hint. So functions and anything that's callable is acceptable.

```python
pipeline = Pipeline().pipe(lambda payload: payload * 10)

# Returns 100
await pipeline.process(10)
```

## Class-Based Stages

Class-based stages are also possible. The `StageInterface[InputType, OutputType]` interface can be implemented, which ensures you have the correct method signature for the `__call__` method.

```python
class TimesTwoStage(StageInterface[int, int]):
    async def __call__(self, payload: int) -> int:
        return payload * 2

class AddOneStage(StageInterface[int, int]):
    async def __call__(self, payload: int) -> int:
        return payload + 1

pipeline = (
    Pipeline[int, int]()
    .pipe(TimesTwoStage())
    .pipe(AddOneStage())
)

# Returns 21
await pipeline.process(10)
```

## Reusable Pipelines

Because the `PipelineInterface` is an extension of the `StageInterface`, pipelines can be reused as stages. This creates a highly composable model to create complex execution patterns while keeping the cognitive load low.

For example, if we'd want to compose a pipeline to process API calls, we'd create something along these lines:

```python
process_api_request = (
    Pipeline()
    .pipe(ExecuteHttpRequest())
    .pipe(ParseJsonResponse())
)

pipeline = (
    Pipeline()
    .pipe(ConvertToPsr7Request())
    .pipe(process_api_request)
    .pipe(ConvertToResponseDto())
)

await pipeline.process(DeleteBlogPost(post_id))
```

## Type Hinting

You can specify the input and output types for pipelines and stages using type variables `T_in` and `T_out`. This allows you to handle varying types between stages, enhancing type safety and code clarity.

The `T_out` type variable is optional and defaults to `T_in`. Similarly, `T_in` is also optional and defaults to `Any`.

```python
from typing import Any

pipeline = Pipeline[int]().pipe(lambda payload: payload * 2)

# Returns 20
await pipeline.process(10)
```

You can also handle varying types between stages:

```python
pipeline = Pipeline[int, str]().pipe(lambda payload: f"Number: {payload}")

# Returns "Number: 10"
await pipeline.process(10)
```

This flexibility allows you to build pipelines that transform data types between stages seamlessly.

## Custom Processors

You can create your own processors to customize how the pipeline processes stages. This allows you to implement different execution strategies, such as handling exceptions, processing resources, or implementing middleware patterns.

For example, you can define a custom processor:

```python
class MyCustomProcessor(Processor[T_in, T_out]):
    async def process(
        self,
        payload: T_in,
        stages: StageInstanceCollection,
    ) -> T_out:
        # Custom processing logic
        for stage in stages:
            payload = await stage(payload)
        return payload
```

And use it in your pipeline:

```python
pipeline = Pipeline[int, int](processor=MyCustomProcessor()).pipe(lambda x: x * 2)
```

## Declarative Stages

Instead of using `pipe` to add stages at runtime, you can define stages declaratively by specifying them as class-level attributes. This makes pipelines easier to set up and reuse with predefined stages.

```python
class MyPipeline(Pipeline[int, int]):
    stages = [
        TimesTwoStage(),
        TimesThreeStage(),
    ]

# Process the payload through the pipeline with the declared stages
result = await MyPipeline().process(5)

# Returns 30
print(result)
```

In this example, `MyPipeline` declares its stages directly in the class definition, making the pipeline setup more readable and maintainable.

## Declarative Processor

You can also specify the processor in a declarative way by setting the `processor_class` attribute in your pipeline class.

```python
class MyPipeline(Pipeline[T_in, T_out]):
    processor_class = MyCustomProcessor
```

This allows you to customize the processing behavior of your pipeline while keeping the definition clean and declarative.

## Processing Streams

The pipeline can also process streams in real-time, allowing you to handle asynchronous iterators and process data as it becomes available.

```python
from typing import AsyncIterator
import asyncio

async def input_stream() -> AsyncIterator[int]:
    for i in range(5):
        yield i

async def stage1(stream: AsyncIterator[int]) -> AsyncIterator[int]:
    async for item in stream:
        yield item * 2
        await asyncio.sleep(1)  # Simulate processing delay

async def stage2(stream: AsyncIterator[int]) -> AsyncIterator[str]:
    async for item in stream:
        yield f"Number: {item}"


async def main():
    pipeline = (
        Pipeline[AsyncIterator[int], AsyncIterator[str]]()
        .pipe(stage1)
        .pipe(stage2)
    )

    stream = await pipeline.process(input_stream())

    async for result in stream:
        print(result)

# Run the async main function
await main()
```

This allows you to process data in a streaming fashion, where each stage can yield results that are immediately consumed by the next stage.

## Pipeline Factory

Because pipelines themselves are immutable, pipeline factory is introduced to facilitate distributed composition of a pipeline.

The `PipelineFactory[InputType, OutputType]` collects stages and allows you to create a pipeline at any given time.

```python
pipeline_factory = PipelineFactory().with_stages([LogicalStage(), AddOneStage()])

# Additional stages can be added later
pipeline_factory.add_stage(LastStage()).with_processor(MyCustomProcessor())

# Build the pipeline
pipeline = pipeline_factory.build()
```

## Exception Handling

This package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis, either inside a _stage_ or at the time the pipeline processes a payload.

```python
pipeline = Pipeline().pipe(lambda payload: payload / 0)

try:
    await pipeline.process(10)
except ZeroDivisionError as e:
    # Handle the exception.
    pass
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "thecodecrate-pipeline",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.13",
    "maintainer_email": null,
    "keywords": "library, pipeline, python, python-pipeline",
    "author": null,
    "author_email": "TheCodeCrate <loureiro.rg@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/8c/ad/efb7758aa0df75d357887337609cce3a1abcc00d31a69ad2871d92ed6b5e/thecodecrate_pipeline-1.23.0.tar.gz",
    "platform": null,
    "description": "# TheCodeCrate's Pipeline\n\nThis package provides a pipeline pattern implementation.\n\nThe implementation is inspired by the excellent [PHP League Pipeline](https://github.com/thephpleague/pipeline) package.\n\n## Installation\n\n```bash\npip install thecodecrate-pipeline\n```\n\n## Pipeline Pattern\n\nThe pipeline pattern allows you to easily compose sequential stages by chaining stages.\n\nIn this particular implementation, the interface consists of two parts:\n\n- `StageInterface`\n- `PipelineInterface`\n\nA pipeline consists of zero, one, or multiple stages. A pipeline can process a payload. During the processing, the payload will be passed to the first stage. From that moment on, the resulting value is passed on from stage to stage.\n\nIn the simplest form, the execution chain can be represented as a for loop:\n\n```python\nresult = payload\n\nfor stage in stages:\n    result = stage(result)\n\nreturn result\n```\n\nEffectively, this is the same as:\n\n```python\nresult = stage3(stage2(stage1(payload)))\n```\n\n## Immutability\n\nPipelines are implemented as immutable stage chains. When you pipe a new stage, a new pipeline will be created with the added stage. This makes pipelines easy to reuse and minimizes side-effects.\n\n## Usage\n\nOperations in a pipeline, stages, can be anything that satisfies the `Callable` type hint. So functions and anything that's callable is acceptable.\n\n```python\npipeline = Pipeline().pipe(lambda payload: payload * 10)\n\n# Returns 100\nawait pipeline.process(10)\n```\n\n## Class-Based Stages\n\nClass-based stages are also possible. The `StageInterface[InputType, OutputType]` interface can be implemented, which ensures you have the correct method signature for the `__call__` method.\n\n```python\nclass TimesTwoStage(StageInterface[int, int]):\n    async def __call__(self, payload: int) -> int:\n        return payload * 2\n\nclass AddOneStage(StageInterface[int, int]):\n    async def __call__(self, payload: int) -> int:\n        return payload + 1\n\npipeline = (\n    Pipeline[int, int]()\n    .pipe(TimesTwoStage())\n    .pipe(AddOneStage())\n)\n\n# Returns 21\nawait pipeline.process(10)\n```\n\n## Reusable Pipelines\n\nBecause the `PipelineInterface` is an extension of the `StageInterface`, pipelines can be reused as stages. This creates a highly composable model to create complex execution patterns while keeping the cognitive load low.\n\nFor example, if we'd want to compose a pipeline to process API calls, we'd create something along these lines:\n\n```python\nprocess_api_request = (\n    Pipeline()\n    .pipe(ExecuteHttpRequest())\n    .pipe(ParseJsonResponse())\n)\n\npipeline = (\n    Pipeline()\n    .pipe(ConvertToPsr7Request())\n    .pipe(process_api_request)\n    .pipe(ConvertToResponseDto())\n)\n\nawait pipeline.process(DeleteBlogPost(post_id))\n```\n\n## Type Hinting\n\nYou can specify the input and output types for pipelines and stages using type variables `T_in` and `T_out`. This allows you to handle varying types between stages, enhancing type safety and code clarity.\n\nThe `T_out` type variable is optional and defaults to `T_in`. Similarly, `T_in` is also optional and defaults to `Any`.\n\n```python\nfrom typing import Any\n\npipeline = Pipeline[int]().pipe(lambda payload: payload * 2)\n\n# Returns 20\nawait pipeline.process(10)\n```\n\nYou can also handle varying types between stages:\n\n```python\npipeline = Pipeline[int, str]().pipe(lambda payload: f\"Number: {payload}\")\n\n# Returns \"Number: 10\"\nawait pipeline.process(10)\n```\n\nThis flexibility allows you to build pipelines that transform data types between stages seamlessly.\n\n## Custom Processors\n\nYou can create your own processors to customize how the pipeline processes stages. This allows you to implement different execution strategies, such as handling exceptions, processing resources, or implementing middleware patterns.\n\nFor example, you can define a custom processor:\n\n```python\nclass MyCustomProcessor(Processor[T_in, T_out]):\n    async def process(\n        self,\n        payload: T_in,\n        stages: StageInstanceCollection,\n    ) -> T_out:\n        # Custom processing logic\n        for stage in stages:\n            payload = await stage(payload)\n        return payload\n```\n\nAnd use it in your pipeline:\n\n```python\npipeline = Pipeline[int, int](processor=MyCustomProcessor()).pipe(lambda x: x * 2)\n```\n\n## Declarative Stages\n\nInstead of using `pipe` to add stages at runtime, you can define stages declaratively by specifying them as class-level attributes. This makes pipelines easier to set up and reuse with predefined stages.\n\n```python\nclass MyPipeline(Pipeline[int, int]):\n    stages = [\n        TimesTwoStage(),\n        TimesThreeStage(),\n    ]\n\n# Process the payload through the pipeline with the declared stages\nresult = await MyPipeline().process(5)\n\n# Returns 30\nprint(result)\n```\n\nIn this example, `MyPipeline` declares its stages directly in the class definition, making the pipeline setup more readable and maintainable.\n\n## Declarative Processor\n\nYou can also specify the processor in a declarative way by setting the `processor_class` attribute in your pipeline class.\n\n```python\nclass MyPipeline(Pipeline[T_in, T_out]):\n    processor_class = MyCustomProcessor\n```\n\nThis allows you to customize the processing behavior of your pipeline while keeping the definition clean and declarative.\n\n## Processing Streams\n\nThe pipeline can also process streams in real-time, allowing you to handle asynchronous iterators and process data as it becomes available.\n\n```python\nfrom typing import AsyncIterator\nimport asyncio\n\nasync def input_stream() -> AsyncIterator[int]:\n    for i in range(5):\n        yield i\n\nasync def stage1(stream: AsyncIterator[int]) -> AsyncIterator[int]:\n    async for item in stream:\n        yield item * 2\n        await asyncio.sleep(1)  # Simulate processing delay\n\nasync def stage2(stream: AsyncIterator[int]) -> AsyncIterator[str]:\n    async for item in stream:\n        yield f\"Number: {item}\"\n\n\nasync def main():\n    pipeline = (\n        Pipeline[AsyncIterator[int], AsyncIterator[str]]()\n        .pipe(stage1)\n        .pipe(stage2)\n    )\n\n    stream = await pipeline.process(input_stream())\n\n    async for result in stream:\n        print(result)\n\n# Run the async main function\nawait main()\n```\n\nThis allows you to process data in a streaming fashion, where each stage can yield results that are immediately consumed by the next stage.\n\n## Pipeline Factory\n\nBecause pipelines themselves are immutable, pipeline factory is introduced to facilitate distributed composition of a pipeline.\n\nThe `PipelineFactory[InputType, OutputType]` collects stages and allows you to create a pipeline at any given time.\n\n```python\npipeline_factory = PipelineFactory().with_stages([LogicalStage(), AddOneStage()])\n\n# Additional stages can be added later\npipeline_factory.add_stage(LastStage()).with_processor(MyCustomProcessor())\n\n# Build the pipeline\npipeline = pipeline_factory.build()\n```\n\n## Exception Handling\n\nThis package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis, either inside a _stage_ or at the time the pipeline processes a payload.\n\n```python\npipeline = Pipeline().pipe(lambda payload: payload / 0)\n\ntry:\n    await pipeline.process(10)\nexcept ZeroDivisionError as e:\n    # Handle the exception.\n    pass\n```\n",
    "bugtrack_url": null,
    "license": "The MIT License (MIT)  Copyright (c) 2024 TheCodeCrate <loureiro.rg@gmail.com>  Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \"Software\"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:  The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.  THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.",
    "summary": "This package provides a pipeline pattern implementation",
    "version": "1.23.0",
    "project_urls": {
        "documentation": "https://github.com/thecodecrate/python-pipeline",
        "repository": "https://github.com/thecodecrate/python-pipeline"
    },
    "split_keywords": [
        "library",
        " pipeline",
        " python",
        " python-pipeline"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "4f3875652640109f8c452fae4dc61aa37883a9c0ad992f71b76c851e6289d8f2",
                "md5": "d79f4fc100a95c28c2b755f75d5e9322",
                "sha256": "fea0c358c8842e2a7021305a4a91300eb31847b2c8e10cf412110b74bfb1732f"
            },
            "downloads": -1,
            "filename": "thecodecrate_pipeline-1.23.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d79f4fc100a95c28c2b755f75d5e9322",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.13",
            "size": 31106,
            "upload_time": "2024-12-09T22:38:38",
            "upload_time_iso_8601": "2024-12-09T22:38:38.984662Z",
            "url": "https://files.pythonhosted.org/packages/4f/38/75652640109f8c452fae4dc61aa37883a9c0ad992f71b76c851e6289d8f2/thecodecrate_pipeline-1.23.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "8cadefb7758aa0df75d357887337609cce3a1abcc00d31a69ad2871d92ed6b5e",
                "md5": "fe9dca653b99d1090d0d35f2439755bb",
                "sha256": "399363b517ae8e65d4b3b1f204013d616cb7a580abeb6e70b550e1f01174314a"
            },
            "downloads": -1,
            "filename": "thecodecrate_pipeline-1.23.0.tar.gz",
            "has_sig": false,
            "md5_digest": "fe9dca653b99d1090d0d35f2439755bb",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.13",
            "size": 37257,
            "upload_time": "2024-12-09T22:38:40",
            "upload_time_iso_8601": "2024-12-09T22:38:40.717471Z",
            "url": "https://files.pythonhosted.org/packages/8c/ad/efb7758aa0df75d357887337609cce3a1abcc00d31a69ad2871d92ed6b5e/thecodecrate_pipeline-1.23.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-12-09 22:38:40",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "thecodecrate",
    "github_project": "python-pipeline",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "thecodecrate-pipeline"
}
        
Elapsed time: 0.41239s