datapipelab


Namedatapipelab JSON
Version 0.3.7 PyPI version JSON
download
home_pageNone
SummaryA data pipeline library with connectors, sources, processors, and sinks.
upload_time2025-07-17 18:52:57
maintainerNone
docs_urlNone
authorNone
requires_pythonNone
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Datapipelab

## Overview
`datapipelab` is a lightweight, flexible data pipeline framework designed for building and orchestrating complex data workflows. It supports a modular node-based architecture, allowing users to plug in source, processor, and sink nodes using technologies such as Apache Spark, Google BigQuery, Hive, Delta Lake, and Microsoft Teams.


## Installation
Clone the repository and install any required dependencies:
```bash
pip install -r requirements.txt
```

Or, if integrating as a module:
```bash
pip install datapipelab
```


## Usage Guide

To run a pipeline, you typically follow these steps:

1. **Define your pipeline configuration** using a Python list or a JSON config.
2. **Instantiate and execute the engine**.

Example:

```python
from datapipelab.engine import Engine

config = [
    {
        "type": "source",
        "format": "hive",
        "name": "load_customer_accounts",
        "options": {
            "query": "SELECT customer_id, enrollment_date FROM customer_account"
        }
    },
    {
        "type": "processor",
        "format": "spark",
        "name": "aggregate_active_users",
        "options": {
            "parents": ["load_customer_accounts"],
            "query": """
                SELECT 
                    YEAR(enrollment_date) AS enrollment_year, 
                    COUNT(*) AS active_user_count
                FROM load_customer_accounts
                GROUP BY enrollment_year
            """
        }
    },
    {
        "type": "sink",
        "format": "hive",
        "name": "store_active_user_report",
        "options": {
            "parents": ["aggregate_active_users"],
            "table": "report.active_user_summary"
        }
    },
    {
        "type": "sink",
        "format": "teams_notification",
        "name": "notify_report_ready",
        "options": {
            "parents": ["store_active_user_report"],
            "webhook_url": "{{{WEBHOOK_URL}}}",
            "message": "Active user report has been updated in Hive."
        }
    }
]

params = {"WEBHOOK_URL": "https://outlook.office.com/webhook/..."}
engine = Engine(config, spark, params)
engine.running_travelers()
```


## Pipeline Configuration

Pipelines are defined using structured configuration objects or files that specify:

* Nodes (source, processor, sink)
* Dependencies and execution order via `parents`
* Parameters for each node, e.g., SQL queries, table names, paths

## Available Node Types

### Source Nodes

* **`spark_node`**
  * Executes a Spark SQL query to read data into the pipeline.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "source",
      "format": "spark",
      "source": "spark",
      "options": {
        "query": "SELECT * FROM database_name.table_name"
      }
    }
    ```
    

* **`hive_node`**
  * Reads data from a Hive table.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "source",
      "format": "hive",
      "source": "hive",
      "options": {
        "query": "SELECT * FROM database_name.table_name"
      }
    }
    ```

### Processor Nodes

* **`bigquery_api_node`**
  * Executes a query via BigQuery API.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "bigquery_api",
      "options": {
            "credentials_path": "creadentials.json",
            "return_as_spark_df": false,
            "return_as_python_list": false,
            "return_as_is": true,
            "project_name": "project_name",
            "query": "select * from `project_name.dataset_name.table_name`"
      }
    }
    ```
    - *`return_as_python_list` and `return_as_is` are optional
    - *`query` can be any valid BigQuery SQL query including (SELECT/DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage) statements.


* **`gcp_bucket_api_node`**
  * Deletes a bucket or a directory in a GCP bucket.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "gcp_bucket_api",
      "options": {
            "credentials_path": "creadentials.json",
            "project_name": "project_name",
            "bucket_name": "bucket_name",
            "subdirectory": "path/to/subdirectory"
      }
    }
    ```
    - *`subdirectory` is optional and can be used to specify a subdirectory within the bucket.


* **`bigquery_spark_node`**
  * Reads data from BigQuery using the Spark BigQuery connector.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "bigquery_spark",
      "options": {
            "parent_project": "parent_project_name",
            "materialization_dataset": "materialization_dataset_name",
            "query": "select * from `project_name.dataset_name.table_name`"
      }
    }
    ```
    - *`query` does not support DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage statements. Only SELECT statements are supported.


* **`shell_node`**
  * Executes a shell command or script.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "shell",
      "options": {
        "query": "echo 'Hello, World!'"
      }
    }
    ```
    

* **`custom_node`**
  * Custom logic node written by user.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "custom",
      "options": {
        "module_name": "CustomModuleName"
        "module_path": "path/to/custom_module",
        "class_name": "CustomNodeClassName",
        "optional_param": "value"
      }
    }
    ```


### Sink Nodes

* **`hive_node`**
  * Writes output to a Hive table.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "sink",
      "format": "hive",
      "type": "spark",
      "options": {
        "parents": ["parent_node_name"],
        "database": "database_name",
        "table": "table_name"
      }
    }
    ```
    

* **`spark_node`**
  * Writes output to a Hive table.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "sink",
      "format": "spark",
      "type": "spark",
      "options": {
        "parents": ["parent_node_name"],
        "database": "database_name",
        "table": "table_name"
      }
    }
    ```
    

* **`teams_notification_node`**

  * Sends a message to a Microsoft Teams channel.
  * Example:

    ```json
    {
      "type": "sink",
      "format": "teams_notification",
      "name": "notify_report_ready",
      "options": {
         "parents": ["store_active_user_report"],
         "webhook_url": "{{{WEBHOOK_URL}}}",
         "message": "Active user report has been updated in Hive."
      }
    }
    ```


## Extending the Framework

To create a custom node:

1. Subclass `TNode` from `app/node/tnode.py`
2. Implement the required methods (`run`, `validate`, etc.)
3. Register your node in the pipeline factory or configuration

## Logging and Monitoring

Logging is centralized in `logger.py`. Logs are categorized by node and execution stage to assist with debugging and auditing.

## Troubleshooting

---

For more advanced examples or integration guides, refer to the `examples/` folder or reach out to the maintainers.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "datapipelab",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": null,
    "author": null,
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/0d/ab/1732726f161c15b7bfd10db4be0b864089f0e957e88f182e7f1815210b6a/datapipelab-0.3.7.tar.gz",
    "platform": null,
    "description": "# Datapipelab\n\n## Overview\n`datapipelab` is a lightweight, flexible data pipeline framework designed for building and orchestrating complex data workflows. It supports a modular node-based architecture, allowing users to plug in source, processor, and sink nodes using technologies such as Apache Spark, Google BigQuery, Hive, Delta Lake, and Microsoft Teams.\n\n\n## Installation\nClone the repository and install any required dependencies:\n```bash\npip install -r requirements.txt\n```\n\nOr, if integrating as a module:\n```bash\npip install datapipelab\n```\n\n\n## Usage Guide\n\nTo run a pipeline, you typically follow these steps:\n\n1. **Define your pipeline configuration** using a Python list or a JSON config.\n2. **Instantiate and execute the engine**.\n\nExample:\n\n```python\nfrom datapipelab.engine import Engine\n\nconfig = [\n    {\n        \"type\": \"source\",\n        \"format\": \"hive\",\n        \"name\": \"load_customer_accounts\",\n        \"options\": {\n            \"query\": \"SELECT customer_id, enrollment_date FROM customer_account\"\n        }\n    },\n    {\n        \"type\": \"processor\",\n        \"format\": \"spark\",\n        \"name\": \"aggregate_active_users\",\n        \"options\": {\n            \"parents\": [\"load_customer_accounts\"],\n            \"query\": \"\"\"\n                SELECT \n                    YEAR(enrollment_date) AS enrollment_year, \n                    COUNT(*) AS active_user_count\n                FROM load_customer_accounts\n                GROUP BY enrollment_year\n            \"\"\"\n        }\n    },\n    {\n        \"type\": \"sink\",\n        \"format\": \"hive\",\n        \"name\": \"store_active_user_report\",\n        \"options\": {\n            \"parents\": [\"aggregate_active_users\"],\n            \"table\": \"report.active_user_summary\"\n        }\n    },\n    {\n        \"type\": \"sink\",\n        \"format\": \"teams_notification\",\n        \"name\": \"notify_report_ready\",\n        \"options\": {\n            \"parents\": [\"store_active_user_report\"],\n            \"webhook_url\": \"{{{WEBHOOK_URL}}}\",\n            \"message\": \"Active user report has been updated in Hive.\"\n        }\n    }\n]\n\nparams = {\"WEBHOOK_URL\": \"https://outlook.office.com/webhook/...\"}\nengine = Engine(config, spark, params)\nengine.running_travelers()\n```\n\n\n## Pipeline Configuration\n\nPipelines are defined using structured configuration objects or files that specify:\n\n* Nodes (source, processor, sink)\n* Dependencies and execution order via `parents`\n* Parameters for each node, e.g., SQL queries, table names, paths\n\n## Available Node Types\n\n### Source Nodes\n\n* **`spark_node`**\n  * Executes a Spark SQL query to read data into the pipeline.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"source\",\n      \"format\": \"spark\",\n      \"source\": \"spark\",\n      \"options\": {\n        \"query\": \"SELECT * FROM database_name.table_name\"\n      }\n    }\n    ```\n    \n\n* **`hive_node`**\n  * Reads data from a Hive table.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"source\",\n      \"format\": \"hive\",\n      \"source\": \"hive\",\n      \"options\": {\n        \"query\": \"SELECT * FROM database_name.table_name\"\n      }\n    }\n    ```\n\n### Processor Nodes\n\n* **`bigquery_api_node`**\n  * Executes a query via BigQuery API.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"processor\",\n      \"format\": \"bigquery_api\",\n      \"options\": {\n            \"credentials_path\": \"creadentials.json\",\n            \"return_as_spark_df\": false,\n            \"return_as_python_list\": false,\n            \"return_as_is\": true,\n            \"project_name\": \"project_name\",\n            \"query\": \"select * from `project_name.dataset_name.table_name`\"\n      }\n    }\n    ```\n    - *`return_as_python_list` and `return_as_is` are optional\n    - *`query` can be any valid BigQuery SQL query including (SELECT/DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage) statements.\n\n\n* **`gcp_bucket_api_node`**\n  * Deletes a bucket or a directory in a GCP bucket.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"processor\",\n      \"format\": \"gcp_bucket_api\",\n      \"options\": {\n            \"credentials_path\": \"creadentials.json\",\n            \"project_name\": \"project_name\",\n            \"bucket_name\": \"bucket_name\",\n            \"subdirectory\": \"path/to/subdirectory\"\n      }\n    }\n    ```\n    - *`subdirectory` is optional and can be used to specify a subdirectory within the bucket.\n\n\n* **`bigquery_spark_node`**\n  * Reads data from BigQuery using the Spark BigQuery connector.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"processor\",\n      \"format\": \"bigquery_spark\",\n      \"options\": {\n            \"parent_project\": \"parent_project_name\",\n            \"materialization_dataset\": \"materialization_dataset_name\",\n            \"query\": \"select * from `project_name.dataset_name.table_name`\"\n      }\n    }\n    ```\n    - *`query` does not support DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage statements. Only SELECT statements are supported.\n\n\n* **`shell_node`**\n  * Executes a shell command or script.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"processor\",\n      \"format\": \"shell\",\n      \"options\": {\n        \"query\": \"echo 'Hello, World!'\"\n      }\n    }\n    ```\n    \n\n* **`custom_node`**\n  * Custom logic node written by user.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"processor\",\n      \"format\": \"custom\",\n      \"options\": {\n        \"module_name\": \"CustomModuleName\"\n        \"module_path\": \"path/to/custom_module\",\n        \"class_name\": \"CustomNodeClassName\",\n        \"optional_param\": \"value\"\n      }\n    }\n    ```\n\n\n### Sink Nodes\n\n* **`hive_node`**\n  * Writes output to a Hive table.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"sink\",\n      \"format\": \"hive\",\n      \"type\": \"spark\",\n      \"options\": {\n        \"parents\": [\"parent_node_name\"],\n        \"database\": \"database_name\",\n        \"table\": \"table_name\"\n      }\n    }\n    ```\n    \n\n* **`spark_node`**\n  * Writes output to a Hive table.\n  * Example:\n\n    ```json\n    {\n      \"name\": \"node_name\",\n      \"type\": \"sink\",\n      \"format\": \"spark\",\n      \"type\": \"spark\",\n      \"options\": {\n        \"parents\": [\"parent_node_name\"],\n        \"database\": \"database_name\",\n        \"table\": \"table_name\"\n      }\n    }\n    ```\n    \n\n* **`teams_notification_node`**\n\n  * Sends a message to a Microsoft Teams channel.\n  * Example:\n\n    ```json\n    {\n      \"type\": \"sink\",\n      \"format\": \"teams_notification\",\n      \"name\": \"notify_report_ready\",\n      \"options\": {\n         \"parents\": [\"store_active_user_report\"],\n         \"webhook_url\": \"{{{WEBHOOK_URL}}}\",\n         \"message\": \"Active user report has been updated in Hive.\"\n      }\n    }\n    ```\n\n\n## Extending the Framework\n\nTo create a custom node:\n\n1. Subclass `TNode` from `app/node/tnode.py`\n2. Implement the required methods (`run`, `validate`, etc.)\n3. Register your node in the pipeline factory or configuration\n\n## Logging and Monitoring\n\nLogging is centralized in `logger.py`. Logs are categorized by node and execution stage to assist with debugging and auditing.\n\n## Troubleshooting\n\n---\n\nFor more advanced examples or integration guides, refer to the `examples/` folder or reach out to the maintainers.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A data pipeline library with connectors, sources, processors, and sinks.",
    "version": "0.3.7",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "6a69198ca38256a24f176ff3c7f592a2edd16ceacf80f860f134397254b26918",
                "md5": "c15c625f5db1c1c24b3c322a8781d7f1",
                "sha256": "f3ff6e49bca1dd935c550c7ea37024644aac3fbd25da025f651371fea1bc5811"
            },
            "downloads": -1,
            "filename": "datapipelab-0.3.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c15c625f5db1c1c24b3c322a8781d7f1",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 20422,
            "upload_time": "2025-07-17T18:52:56",
            "upload_time_iso_8601": "2025-07-17T18:52:56.248788Z",
            "url": "https://files.pythonhosted.org/packages/6a/69/198ca38256a24f176ff3c7f592a2edd16ceacf80f860f134397254b26918/datapipelab-0.3.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "0dab1732726f161c15b7bfd10db4be0b864089f0e957e88f182e7f1815210b6a",
                "md5": "f0776a705dd242b8ef58cff122653234",
                "sha256": "312841dc2d3d447a8ecd22ea6a03607cba0cdad061f802fb3ecd3d71428be416"
            },
            "downloads": -1,
            "filename": "datapipelab-0.3.7.tar.gz",
            "has_sig": false,
            "md5_digest": "f0776a705dd242b8ef58cff122653234",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 13834,
            "upload_time": "2025-07-17T18:52:57",
            "upload_time_iso_8601": "2025-07-17T18:52:57.568875Z",
            "url": "https://files.pythonhosted.org/packages/0d/ab/1732726f161c15b7bfd10db4be0b864089f0e957e88f182e7f1815210b6a/datapipelab-0.3.7.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-17 18:52:57",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "datapipelab"
}
        
Elapsed time: 0.51585s