airflow-pentaho-plugin


Nameairflow-pentaho-plugin JSON
Version 1.1.2 PyPI version JSON
download
home_pagehttps://github.com/damavis/airflow-pentaho-plugin
Summary
upload_time2023-11-03 07:32:28
maintainer
docs_urlNone
authorDamavis
requires_python>=3
licenseApache 2.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI
coveralls test coverage No coveralls.
            # Pentaho Airflow plugin

[![Build Status](https://api.travis-ci.com/damavis/airflow-pentaho-plugin.svg?branch=master)](https://app.travis-ci.com/damavis/airflow-pentaho-plugin)
[![codecov](https://codecov.io/gh/damavis/airflow-pentaho-plugin/branch/master/graph/badge.svg)](https://codecov.io/gh/damavis/airflow-pentaho-plugin)
[![PyPI](https://img.shields.io/pypi/v/airflow-pentaho-plugin)](https://pypi.org/project/airflow-pentaho-plugin/)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/airflow-pentaho-plugin)](https://pypi.org/project/airflow-pentaho-plugin/)

This plugins runs Jobs and Transformations through Carte servers.
It allows to orchestrate a massive number of trans/jobs taking care
of the dependencies between them, even between different instances.
This is done by using `CarteJobOperator` and `CarteTransOperator`

It also runs Pan (transformations) and Kitchen (Jobs) in local mode,
both from repository and local XML files. For this approach, use
`KitchenOperator` and `PanOperator`

## Requirements

1. A Apache Airflow system deployed.
2. One or many working PDI CE installations.
3. A Carte server for Carte Operators.

## Setup

The same setup process must be performed on webserver, scheduler
and workers (that runs this tasks) to get it working. If you want to
deploy specific workers to run this kind of tasks, see
[Queues](https://airflow.apache.org/docs/stable/concepts.html#queues),
in **Airflow** *Concepts* section.

### Pip package

First of all, the package should be installed via `pip install` command.

```bash
pip install airflow-pentaho-plugin
```

### Airflow connection

Then, a new connection needs to be added to Airflow Connections, to do this,
go to Airflow web UI, and click on `Admin -> Connections` on the top menu.
Now, click on `Create` tab.

Use HTTP connection type. Enter the **Conn Id**, this plugin uses `pdi_default`
by default, the username and the password for your Pentaho Repository.

At the bottom of the form, fill the **Extra** field with `pentaho_home`, the
path where your pdi-ce is placed, and `rep`, the repository name for this
connection, using a json formatted string like it follows.

```json
{
    "pentaho_home": "/opt/pentaho",
    "rep": "Default"
}
```

### Carte

In order to use `CarteJobOperator`, the connection should be set different. Fill
`host` (including `http://` or `https://`) and `port` for Carte hostname and port,
`username` and `password` for PDI repository, and `extra` as it follows.

```json
{
    "rep": "Default",
    "carte_username": "cluster",
    "carte_password": "cluster"
}
```

## Usage

### CarteJobOperator

CarteJobOperator is responsible for running jobs in remote slave servers. Here
it is an example of `CarteJobOperator` usage.

```python
# For versions before 2.0
# from airflow.operators.airflow_pentaho import CarteJobOperator

from airflow_pentaho.operators.carte import CarteJobOperator

# ... #

# Define the task using the CarteJobOperator
avg_spent = CarteJobOperator(
    conn_id='pdi_default',
    task_id="average_spent",
    job="/home/bi/average_spent",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> avg_spent >> another_task
```

### KitchenOperator

Kitchen operator is responsible for running Jobs. Lets suppose that we have
a defined *Job* saved on `/home/bi/average_spent` in our repository with
the argument `date` as input parameter. Lets define the task using the
`KitchenOperator`.

```python
# For versions before 2.0
# from airflow.operators.airflow_pentaho import KitchenOperator

from airflow_pentaho.operators.kettle import KitchenOperator

# ... #

# Define the task using the KitchenOperator
avg_spent = KitchenOperator(
    conn_id='pdi_default',
    queue="pdi",
    task_id="average_spent",
    directory="/home/bi",
    job="average_spent",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> avg_spent >> another_task
```

### CarteTransOperator

CarteTransOperator is responsible for running transformations in remote slave
servers. Here it is an example of `CarteTransOperator` usage.

```python
# For versions before 2.0
# from airflow.operators.airflow_pentaho import CarteTransOperator

from airflow_pentaho.operators.carte import CarteTransOperator

# ... #

# Define the task using the CarteJobOperator
enriche_customers = CarteTransOperator(
    conn_id='pdi_default',
    task_id="enrich_customer_data",
    job="/home/bi/enrich_customer_data",
    params={"date": "{{ ds }}"},  # Date in yyyy-mm-dd format
    dag=dag)

# ... #

some_task >> enrich_customers >> another_task
```

### PanOperator

Pan operator is responsible for running transformations. Lets suppose that
we have one saved on `/home/bi/clean_somedata`. Lets define the task using the
`PanOperator`. In this case, the transformation receives a parameter that
determines the file to be cleaned.

```python
# For versions before 2.0
# from airflow.operators.airflow_pentaho import PanOperator

from airflow_pentaho.operators.kettle import PanOperator

# ... #

# Define the task using the PanOperator
clean_input = PanOperator(
    conn_id='pdi_default',
    queue="pdi",
    task_id="cleanup",
    directory="/home/bi",
    trans="clean_somedata",
    params={"file": "/tmp/input_data/{{ ds }}/sells.csv"},
    dag=dag)

# ... #

some_task >> clean_input >> another_task
```

For more information, please see `sample_dags/pdi_flow.py`

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/damavis/airflow-pentaho-plugin",
    "name": "airflow-pentaho-plugin",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3",
    "maintainer_email": "",
    "keywords": "",
    "author": "Damavis",
    "author_email": "info@damavis.com",
    "download_url": "https://files.pythonhosted.org/packages/ae/b7/7ed0879294111a208aa858d0a0bb68ba9b0ea91e2e711b06c4ab386c2d41/airflow-pentaho-plugin-1.1.2.tar.gz",
    "platform": null,
    "description": "# Pentaho Airflow plugin\n\n[![Build Status](https://api.travis-ci.com/damavis/airflow-pentaho-plugin.svg?branch=master)](https://app.travis-ci.com/damavis/airflow-pentaho-plugin)\n[![codecov](https://codecov.io/gh/damavis/airflow-pentaho-plugin/branch/master/graph/badge.svg)](https://codecov.io/gh/damavis/airflow-pentaho-plugin)\n[![PyPI](https://img.shields.io/pypi/v/airflow-pentaho-plugin)](https://pypi.org/project/airflow-pentaho-plugin/)\n[![PyPI - Downloads](https://img.shields.io/pypi/dm/airflow-pentaho-plugin)](https://pypi.org/project/airflow-pentaho-plugin/)\n\nThis plugins runs Jobs and Transformations through Carte servers.\nIt allows to orchestrate a massive number of trans/jobs taking care\nof the dependencies between them, even between different instances.\nThis is done by using `CarteJobOperator` and `CarteTransOperator`\n\nIt also runs Pan (transformations) and Kitchen (Jobs) in local mode,\nboth from repository and local XML files. For this approach, use\n`KitchenOperator` and `PanOperator`\n\n## Requirements\n\n1. A Apache Airflow system deployed.\n2. One or many working PDI CE installations.\n3. A Carte server for Carte Operators.\n\n## Setup\n\nThe same setup process must be performed on webserver, scheduler\nand workers (that runs this tasks) to get it working. If you want to\ndeploy specific workers to run this kind of tasks, see\n[Queues](https://airflow.apache.org/docs/stable/concepts.html#queues),\nin **Airflow** *Concepts* section.\n\n### Pip package\n\nFirst of all, the package should be installed via `pip install` command.\n\n```bash\npip install airflow-pentaho-plugin\n```\n\n### Airflow connection\n\nThen, a new connection needs to be added to Airflow Connections, to do this,\ngo to Airflow web UI, and click on `Admin -> Connections` on the top menu.\nNow, click on `Create` tab.\n\nUse HTTP connection type. Enter the **Conn Id**, this plugin uses `pdi_default`\nby default, the username and the password for your Pentaho Repository.\n\nAt the bottom of the form, fill the **Extra** field with `pentaho_home`, the\npath where your pdi-ce is placed, and `rep`, the repository name for this\nconnection, using a json formatted string like it follows.\n\n```json\n{\n    \"pentaho_home\": \"/opt/pentaho\",\n    \"rep\": \"Default\"\n}\n```\n\n### Carte\n\nIn order to use `CarteJobOperator`, the connection should be set different. Fill\n`host` (including `http://` or `https://`) and `port` for Carte hostname and port,\n`username` and `password` for PDI repository, and `extra` as it follows.\n\n```json\n{\n    \"rep\": \"Default\",\n    \"carte_username\": \"cluster\",\n    \"carte_password\": \"cluster\"\n}\n```\n\n## Usage\n\n### CarteJobOperator\n\nCarteJobOperator is responsible for running jobs in remote slave servers. Here\nit is an example of `CarteJobOperator` usage.\n\n```python\n# For versions before 2.0\n# from airflow.operators.airflow_pentaho import CarteJobOperator\n\nfrom airflow_pentaho.operators.carte import CarteJobOperator\n\n# ... #\n\n# Define the task using the CarteJobOperator\navg_spent = CarteJobOperator(\n    conn_id='pdi_default',\n    task_id=\"average_spent\",\n    job=\"/home/bi/average_spent\",\n    params={\"date\": \"{{ ds }}\"},  # Date in yyyy-mm-dd format\n    dag=dag)\n\n# ... #\n\nsome_task >> avg_spent >> another_task\n```\n\n### KitchenOperator\n\nKitchen operator is responsible for running Jobs. Lets suppose that we have\na defined *Job* saved on `/home/bi/average_spent` in our repository with\nthe argument `date` as input parameter. Lets define the task using the\n`KitchenOperator`.\n\n```python\n# For versions before 2.0\n# from airflow.operators.airflow_pentaho import KitchenOperator\n\nfrom airflow_pentaho.operators.kettle import KitchenOperator\n\n# ... #\n\n# Define the task using the KitchenOperator\navg_spent = KitchenOperator(\n    conn_id='pdi_default',\n    queue=\"pdi\",\n    task_id=\"average_spent\",\n    directory=\"/home/bi\",\n    job=\"average_spent\",\n    params={\"date\": \"{{ ds }}\"},  # Date in yyyy-mm-dd format\n    dag=dag)\n\n# ... #\n\nsome_task >> avg_spent >> another_task\n```\n\n### CarteTransOperator\n\nCarteTransOperator is responsible for running transformations in remote slave\nservers. Here it is an example of `CarteTransOperator` usage.\n\n```python\n# For versions before 2.0\n# from airflow.operators.airflow_pentaho import CarteTransOperator\n\nfrom airflow_pentaho.operators.carte import CarteTransOperator\n\n# ... #\n\n# Define the task using the CarteJobOperator\nenriche_customers = CarteTransOperator(\n    conn_id='pdi_default',\n    task_id=\"enrich_customer_data\",\n    job=\"/home/bi/enrich_customer_data\",\n    params={\"date\": \"{{ ds }}\"},  # Date in yyyy-mm-dd format\n    dag=dag)\n\n# ... #\n\nsome_task >> enrich_customers >> another_task\n```\n\n### PanOperator\n\nPan operator is responsible for running transformations. Lets suppose that\nwe have one saved on `/home/bi/clean_somedata`. Lets define the task using the\n`PanOperator`. In this case, the transformation receives a parameter that\ndetermines the file to be cleaned.\n\n```python\n# For versions before 2.0\n# from airflow.operators.airflow_pentaho import PanOperator\n\nfrom airflow_pentaho.operators.kettle import PanOperator\n\n# ... #\n\n# Define the task using the PanOperator\nclean_input = PanOperator(\n    conn_id='pdi_default',\n    queue=\"pdi\",\n    task_id=\"cleanup\",\n    directory=\"/home/bi\",\n    trans=\"clean_somedata\",\n    params={\"file\": \"/tmp/input_data/{{ ds }}/sells.csv\"},\n    dag=dag)\n\n# ... #\n\nsome_task >> clean_input >> another_task\n```\n\nFor more information, please see `sample_dags/pdi_flow.py`\n",
    "bugtrack_url": null,
    "license": "Apache 2.0",
    "summary": "",
    "version": "1.1.2",
    "project_urls": {
        "Homepage": "https://github.com/damavis/airflow-pentaho-plugin"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4d47f2ef3e17d7a9f1a0298d8b4b42cb20b39fd38d30177405241991d0216c0e",
                "md5": "9301fe64db9838951a7c4500badad4b8",
                "sha256": "cb04228f585d75f04f79bbe2b5bfd0288a9112bbbb959c2026e8086a3f78eec0"
            },
            "downloads": -1,
            "filename": "airflow_pentaho_plugin-1.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9301fe64db9838951a7c4500badad4b8",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3",
            "size": 20348,
            "upload_time": "2023-11-03T07:32:25",
            "upload_time_iso_8601": "2023-11-03T07:32:25.769315Z",
            "url": "https://files.pythonhosted.org/packages/4d/47/f2ef3e17d7a9f1a0298d8b4b42cb20b39fd38d30177405241991d0216c0e/airflow_pentaho_plugin-1.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "aeb77ed0879294111a208aa858d0a0bb68ba9b0ea91e2e711b06c4ab386c2d41",
                "md5": "52ec7ff5fb2a8b6bfe4b24d984a1dec1",
                "sha256": "87ecbddda3c5ee9132a826561b9bbb2e705e4d50e7d7c27c99ac6447b2a6defe"
            },
            "downloads": -1,
            "filename": "airflow-pentaho-plugin-1.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "52ec7ff5fb2a8b6bfe4b24d984a1dec1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3",
            "size": 15441,
            "upload_time": "2023-11-03T07:32:28",
            "upload_time_iso_8601": "2023-11-03T07:32:28.053034Z",
            "url": "https://files.pythonhosted.org/packages/ae/b7/7ed0879294111a208aa858d0a0bb68ba9b0ea91e2e711b06c4ab386c2d41/airflow-pentaho-plugin-1.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-11-03 07:32:28",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "damavis",
    "github_project": "airflow-pentaho-plugin",
    "travis_ci": true,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "airflow-pentaho-plugin"
}
        
Elapsed time: 0.13432s