Name | datapipelab JSON |
Version |
0.3.7
JSON |
| download |
home_page | None |
Summary | A data pipeline library with connectors, sources, processors, and sinks. |
upload_time | 2025-07-17 18:52:57 |
maintainer | None |
docs_url | None |
author | None |
requires_python | None |
license | None |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Datapipelab
## Overview
`datapipelab` is a lightweight, flexible data pipeline framework designed for building and orchestrating complex data workflows. It supports a modular node-based architecture, allowing users to plug in source, processor, and sink nodes using technologies such as Apache Spark, Google BigQuery, Hive, Delta Lake, and Microsoft Teams.
## Installation
Clone the repository and install any required dependencies:
```bash
pip install -r requirements.txt
```
Or, if integrating as a module:
```bash
pip install datapipelab
```
## Usage Guide
To run a pipeline, you typically follow these steps:
1. **Define your pipeline configuration** using a Python list or a JSON config.
2. **Instantiate and execute the engine**.
Example:
```python
from datapipelab.engine import Engine
config = [
{
"type": "source",
"format": "hive",
"name": "load_customer_accounts",
"options": {
"query": "SELECT customer_id, enrollment_date FROM customer_account"
}
},
{
"type": "processor",
"format": "spark",
"name": "aggregate_active_users",
"options": {
"parents": ["load_customer_accounts"],
"query": """
SELECT
YEAR(enrollment_date) AS enrollment_year,
COUNT(*) AS active_user_count
FROM load_customer_accounts
GROUP BY enrollment_year
"""
}
},
{
"type": "sink",
"format": "hive",
"name": "store_active_user_report",
"options": {
"parents": ["aggregate_active_users"],
"table": "report.active_user_summary"
}
},
{
"type": "sink",
"format": "teams_notification",
"name": "notify_report_ready",
"options": {
"parents": ["store_active_user_report"],
"webhook_url": "{{{WEBHOOK_URL}}}",
"message": "Active user report has been updated in Hive."
}
}
]
params = {"WEBHOOK_URL": "https://outlook.office.com/webhook/..."}
engine = Engine(config, spark, params)
engine.running_travelers()
```
## Pipeline Configuration
Pipelines are defined using structured configuration objects or files that specify:
* Nodes (source, processor, sink)
* Dependencies and execution order via `parents`
* Parameters for each node, e.g., SQL queries, table names, paths
## Available Node Types
### Source Nodes
* **`spark_node`**
* Executes a Spark SQL query to read data into the pipeline.
* Example:
```json
{
"name": "node_name",
"type": "source",
"format": "spark",
"source": "spark",
"options": {
"query": "SELECT * FROM database_name.table_name"
}
}
```
* **`hive_node`**
* Reads data from a Hive table.
* Example:
```json
{
"name": "node_name",
"type": "source",
"format": "hive",
"source": "hive",
"options": {
"query": "SELECT * FROM database_name.table_name"
}
}
```
### Processor Nodes
* **`bigquery_api_node`**
* Executes a query via BigQuery API.
* Example:
```json
{
"name": "node_name",
"type": "processor",
"format": "bigquery_api",
"options": {
"credentials_path": "creadentials.json",
"return_as_spark_df": false,
"return_as_python_list": false,
"return_as_is": true,
"project_name": "project_name",
"query": "select * from `project_name.dataset_name.table_name`"
}
}
```
- *`return_as_python_list` and `return_as_is` are optional
- *`query` can be any valid BigQuery SQL query including (SELECT/DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage) statements.
* **`gcp_bucket_api_node`**
* Deletes a bucket or a directory in a GCP bucket.
* Example:
```json
{
"name": "node_name",
"type": "processor",
"format": "gcp_bucket_api",
"options": {
"credentials_path": "creadentials.json",
"project_name": "project_name",
"bucket_name": "bucket_name",
"subdirectory": "path/to/subdirectory"
}
}
```
- *`subdirectory` is optional and can be used to specify a subdirectory within the bucket.
* **`bigquery_spark_node`**
* Reads data from BigQuery using the Spark BigQuery connector.
* Example:
```json
{
"name": "node_name",
"type": "processor",
"format": "bigquery_spark",
"options": {
"parent_project": "parent_project_name",
"materialization_dataset": "materialization_dataset_name",
"query": "select * from `project_name.dataset_name.table_name`"
}
}
```
- *`query` does not support DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage statements. Only SELECT statements are supported.
* **`shell_node`**
* Executes a shell command or script.
* Example:
```json
{
"name": "node_name",
"type": "processor",
"format": "shell",
"options": {
"query": "echo 'Hello, World!'"
}
}
```
* **`custom_node`**
* Custom logic node written by user.
* Example:
```json
{
"name": "node_name",
"type": "processor",
"format": "custom",
"options": {
"module_name": "CustomModuleName"
"module_path": "path/to/custom_module",
"class_name": "CustomNodeClassName",
"optional_param": "value"
}
}
```
### Sink Nodes
* **`hive_node`**
* Writes output to a Hive table.
* Example:
```json
{
"name": "node_name",
"type": "sink",
"format": "hive",
"type": "spark",
"options": {
"parents": ["parent_node_name"],
"database": "database_name",
"table": "table_name"
}
}
```
* **`spark_node`**
* Writes output to a Hive table.
* Example:
```json
{
"name": "node_name",
"type": "sink",
"format": "spark",
"type": "spark",
"options": {
"parents": ["parent_node_name"],
"database": "database_name",
"table": "table_name"
}
}
```
* **`teams_notification_node`**
* Sends a message to a Microsoft Teams channel.
* Example:
```json
{
"type": "sink",
"format": "teams_notification",
"name": "notify_report_ready",
"options": {
"parents": ["store_active_user_report"],
"webhook_url": "{{{WEBHOOK_URL}}}",
"message": "Active user report has been updated in Hive."
}
}
```
## Extending the Framework
To create a custom node:
1. Subclass `TNode` from `app/node/tnode.py`
2. Implement the required methods (`run`, `validate`, etc.)
3. Register your node in the pipeline factory or configuration
## Logging and Monitoring
Logging is centralized in `logger.py`. Logs are categorized by node and execution stage to assist with debugging and auditing.
## Troubleshooting
---
For more advanced examples or integration guides, refer to the `examples/` folder or reach out to the maintainers.
Raw data
{
"_id": null,
"home_page": null,
"name": "datapipelab",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": null,
"author": null,
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/0d/ab/1732726f161c15b7bfd10db4be0b864089f0e957e88f182e7f1815210b6a/datapipelab-0.3.7.tar.gz",
"platform": null,
"description": "# Datapipelab\n\n## Overview\n`datapipelab` is a lightweight, flexible data pipeline framework designed for building and orchestrating complex data workflows. It supports a modular node-based architecture, allowing users to plug in source, processor, and sink nodes using technologies such as Apache Spark, Google BigQuery, Hive, Delta Lake, and Microsoft Teams.\n\n\n## Installation\nClone the repository and install any required dependencies:\n```bash\npip install -r requirements.txt\n```\n\nOr, if integrating as a module:\n```bash\npip install datapipelab\n```\n\n\n## Usage Guide\n\nTo run a pipeline, you typically follow these steps:\n\n1. **Define your pipeline configuration** using a Python list or a JSON config.\n2. **Instantiate and execute the engine**.\n\nExample:\n\n```python\nfrom datapipelab.engine import Engine\n\nconfig = [\n {\n \"type\": \"source\",\n \"format\": \"hive\",\n \"name\": \"load_customer_accounts\",\n \"options\": {\n \"query\": \"SELECT customer_id, enrollment_date FROM customer_account\"\n }\n },\n {\n \"type\": \"processor\",\n \"format\": \"spark\",\n \"name\": \"aggregate_active_users\",\n \"options\": {\n \"parents\": [\"load_customer_accounts\"],\n \"query\": \"\"\"\n SELECT \n YEAR(enrollment_date) AS enrollment_year, \n COUNT(*) AS active_user_count\n FROM load_customer_accounts\n GROUP BY enrollment_year\n \"\"\"\n }\n },\n {\n \"type\": \"sink\",\n \"format\": \"hive\",\n \"name\": \"store_active_user_report\",\n \"options\": {\n \"parents\": [\"aggregate_active_users\"],\n \"table\": \"report.active_user_summary\"\n }\n },\n {\n \"type\": \"sink\",\n \"format\": \"teams_notification\",\n \"name\": \"notify_report_ready\",\n \"options\": {\n \"parents\": [\"store_active_user_report\"],\n \"webhook_url\": \"{{{WEBHOOK_URL}}}\",\n \"message\": \"Active user report has been updated in Hive.\"\n }\n }\n]\n\nparams = {\"WEBHOOK_URL\": \"https://outlook.office.com/webhook/...\"}\nengine = Engine(config, spark, params)\nengine.running_travelers()\n```\n\n\n## Pipeline Configuration\n\nPipelines are defined using structured configuration objects or files that specify:\n\n* Nodes (source, processor, sink)\n* Dependencies and execution order via `parents`\n* Parameters for each node, e.g., SQL queries, table names, paths\n\n## Available Node Types\n\n### Source Nodes\n\n* **`spark_node`**\n * Executes a Spark SQL query to read data into the pipeline.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"source\",\n \"format\": \"spark\",\n \"source\": \"spark\",\n \"options\": {\n \"query\": \"SELECT * FROM database_name.table_name\"\n }\n }\n ```\n \n\n* **`hive_node`**\n * Reads data from a Hive table.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"source\",\n \"format\": \"hive\",\n \"source\": \"hive\",\n \"options\": {\n \"query\": \"SELECT * FROM database_name.table_name\"\n }\n }\n ```\n\n### Processor Nodes\n\n* **`bigquery_api_node`**\n * Executes a query via BigQuery API.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"processor\",\n \"format\": \"bigquery_api\",\n \"options\": {\n \"credentials_path\": \"creadentials.json\",\n \"return_as_spark_df\": false,\n \"return_as_python_list\": false,\n \"return_as_is\": true,\n \"project_name\": \"project_name\",\n \"query\": \"select * from `project_name.dataset_name.table_name`\"\n }\n }\n ```\n - *`return_as_python_list` and `return_as_is` are optional\n - *`query` can be any valid BigQuery SQL query including (SELECT/DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage) statements.\n\n\n* **`gcp_bucket_api_node`**\n * Deletes a bucket or a directory in a GCP bucket.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"processor\",\n \"format\": \"gcp_bucket_api\",\n \"options\": {\n \"credentials_path\": \"creadentials.json\",\n \"project_name\": \"project_name\",\n \"bucket_name\": \"bucket_name\",\n \"subdirectory\": \"path/to/subdirectory\"\n }\n }\n ```\n - *`subdirectory` is optional and can be used to specify a subdirectory within the bucket.\n\n\n* **`bigquery_spark_node`**\n * Reads data from BigQuery using the Spark BigQuery connector.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"processor\",\n \"format\": \"bigquery_spark\",\n \"options\": {\n \"parent_project\": \"parent_project_name\",\n \"materialization_dataset\": \"materialization_dataset_name\",\n \"query\": \"select * from `project_name.dataset_name.table_name`\"\n }\n }\n ```\n - *`query` does not support DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage statements. Only SELECT statements are supported.\n\n\n* **`shell_node`**\n * Executes a shell command or script.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"processor\",\n \"format\": \"shell\",\n \"options\": {\n \"query\": \"echo 'Hello, World!'\"\n }\n }\n ```\n \n\n* **`custom_node`**\n * Custom logic node written by user.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"processor\",\n \"format\": \"custom\",\n \"options\": {\n \"module_name\": \"CustomModuleName\"\n \"module_path\": \"path/to/custom_module\",\n \"class_name\": \"CustomNodeClassName\",\n \"optional_param\": \"value\"\n }\n }\n ```\n\n\n### Sink Nodes\n\n* **`hive_node`**\n * Writes output to a Hive table.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"sink\",\n \"format\": \"hive\",\n \"type\": \"spark\",\n \"options\": {\n \"parents\": [\"parent_node_name\"],\n \"database\": \"database_name\",\n \"table\": \"table_name\"\n }\n }\n ```\n \n\n* **`spark_node`**\n * Writes output to a Hive table.\n * Example:\n\n ```json\n {\n \"name\": \"node_name\",\n \"type\": \"sink\",\n \"format\": \"spark\",\n \"type\": \"spark\",\n \"options\": {\n \"parents\": [\"parent_node_name\"],\n \"database\": \"database_name\",\n \"table\": \"table_name\"\n }\n }\n ```\n \n\n* **`teams_notification_node`**\n\n * Sends a message to a Microsoft Teams channel.\n * Example:\n\n ```json\n {\n \"type\": \"sink\",\n \"format\": \"teams_notification\",\n \"name\": \"notify_report_ready\",\n \"options\": {\n \"parents\": [\"store_active_user_report\"],\n \"webhook_url\": \"{{{WEBHOOK_URL}}}\",\n \"message\": \"Active user report has been updated in Hive.\"\n }\n }\n ```\n\n\n## Extending the Framework\n\nTo create a custom node:\n\n1. Subclass `TNode` from `app/node/tnode.py`\n2. Implement the required methods (`run`, `validate`, etc.)\n3. Register your node in the pipeline factory or configuration\n\n## Logging and Monitoring\n\nLogging is centralized in `logger.py`. Logs are categorized by node and execution stage to assist with debugging and auditing.\n\n## Troubleshooting\n\n---\n\nFor more advanced examples or integration guides, refer to the `examples/` folder or reach out to the maintainers.\n",
"bugtrack_url": null,
"license": null,
"summary": "A data pipeline library with connectors, sources, processors, and sinks.",
"version": "0.3.7",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "6a69198ca38256a24f176ff3c7f592a2edd16ceacf80f860f134397254b26918",
"md5": "c15c625f5db1c1c24b3c322a8781d7f1",
"sha256": "f3ff6e49bca1dd935c550c7ea37024644aac3fbd25da025f651371fea1bc5811"
},
"downloads": -1,
"filename": "datapipelab-0.3.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c15c625f5db1c1c24b3c322a8781d7f1",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 20422,
"upload_time": "2025-07-17T18:52:56",
"upload_time_iso_8601": "2025-07-17T18:52:56.248788Z",
"url": "https://files.pythonhosted.org/packages/6a/69/198ca38256a24f176ff3c7f592a2edd16ceacf80f860f134397254b26918/datapipelab-0.3.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "0dab1732726f161c15b7bfd10db4be0b864089f0e957e88f182e7f1815210b6a",
"md5": "f0776a705dd242b8ef58cff122653234",
"sha256": "312841dc2d3d447a8ecd22ea6a03607cba0cdad061f802fb3ecd3d71428be416"
},
"downloads": -1,
"filename": "datapipelab-0.3.7.tar.gz",
"has_sig": false,
"md5_digest": "f0776a705dd242b8ef58cff122653234",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 13834,
"upload_time": "2025-07-17T18:52:57",
"upload_time_iso_8601": "2025-07-17T18:52:57.568875Z",
"url": "https://files.pythonhosted.org/packages/0d/ab/1732726f161c15b7bfd10db4be0b864089f0e957e88f182e7f1815210b6a/datapipelab-0.3.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-17 18:52:57",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "datapipelab"
}