# airflow-dbt
This is a collection of [Airflow](https://airflow.apache.org/) operators to provide easy integration with [dbt](https://www.getdbt.com).
```py
from airflow import DAG
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator,
DbtCleanOperator,
)
from airflow.utils.dates import days_ago
default_args = {
'dir': '/srv/app/dbt',
'start_date': days_ago(0)
}
with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:
dbt_seed = DbtSeedOperator(
task_id='dbt_seed',
)
dbt_snapshot = DbtSnapshotOperator(
task_id='dbt_snapshot',
)
dbt_run = DbtRunOperator(
task_id='dbt_run',
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)
dbt_clean = DbtCleanOperator(
task_id='dbt_clean',
)
dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test >> dbt_clean
```
## Installation
Install from PyPI:
```sh
pip install airflow-dbt-winwin
```
It will also need access to the `dbt` CLI, which should either be on your `PATH` or can be set with the `dbt_bin` argument in each operator.
## Usage
There are five operators currently implemented:
* `DbtDocsGenerateOperator`
* Calls [`dbt docs generate`](https://docs.getdbt.com/reference/commands/cmd-docs)
* `DbtDepsOperator`
* Calls [`dbt deps`](https://docs.getdbt.com/docs/deps)
* `DbtSeedOperator`
* Calls [`dbt seed`](https://docs.getdbt.com/docs/seed)
* `DbtSnapshotOperator`
* Calls [`dbt snapshot`](https://docs.getdbt.com/docs/snapshot)
* `DbtRunOperator`
* Calls [`dbt run`](https://docs.getdbt.com/docs/run)
* `DbtTestOperator`
* Calls [`dbt test`](https://docs.getdbt.com/docs/test)
* `DbtCleanOperator`
* Calls [`dbt clean`](https://docs.getdbt.com/docs/clean)
Each of the above operators accept the following arguments:
* `env`
* If set as a kwarg dict, passed the given environment variables as the arguments to the dbt task
* `profiles_dir`
* If set, passed as the `--profiles-dir` argument to the `dbt` command
* `target`
* If set, passed as the `--target` argument to the `dbt` command
* `dir`
* The directory to run the `dbt` command in
* `full_refresh`
* If set to `True`, passes `--full-refresh`
* `vars`
* If set, passed as the `--vars` argument to the `dbt` command. Should be set as a Python dictionary, as will be passed to the `dbt` command as YAML
* `models`
* If set, passed as the `--models` argument to the `dbt` command
* `exclude`
* If set, passed as the `--exclude` argument to the `dbt` command
* `select`
* If set, passed as the `--select` argument to the `dbt` command
* `selector`
* If set, passed as the `--selector` argument to the `dbt` command
* `dbt_bin`
* The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH`
* `verbose`
* The operator will log verbosely to the Airflow logs
* `warn_error`
* If set to `True`, passes `--warn-error` argument to `dbt` command and will treat warnings as errors
Typically you will want to use the `DbtRunOperator`, followed by the `DbtTestOperator`, as shown earlier.
You can also use the hook directly. Typically this can be used for when you need to combine the `dbt` command with another task in the same operators, for example running `dbt docs` and uploading the docs to somewhere they can be served from.
## Building Locally
To install from the repository:
First it's recommended to create a virtual environment:
```bash
python3 -m venv .venv
source .venv/bin/activate
```
Install using `pip`:
```bash
pip install .
```
## Testing
To run tests locally, first create a virtual environment (see [Building Locally](https://github.com/gocardless/airflow-dbt#building-locally) section)
Install dependencies:
```bash
pip install . pytest
```
Run the tests:
```bash
pytest tests/
```
## Code style
This project uses [flake8](https://flake8.pycqa.org/en/latest/).
To check your code, first create a virtual environment (see [Building Locally](https://github.com/gocardless/airflow-dbt#building-locally) section):
```bash
pip install flake8
flake8 airflow_dbt/ tests/ setup.py
```
## Package management
If you use dbt's package manager you should include all dependencies before deploying your dbt project.
For Docker users, packages specified in `packages.yml` should be included as part your docker image by calling `dbt deps` in your `Dockerfile`.
## Amazon Managed Workflows for Apache Airflow (MWAA)
If you use MWAA, you just need to update the `requirements.txt` file and add `airflow-dbt` and `dbt` to it.
Then you can have your dbt code inside a folder `{DBT_FOLDER}` in the dags folder on S3 and configure the dbt task like below:
```python
dbt_run = DbtRunOperator(
task_id='dbt_run',
dbt_bin='/usr/local/airflow/.local/bin/dbt',
profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
dir='/usr/local/airflow/dags/{DBT_FOLDER}/'
)
```
## Templating and parsing environments variables
If you would like to run DBT using custom profile definition template with environment-specific variables, like for example profiles.yml using jinja:
```yaml
<profile_name>:
outputs:
<source>:
database: "{{ env_var('DBT_ENV_SECRET_DATABASE') }}"
password: "{{ env_var('DBT_ENV_SECRET_PASSWORD') }}"
schema: "{{ env_var('DBT_ENV_SECRET_SCHEMA') }}"
threads: "{{ env_var('DBT_THREADS') }}"
type: <type>
user: "{{ env_var('USER_NAME') }}_{{ env_var('ENV_NAME') }}"
target: <source>
```
You can pass the environment variables via the `env` kwarg parameter:
```python
import os
...
dbt_run = DbtRunOperator(
task_id='dbt_run',
env={
'DBT_ENV_SECRET_DATABASE': '<DATABASE>',
'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',
'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',
'USER_NAME': '<USER_NAME>',
'DBT_THREADS': os.getenv('<DBT_THREADS_ENV_VARIABLE_NAME>'),
'ENV_NAME': os.getenv('ENV_NAME')
}
)
```
## License & Contributing
* This is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).
* Bug reports and pull requests are welcome on GitHub at https://github.com/gocardless/airflow-dbt.
GoCardless ♥ open source. If you do too, come [join us](https://gocardless.com/about/jobs).
Raw data
{
"_id": null,
"home_page": "https://github.com/gocardless/airflow-dbt",
"name": "airflow-dbt-winwin",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": null,
"author": "GoCardless",
"author_email": "engineering@gocardless.com",
"download_url": "https://files.pythonhosted.org/packages/f8/d4/ff40b37f212ddbed8d989b5de5fcad3e5aaba171814b6b93873d5021a008/airflow_dbt_winwin-0.5.15.tar.gz",
"platform": null,
"description": "# airflow-dbt\n\nThis is a collection of [Airflow](https://airflow.apache.org/) operators to provide easy integration with [dbt](https://www.getdbt.com).\n\n```py\nfrom airflow import DAG\nfrom airflow_dbt.operators.dbt_operator import (\n DbtSeedOperator,\n DbtSnapshotOperator,\n DbtRunOperator,\n DbtTestOperator,\n DbtCleanOperator,\n)\nfrom airflow.utils.dates import days_ago\n\ndefault_args = {\n 'dir': '/srv/app/dbt',\n 'start_date': days_ago(0)\n}\n\nwith DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:\n\n dbt_seed = DbtSeedOperator(\n task_id='dbt_seed',\n )\n\n dbt_snapshot = DbtSnapshotOperator(\n task_id='dbt_snapshot',\n )\n\n dbt_run = DbtRunOperator(\n task_id='dbt_run',\n )\n\n dbt_test = DbtTestOperator(\n task_id='dbt_test',\n retries=0, # Failing tests would fail the task, and we don't want Airflow to try again\n )\n\n dbt_clean = DbtCleanOperator(\n task_id='dbt_clean',\n )\n\n dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test >> dbt_clean\n```\n\n## Installation\n\nInstall from PyPI:\n\n```sh\npip install airflow-dbt-winwin\n```\n\nIt will also need access to the `dbt` CLI, which should either be on your `PATH` or can be set with the `dbt_bin` argument in each operator.\n\n## Usage\n\nThere are five operators currently implemented:\n\n* `DbtDocsGenerateOperator`\n * Calls [`dbt docs generate`](https://docs.getdbt.com/reference/commands/cmd-docs)\n* `DbtDepsOperator`\n * Calls [`dbt deps`](https://docs.getdbt.com/docs/deps)\n* `DbtSeedOperator`\n * Calls [`dbt seed`](https://docs.getdbt.com/docs/seed)\n* `DbtSnapshotOperator`\n * Calls [`dbt snapshot`](https://docs.getdbt.com/docs/snapshot)\n* `DbtRunOperator`\n * Calls [`dbt run`](https://docs.getdbt.com/docs/run)\n* `DbtTestOperator`\n * Calls [`dbt test`](https://docs.getdbt.com/docs/test)\n* `DbtCleanOperator`\n * Calls [`dbt clean`](https://docs.getdbt.com/docs/clean)\n\n\nEach of the above operators accept the following arguments:\n\n* `env`\n * If set as a kwarg dict, passed the given environment variables as the arguments to the dbt task\n* `profiles_dir`\n * If set, passed as the `--profiles-dir` argument to the `dbt` command\n* `target`\n * If set, passed as the `--target` argument to the `dbt` command\n* `dir`\n * The directory to run the `dbt` command in\n* `full_refresh`\n * If set to `True`, passes `--full-refresh`\n* `vars`\n * If set, passed as the `--vars` argument to the `dbt` command. Should be set as a Python dictionary, as will be passed to the `dbt` command as YAML\n* `models`\n * If set, passed as the `--models` argument to the `dbt` command\n* `exclude`\n * If set, passed as the `--exclude` argument to the `dbt` command\n* `select`\n * If set, passed as the `--select` argument to the `dbt` command\n* `selector`\n * If set, passed as the `--selector` argument to the `dbt` command\n* `dbt_bin`\n * The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH`\n* `verbose`\n * The operator will log verbosely to the Airflow logs\n* `warn_error`\n * If set to `True`, passes `--warn-error` argument to `dbt` command and will treat warnings as errors\n\nTypically you will want to use the `DbtRunOperator`, followed by the `DbtTestOperator`, as shown earlier.\n\nYou can also use the hook directly. Typically this can be used for when you need to combine the `dbt` command with another task in the same operators, for example running `dbt docs` and uploading the docs to somewhere they can be served from.\n\n## Building Locally\n\nTo install from the repository:\nFirst it's recommended to create a virtual environment:\n```bash\npython3 -m venv .venv\n\nsource .venv/bin/activate\n```\n\nInstall using `pip`:\n```bash\npip install .\n```\n\n## Testing\n\nTo run tests locally, first create a virtual environment (see [Building Locally](https://github.com/gocardless/airflow-dbt#building-locally) section)\n\nInstall dependencies:\n```bash\npip install . pytest\n```\n\nRun the tests:\n```bash\npytest tests/\n```\n\n## Code style\nThis project uses [flake8](https://flake8.pycqa.org/en/latest/).\n\nTo check your code, first create a virtual environment (see [Building Locally](https://github.com/gocardless/airflow-dbt#building-locally) section):\n```bash\npip install flake8\nflake8 airflow_dbt/ tests/ setup.py\n```\n\n## Package management\n\nIf you use dbt's package manager you should include all dependencies before deploying your dbt project.\n\nFor Docker users, packages specified in `packages.yml` should be included as part your docker image by calling `dbt deps` in your `Dockerfile`.\n\n## Amazon Managed Workflows for Apache Airflow (MWAA)\n\nIf you use MWAA, you just need to update the `requirements.txt` file and add `airflow-dbt` and `dbt` to it.\n\nThen you can have your dbt code inside a folder `{DBT_FOLDER}` in the dags folder on S3 and configure the dbt task like below:\n\n```python\ndbt_run =\u00a0DbtRunOperator(\n task_id='dbt_run',\n dbt_bin='/usr/local/airflow/.local/bin/dbt',\n profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',\n dir='/usr/local/airflow/dags/{DBT_FOLDER}/'\n)\n```\n\n## Templating and parsing environments variables\n\nIf you would like to run DBT using custom profile definition template with environment-specific variables, like for example profiles.yml using jinja:\n```yaml\n<profile_name>:\n outputs:\n <source>:\n database: \"{{ env_var('DBT_ENV_SECRET_DATABASE') }}\"\n password: \"{{ env_var('DBT_ENV_SECRET_PASSWORD') }}\"\n schema: \"{{ env_var('DBT_ENV_SECRET_SCHEMA') }}\"\n threads: \"{{ env_var('DBT_THREADS') }}\"\n type: <type>\n user: \"{{ env_var('USER_NAME') }}_{{ env_var('ENV_NAME') }}\"\n target: <source>\n```\n\nYou can pass the environment variables via the `env` kwarg parameter:\n\n```python\nimport os\n...\n\ndbt_run =\u00a0DbtRunOperator(\n task_id='dbt_run',\n env={\n 'DBT_ENV_SECRET_DATABASE': '<DATABASE>',\n 'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',\n 'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',\n 'USER_NAME': '<USER_NAME>',\n 'DBT_THREADS': os.getenv('<DBT_THREADS_ENV_VARIABLE_NAME>'),\n 'ENV_NAME': os.getenv('ENV_NAME')\n }\n)\n```\n\n## License & Contributing\n\n* This is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).\n* Bug reports and pull requests are welcome on GitHub at https://github.com/gocardless/airflow-dbt.\n\nGoCardless \u2665 open source. If you do too, come [join us](https://gocardless.com/about/jobs).\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Apache Airflow integration for dbt",
"version": "0.5.15",
"project_urls": {
"Homepage": "https://github.com/gocardless/airflow-dbt"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "67ab2684ab874574986f39b3029cad1ae750df5889f33dd83bc4d12e6051ec69",
"md5": "0fa9daeaae096146dcbedbd9025befd4",
"sha256": "73b626666c69a14c974b9c6d9a9e37e3ea0ea4e0c362b6a4d60370eb82bc1eab"
},
"downloads": -1,
"filename": "airflow_dbt_winwin-0.5.15-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "0fa9daeaae096146dcbedbd9025befd4",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 9595,
"upload_time": "2024-12-05T09:20:50",
"upload_time_iso_8601": "2024-12-05T09:20:50.991068Z",
"url": "https://files.pythonhosted.org/packages/67/ab/2684ab874574986f39b3029cad1ae750df5889f33dd83bc4d12e6051ec69/airflow_dbt_winwin-0.5.15-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "f8d4ff40b37f212ddbed8d989b5de5fcad3e5aaba171814b6b93873d5021a008",
"md5": "99cbbca78d606108963d2c226daa3e6c",
"sha256": "1b2acac8c9787746a4649e925355a8a0d0c702a0d2d87bed5b821276c5c04524"
},
"downloads": -1,
"filename": "airflow_dbt_winwin-0.5.15.tar.gz",
"has_sig": false,
"md5_digest": "99cbbca78d606108963d2c226daa3e6c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 10860,
"upload_time": "2024-12-05T09:20:52",
"upload_time_iso_8601": "2024-12-05T09:20:52.182218Z",
"url": "https://files.pythonhosted.org/packages/f8/d4/ff40b37f212ddbed8d989b5de5fcad3e5aaba171814b6b93873d5021a008/airflow_dbt_winwin-0.5.15.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-05 09:20:52",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "gocardless",
"github_project": "airflow-dbt",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "airflow-dbt-winwin"
}