bacalhau-airflow


Namebacalhau-airflow JSON
Version 1.2.1 PyPI version JSON
download
home_pagehttps://github.com/filecoin-project/bacalhau/tree/main/integration/airflow
SummaryAn Apache Airflow provider for Bacalhau.
upload_time2024-01-30 21:49:49
maintainer
docs_urlNone
authorEnrico Rotundo
requires_python>=3.8
licenseApache Software License 2.0
keywords bacalhau airflow provider
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Apache Airflow Provider for Bacalhau

This is `bacalhau-airflow`, a Python package that integrates [Bacalhau](https://github.com/bacalhau-project/bacalhau) with [Apache Airflow](https://github.com/apache/airflow).
The benefit is two fold.
First, thanks to this package you can now write complex pipelines for Bacalhau.
For instance, jobs can communicate their output's CIDs to downstream jobs, that can use those as inputs.
Second, Apache Airflow provides a solid solution to reliably orchestrate your DAGs.


## Features

- Create Airflow tasks that run on Bacalhau (via custom operator!)
- Support for sharded jobs: output shards can be passed downstream (via [XComs](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html))
- Coming soon...
    - Lineage (see [OpenLineage proof-of-concept integration here](https://github.com/enricorotundo/bacalhau-airflow-provider))
    - Various working code examples
    - Hosting instructions

## Requirements

- Python 3.8+
- [`bacalhau-sdk` 0.1.6](https://pypi.org/project/bacalhau-sdk/)
- `apache-airflow` 2.3+

## Installation

The integration automatically registers itself for Airflow 2.3+ if it's installed on the Airflow worker's Python.

## From pypi

```console
pip install bacalhau-airflow
```

## From source

Clone the public repository:

```shell
git clone https://github.com/bacalhau-project/bacalhau/
```

Once you have a copy of the source, you can install it with:

```shell
cd integration/airflow/
pip install .
```

## Worked example

### Setup


> In a production environment you may want to follow the [official Airflow's instructions](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/production-deployment.html) or pick one of the suggested [hosted solutions](https://airflow.apache.org/ecosystem/#airflow-as-a-service).

If you're just curious and want to give it a try on your *local machine*, please follow the steps below.

First, install and initialize Airflow:

```shell
$ pip install apache-airflow
export AIRFLOW_HOME=~/airflow
$ airflow db init
```

Then, we need to point Airflow to the absolute path of the folder where your pipelines live.
To do that we edit the `dags_folder` field in `${AIRFLOW_HOME}/airflow.cfg` file.
In this example I'm going to use the `hello_world.py` DAG shipped with this repository;
for the sake of completeness, the next section will walk you through the actual code.

My config file looks like what follows:

```
[core]
dags_folder = /Users/enricorotundo/bacalhau/integration/airflow/example_dags
...
```

Optionally, to reduce clutter in the Airflow UI, you could disable the loading of the default example DAGs by setting `load_examples` to `False`.

Finally, we can launch Airflow locally:

```shell
airflow standalone
```

### Example DAG: chaining jobs

While Airflow's pinwheel is warming up in the background, let's take a look at the `hello_world.py` break down below.

> In brief, the first task of this DAG prints out "Hello World" to stdout, then automatically pipe its output into the subsequent task as an input file. The second task will simply print out the content of its input file.

All you need to import from this package is the `BacalhauSubmitJobOperator`.
It allows you to submit a job spec comprised of the usual fields such as engine, image, etc.

```python
from datetime import datetime
from airflow import DAG
from bacalhau_airflow.operators import BacalhauSubmitJobOperator
```

This operator supports chaining multiple jobs without the need to manually pass any CID along, in this regards a special note goes to the `input_volumes` parameter (see `task_2` below).
Every time the operator runs a task, it stores a comma-separated string with the output shard-CIDs in an internal key-value store under the `cids` key.
Thus, downstream tasks can read in those CIDs via the `input_volumes` parameter.

All you need to do is (1) use the [XComs syntax](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html) (in curly brackets) to specify the "sender" task ids and the `cids` key (e.g. `{{ task_instance.xcom_pull(task_ids='task_1', key='cids') }}`), (2) define a target mount point separated by a colon (e.g. `:/task_1_output`).

Lastly, we define task dependencies simply with `task_1 >> task_2`.
To learn more about [Airflow's DAG syntax please check out this page](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#task-dependencies).

```python
with DAG("bacalhau-helloworld-dag", start_date=datetime(2023, 3, 1)) as dag:
    task_1 = BacalhauSubmitJobOperator(
        task_id="task_1",
        api_version="V1beta1",
        job_spec=dict(
            engine="Docker",
            publisher="IPFS",
            docker=dict(
                image="ubuntu",
                entrypoint=["echo", "Hello World"],
            ),
            deal=dict(concurrency=1),
        ),
    )

    task_2 = BacalhauSubmitJobOperator(
        task_id="task_2",
        api_version="V1beta1",
        input_volumes=[
            "{{ task_instance.xcom_pull(task_ids='task_1', key='cids') }}:/task_1_output",
        ],
        job_spec=dict(
            engine="Docker",
            publisher="IPFS",
            docker=dict(
                image="ubuntu",
                entrypoint=["cat", "/task_1_output/stdout"],
            ),
            deal=dict(concurrency=1),
        ),
    )

    task_1 >> task_2
```

### Run it

Now the we understand what the example DAG is supposed to do, let's just run it!
Head over to http://0.0.0.0:8080 were Airflow UI is being served.
The screenshot below shows our hello world has been loaded correctly.

![](docs/_static/airflow_01.png)

When you inspect a DAG, Airflow will render a graph depicting a color-coded topology (see image below).
For active (i.e. running) pipelines, this will be useful to oversee what the status of each task is.

To trigger a DAG please enable the toggle shown below.

![](docs/_static/airflow_02.png)

When all tasks have completed, we want to fetch the output of our pipeline.
To do so we need to retrieve the job-id of the last task.
Click on a green box in the `task_2` line and then open the XCom tab.

![](docs/_static/airflow_03.png)

Here we find the `bacalhau_job_id`.
Select that value and copy into your clipboard.

![](docs/_static/airflow_04.png)

Lastly, we can use the bacalhau cli `get` command to fetch the output data as follows:

```console
$ bacalhau get 8fdab73b-00fd-4d13-941c-8ba002f8178d
Fetching results of job '8fdab73b-00fd-4d13-941c-8ba002f8178d'...
...
Results for job '8fdab73b-00fd-4d13-941c-8ba002f8178d' have been written to...
/tmp/dag-example/job-8fdab73b

$ cat /tmp/dag-example/job-8fdab73b/combined_results/stdout
Hello World
```

That's all folks :rainbow:.

## Development


```console
pip install -r dev-requirements.txt
```

### Unit tests


```shell
tox
```

You can also skip using `tox` and run `pytest` on your own dev environment.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/filecoin-project/bacalhau/tree/main/integration/airflow",
    "name": "bacalhau-airflow",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "bacalhau,airflow,provider",
    "author": "Enrico Rotundo",
    "author_email": "team@bacalhau.org",
    "download_url": "https://files.pythonhosted.org/packages/aa/c1/d826467d21136e3221544069a0131ceca42e43a0423d1a9da5ba87d73206/bacalhau_airflow-1.2.1.tar.gz",
    "platform": null,
    "description": "# Apache Airflow Provider for Bacalhau\n\nThis is `bacalhau-airflow`, a Python package that integrates [Bacalhau](https://github.com/bacalhau-project/bacalhau) with [Apache Airflow](https://github.com/apache/airflow).\nThe benefit is two fold.\nFirst, thanks to this package you can now write complex pipelines for Bacalhau.\nFor instance, jobs can communicate their output's CIDs to downstream jobs, that can use those as inputs.\nSecond, Apache Airflow provides a solid solution to reliably orchestrate your DAGs.\n\n\n## Features\n\n- Create Airflow tasks that run on Bacalhau (via custom operator!)\n- Support for sharded jobs: output shards can be passed downstream (via [XComs](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html))\n- Coming soon...\n    - Lineage (see [OpenLineage proof-of-concept integration here](https://github.com/enricorotundo/bacalhau-airflow-provider))\n    - Various working code examples\n    - Hosting instructions\n\n## Requirements\n\n- Python 3.8+\n- [`bacalhau-sdk` 0.1.6](https://pypi.org/project/bacalhau-sdk/)\n- `apache-airflow` 2.3+\n\n## Installation\n\nThe integration automatically registers itself for Airflow 2.3+ if it's installed on the Airflow worker's Python.\n\n## From pypi\n\n```console\npip install bacalhau-airflow\n```\n\n## From source\n\nClone the public repository:\n\n```shell\ngit clone https://github.com/bacalhau-project/bacalhau/\n```\n\nOnce you have a copy of the source, you can install it with:\n\n```shell\ncd integration/airflow/\npip install .\n```\n\n## Worked example\n\n### Setup\n\n\n> In a production environment you may want to follow the [official Airflow's instructions](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/production-deployment.html) or pick one of the suggested [hosted solutions](https://airflow.apache.org/ecosystem/#airflow-as-a-service).\n\nIf you're just curious and want to give it a try on your *local machine*, please follow the steps below.\n\nFirst, install and initialize Airflow:\n\n```shell\n$ pip install apache-airflow\nexport AIRFLOW_HOME=~/airflow\n$ airflow db init\n```\n\nThen, we need to point Airflow to the absolute path of the folder where your pipelines live.\nTo do that we edit the `dags_folder` field in `${AIRFLOW_HOME}/airflow.cfg` file.\nIn this example I'm going to use the `hello_world.py` DAG shipped with this repository;\nfor the sake of completeness, the next section will walk you through the actual code.\n\nMy config file looks like what follows:\n\n```\n[core]\ndags_folder = /Users/enricorotundo/bacalhau/integration/airflow/example_dags\n...\n```\n\nOptionally, to reduce clutter in the Airflow UI, you could disable the loading of the default example DAGs by setting `load_examples` to `False`.\n\nFinally, we can launch Airflow locally:\n\n```shell\nairflow standalone\n```\n\n### Example DAG: chaining jobs\n\nWhile Airflow's pinwheel is warming up in the background, let's take a look at the `hello_world.py` break down below.\n\n> In brief, the first task of this DAG prints out \"Hello World\" to stdout, then automatically pipe its output into the subsequent task as an input file. The second task will simply print out the content of its input file.\n\nAll you need to import from this package is the `BacalhauSubmitJobOperator`.\nIt allows you to submit a job spec comprised of the usual fields such as engine, image, etc.\n\n```python\nfrom datetime import datetime\nfrom airflow import DAG\nfrom bacalhau_airflow.operators import BacalhauSubmitJobOperator\n```\n\nThis operator supports chaining multiple jobs without the need to manually pass any CID along, in this regards a special note goes to the `input_volumes` parameter (see `task_2` below).\nEvery time the operator runs a task, it stores a comma-separated string with the output shard-CIDs in an internal key-value store under the `cids` key.\nThus, downstream tasks can read in those CIDs via the `input_volumes` parameter.\n\nAll you need to do is (1) use the [XComs syntax](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html) (in curly brackets) to specify the \"sender\" task ids and the `cids` key (e.g. `{{ task_instance.xcom_pull(task_ids='task_1', key='cids') }}`), (2) define a target mount point separated by a colon (e.g. `:/task_1_output`).\n\nLastly, we define task dependencies simply with `task_1 >> task_2`.\nTo learn more about [Airflow's DAG syntax please check out this page](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#task-dependencies).\n\n```python\nwith DAG(\"bacalhau-helloworld-dag\", start_date=datetime(2023, 3, 1)) as dag:\n    task_1 = BacalhauSubmitJobOperator(\n        task_id=\"task_1\",\n        api_version=\"V1beta1\",\n        job_spec=dict(\n            engine=\"Docker\",\n            publisher=\"IPFS\",\n            docker=dict(\n                image=\"ubuntu\",\n                entrypoint=[\"echo\", \"Hello World\"],\n            ),\n            deal=dict(concurrency=1),\n        ),\n    )\n\n    task_2 = BacalhauSubmitJobOperator(\n        task_id=\"task_2\",\n        api_version=\"V1beta1\",\n        input_volumes=[\n            \"{{ task_instance.xcom_pull(task_ids='task_1', key='cids') }}:/task_1_output\",\n        ],\n        job_spec=dict(\n            engine=\"Docker\",\n            publisher=\"IPFS\",\n            docker=dict(\n                image=\"ubuntu\",\n                entrypoint=[\"cat\", \"/task_1_output/stdout\"],\n            ),\n            deal=dict(concurrency=1),\n        ),\n    )\n\n    task_1 >> task_2\n```\n\n### Run it\n\nNow the we understand what the example DAG is supposed to do, let's just run it!\nHead over to http://0.0.0.0:8080 were Airflow UI is being served.\nThe screenshot below shows our hello world has been loaded correctly.\n\n![](docs/_static/airflow_01.png)\n\nWhen you inspect a DAG, Airflow will render a graph depicting a color-coded topology (see image below).\nFor active (i.e. running) pipelines, this will be useful to oversee what the status of each task is.\n\nTo trigger a DAG please enable the toggle shown below.\n\n![](docs/_static/airflow_02.png)\n\nWhen all tasks have completed, we want to fetch the output of our pipeline.\nTo do so we need to retrieve the job-id of the last task.\nClick on a green box in the `task_2` line and then open the XCom tab.\n\n![](docs/_static/airflow_03.png)\n\nHere we find the `bacalhau_job_id`.\nSelect that value and copy into your clipboard.\n\n![](docs/_static/airflow_04.png)\n\nLastly, we can use the bacalhau cli `get` command to fetch the output data as follows:\n\n```console\n$ bacalhau get 8fdab73b-00fd-4d13-941c-8ba002f8178d\nFetching results of job '8fdab73b-00fd-4d13-941c-8ba002f8178d'...\n...\nResults for job '8fdab73b-00fd-4d13-941c-8ba002f8178d' have been written to...\n/tmp/dag-example/job-8fdab73b\n\n$ cat /tmp/dag-example/job-8fdab73b/combined_results/stdout\nHello World\n```\n\nThat's all folks :rainbow:.\n\n## Development\n\n\n```console\npip install -r dev-requirements.txt\n```\n\n### Unit tests\n\n\n```shell\ntox\n```\n\nYou can also skip using `tox` and run `pytest` on your own dev environment.\n",
    "bugtrack_url": null,
    "license": "Apache Software License 2.0",
    "summary": "An Apache Airflow provider for Bacalhau.",
    "version": "1.2.1",
    "project_urls": {
        "Homepage": "https://github.com/filecoin-project/bacalhau/tree/main/integration/airflow"
    },
    "split_keywords": [
        "bacalhau",
        "airflow",
        "provider"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "9c313c27fd4775464f5e64a9fbfc65dfa33476f33c63b83f4676a141b54cdc4d",
                "md5": "aa38690211cfdc65c3ffe5685b5a12c3",
                "sha256": "b8142270eeffeef0ba4e0faf619d2e2ce2df0fea74d50cc0c10b9b6ff6002a84"
            },
            "downloads": -1,
            "filename": "bacalhau_airflow-1.2.1-py2.py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "aa38690211cfdc65c3ffe5685b5a12c3",
            "packagetype": "bdist_wheel",
            "python_version": "py2.py3",
            "requires_python": ">=3.8",
            "size": 11728,
            "upload_time": "2024-01-30T21:49:47",
            "upload_time_iso_8601": "2024-01-30T21:49:47.273982Z",
            "url": "https://files.pythonhosted.org/packages/9c/31/3c27fd4775464f5e64a9fbfc65dfa33476f33c63b83f4676a141b54cdc4d/bacalhau_airflow-1.2.1-py2.py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "aac1d826467d21136e3221544069a0131ceca42e43a0423d1a9da5ba87d73206",
                "md5": "2716823b64a11e45d5eb27ab920c422d",
                "sha256": "747c015e093ebebe7be08a59617952cd82b52fe777611ab3c232c43dd0532e15"
            },
            "downloads": -1,
            "filename": "bacalhau_airflow-1.2.1.tar.gz",
            "has_sig": false,
            "md5_digest": "2716823b64a11e45d5eb27ab920c422d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 2425886,
            "upload_time": "2024-01-30T21:49:49",
            "upload_time_iso_8601": "2024-01-30T21:49:49.057133Z",
            "url": "https://files.pythonhosted.org/packages/aa/c1/d826467d21136e3221544069a0131ceca42e43a0423d1a9da5ba87d73206/bacalhau_airflow-1.2.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-01-30 21:49:49",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "filecoin-project",
    "github_project": "bacalhau",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "circle": true,
    "lcname": "bacalhau-airflow"
}
        
Elapsed time: 0.43401s