airflow-dbt-cta


Nameairflow-dbt-cta JSON
Version 0.0.10 PyPI version JSON
download
home_pagehttps://github.com/communitytechalliance/airflow-dbt
SummaryApache Airflow integration for dbt (forked from https://github.com/gocardless/airflow-dbt)
upload_time2024-01-29 22:07:10
maintainer
docs_urlNone
authorGoCardless
requires_python
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # airflow-dbt

**NOTE: this repository was forked from https://github.com/gocardless/airflow-dbt in order to release an updated version to PyPi.**

This is a collection of [Airflow](https://airflow.apache.org/) operators to provide easy integration with [dbt](https://www.getdbt.com).

```py
from airflow import DAG
from airflow_dbt_cta.operators.dbt_operator import (
    DbtSeedOperator,
    DbtSnapshotOperator,
    DbtRunOperator,
    DbtTestOperator,
    DbtCleanOperator,
    DbtBuildOperator,
)
from airflow.utils.dates import days_ago

default_args = {
  'dir': '/srv/app/dbt',
  'start_date': days_ago(0)
}

with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:

  dbt_seed = DbtSeedOperator(
    task_id='dbt_seed',
  )

  dbt_snapshot = DbtSnapshotOperator(
    task_id='dbt_snapshot',
  )

  dbt_run = DbtRunOperator(
    task_id='dbt_run',
  )

  dbt_build = DbtBuildOperator(
    task_id='dbt_build',
  )

  dbt_test = DbtTestOperator(
    task_id='dbt_test',
    retries=0,  # Failing tests would fail the task, and we don't want Airflow to try again
  )

  dbt_clean = DbtCleanOperator(
    task_id='dbt_clean',
  )

  dbt_seed >> dbt_snapshot >> dbt_run >> dbt_build >> dbt_test >> dbt_clean
```

## Installation

Install from PyPI:

```sh
pip install airflow-dbt
```

It will also need access to the `dbt` CLI, which should either be on your `PATH` or can be set with the `dbt_bin` argument in each operator.

## Usage

There are six operators currently implemented:

* `DbtDocsGenerateOperator`
  * Calls [`dbt docs generate`](https://docs.getdbt.com/reference/commands/cmd-docs)
* `DbtDepsOperator`
  * Calls [`dbt deps`](https://docs.getdbt.com/docs/deps)
* `DbtSeedOperator`
  * Calls [`dbt seed`](https://docs.getdbt.com/docs/seed)
* `DbtSnapshotOperator`
  * Calls [`dbt snapshot`](https://docs.getdbt.com/docs/snapshot)
* `DbtRunOperator`
  * Calls [`dbt run`](https://docs.getdbt.com/docs/run)
* `DbtTestOperator`
  * Calls [`dbt test`](https://docs.getdbt.com/docs/test)
* `DbtCleanOperator`
  * Calls [`dbt clean`](https://docs.getdbt.com/docs/clean)
* `DbtBuildOperator`
  * Calls [`dbt build`](https://docs.getdbt.com/reference/commands/build)


Each of the above operators accept the following arguments:

* `env`
  * If set as a kwarg dict, passed the given environment variables as the arguments to the dbt task
* `profiles_dir`
  * If set, passed as the `--profiles-dir` argument to the `dbt` command
* `target`
  * If set, passed as the `--target` argument to the `dbt` command
* `dir`
  * The directory to run the `dbt` command in
* `full_refresh`
  * If set to `True`, passes `--full-refresh`
* `vars`
  * If set, passed as the `--vars` argument to the `dbt` command. Should be set as a Python dictionary, as will be passed to the `dbt` command as YAML
* `models`
  * If set, passed as the `--models` argument to the `dbt` command
* `exclude`
  * If set, passed as the `--exclude` argument to the `dbt` command
* `select`
  * If set, passed as the `--select` argument to the `dbt` command
* `selector`
  * If set, passed as the `--selector` argument to the `dbt` command
* `dbt_bin`
  * The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH`
* `verbose`
  * The operator will log verbosely to the Airflow logs
* `warn_error`
  * If set to `True`, passes `--warn-error` argument to `dbt` command and will treat warnings as errors

Typically you will want to use the `DbtRunOperator`, followed by the `DbtTestOperator`, as shown earlier.

You can also use the hook directly. Typically this can be used for when you need to combine the `dbt` command with another task in the same operators, for example running `dbt docs` and uploading the docs to somewhere they can be served from.

## Building Locally

To install from the repository:
First it's recommended to create a virtual environment:
```bash
python3 -m venv .venv

source .venv/bin/activate
```

Install using `pip`:
```bash
pip install .
```

## Testing

To run tests locally, first create a virtual environment (see [Building Locally](https://github.com/gocardless/airflow-dbt#building-locally) section)

Install dependencies:
```bash
pip install . pytest
```

Run the tests:
```bash
pytest tests/
```

## Code style
This project uses [flake8](https://flake8.pycqa.org/en/latest/).

To check your code, first create a virtual environment (see [Building Locally](https://github.com/gocardless/airflow-dbt#building-locally) section):
```bash
pip install flake8
flake8 airflow_dbt/ tests/ setup.py
```

## Package management

If you use dbt's package manager you should include all dependencies before deploying your dbt project.

For Docker users, packages specified in `packages.yml` should be included as part your docker image by calling `dbt deps` in your `Dockerfile`.

## Amazon Managed Workflows for Apache Airflow (MWAA)

If you use MWAA, you just need to update the `requirements.txt` file and add `airflow-dbt` and `dbt` to it.

Then you can have your dbt code inside a folder `{DBT_FOLDER}` in the dags folder on S3 and configure the dbt task like below:

```python
dbt_run = DbtRunOperator(
  task_id='dbt_run',
  dbt_bin='/usr/local/airflow/.local/bin/dbt',
  profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
  dir='/usr/local/airflow/dags/{DBT_FOLDER}/'
)
```

## Templating and parsing environments variables

If you would like to run DBT using custom profile definition template with environment-specific variables, like for example profiles.yml using jinja:
```yaml
<profile_name>:
  outputs:
    <source>:
      database: "{{ env_var('DBT_ENV_SECRET_DATABASE') }}"
      password: "{{ env_var('DBT_ENV_SECRET_PASSWORD') }}"
      schema: "{{ env_var('DBT_ENV_SECRET_SCHEMA') }}"
      threads: "{{ env_var('DBT_THREADS') }}"
      type: <type>
      user: "{{ env_var('USER_NAME') }}_{{ env_var('ENV_NAME') }}"
  target: <source>
```

You can pass the environment variables via the `env` kwarg parameter:

```python
import os
...

dbt_run = DbtRunOperator(
  task_id='dbt_run',
  env={
    'DBT_ENV_SECRET_DATABASE': '<DATABASE>',
    'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',
    'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',
    'USER_NAME': '<USER_NAME>',
    'DBT_THREADS': os.getenv('<DBT_THREADS_ENV_VARIABLE_NAME>'),
    'ENV_NAME': os.getenv('ENV_NAME')
  }
)
```

## License & Contributing

* This is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).
* Bug reports and pull requests are welcome on GitHub at https://github.com/gocardless/airflow-dbt.

GoCardless ♥ open source. If you do too, come [join us](https://gocardless.com/about/jobs).

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/communitytechalliance/airflow-dbt",
    "name": "airflow-dbt-cta",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "",
    "author": "GoCardless",
    "author_email": "engineering@gocardless.com",
    "download_url": "https://files.pythonhosted.org/packages/c1/2f/ba942fd056b6e82857e8be55f851448e05e574020ac6c13e8ac22211a645/airflow_dbt_cta-0.0.10.tar.gz",
    "platform": null,
    "description": "# airflow-dbt\n\n**NOTE: this repository was forked from https://github.com/gocardless/airflow-dbt in order to release an updated version to PyPi.**\n\nThis is a collection of [Airflow](https://airflow.apache.org/) operators to provide easy integration with [dbt](https://www.getdbt.com).\n\n```py\nfrom airflow import DAG\nfrom airflow_dbt_cta.operators.dbt_operator import (\n    DbtSeedOperator,\n    DbtSnapshotOperator,\n    DbtRunOperator,\n    DbtTestOperator,\n    DbtCleanOperator,\n    DbtBuildOperator,\n)\nfrom airflow.utils.dates import days_ago\n\ndefault_args = {\n  'dir': '/srv/app/dbt',\n  'start_date': days_ago(0)\n}\n\nwith DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:\n\n  dbt_seed = DbtSeedOperator(\n    task_id='dbt_seed',\n  )\n\n  dbt_snapshot = DbtSnapshotOperator(\n    task_id='dbt_snapshot',\n  )\n\n  dbt_run = DbtRunOperator(\n    task_id='dbt_run',\n  )\n\n  dbt_build = DbtBuildOperator(\n    task_id='dbt_build',\n  )\n\n  dbt_test = DbtTestOperator(\n    task_id='dbt_test',\n    retries=0,  # Failing tests would fail the task, and we don't want Airflow to try again\n  )\n\n  dbt_clean = DbtCleanOperator(\n    task_id='dbt_clean',\n  )\n\n  dbt_seed >> dbt_snapshot >> dbt_run >> dbt_build >> dbt_test >> dbt_clean\n```\n\n## Installation\n\nInstall from PyPI:\n\n```sh\npip install airflow-dbt\n```\n\nIt will also need access to the `dbt` CLI, which should either be on your `PATH` or can be set with the `dbt_bin` argument in each operator.\n\n## Usage\n\nThere are six operators currently implemented:\n\n* `DbtDocsGenerateOperator`\n  * Calls [`dbt docs generate`](https://docs.getdbt.com/reference/commands/cmd-docs)\n* `DbtDepsOperator`\n  * Calls [`dbt deps`](https://docs.getdbt.com/docs/deps)\n* `DbtSeedOperator`\n  * Calls [`dbt seed`](https://docs.getdbt.com/docs/seed)\n* `DbtSnapshotOperator`\n  * Calls [`dbt snapshot`](https://docs.getdbt.com/docs/snapshot)\n* `DbtRunOperator`\n  * Calls [`dbt run`](https://docs.getdbt.com/docs/run)\n* `DbtTestOperator`\n  * Calls [`dbt test`](https://docs.getdbt.com/docs/test)\n* `DbtCleanOperator`\n  * Calls [`dbt clean`](https://docs.getdbt.com/docs/clean)\n* `DbtBuildOperator`\n  * Calls [`dbt build`](https://docs.getdbt.com/reference/commands/build)\n\n\nEach of the above operators accept the following arguments:\n\n* `env`\n  * If set as a kwarg dict, passed the given environment variables as the arguments to the dbt task\n* `profiles_dir`\n  * If set, passed as the `--profiles-dir` argument to the `dbt` command\n* `target`\n  * If set, passed as the `--target` argument to the `dbt` command\n* `dir`\n  * The directory to run the `dbt` command in\n* `full_refresh`\n  * If set to `True`, passes `--full-refresh`\n* `vars`\n  * If set, passed as the `--vars` argument to the `dbt` command. Should be set as a Python dictionary, as will be passed to the `dbt` command as YAML\n* `models`\n  * If set, passed as the `--models` argument to the `dbt` command\n* `exclude`\n  * If set, passed as the `--exclude` argument to the `dbt` command\n* `select`\n  * If set, passed as the `--select` argument to the `dbt` command\n* `selector`\n  * If set, passed as the `--selector` argument to the `dbt` command\n* `dbt_bin`\n  * The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH`\n* `verbose`\n  * The operator will log verbosely to the Airflow logs\n* `warn_error`\n  * If set to `True`, passes `--warn-error` argument to `dbt` command and will treat warnings as errors\n\nTypically you will want to use the `DbtRunOperator`, followed by the `DbtTestOperator`, as shown earlier.\n\nYou can also use the hook directly. Typically this can be used for when you need to combine the `dbt` command with another task in the same operators, for example running `dbt docs` and uploading the docs to somewhere they can be served from.\n\n## Building Locally\n\nTo install from the repository:\nFirst it's recommended to create a virtual environment:\n```bash\npython3 -m venv .venv\n\nsource .venv/bin/activate\n```\n\nInstall using `pip`:\n```bash\npip install .\n```\n\n## Testing\n\nTo run tests locally, first create a virtual environment (see [Building Locally](https://github.com/gocardless/airflow-dbt#building-locally) section)\n\nInstall dependencies:\n```bash\npip install . pytest\n```\n\nRun the tests:\n```bash\npytest tests/\n```\n\n## Code style\nThis project uses [flake8](https://flake8.pycqa.org/en/latest/).\n\nTo check your code, first create a virtual environment (see [Building Locally](https://github.com/gocardless/airflow-dbt#building-locally) section):\n```bash\npip install flake8\nflake8 airflow_dbt/ tests/ setup.py\n```\n\n## Package management\n\nIf you use dbt's package manager you should include all dependencies before deploying your dbt project.\n\nFor Docker users, packages specified in `packages.yml` should be included as part your docker image by calling `dbt deps` in your `Dockerfile`.\n\n## Amazon Managed Workflows for Apache Airflow (MWAA)\n\nIf you use MWAA, you just need to update the `requirements.txt` file and add `airflow-dbt` and `dbt` to it.\n\nThen you can have your dbt code inside a folder `{DBT_FOLDER}` in the dags folder on S3 and configure the dbt task like below:\n\n```python\ndbt_run =\u00a0DbtRunOperator(\n  task_id='dbt_run',\n  dbt_bin='/usr/local/airflow/.local/bin/dbt',\n  profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',\n  dir='/usr/local/airflow/dags/{DBT_FOLDER}/'\n)\n```\n\n## Templating and parsing environments variables\n\nIf you would like to run DBT using custom profile definition template with environment-specific variables, like for example profiles.yml using jinja:\n```yaml\n<profile_name>:\n  outputs:\n    <source>:\n      database: \"{{ env_var('DBT_ENV_SECRET_DATABASE') }}\"\n      password: \"{{ env_var('DBT_ENV_SECRET_PASSWORD') }}\"\n      schema: \"{{ env_var('DBT_ENV_SECRET_SCHEMA') }}\"\n      threads: \"{{ env_var('DBT_THREADS') }}\"\n      type: <type>\n      user: \"{{ env_var('USER_NAME') }}_{{ env_var('ENV_NAME') }}\"\n  target: <source>\n```\n\nYou can pass the environment variables via the `env` kwarg parameter:\n\n```python\nimport os\n...\n\ndbt_run =\u00a0DbtRunOperator(\n  task_id='dbt_run',\n  env={\n    'DBT_ENV_SECRET_DATABASE': '<DATABASE>',\n    'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',\n    'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',\n    'USER_NAME': '<USER_NAME>',\n    'DBT_THREADS': os.getenv('<DBT_THREADS_ENV_VARIABLE_NAME>'),\n    'ENV_NAME': os.getenv('ENV_NAME')\n  }\n)\n```\n\n## License & Contributing\n\n* This is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).\n* Bug reports and pull requests are welcome on GitHub at https://github.com/gocardless/airflow-dbt.\n\nGoCardless \u2665 open source. If you do too, come [join us](https://gocardless.com/about/jobs).\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Apache Airflow integration for dbt (forked from https://github.com/gocardless/airflow-dbt)",
    "version": "0.0.10",
    "project_urls": {
        "Homepage": "https://github.com/communitytechalliance/airflow-dbt"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c12fba942fd056b6e82857e8be55f851448e05e574020ac6c13e8ac22211a645",
                "md5": "d9b860f283585c63ce2e7097460f5ad8",
                "sha256": "a067c613d1476118a0b1a49ec0ebcf4f0f8543d7056c88d1e256b372589c5e1b"
            },
            "downloads": -1,
            "filename": "airflow_dbt_cta-0.0.10.tar.gz",
            "has_sig": false,
            "md5_digest": "d9b860f283585c63ce2e7097460f5ad8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 10033,
            "upload_time": "2024-01-29T22:07:10",
            "upload_time_iso_8601": "2024-01-29T22:07:10.363488Z",
            "url": "https://files.pythonhosted.org/packages/c1/2f/ba942fd056b6e82857e8be55f851448e05e574020ac6c13e8ac22211a645/airflow_dbt_cta-0.0.10.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-01-29 22:07:10",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "communitytechalliance",
    "github_project": "airflow-dbt",
    "github_not_found": true,
    "lcname": "airflow-dbt-cta"
}
        
Elapsed time: 5.28605s