dbt-airflow


Namedbt-airflow JSON
Version 2.10.0 PyPI version JSON
download
home_pagehttps://github.com/gmyrianthous/dbt-airflow
SummaryA Python package that creates fine-grained Airflow tasks for dbt
upload_time2023-12-22 09:31:39
maintainer
docs_urlNone
authorGiorgos Myrianthous
requires_python>=3.7.2,<4
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            ![deploy](https://github.com/gmyrianthous/dbt-airflow/actions/workflows/deploy.yml/badge.svg?branch=main)
![docs](https://github.com/gmyrianthous/dbt-airflow/actions/workflows/docs.yml/badge.svg?branch=main)
![main](https://github.com/gmyrianthous/dbt-airflow/actions/workflows/main.yml/badge.svg?branch=main)
![validate_pull_request](https://github.com/gmyrianthous/dbt-airflow/actions/workflows/validate_pull_request.yml/badge.svg?branch=main)
[![Downloads](https://static.pepy.tech/badge/dbt-airflow)](https://pepy.tech/project/dbt-airflow)

# dbt-airflow
A Python package that helps Data and Analytics engineers render dbt projects in Apache Airflow DAGs such that
models, seeds, snapshots and tests are represented by individual Airflow Task.

`dbt` is a command-line tool that enables data teams build, maintain and test data models in a scalable fashion. The 
biggest challenge though is how to embed `dbt` in modern data workflows and infrastructure. dbt CLI is indeed a powerful
tool, but if used as is, it will create silos in the way an organisation manages its data. Every contributor is able to 
run `dbt` commands from their local machine (or even a host machine), but how do you know if a model run by another 
contributor has failed, or succeeded? How can you enable shared visibility over data models, within the team? 

One way to host dbt projects and orchestrate dbt tasks is via Apache Airflow. In its simplest form, an Airflow DAG
that will build and test data models will consist of two tasks, one that executes `dbt run` command followed by an 
Airflow task that executes `dbt test`. 

<img style="display: block; margin: 0 auto" src="docs/blob/dbt_run_test_dag.png" alt="test">

But what happens when model builds or tests fail? Should we re-run the whole dbt project (that could involve hundreds of 
different models and/or tests) just to run a single model we've just fixed? This doesn't seem to be a good practice 
since re-running the whole project  will be time-consuming and expensive. 

A potential solution to this problem is to create individual Airflow tasks for every model, seed, snapshot and test
within the dbt project. If we were about to do this work manually, we would have to put huge effort that would also be 
prone to errors. Additionally, it would beat  the purpose of dbt, that among other features, it also automates model 
dependency management.

`dbt-airflow` is a package that builds a layer in-between Apache Airflow and dbt, and enables teams to automatically
render their dbt projects in a granular level such that they have full control to individual dbt resource types. Every
dbt model, seed, snapshot or test will have its own Airflow Task so that you can perform any action at a task-level. 

Here's how the popular Jaffle Shop dbt project will be rendered on Apache Airflow via `dbt-airflow`:

<img style="display: block; margin: 0 auto" src="docs/blob/dbt_jaffle_shop_dag.png" alt="test">


### Features
- Render a `dbt` project as a `TaskGroup` consisting of Airflow Tasks that correspond to dbt models, seeds, snapshots
and tests
- Every `model`, `seed` and `snapshot` resource that has at least a single test, will also have a corresponding
test task as a downstream task
- Add tasks before or after the whole dbt project
- Introduce extra tasks within the dbt project tasks and specify any downstream or upstream dependencies
- Create sub-`TaskGroup`s of dbt Airflow tasks based on your project's folder structure 

## How does it work
The library essentially builds on top of the metadata generated by `dbt-core` and are stored in 
the `target/manifest.json` file in your dbt project directory. This means that you first need to compile (or run 
any other dbt command that creates the `manifest` file) before creating your Airflow DAG. This means the `dbt-airflow` 
package expects that you have already compiled your dbt project so that an up to date manifest file can then be used
to render the individual tasks.

---

# Installation

The package is available on PyPI and can be installed through `pip`:
```bash
pip install dbt-airflow
```

`dbt` needs to connect to your target environment (database, warehouse etc.) and in order to do so, it makes use of 
different adapters, each dedicated to a different technology (such as Postgres or BigQuery). Therefore, before running
`dbt-airflow` you also need to ensure that the required adapter(s) are installed in your environment. 

For the full list of available adapters please refer to the official 
[dbt documentation](https://docs.getdbt.com/docs/available-adapters). 

---
# Usage



### Building an Airflow DAG using `dbt-airflow`

```python3
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator

from dbt_airflow.core.config import DbtAirflowConfig, DbtProjectConfig, DbtProfileConfig
from dbt_airflow.core.task_group import DbtTaskGroup
from dbt_airflow.core.task import ExtraTask
from dbt_airflow.operators.execution import ExecutionOperator


with DAG(
    dag_id='test_dag',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
) as dag:
    extra_tasks = [
        ExtraTask(
            task_id='test_task',
            operator=PythonOperator,
            operator_args={
                'python_callable': lambda: print('Hello world'),
            },
            upstream_task_ids={
                'model.example_dbt_project.int_customers_per_store',
                'model.example_dbt_project.int_revenue_by_date',
            },
        ),
        ExtraTask(
            task_id='another_test_task',
            operator=PythonOperator,
            operator_args={
                'python_callable': lambda: print('Hello world 2!'),
            },
            upstream_task_ids={
                'test.example_dbt_project.int_customers_per_store',
            },
            downstream_task_ids={
                'snapshot.example_dbt_project.int_customers_per_store_snapshot',
            },
        ),
        ExtraTask(
            task_id='test_task_3',
            operator=PythonOperator,
            operator_args={
                'python_callable': lambda: print('Hello world 3!'),
            },
            downstream_task_ids={
                'snapshot.example_dbt_project.int_customers_per_store_snapshot',
            },
            upstream_task_ids={
                'model.example_dbt_project.int_revenue_by_date',
            },
        ),
    ]

    t1 = EmptyOperator(task_id='dummy_1')
    t2 = EmptyOperator(task_id='dummy_2')

    tg = DbtTaskGroup(
        group_id='dbt-company',
        dbt_project_config=DbtProjectConfig(
            project_path=Path('/opt/airflow/example_dbt_project/'),
            manifest_path=Path('/opt/airflow/example_dbt_project/target/manifest.json'),
        ),
        dbt_profile_config=DbtProfileConfig(
            profiles_path=Path('/opt/airflow/example_dbt_project/profiles'),
            target='dev',
        ),
        dbt_airflow_config=DbtAirflowConfig(
            extra_tasks=extra_tasks,
            execution_operator=ExecutionOperator.BASH,
            test_tasks_operator_kwargs={'retries': 0},
        ),
    )

    t1 >> tg >> t2

```


            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/gmyrianthous/dbt-airflow",
    "name": "dbt-airflow",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7.2,<4",
    "maintainer_email": "",
    "keywords": "",
    "author": "Giorgos Myrianthous",
    "author_email": "giorgos.myrianthous@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/e9/5c/3232968376b337d683a05fbb6454334ab1f32c81038e2a40dbc71f6136fe/dbt_airflow-2.10.0.tar.gz",
    "platform": null,
    "description": "![deploy](https://github.com/gmyrianthous/dbt-airflow/actions/workflows/deploy.yml/badge.svg?branch=main)\n![docs](https://github.com/gmyrianthous/dbt-airflow/actions/workflows/docs.yml/badge.svg?branch=main)\n![main](https://github.com/gmyrianthous/dbt-airflow/actions/workflows/main.yml/badge.svg?branch=main)\n![validate_pull_request](https://github.com/gmyrianthous/dbt-airflow/actions/workflows/validate_pull_request.yml/badge.svg?branch=main)\n[![Downloads](https://static.pepy.tech/badge/dbt-airflow)](https://pepy.tech/project/dbt-airflow)\n\n# dbt-airflow\nA Python package that helps Data and Analytics engineers render dbt projects in Apache Airflow DAGs such that\nmodels, seeds, snapshots and tests are represented by individual Airflow Task.\n\n`dbt` is a command-line tool that enables data teams build, maintain and test data models in a scalable fashion. The \nbiggest challenge though is how to embed `dbt` in modern data workflows and infrastructure. dbt CLI is indeed a powerful\ntool, but if used as is, it will create silos in the way an organisation manages its data. Every contributor is able to \nrun `dbt` commands from their local machine (or even a host machine), but how do you know if a model run by another \ncontributor has failed, or succeeded? How can you enable shared visibility over data models, within the team? \n\nOne way to host dbt projects and orchestrate dbt tasks is via Apache Airflow. In its simplest form, an Airflow DAG\nthat will build and test data models will consist of two tasks, one that executes `dbt run` command followed by an \nAirflow task that executes `dbt test`. \n\n<img style=\"display: block; margin: 0 auto\" src=\"docs/blob/dbt_run_test_dag.png\" alt=\"test\">\n\nBut what happens when model builds or tests fail? Should we re-run the whole dbt project (that could involve hundreds of \ndifferent models and/or tests) just to run a single model we've just fixed? This doesn't seem to be a good practice \nsince re-running the whole project  will be time-consuming and expensive. \n\nA potential solution to this problem is to create individual Airflow tasks for every model, seed, snapshot and test\nwithin the dbt project. If we were about to do this work manually, we would have to put huge effort that would also be \nprone to errors. Additionally, it would beat  the purpose of dbt, that among other features, it also automates model \ndependency management.\n\n`dbt-airflow` is a package that builds a layer in-between Apache Airflow and dbt, and enables teams to automatically\nrender their dbt projects in a granular level such that they have full control to individual dbt resource types. Every\ndbt model, seed, snapshot or test will have its own Airflow Task so that you can perform any action at a task-level. \n\nHere's how the popular Jaffle Shop dbt project will be rendered on Apache Airflow via `dbt-airflow`:\n\n<img style=\"display: block; margin: 0 auto\" src=\"docs/blob/dbt_jaffle_shop_dag.png\" alt=\"test\">\n\n\n### Features\n- Render a `dbt` project as a `TaskGroup` consisting of Airflow Tasks that correspond to dbt models, seeds, snapshots\nand tests\n- Every `model`, `seed` and `snapshot` resource that has at least a single test, will also have a corresponding\ntest task as a downstream task\n- Add tasks before or after the whole dbt project\n- Introduce extra tasks within the dbt project tasks and specify any downstream or upstream dependencies\n- Create sub-`TaskGroup`s of dbt Airflow tasks based on your project's folder structure \n\n## How does it work\nThe library essentially builds on top of the metadata generated by `dbt-core` and are stored in \nthe `target/manifest.json` file in your dbt project directory. This means that you first need to compile (or run \nany other dbt command that creates the `manifest` file) before creating your Airflow DAG. This means the `dbt-airflow` \npackage expects that you have already compiled your dbt project so that an up to date manifest file can then be used\nto render the individual tasks.\n\n---\n\n# Installation\n\nThe package is available on PyPI and can be installed through `pip`:\n```bash\npip install dbt-airflow\n```\n\n`dbt` needs to connect to your target environment (database, warehouse etc.) and in order to do so, it makes use of \ndifferent adapters, each dedicated to a different technology (such as Postgres or BigQuery). Therefore, before running\n`dbt-airflow` you also need to ensure that the required adapter(s) are installed in your environment. \n\nFor the full list of available adapters please refer to the official \n[dbt documentation](https://docs.getdbt.com/docs/available-adapters). \n\n---\n# Usage\n\n\n\n### Building an Airflow DAG using `dbt-airflow`\n\n```python3\nfrom datetime import datetime, timedelta\nfrom pathlib import Path\n\nfrom airflow import DAG\nfrom airflow.operators.python import PythonOperator\nfrom airflow.operators.empty import EmptyOperator\n\nfrom dbt_airflow.core.config import DbtAirflowConfig, DbtProjectConfig, DbtProfileConfig\nfrom dbt_airflow.core.task_group import DbtTaskGroup\nfrom dbt_airflow.core.task import ExtraTask\nfrom dbt_airflow.operators.execution import ExecutionOperator\n\n\nwith DAG(\n    dag_id='test_dag',\n    start_date=datetime(2021, 1, 1),\n    catchup=False,\n    tags=['example'],\n    default_args={\n        'owner': 'airflow',\n        'retries': 1,\n        'retry_delay': timedelta(minutes=2),\n    },\n) as dag:\n    extra_tasks = [\n        ExtraTask(\n            task_id='test_task',\n            operator=PythonOperator,\n            operator_args={\n                'python_callable': lambda: print('Hello world'),\n            },\n            upstream_task_ids={\n                'model.example_dbt_project.int_customers_per_store',\n                'model.example_dbt_project.int_revenue_by_date',\n            },\n        ),\n        ExtraTask(\n            task_id='another_test_task',\n            operator=PythonOperator,\n            operator_args={\n                'python_callable': lambda: print('Hello world 2!'),\n            },\n            upstream_task_ids={\n                'test.example_dbt_project.int_customers_per_store',\n            },\n            downstream_task_ids={\n                'snapshot.example_dbt_project.int_customers_per_store_snapshot',\n            },\n        ),\n        ExtraTask(\n            task_id='test_task_3',\n            operator=PythonOperator,\n            operator_args={\n                'python_callable': lambda: print('Hello world 3!'),\n            },\n            downstream_task_ids={\n                'snapshot.example_dbt_project.int_customers_per_store_snapshot',\n            },\n            upstream_task_ids={\n                'model.example_dbt_project.int_revenue_by_date',\n            },\n        ),\n    ]\n\n    t1 = EmptyOperator(task_id='dummy_1')\n    t2 = EmptyOperator(task_id='dummy_2')\n\n    tg = DbtTaskGroup(\n        group_id='dbt-company',\n        dbt_project_config=DbtProjectConfig(\n            project_path=Path('/opt/airflow/example_dbt_project/'),\n            manifest_path=Path('/opt/airflow/example_dbt_project/target/manifest.json'),\n        ),\n        dbt_profile_config=DbtProfileConfig(\n            profiles_path=Path('/opt/airflow/example_dbt_project/profiles'),\n            target='dev',\n        ),\n        dbt_airflow_config=DbtAirflowConfig(\n            extra_tasks=extra_tasks,\n            execution_operator=ExecutionOperator.BASH,\n            test_tasks_operator_kwargs={'retries': 0},\n        ),\n    )\n\n    t1 >> tg >> t2\n\n```\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A Python package that creates fine-grained Airflow tasks for dbt",
    "version": "2.10.0",
    "project_urls": {
        "Homepage": "https://github.com/gmyrianthous/dbt-airflow",
        "Repository": "https://github.com/gmyrianthous/dbt-airflow"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f04035fdf25b2d1e1cc6d3081bd10a31d91a16fb52909315084d02c3ce9baa51",
                "md5": "6d7a7ecd1c894f3382bb8fe055e8448b",
                "sha256": "e95bdb4dc48ef5759d2cd4b47ff61e63610497ec5bb3a0296f4427225a784686"
            },
            "downloads": -1,
            "filename": "dbt_airflow-2.10.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6d7a7ecd1c894f3382bb8fe055e8448b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7.2,<4",
            "size": 16540,
            "upload_time": "2023-12-22T09:31:37",
            "upload_time_iso_8601": "2023-12-22T09:31:37.317643Z",
            "url": "https://files.pythonhosted.org/packages/f0/40/35fdf25b2d1e1cc6d3081bd10a31d91a16fb52909315084d02c3ce9baa51/dbt_airflow-2.10.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e95c3232968376b337d683a05fbb6454334ab1f32c81038e2a40dbc71f6136fe",
                "md5": "2791a6f25bed38cd08321726e632825a",
                "sha256": "e7b19a60a59ab830fca51925aac24559ace5a2c9551d7e4eaef9661a8654e280"
            },
            "downloads": -1,
            "filename": "dbt_airflow-2.10.0.tar.gz",
            "has_sig": false,
            "md5_digest": "2791a6f25bed38cd08321726e632825a",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7.2,<4",
            "size": 14852,
            "upload_time": "2023-12-22T09:31:39",
            "upload_time_iso_8601": "2023-12-22T09:31:39.073433Z",
            "url": "https://files.pythonhosted.org/packages/e9/5c/3232968376b337d683a05fbb6454334ab1f32c81038e2a40dbc71f6136fe/dbt_airflow-2.10.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-22 09:31:39",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "gmyrianthous",
    "github_project": "dbt-airflow",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "dbt-airflow"
}
        
Elapsed time: 0.17265s