pipeweave


Namepipeweave JSON
Version 0.2.1 PyPI version JSON
download
home_pagehttps://github.com/taylorgtyler/pipeweave
SummaryA flexible Python data pipeline library using finite state machines for custom data processing workflows
upload_time2024-11-10 17:53:42
maintainerNone
docs_urlNone
authorTaylor Tyler
requires_python<4.0,>=3.12
licenseMIT
keywords pipeline data fsm workflow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Pipeweave

A flexible Python data pipeline library that makes it easy to construct and run custom data pipelines using a finite state machine approach.

## Project Goal

I have tried some popular Python data pipeline libraries, and have found them all to be a little hard to use for custom use cases. The goal of this project is to create a pipeline library that avoids some of the common pitfalls and allows users to easily construct pipelines using custom functions and run them using a finite state machine.

## Features

- 🚀 Simple, intuitive API for creating data pipelines
- 🔄 Built-in state management using finite state machines
- 📦 Easy integration of custom functions
- 💾 Multiple storage backends (SQLite included)
- 🔍 Pipeline status tracking and monitoring
- ⚡ Efficient execution with dependency management

## Installation

```bash
pip install pipeweave
```

## Quick Start

Here's a simple example that demonstrates how to create and run a pipeline:

```bash
pip install pipeweave

```

## Quick Start
Here's a simple example that demonstrates how to create and run a pipeline:

```python
from pipeweave.core import Pipeline, create_step, create_stage

# Create a pipeline
pipeline = Pipeline(name="data_transformer")

# Define processing functions
def clean_data(data):
    return [x.strip().lower() for x in data]

def filter_empty(data):
    return [x for x in data if x]

# Create steps
clean_step = create_step(
    name="clean_data",
    description="Clean the data",
    function=clean_data,
    inputs=["raw_data"],
    outputs=["cleaned_data"],
)

filter_step = create_step(
    name="filter_empty",
    description="Filter out empty strings",
    function=filter_empty,
    inputs=["cleaned_data"],
    outputs=["filtered_data"],
    dependencies={"clean_data"},
)

# Add steps to the pipeline
pipeline.add_step(clean_step)
pipeline.add_step(filter_step)

# Run the pipeline
data = [" Hello ", "World ", "", " Python "]
results = pipeline.run(data)

print(results)
```


## Core Concepts

### Steps

A Step is the basic building block of a pipeline. Each step:
- Has a unique name
- Contains a processing function
- Defines its inputs and outputs
- Can specify dependencies on other steps
- Maintains its own state (IDLE, RUNNING, COMPLETED, ERROR)


### Stages

A Stage is a collection of steps that can be executed together. Each stage:
- Has a unique name and description
- Contains multiple steps, which are individual processing units
- Defines its own state (IDLE, RUNNING, COMPLETED, ERROR)
- Can specify dependencies on other stages, ensuring that it only runs when all its dependencies have been completed

Stages allow for better organization of complex pipelines by grouping related steps together. This modular approach enhances readability and maintainability of the pipeline code.


### Pipeline

A Pipeline is a collection of steps that:
- Manages the execution order based on dependencies
- Handles data flow between steps
- Tracks overall execution state
- Can be saved and loaded using storage backends

### Storage Backends

Pipeweave supports different storage backends for persisting pipelines:
- SQLite (included)
- Custom backends can be implemented using the StorageBackend base class

## Advanced Usage

### Using Storage Backends
```python
from pipeweave.core import Pipeline, create_step
from pipeweave.storage import SQLiteStorage

# Create a pipeline
pipeline = Pipeline(name="data_transformer")

# Add steps
step = create_step(
    name="example_step",
    description="Example step",
    function=lambda x: x * 2,
    inputs=["input"],
    outputs=["output"],
)
pipeline.add_step(step)

# Initialize Storage
storage = SQLiteStorage("pipelines.db")

# Save Pipeline
storage.save_pipeline(pipeline)

# Load Pipeline
loaded_pipeline = storage.load_pipeline("data_transformer")
```

### Error Handling
```python
from pipeweave.core import Pipeline, create_step
from pipeweave.step import State

# Create pipeline with a step that will fail
def will_fail(x):
    raise ValueError("Example error")

error_step = create_step(
    name="error_step",
    description="This step will fail",
    function=will_fail,
    inputs=["data"],
    outputs=["result"],
)

pipeline = Pipeline(name="error_example")
pipeline.add_step(error_step)

try:
    results = pipeline.run(data)
except Exception as e:
    # Check state of steps
    for step in pipeline.steps.values():
        if step.state == State.ERROR:
            print(f"Step {step.name} failed: {step.error}")
```
### Stages
```python
from pipeweave.core import Pipeline, create_step, create_stage

# Create a pipeline
pipeline = Pipeline(name="data_transformer")

# Define step functions
def double_number(x):
    return x * 2

def add_one(x):
    return x + 1

# Create steps
step_double = create_step(
    name="double",
    description="Double the input",
    function=double_number,
    inputs=["number"],
    outputs=["result"],
)

step_add_one = create_step(
    name="add_one",
    description="Add one to the input",
    function=add_one,
    inputs=["result"],
    outputs=["final"],
)

# Create a stage
processing_stage = create_stage(
    name="processing_stage",
    description="Process the data",
    steps=[step_double, step_add_one],
)

# Add stage to pipeline
pipeline.add_stage(processing_stage)

# Run the pipeline
results = pipeline.run(5)
print(results)
```

## Contributing

Contributions are welcome! This is a new project, so please feel free to open issues and suggest improvements.

For major changes, please open an issue first to discuss what you would like to change.

1. Fork the repository
2. Create your feature branch (`git checkout -b feature/AmazingFeature`)
3. Commit your changes (`git commit -m 'Add some AmazingFeature'`)
4. Push to the branch (`git push origin feature/AmazingFeature`)
5. Open a Pull Request

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Project Status

Pipeweave is currently in alpha. While it's functional and tested, the API may change as we gather user feedback and add new features.

## Roadmap

- [x] Add a stages feature
- [ ] Add a more robust state machine implementation
- [ ] Add more storage backends
- [ ] Add more detailed monitoring and logging
- [ ] Add more testing and CI/CD pipeline
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/taylorgtyler/pipeweave",
    "name": "pipeweave",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.12",
    "maintainer_email": null,
    "keywords": "pipeline, data, fsm, workflow",
    "author": "Taylor Tyler",
    "author_email": "taylorgtyler@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/90/64/9b0a6bf26d2c3bb5c41292912592d11f321e452342f07404f6b5c5fca868/pipeweave-0.2.1.tar.gz",
    "platform": null,
    "description": "# Pipeweave\n\nA flexible Python data pipeline library that makes it easy to construct and run custom data pipelines using a finite state machine approach.\n\n## Project Goal\n\nI have tried some popular Python data pipeline libraries, and have found them all to be a little hard to use for custom use cases. The goal of this project is to create a pipeline library that avoids some of the common pitfalls and allows users to easily construct pipelines using custom functions and run them using a finite state machine.\n\n## Features\n\n- \ud83d\ude80 Simple, intuitive API for creating data pipelines\n- \ud83d\udd04 Built-in state management using finite state machines\n- \ud83d\udce6 Easy integration of custom functions\n- \ud83d\udcbe Multiple storage backends (SQLite included)\n- \ud83d\udd0d Pipeline status tracking and monitoring\n- \u26a1 Efficient execution with dependency management\n\n## Installation\n\n```bash\npip install pipeweave\n```\n\n## Quick Start\n\nHere's a simple example that demonstrates how to create and run a pipeline:\n\n```bash\npip install pipeweave\n\n```\n\n## Quick Start\nHere's a simple example that demonstrates how to create and run a pipeline:\n\n```python\nfrom pipeweave.core import Pipeline, create_step, create_stage\n\n# Create a pipeline\npipeline = Pipeline(name=\"data_transformer\")\n\n# Define processing functions\ndef clean_data(data):\n    return [x.strip().lower() for x in data]\n\ndef filter_empty(data):\n    return [x for x in data if x]\n\n# Create steps\nclean_step = create_step(\n    name=\"clean_data\",\n    description=\"Clean the data\",\n    function=clean_data,\n    inputs=[\"raw_data\"],\n    outputs=[\"cleaned_data\"],\n)\n\nfilter_step = create_step(\n    name=\"filter_empty\",\n    description=\"Filter out empty strings\",\n    function=filter_empty,\n    inputs=[\"cleaned_data\"],\n    outputs=[\"filtered_data\"],\n    dependencies={\"clean_data\"},\n)\n\n# Add steps to the pipeline\npipeline.add_step(clean_step)\npipeline.add_step(filter_step)\n\n# Run the pipeline\ndata = [\" Hello \", \"World \", \"\", \" Python \"]\nresults = pipeline.run(data)\n\nprint(results)\n```\n\n\n## Core Concepts\n\n### Steps\n\nA Step is the basic building block of a pipeline. Each step:\n- Has a unique name\n- Contains a processing function\n- Defines its inputs and outputs\n- Can specify dependencies on other steps\n- Maintains its own state (IDLE, RUNNING, COMPLETED, ERROR)\n\n\n### Stages\n\nA Stage is a collection of steps that can be executed together. Each stage:\n- Has a unique name and description\n- Contains multiple steps, which are individual processing units\n- Defines its own state (IDLE, RUNNING, COMPLETED, ERROR)\n- Can specify dependencies on other stages, ensuring that it only runs when all its dependencies have been completed\n\nStages allow for better organization of complex pipelines by grouping related steps together. This modular approach enhances readability and maintainability of the pipeline code.\n\n\n### Pipeline\n\nA Pipeline is a collection of steps that:\n- Manages the execution order based on dependencies\n- Handles data flow between steps\n- Tracks overall execution state\n- Can be saved and loaded using storage backends\n\n### Storage Backends\n\nPipeweave supports different storage backends for persisting pipelines:\n- SQLite (included)\n- Custom backends can be implemented using the StorageBackend base class\n\n## Advanced Usage\n\n### Using Storage Backends\n```python\nfrom pipeweave.core import Pipeline, create_step\nfrom pipeweave.storage import SQLiteStorage\n\n# Create a pipeline\npipeline = Pipeline(name=\"data_transformer\")\n\n# Add steps\nstep = create_step(\n    name=\"example_step\",\n    description=\"Example step\",\n    function=lambda x: x * 2,\n    inputs=[\"input\"],\n    outputs=[\"output\"],\n)\npipeline.add_step(step)\n\n# Initialize Storage\nstorage = SQLiteStorage(\"pipelines.db\")\n\n# Save Pipeline\nstorage.save_pipeline(pipeline)\n\n# Load Pipeline\nloaded_pipeline = storage.load_pipeline(\"data_transformer\")\n```\n\n### Error Handling\n```python\nfrom pipeweave.core import Pipeline, create_step\nfrom pipeweave.step import State\n\n# Create pipeline with a step that will fail\ndef will_fail(x):\n    raise ValueError(\"Example error\")\n\nerror_step = create_step(\n    name=\"error_step\",\n    description=\"This step will fail\",\n    function=will_fail,\n    inputs=[\"data\"],\n    outputs=[\"result\"],\n)\n\npipeline = Pipeline(name=\"error_example\")\npipeline.add_step(error_step)\n\ntry:\n    results = pipeline.run(data)\nexcept Exception as e:\n    # Check state of steps\n    for step in pipeline.steps.values():\n        if step.state == State.ERROR:\n            print(f\"Step {step.name} failed: {step.error}\")\n```\n### Stages\n```python\nfrom pipeweave.core import Pipeline, create_step, create_stage\n\n# Create a pipeline\npipeline = Pipeline(name=\"data_transformer\")\n\n# Define step functions\ndef double_number(x):\n    return x * 2\n\ndef add_one(x):\n    return x + 1\n\n# Create steps\nstep_double = create_step(\n    name=\"double\",\n    description=\"Double the input\",\n    function=double_number,\n    inputs=[\"number\"],\n    outputs=[\"result\"],\n)\n\nstep_add_one = create_step(\n    name=\"add_one\",\n    description=\"Add one to the input\",\n    function=add_one,\n    inputs=[\"result\"],\n    outputs=[\"final\"],\n)\n\n# Create a stage\nprocessing_stage = create_stage(\n    name=\"processing_stage\",\n    description=\"Process the data\",\n    steps=[step_double, step_add_one],\n)\n\n# Add stage to pipeline\npipeline.add_stage(processing_stage)\n\n# Run the pipeline\nresults = pipeline.run(5)\nprint(results)\n```\n\n## Contributing\n\nContributions are welcome! This is a new project, so please feel free to open issues and suggest improvements.\n\nFor major changes, please open an issue first to discuss what you would like to change.\n\n1. Fork the repository\n2. Create your feature branch (`git checkout -b feature/AmazingFeature`)\n3. Commit your changes (`git commit -m 'Add some AmazingFeature'`)\n4. Push to the branch (`git push origin feature/AmazingFeature`)\n5. Open a Pull Request\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## Project Status\n\nPipeweave is currently in alpha. While it's functional and tested, the API may change as we gather user feedback and add new features.\n\n## Roadmap\n\n- [x] Add a stages feature\n- [ ] Add a more robust state machine implementation\n- [ ] Add more storage backends\n- [ ] Add more detailed monitoring and logging\n- [ ] Add more testing and CI/CD pipeline",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A flexible Python data pipeline library using finite state machines for custom data processing workflows",
    "version": "0.2.1",
    "project_urls": {
        "Documentation": "https://github.com/taylorgtyler/pipeweave",
        "Homepage": "https://github.com/taylorgtyler/pipeweave",
        "Repository": "https://github.com/taylorgtyler/pipeweave"
    },
    "split_keywords": [
        "pipeline",
        " data",
        " fsm",
        " workflow"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "71bc55e03cad7f8893577b002a23f427b4f809ae920606a57c76c66174c9a04a",
                "md5": "ec780b6a4a03be587465b7dc1d37bc61",
                "sha256": "4a764ff2485148cd7683039d1b8f59c99c21d6f972e12b80fd224c02d0d3ee1c"
            },
            "downloads": -1,
            "filename": "pipeweave-0.2.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ec780b6a4a03be587465b7dc1d37bc61",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.12",
            "size": 10976,
            "upload_time": "2024-11-10T17:53:41",
            "upload_time_iso_8601": "2024-11-10T17:53:41.176718Z",
            "url": "https://files.pythonhosted.org/packages/71/bc/55e03cad7f8893577b002a23f427b4f809ae920606a57c76c66174c9a04a/pipeweave-0.2.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "90649b0a6bf26d2c3bb5c41292912592d11f321e452342f07404f6b5c5fca868",
                "md5": "3338d764fa7c0becddf59023e708eb88",
                "sha256": "2f90c286088013670404c54809fd0215d0b21ecd18e46f4ee774dec02ecbc5ed"
            },
            "downloads": -1,
            "filename": "pipeweave-0.2.1.tar.gz",
            "has_sig": false,
            "md5_digest": "3338d764fa7c0becddf59023e708eb88",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.12",
            "size": 10889,
            "upload_time": "2024-11-10T17:53:42",
            "upload_time_iso_8601": "2024-11-10T17:53:42.709603Z",
            "url": "https://files.pythonhosted.org/packages/90/64/9b0a6bf26d2c3bb5c41292912592d11f321e452342f07404f6b5c5fca868/pipeweave-0.2.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-10 17:53:42",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "taylorgtyler",
    "github_project": "pipeweave",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "pipeweave"
}
        
Elapsed time: 0.34032s