pipelinehub


Namepipelinehub JSON
Version 0.1.0 PyPI version JSON
download
home_pagehttps://github.com/rahulxj100/pipelinehub
SummaryA flexible data pipeline library for custom data processing workflows
upload_time2025-09-16 16:24:49
maintainerNone
docs_urlNone
authorRahul Paul
requires_python>=3.7
licenseMIT
keywords pipeline data processing workflow etl
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # PipelineHub

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)


A flexible Python library for creating custom data processing workflows with ease.

## โœจ Features

- ๐Ÿ”ง **Flexible**: Add any callable function as a processing step
- ๐Ÿ”— **Chainable**: Fluent method chaining for clean, readable code
- ๐Ÿ› **Debuggable**: Verbose mode shows data flow between steps
- ๐Ÿงช **Testable**: Clear error handling with step identification
- ๐Ÿ“ฆ **Lightweight**: Zero external dependencies
- ๐ŸŽฏ **Type-friendly**: Full type hints for better IDE support
- ๐Ÿš€ **Performance**: Minimal overhead for maximum speed
- ๐Ÿ”„ **Reusable**: Create pipelines once, use with different datasets

## Installation
```bash
pip install pipelinehub
```

## ๐Ÿ“– Quick Start
```python
from pipelinehub import DataPipeline, normalize_data, square_numbers

# Create a pipeline with multiple steps
pipeline = DataPipeline()
pipeline.add_step(lambda x: [i for i in x if i > 0], "filter_positive")
pipeline.add_step(square_numbers, "square")
pipeline.add_step(normalize_data, "normalize")

# Execute with sample data
data = [-2, -1, 0, 1, 2, 3, 4, 5]
result = pipeline.execute(data, verbose=True)

print(result)
```
## ๐Ÿ”— Method Chaining
Create pipelines fluently with method chaining:

```python
from pipelinehub import DataPipeline, add_constant

# Chain operations together
result = (DataPipeline()
          .add_step(lambda x: [i for i in x if i % 2 == 0], "filter_even")
          .add_step(add_constant(10), "add_10")  
          .add_step(lambda x: sorted(x, reverse=True), "sort_desc")
          .execute([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))

print(result) 
```

## ๐Ÿ“š Comprehensive Examples

### Data Cleaning Pipeline
```python 
from pipelinehub import DataPipeline, outlier_removal, normalize_data, calculate_stats

# Create a data cleaning pipeline
cleaning_pipeline = (DataPipeline()
    .add_step(lambda x: [float(i) for i in x if i is not None], "convert_and_filter")
    .add_step(lambda x: outlier_removal(x, threshold=2.5), "remove_outliers") 
    .add_step(normalize_data, "normalize")
    .add_step(calculate_stats, "final_stats"))

# Process messy data
messy_data = [1, 2, 3, None, 100, 4, 5, 6, 7, 8, 9]
stats = cleaning_pipeline.execute(messy_data, verbose=True)
print(stats)
```
### Text Processing Pipeline
```python
import re
from pipelinehub import DataPipeline

def clean_text(text):
    """Remove special characters and extra whitespace."""
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    return ' '.join(text.split())

def extract_keywords(words, min_length=4):
    """Extract words longer than min_length."""
    return [word for word in words if len(word) >= min_length]

# Build text processing pipeline
text_pipeline = (DataPipeline()
    .add_step(str.lower, "lowercase")
    .add_step(clean_text, "clean")
    .add_step(str.split, "tokenize") 
    .add_step(lambda words: extract_keywords(words, min_length=4), "extract_keywords")
    .add_step(lambda words: sorted(set(words)), "unique_and_sort"))

# Process text
text = "Hello World! This is a Sample Text for Processing... With special chars!!!"
keywords = text_pipeline.execute(text, verbose=True)
print(keywords)
```

## Pipeline Management
```python
pipeline = DataPipeline()
pipeline.add_step(lambda x: [i*2 for i in x], "double")
pipeline.add_step(lambda x: [i+1 for i in x], "add_one")

# Inspect pipeline
print(len(pipeline))  # 2
print(pipeline.get_steps())  # ['double', 'add_one']
print(pipeline)  # DataPipeline(2 steps: double, add_one)

# Remove steps
pipeline.remove_step(0)  # Remove first step
print(pipeline.get_steps())  # ['add_one']

# Clear all steps
pipeline.clear_steps()
print(len(pipeline))  # 0
```
## ๐Ÿš€ Performance Tips

- Use built-in functions when possible - they're optimized
- Avoid creating large intermediate data structures
- Consider using generators for large datasets:
```python
def generator_step(data):
    """Use generator for memory efficiency."""
    for item in data:
        if item > 0:
            yield item * 2

pipeline = DataPipeline().add_step(lambda x: list(generator_step(x)), "process")
```

## ๐Ÿค Contributing
Contributions are welcome! Here's how to get started:

- Fork the repository
- Create a feature branch: git checkout -b feature/amazing-feature
- Make your changes and add tests
- Run tests: pytest tests/
- Commit your changes: git commit -m 'Add amazing feature'
- Push to branch: git push origin feature/amazing-feature
- Open a Pull Request

## ๐Ÿ“„ License
This project is licensed under the MIT License - see the LICENSE file for details.
## ๐Ÿ™‹โ€โ™‚๏ธ Support

Discussions: GitHub Discussions

## ๐ŸŽ‰ Acknowledgments

- Inspired by functional programming and Unix pipes philosophy
- Built with โค๏ธ for the Python community
- Thanks to all contributors and users!



            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/rahulxj100/pipelinehub",
    "name": "pipelinehub",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": null,
    "keywords": "pipeline, data, processing, workflow, etl",
    "author": "Rahul Paul",
    "author_email": "Rahul Paul <paul.rahulxj100@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/f5/29/1524a88ca4e906c2eb07bec5ca35bbd2dc44f210248b65c7d0beeb0d1267/pipelinehub-0.1.0.tar.gz",
    "platform": null,
    "description": "# PipelineHub\r\n\r\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\r\n\r\n\r\nA flexible Python library for creating custom data processing workflows with ease.\r\n\r\n## \u2728 Features\r\n\r\n- \ud83d\udd27 **Flexible**: Add any callable function as a processing step\r\n- \ud83d\udd17 **Chainable**: Fluent method chaining for clean, readable code\r\n- \ud83d\udc1b **Debuggable**: Verbose mode shows data flow between steps\r\n- \ud83e\uddea **Testable**: Clear error handling with step identification\r\n- \ud83d\udce6 **Lightweight**: Zero external dependencies\r\n- \ud83c\udfaf **Type-friendly**: Full type hints for better IDE support\r\n- \ud83d\ude80 **Performance**: Minimal overhead for maximum speed\r\n- \ud83d\udd04 **Reusable**: Create pipelines once, use with different datasets\r\n\r\n## Installation\r\n```bash\r\npip install pipelinehub\r\n```\r\n\r\n## \ud83d\udcd6 Quick Start\r\n```python\r\nfrom pipelinehub import DataPipeline, normalize_data, square_numbers\r\n\r\n# Create a pipeline with multiple steps\r\npipeline = DataPipeline()\r\npipeline.add_step(lambda x: [i for i in x if i > 0], \"filter_positive\")\r\npipeline.add_step(square_numbers, \"square\")\r\npipeline.add_step(normalize_data, \"normalize\")\r\n\r\n# Execute with sample data\r\ndata = [-2, -1, 0, 1, 2, 3, 4, 5]\r\nresult = pipeline.execute(data, verbose=True)\r\n\r\nprint(result)\r\n```\r\n## \ud83d\udd17 Method Chaining\r\nCreate pipelines fluently with method chaining:\r\n\r\n```python\r\nfrom pipelinehub import DataPipeline, add_constant\r\n\r\n# Chain operations together\r\nresult = (DataPipeline()\r\n          .add_step(lambda x: [i for i in x if i % 2 == 0], \"filter_even\")\r\n          .add_step(add_constant(10), \"add_10\")  \r\n          .add_step(lambda x: sorted(x, reverse=True), \"sort_desc\")\r\n          .execute([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))\r\n\r\nprint(result) \r\n```\r\n\r\n## \ud83d\udcda Comprehensive Examples\r\n\r\n### Data Cleaning Pipeline\r\n```python \r\nfrom pipelinehub import DataPipeline, outlier_removal, normalize_data, calculate_stats\r\n\r\n# Create a data cleaning pipeline\r\ncleaning_pipeline = (DataPipeline()\r\n    .add_step(lambda x: [float(i) for i in x if i is not None], \"convert_and_filter\")\r\n    .add_step(lambda x: outlier_removal(x, threshold=2.5), \"remove_outliers\") \r\n    .add_step(normalize_data, \"normalize\")\r\n    .add_step(calculate_stats, \"final_stats\"))\r\n\r\n# Process messy data\r\nmessy_data = [1, 2, 3, None, 100, 4, 5, 6, 7, 8, 9]\r\nstats = cleaning_pipeline.execute(messy_data, verbose=True)\r\nprint(stats)\r\n```\r\n### Text Processing Pipeline\r\n```python\r\nimport re\r\nfrom pipelinehub import DataPipeline\r\n\r\ndef clean_text(text):\r\n    \"\"\"Remove special characters and extra whitespace.\"\"\"\r\n    text = re.sub(r'[^a-zA-Z0-9\\s]', '', text)\r\n    return ' '.join(text.split())\r\n\r\ndef extract_keywords(words, min_length=4):\r\n    \"\"\"Extract words longer than min_length.\"\"\"\r\n    return [word for word in words if len(word) >= min_length]\r\n\r\n# Build text processing pipeline\r\ntext_pipeline = (DataPipeline()\r\n    .add_step(str.lower, \"lowercase\")\r\n    .add_step(clean_text, \"clean\")\r\n    .add_step(str.split, \"tokenize\") \r\n    .add_step(lambda words: extract_keywords(words, min_length=4), \"extract_keywords\")\r\n    .add_step(lambda words: sorted(set(words)), \"unique_and_sort\"))\r\n\r\n# Process text\r\ntext = \"Hello World! This is a Sample Text for Processing... With special chars!!!\"\r\nkeywords = text_pipeline.execute(text, verbose=True)\r\nprint(keywords)\r\n```\r\n\r\n## Pipeline Management\r\n```python\r\npipeline = DataPipeline()\r\npipeline.add_step(lambda x: [i*2 for i in x], \"double\")\r\npipeline.add_step(lambda x: [i+1 for i in x], \"add_one\")\r\n\r\n# Inspect pipeline\r\nprint(len(pipeline))  # 2\r\nprint(pipeline.get_steps())  # ['double', 'add_one']\r\nprint(pipeline)  # DataPipeline(2 steps: double, add_one)\r\n\r\n# Remove steps\r\npipeline.remove_step(0)  # Remove first step\r\nprint(pipeline.get_steps())  # ['add_one']\r\n\r\n# Clear all steps\r\npipeline.clear_steps()\r\nprint(len(pipeline))  # 0\r\n```\r\n## \ud83d\ude80 Performance Tips\r\n\r\n- Use built-in functions when possible - they're optimized\r\n- Avoid creating large intermediate data structures\r\n- Consider using generators for large datasets:\r\n```python\r\ndef generator_step(data):\r\n    \"\"\"Use generator for memory efficiency.\"\"\"\r\n    for item in data:\r\n        if item > 0:\r\n            yield item * 2\r\n\r\npipeline = DataPipeline().add_step(lambda x: list(generator_step(x)), \"process\")\r\n```\r\n\r\n## \ud83e\udd1d Contributing\r\nContributions are welcome! Here's how to get started:\r\n\r\n- Fork the repository\r\n- Create a feature branch: git checkout -b feature/amazing-feature\r\n- Make your changes and add tests\r\n- Run tests: pytest tests/\r\n- Commit your changes: git commit -m 'Add amazing feature'\r\n- Push to branch: git push origin feature/amazing-feature\r\n- Open a Pull Request\r\n\r\n## \ud83d\udcc4 License\r\nThis project is licensed under the MIT License - see the LICENSE file for details.\r\n## \ud83d\ude4b\u200d\u2642\ufe0f Support\r\n\r\nDiscussions: GitHub Discussions\r\n\r\n## \ud83c\udf89 Acknowledgments\r\n\r\n- Inspired by functional programming and Unix pipes philosophy\r\n- Built with \u2764\ufe0f for the Python community\r\n- Thanks to all contributors and users!\r\n\r\n\r\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A flexible data pipeline library for custom data processing workflows",
    "version": "0.1.0",
    "project_urls": {
        "Bug Reports": "https://github.com/rahulxj100/pipelinehub/issues",
        "Homepage": "https://github.com/rahulxj100/pipelinehub",
        "Source": "https://github.com/rahulxj100/pipelinehub"
    },
    "split_keywords": [
        "pipeline",
        " data",
        " processing",
        " workflow",
        " etl"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "a310541b4e17639f5305160a943e0a3f7c43fc6304eae1d2fcc08cd4d7349ff4",
                "md5": "f516b0d71badcd3646071c14edb1f911",
                "sha256": "4541b5fe6a3a2b1250a7983c330c25e069e9b4609a9143464c7236219101472d"
            },
            "downloads": -1,
            "filename": "pipelinehub-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "f516b0d71badcd3646071c14edb1f911",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 7269,
            "upload_time": "2025-09-16T16:24:48",
            "upload_time_iso_8601": "2025-09-16T16:24:48.037612Z",
            "url": "https://files.pythonhosted.org/packages/a3/10/541b4e17639f5305160a943e0a3f7c43fc6304eae1d2fcc08cd4d7349ff4/pipelinehub-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f5291524a88ca4e906c2eb07bec5ca35bbd2dc44f210248b65c7d0beeb0d1267",
                "md5": "97a6b85eaf5dccd816b58f9737c19780",
                "sha256": "832d762e7f25bfe00fe26da06ea3c9a01e6eac73a84d425583e212698c687cb4"
            },
            "downloads": -1,
            "filename": "pipelinehub-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "97a6b85eaf5dccd816b58f9737c19780",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 7733,
            "upload_time": "2025-09-16T16:24:49",
            "upload_time_iso_8601": "2025-09-16T16:24:49.611736Z",
            "url": "https://files.pythonhosted.org/packages/f5/29/1524a88ca4e906c2eb07bec5ca35bbd2dc44f210248b65c7d0beeb0d1267/pipelinehub-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-16 16:24:49",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "rahulxj100",
    "github_project": "pipelinehub",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "pipelinehub"
}
        
Elapsed time: 8.46229s