Name | ddeutil-workflow JSON |
Version |
0.0.4
JSON |
| download |
home_page | None |
Summary | Data Developer & Engineer Workflow Utility Objects |
upload_time | 2024-06-11 05:13:11 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.9.13 |
license | MIT |
keywords |
data
workflow
utility
pipeline
|
VCS |
![](/static/img/github-24-000000.png) |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Data Utility: _Workflow_
[![test](https://github.com/ddeutils/ddeutil-workflow/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/ddeutils/ddeutil-workflow/actions/workflows/tests.yml)
[![python support version](https://img.shields.io/pypi/pyversions/ddeutil-workflow)](https://pypi.org/project/ddeutil-workflow/)
[![size](https://img.shields.io/github/languages/code-size/ddeutils/ddeutil-workflow)](https://github.com/ddeutils/ddeutil-workflow)
**Table of Contents**:
- [Installation](#installation)
- [Getting Started](#getting-started)
- [Connection](#connection)
- [Dataset](#dataset)
- [Schedule](#schedule)
- [Examples](#examples)
- [Python](#python)
- [Tasks (EL)](#tasks-extract--load)
- [Hooks (T)](#hooks-transform)
This **Utility Workflow** objects was created for easy to make a simple metadata
driven pipeline that able to **ETL, T, EL, or ELT** by `.yaml` file.
I think we should not create the multiple pipeline per use-case if we able to
write some dynamic pipeline that just change the input parameters per use-case
instead. This way we can handle a lot of pipelines in our orgs with metadata only.
It called **Metadata Driven**.
Next, we should get some monitoring tools for manage logging that return from
pipeline running. Because it not show us what is a use-case that running data
pipeline.
> [!NOTE]
> _Disclaimer_: I inspire the dynamic statement from the GitHub Action `.yml` files
> and all of config file from several data orchestration framework tools from my
> experience on Data Engineer.
## Installation
```shell
pip install ddeutil-workflow
```
This project need `ddeutil-io`, `ddeutil-model` extension namespace packages.
## Getting Started
The first step, you should start create the connections and datasets for In and
Out of you data that want to use in pipeline of workflow. Some of this component
is similar component of the **Airflow** because I like it concepts.
The main feature of this project is the `Pipeline` object that can call any
registries function. The pipeline can handle everything that you want to do, it
will passing parameters and catching the output for re-use it to next step.
> [!IMPORTANT]
> In the future of this project, I will drop the connection and dataset to
> dynamic registries instead of main features because it have a lot of maintain
> vendor codes and deps. (I do not have time to handle this features)
### Connection
The connection for worker able to do any thing.
```yaml
conn_postgres_data:
type: conn.Postgres
url: 'postgres//username:${ENV_PASS}@hostname:port/database?echo=True&time_out=10'
```
```python
from ddeutil.workflow.conn import Conn
conn = Conn.from_loader(name='conn_postgres_data', externals={})
assert conn.ping()
```
### Dataset
The dataset is define any objects on the connection. This feature was implemented
on `/vendors` because it has a lot of tools that can interact with any data systems
in the data tool stacks.
```yaml
ds_postgres_customer_tbl:
type: dataset.PostgresTbl
conn: 'conn_postgres_data'
features:
id: serial primary key
name: varchar( 100 ) not null
```
```python
from ddeutil.workflow.vendors.pg import PostgresTbl
dataset = PostgresTbl.from_loader(name='ds_postgres_customer_tbl', externals={})
assert dataset.exists()
```
### Schedule
```yaml
schd_for_node:
type: schedule.Schedule
cron: "*/5 * * * *"
```
```python
from ddeutil.workflow.schedule import Schedule
scdl = Schedule.from_loader(name='schd_for_node', externals={})
assert '*/5 * * * *' == str(scdl.cronjob)
cron_iterate = scdl.generate('2022-01-01 00:00:00')
assert '2022-01-01 00:05:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:10:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:15:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:20:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
assert '2022-01-01 00:25:00' f"{cron_iterate.next:%Y-%m-%d %H:%M:%S}"
```
## Examples
This is examples that use workflow file for running common Data Engineering
use-case.
### Python
The state of doing lists that worker should to do. It be collection of the stage.
```yaml
run_py_local:
type: ddeutil.workflow.pipe.Pipeline
params:
author-run:
type: str
run-date:
type: datetime
jobs:
first-job:
stages:
- name: Printing Information
id: define-func
run: |
x = '${{ params.author-run }}'
print(f'Hello {x}')
def echo(name: str):
print(f'Hello {name}')
- name: Run Sequence and use var from Above
vars:
x: ${{ params.author-run }}
run: |
print(f'Receive x from above with {x}')
# Change x value
x: int = 1
- name: Call Function
vars:
echo: ${{ stages.define-func.outputs.echo }}
run: |
echo('Caller')
```
```python
from ddeutil.workflow.pipeline import Pipeline
pipe = Pipeline.from_loader(name='run_py_local', externals={})
pipe.execute(params={'author-run': 'Local Workflow', 'run-date': '2024-01-01'})
```
```shell
> Hello Local Workflow
> Receive x from above with Local Workflow
> Hello Caller
```
### Tasks (Extract & Load)
```yaml
pipe_el_pg_to_lake:
type: ddeutil.workflow.pipe.Pipeline
params:
run-date:
type: datetime
author-email:
type: str
jobs:
extract-load:
stages:
- name: "Extract Load from Postgres to Lake"
id: extract-load
task: tasks/postgres-to-delta@polars
with:
source:
conn: conn_postgres_url
query: |
select * from ${{ params.name }}
where update_date = '${{ params.datetime }}'
sink:
conn: conn_az_lake
endpoint: "/${{ params.name }}"
```
### Tasks (Transform)
```yaml
pipe_hook_mssql_proc:
type: ddeutil.workflow.pipe.Pipeline
params:
run_date: datetime
sp_name: str
source_name: str
target_name: str
jobs:
transform:
stages:
- name: "Transform Data in MS SQL Server"
id: transform
task: tasks/mssql-proc@odbc
with:
exec: ${{ params.sp_name }}
params:
run_mode: "T"
run_date: ${{ params.run_date }}
source: ${{ params.source_name }}
target: ${{ params.target_name }}
```
## License
This project was licensed under the terms of the [MIT license](LICENSE).
Raw data
{
"_id": null,
"home_page": null,
"name": "ddeutil-workflow",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9.13",
"maintainer_email": null,
"keywords": "data, workflow, utility, pipeline",
"author": null,
"author_email": "ddeutils <korawich.anu@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/5b/2f/804f1198c45e2c73d592d4e60408adc45b97d497638ab95467addb44b36d/ddeutil_workflow-0.0.4.tar.gz",
"platform": null,
"description": "# Data Utility: _Workflow_\n\n[![test](https://github.com/ddeutils/ddeutil-workflow/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/ddeutils/ddeutil-workflow/actions/workflows/tests.yml)\n[![python support version](https://img.shields.io/pypi/pyversions/ddeutil-workflow)](https://pypi.org/project/ddeutil-workflow/)\n[![size](https://img.shields.io/github/languages/code-size/ddeutils/ddeutil-workflow)](https://github.com/ddeutils/ddeutil-workflow)\n\n**Table of Contents**:\n\n- [Installation](#installation)\n- [Getting Started](#getting-started)\n - [Connection](#connection)\n - [Dataset](#dataset)\n - [Schedule](#schedule)\n- [Examples](#examples)\n - [Python](#python)\n - [Tasks (EL)](#tasks-extract--load)\n - [Hooks (T)](#hooks-transform)\n\nThis **Utility Workflow** objects was created for easy to make a simple metadata\ndriven pipeline that able to **ETL, T, EL, or ELT** by `.yaml` file.\n\nI think we should not create the multiple pipeline per use-case if we able to\nwrite some dynamic pipeline that just change the input parameters per use-case\ninstead. This way we can handle a lot of pipelines in our orgs with metadata only.\nIt called **Metadata Driven**.\n\nNext, we should get some monitoring tools for manage logging that return from\npipeline running. Because it not show us what is a use-case that running data\npipeline.\n\n> [!NOTE]\n> _Disclaimer_: I inspire the dynamic statement from the GitHub Action `.yml` files\n> and all of config file from several data orchestration framework tools from my\n> experience on Data Engineer.\n\n## Installation\n\n```shell\npip install ddeutil-workflow\n```\n\nThis project need `ddeutil-io`, `ddeutil-model` extension namespace packages.\n\n## Getting Started\n\nThe first step, you should start create the connections and datasets for In and\nOut of you data that want to use in pipeline of workflow. Some of this component\nis similar component of the **Airflow** because I like it concepts.\n\nThe main feature of this project is the `Pipeline` object that can call any\nregistries function. The pipeline can handle everything that you want to do, it\nwill passing parameters and catching the output for re-use it to next step.\n\n> [!IMPORTANT]\n> In the future of this project, I will drop the connection and dataset to\n> dynamic registries instead of main features because it have a lot of maintain\n> vendor codes and deps. (I do not have time to handle this features)\n\n### Connection\n\nThe connection for worker able to do any thing.\n\n```yaml\nconn_postgres_data:\n type: conn.Postgres\n url: 'postgres//username:${ENV_PASS}@hostname:port/database?echo=True&time_out=10'\n```\n\n```python\nfrom ddeutil.workflow.conn import Conn\n\nconn = Conn.from_loader(name='conn_postgres_data', externals={})\nassert conn.ping()\n```\n\n### Dataset\n\nThe dataset is define any objects on the connection. This feature was implemented\non `/vendors` because it has a lot of tools that can interact with any data systems\nin the data tool stacks.\n\n```yaml\nds_postgres_customer_tbl:\n type: dataset.PostgresTbl\n conn: 'conn_postgres_data'\n features:\n id: serial primary key\n name: varchar( 100 ) not null\n```\n\n```python\nfrom ddeutil.workflow.vendors.pg import PostgresTbl\n\ndataset = PostgresTbl.from_loader(name='ds_postgres_customer_tbl', externals={})\nassert dataset.exists()\n```\n\n### Schedule\n\n```yaml\nschd_for_node:\n type: schedule.Schedule\n cron: \"*/5 * * * *\"\n```\n\n```python\nfrom ddeutil.workflow.schedule import Schedule\n\nscdl = Schedule.from_loader(name='schd_for_node', externals={})\nassert '*/5 * * * *' == str(scdl.cronjob)\n\ncron_iterate = scdl.generate('2022-01-01 00:00:00')\nassert '2022-01-01 00:05:00' f\"{cron_iterate.next:%Y-%m-%d %H:%M:%S}\"\nassert '2022-01-01 00:10:00' f\"{cron_iterate.next:%Y-%m-%d %H:%M:%S}\"\nassert '2022-01-01 00:15:00' f\"{cron_iterate.next:%Y-%m-%d %H:%M:%S}\"\nassert '2022-01-01 00:20:00' f\"{cron_iterate.next:%Y-%m-%d %H:%M:%S}\"\nassert '2022-01-01 00:25:00' f\"{cron_iterate.next:%Y-%m-%d %H:%M:%S}\"\n```\n\n## Examples\n\nThis is examples that use workflow file for running common Data Engineering\nuse-case.\n\n### Python\n\nThe state of doing lists that worker should to do. It be collection of the stage.\n\n```yaml\nrun_py_local:\n type: ddeutil.workflow.pipe.Pipeline\n params:\n author-run:\n type: str\n run-date:\n type: datetime\n jobs:\n first-job:\n stages:\n - name: Printing Information\n id: define-func\n run: |\n x = '${{ params.author-run }}'\n print(f'Hello {x}')\n\n def echo(name: str):\n print(f'Hello {name}')\n\n - name: Run Sequence and use var from Above\n vars:\n x: ${{ params.author-run }}\n run: |\n print(f'Receive x from above with {x}')\n # Change x value\n x: int = 1\n\n - name: Call Function\n vars:\n echo: ${{ stages.define-func.outputs.echo }}\n run: |\n echo('Caller')\n```\n\n```python\nfrom ddeutil.workflow.pipeline import Pipeline\n\npipe = Pipeline.from_loader(name='run_py_local', externals={})\npipe.execute(params={'author-run': 'Local Workflow', 'run-date': '2024-01-01'})\n```\n\n```shell\n> Hello Local Workflow\n> Receive x from above with Local Workflow\n> Hello Caller\n```\n\n### Tasks (Extract & Load)\n\n```yaml\npipe_el_pg_to_lake:\n type: ddeutil.workflow.pipe.Pipeline\n params:\n run-date:\n type: datetime\n author-email:\n type: str\n jobs:\n extract-load:\n stages:\n - name: \"Extract Load from Postgres to Lake\"\n id: extract-load\n task: tasks/postgres-to-delta@polars\n with:\n source:\n conn: conn_postgres_url\n query: |\n select * from ${{ params.name }}\n where update_date = '${{ params.datetime }}'\n sink:\n conn: conn_az_lake\n endpoint: \"/${{ params.name }}\"\n```\n\n### Tasks (Transform)\n\n```yaml\npipe_hook_mssql_proc:\n type: ddeutil.workflow.pipe.Pipeline\n params:\n run_date: datetime\n sp_name: str\n source_name: str\n target_name: str\n jobs:\n transform:\n stages:\n - name: \"Transform Data in MS SQL Server\"\n id: transform\n task: tasks/mssql-proc@odbc\n with:\n exec: ${{ params.sp_name }}\n params:\n run_mode: \"T\"\n run_date: ${{ params.run_date }}\n source: ${{ params.source_name }}\n target: ${{ params.target_name }}\n```\n\n## License\n\nThis project was licensed under the terms of the [MIT license](LICENSE).\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Data Developer & Engineer Workflow Utility Objects",
"version": "0.0.4",
"project_urls": {
"Homepage": "https://github.com/ddeutils/ddeutil-workflow/",
"Source Code": "https://github.com/ddeutils/ddeutil-workflow/"
},
"split_keywords": [
"data",
" workflow",
" utility",
" pipeline"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "2d6cde1e073f6f7da5cc0538f10ed3da5ef9c896eee36970f78d2e6a10ee4da7",
"md5": "b4aeaf4abec685868f86729572c111be",
"sha256": "c2446d7bfa006ba5662b5c419caeebd233c3ae6e63c7d8b5adb07452f605fb81"
},
"downloads": -1,
"filename": "ddeutil_workflow-0.0.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "b4aeaf4abec685868f86729572c111be",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9.13",
"size": 39432,
"upload_time": "2024-06-11T05:13:09",
"upload_time_iso_8601": "2024-06-11T05:13:09.423855Z",
"url": "https://files.pythonhosted.org/packages/2d/6c/de1e073f6f7da5cc0538f10ed3da5ef9c896eee36970f78d2e6a10ee4da7/ddeutil_workflow-0.0.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "5b2f804f1198c45e2c73d592d4e60408adc45b97d497638ab95467addb44b36d",
"md5": "a2d1be7ed423519257f62314500f3162",
"sha256": "a80691e062223a7f27a9c08b8a031928f67c93eecbd2738372e6a3df61c10b0d"
},
"downloads": -1,
"filename": "ddeutil_workflow-0.0.4.tar.gz",
"has_sig": false,
"md5_digest": "a2d1be7ed423519257f62314500f3162",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9.13",
"size": 39142,
"upload_time": "2024-06-11T05:13:11",
"upload_time_iso_8601": "2024-06-11T05:13:11.360829Z",
"url": "https://files.pythonhosted.org/packages/5b/2f/804f1198c45e2c73d592d4e60408adc45b97d497638ab95467addb44b36d/ddeutil_workflow-0.0.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-06-11 05:13:11",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ddeutils",
"github_project": "ddeutil-workflow",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "ddeutil-workflow"
}