# PipelinePy - Flexible Data Transformation Library
PipelinePy is a flexible library designed to streamline the process of data transformation within an application using a declarative approach. By providing an easy-to-use interface for setting up sequences of data transformations, it allows users to define _what_ should happen to their data without needing to manage _how_ the data is processed explicitly. Each transformation in the pipeline can be tailored with contextual logic, supporting complex data handling strategies in a clear and concise manner.
## Key Features
- **Declarative Data Transformations**: Specify your data processing logic declaratively, making your code more readable and maintainable.
- **Context-Aware Operations**: Leverage a shared context to dynamically adjust the behavior of transformations throughout the data pipeline.
- **Pre and Post Hooks**: Execute custom logic before or after transformations to extend functionality without modifying the core processing stages.
- **Flexible Configuration**: Configure and extend pipelines to suit various data sources and processing requirements.
## Installation
Install PipelinePy via pip:
```bash
pip install pipelinepy
```
## Getting Started
Below is a comprehensive example showcasing how to set up and use the PipelinePy library:
### Import Library Components
```python
from pipelinepy import Pipeline
from pipelinepy.transformations import Lambda, Transformation
```
### Define Transformations
Transformation functions manipulate data. Here's how you can define some common transformations:
```python
def Add(value):
return Lambda(lambda data, value=value: [x + value for x in data])
def SubOne():
return Lambda(lambda data: [x - 1 for x in data])
def Multiply(value):
return Lambda(lambda data, value: [x * value for x in data], value)
def Print(prefix=""):
return Lambda(lambda data, prefix: [print(prefix, data), data][1], prefix)
```
### Custom Transformation Classes
For more complex logic, you can define transformation classes:
```python
class Power(Transformation):
def __init__(self, exponent):
self.exponent = exponent
def apply(self, data, context=None):
return [x ** self.exponent for x in data]
class Square(Transformation):
def apply(self, data, context=None):
return [x * x for x in data]
class Filter(Transformation):
def __init__(self, condition):
self.condition = condition
def apply(self, data, context=None):
return list(filter(self.condition, data))
class SortBy(Transformation):
def __init__(self, key_function, reverse=False):
self.key_function = key_function
self.reverse = reverse
def apply(self, data, context=None):
return sorted(data, key=self.key_function, reverse=self.reverse)
```
### Initialize Data
You can start your pipeline with predefined or dynamically generated data:
```python
class InitializeData(Transformation):
def apply(self, data, context=None):
return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
```
### Example Usage
Here is how you can set up and run a pipeline with multiple stages:
```python
def main():
context = {'increment': 1}
pipeline = Pipeline(context)
pipeline.add_pre_hook(pre_hook)
pipeline.add_post_hook(post_hook)
pipeline\
.stage(InitializeData, description="Initial data")\
.stage(Add, 2, description="Add 1")\
.stage(RaiseTo, 2, description="Raise to 2")\
.stage(Square, description="Square")\
.stage(Print, prefix="Before SubOne")\
.stage(lambda data: [x - 1 for x in data])\
.stage(Multiply, 2)\
.stage(Filter, lambda x: x > 200, description="Filter even numbers")\
.stage(SortBy, lambda x: x, reverse=True, description="Sort descending")\
.stage(Take, 2)\
.stage(Print, prefix="Final Output")\
.run()
if __name__ == "__main__":
main()
```
## Advanced Configuration
PipelinePy supports various configurations to tailor the behavior of data transformations according to specific needs. Here are some advanced configurations you might consider:
- **Context Customization**: You can pass a context object that carries runtime-specific data throughout the pipeline execution. This can be useful for conditionally altering the behavior of transformations based on external factors.
- **Dynamic Data Sources**: The pipeline can dynamically source its initial data from external APIs or databases at runtime, allowing for highly adaptive data processing workflows.
### Example: Configuring a Dynamic Data Source
```python
class DynamicData(Transformation):
def apply(self, data, context=None):
# Assume fetch_data is a function that retrieves data based on some criteria
return fetch_data(context.get('data_source'))
```
## Contributing
We welcome contributions from the community! Here are some ways you can contribute:
- **Submit Bug Reports and Feature Requests**: Use the Issues section of our GitHub repository to report problems or suggest new features.
- **Submit Pull Requests**: If you've developed a fix or an enhancement, submit a pull request with your changes. Please ensure your code adheres to the existing style and includes tests covering new or changed functionality.
### Pull Request Process
1. Fork the repository and create your branch from `main`.
2. If you've added code, update the documentation as necessary.
3. Ensure your code adheres to the existing style guidelines.
4. Issue the pull request.
Raw data
{
"_id": null,
"home_page": "https://github.com/mzakariabigdata/pieplinepy",
"name": "pieplinepy",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.6",
"maintainer_email": null,
"keywords": "pipeline data transformation",
"author": "Zakaria Morchid",
"author_email": "morchid.zakariaa@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/cd/1a/a50e05901f916ebbb00d7fb6a37c95fc289511ddc652eed3fa63a0a857b3/pieplinepy-0.2.tar.gz",
"platform": null,
"description": "# PipelinePy - Flexible Data Transformation Library\r\n\r\nPipelinePy is a flexible library designed to streamline the process of data transformation within an application using a declarative approach. By providing an easy-to-use interface for setting up sequences of data transformations, it allows users to define _what_ should happen to their data without needing to manage _how_ the data is processed explicitly. Each transformation in the pipeline can be tailored with contextual logic, supporting complex data handling strategies in a clear and concise manner.\r\n\r\n## Key Features\r\n\r\n- **Declarative Data Transformations**: Specify your data processing logic declaratively, making your code more readable and maintainable.\r\n- **Context-Aware Operations**: Leverage a shared context to dynamically adjust the behavior of transformations throughout the data pipeline.\r\n- **Pre and Post Hooks**: Execute custom logic before or after transformations to extend functionality without modifying the core processing stages.\r\n- **Flexible Configuration**: Configure and extend pipelines to suit various data sources and processing requirements.\r\n\r\n## Installation\r\n\r\nInstall PipelinePy via pip:\r\n\r\n```bash\r\npip install pipelinepy\r\n```\r\n\r\n## Getting Started\r\n\r\nBelow is a comprehensive example showcasing how to set up and use the PipelinePy library:\r\n\r\n### Import Library Components\r\n\r\n```python\r\nfrom pipelinepy import Pipeline\r\nfrom pipelinepy.transformations import Lambda, Transformation\r\n```\r\n\r\n### Define Transformations\r\n\r\nTransformation functions manipulate data. Here's how you can define some common transformations:\r\n\r\n```python\r\ndef Add(value):\r\n return Lambda(lambda data, value=value: [x + value for x in data])\r\n\r\ndef SubOne():\r\n return Lambda(lambda data: [x - 1 for x in data])\r\n\r\ndef Multiply(value):\r\n return Lambda(lambda data, value: [x * value for x in data], value)\r\n\r\ndef Print(prefix=\"\"):\r\n return Lambda(lambda data, prefix: [print(prefix, data), data][1], prefix)\r\n```\r\n\r\n### Custom Transformation Classes\r\n\r\nFor more complex logic, you can define transformation classes:\r\n\r\n```python\r\nclass Power(Transformation):\r\n def __init__(self, exponent):\r\n self.exponent = exponent\r\n\r\n def apply(self, data, context=None):\r\n return [x ** self.exponent for x in data]\r\n\r\nclass Square(Transformation):\r\n def apply(self, data, context=None):\r\n return [x * x for x in data]\r\n\r\nclass Filter(Transformation):\r\n def __init__(self, condition):\r\n self.condition = condition\r\n\r\n def apply(self, data, context=None):\r\n return list(filter(self.condition, data))\r\n\r\nclass SortBy(Transformation):\r\n def __init__(self, key_function, reverse=False):\r\n self.key_function = key_function\r\n self.reverse = reverse\r\n\r\n def apply(self, data, context=None):\r\n return sorted(data, key=self.key_function, reverse=self.reverse)\r\n```\r\n\r\n### Initialize Data\r\n\r\nYou can start your pipeline with predefined or dynamically generated data:\r\n\r\n```python\r\nclass InitializeData(Transformation):\r\n def apply(self, data, context=None):\r\n return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]\r\n```\r\n\r\n### Example Usage\r\n\r\nHere is how you can set up and run a pipeline with multiple stages:\r\n\r\n```python\r\ndef main():\r\n context = {'increment': 1}\r\n pipeline = Pipeline(context)\r\n pipeline.add_pre_hook(pre_hook)\r\n pipeline.add_post_hook(post_hook)\r\n pipeline\\\r\n .stage(InitializeData, description=\"Initial data\")\\\r\n .stage(Add, 2, description=\"Add 1\")\\\r\n .stage(RaiseTo, 2, description=\"Raise to 2\")\\\r\n .stage(Square, description=\"Square\")\\\r\n .stage(Print, prefix=\"Before SubOne\")\\\r\n .stage(lambda data: [x - 1 for x in data])\\\r\n .stage(Multiply, 2)\\\r\n .stage(Filter, lambda x: x > 200, description=\"Filter even numbers\")\\\r\n .stage(SortBy, lambda x: x, reverse=True, description=\"Sort descending\")\\\r\n .stage(Take, 2)\\\r\n .stage(Print, prefix=\"Final Output\")\\\r\n .run()\r\n\r\nif __name__ == \"__main__\":\r\n main()\r\n```\r\n\r\n## Advanced Configuration\r\n\r\nPipelinePy supports various configurations to tailor the behavior of data transformations according to specific needs. Here are some advanced configurations you might consider:\r\n\r\n- **Context Customization**: You can pass a context object that carries runtime-specific data throughout the pipeline execution. This can be useful for conditionally altering the behavior of transformations based on external factors.\r\n\r\n- **Dynamic Data Sources**: The pipeline can dynamically source its initial data from external APIs or databases at runtime, allowing for highly adaptive data processing workflows.\r\n\r\n### Example: Configuring a Dynamic Data Source\r\n\r\n```python\r\nclass DynamicData(Transformation):\r\n def apply(self, data, context=None):\r\n # Assume fetch_data is a function that retrieves data based on some criteria\r\n return fetch_data(context.get('data_source'))\r\n```\r\n\r\n## Contributing\r\n\r\nWe welcome contributions from the community! Here are some ways you can contribute:\r\n\r\n- **Submit Bug Reports and Feature Requests**: Use the Issues section of our GitHub repository to report problems or suggest new features.\r\n\r\n- **Submit Pull Requests**: If you've developed a fix or an enhancement, submit a pull request with your changes. Please ensure your code adheres to the existing style and includes tests covering new or changed functionality.\r\n\r\n### Pull Request Process\r\n\r\n1. Fork the repository and create your branch from `main`.\r\n2. If you've added code, update the documentation as necessary.\r\n3. Ensure your code adheres to the existing style guidelines.\r\n4. Issue the pull request.\r\n",
"bugtrack_url": null,
"license": null,
"summary": "A custom pipeline library for data transformations",
"version": "0.2",
"project_urls": {
"Homepage": "https://github.com/mzakariabigdata/pieplinepy"
},
"split_keywords": [
"pipeline",
"data",
"transformation"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "cd1aa50e05901f916ebbb00d7fb6a37c95fc289511ddc652eed3fa63a0a857b3",
"md5": "99244dc35aceffeeb1b4200a27ec15d8",
"sha256": "4b1ee46a1bc769d6e19035d64c0f87d248096a85d6103060d889ad1b3c57ead0"
},
"downloads": -1,
"filename": "pieplinepy-0.2.tar.gz",
"has_sig": false,
"md5_digest": "99244dc35aceffeeb1b4200a27ec15d8",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.6",
"size": 6088,
"upload_time": "2024-04-26T00:11:06",
"upload_time_iso_8601": "2024-04-26T00:11:06.710200Z",
"url": "https://files.pythonhosted.org/packages/cd/1a/a50e05901f916ebbb00d7fb6a37c95fc289511ddc652eed3fa63a0a857b3/pieplinepy-0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-26 00:11:06",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "mzakariabigdata",
"github_project": "pieplinepy",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "pieplinepy"
}