airflow-config


Nameairflow-config JSON
Version 0.2.1 PyPI version JSON
download
home_pageNone
SummaryAirflow utilities for configuration of many DAGs and DAG environments
upload_time2024-12-17 01:23:13
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseApache-2.0
keywords airflow config scheduler
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # airflow-config

[Apache Airflow](https://airflow.apache.org) utilities for for configuration of many DAGs and DAG environments

[![Build Status](https://github.com/airflow-laminar/airflow-config/actions/workflows/build.yml/badge.svg?branch=main&event=push)](https://github.com/airflow-laminar/airflow-config/actions/workflows/build.yml)
[![codecov](https://codecov.io/gh/airflow-laminar/airflow-config/branch/main/graph/badge.svg)](https://codecov.io/gh/airflow-laminar/airflow-config)
[![License](https://img.shields.io/github/license/airflow-laminar/airflow-config)](https://github.com/airflow-laminar/airflow-config)
[![PyPI](https://img.shields.io/pypi/v/airflow-config.svg)](https://pypi.python.org/pypi/airflow-config)

## Overview

This library allows for `YAML`-driven configuration of Airflow, including DAGs, Operators, and declaratively defined DAGs (à la [dag-factory](https://github.com/astronomer/dag-factory)). It is built with [Pydantic](https://pydantic.dev), [Hydra](https://hydra.cc), and [OmegaConf](https://omegaconf.readthedocs.io/).

Consider the following basic DAG:

```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

with DAG(
    dag_id="test-dag",
    default_args={
        "depends_on_past": False,
        "email": ["my.email@myemail.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0,
    },
    description="test that dag is working properly",
    schedule=timedelta(minutes=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["utility", "test"],
):
    BashOperator(
        task_id="test-task",
        bash_command="echo 'test'",
    )
```

We can already see many options that we might want to drive centrally via config, perhaps based on some notion of environment (e.g. `dev`, `prod`, etc).

- `"email": ["my.email@myemail.com"]`
- `"email_on_failure": False`
- `"email_on_retry": False`
- `"retries": 0`
- `schedule=timedelta(minutes=1)`
- `tags=["utility", "test"]`

If we want to change these in our DAG, we need to modify code. Now imagine we have hundreds of DAGs, this can quickly get out of hand, especially since Airflow DAGs are Python code, and we might easily inject a syntax error or a trailing comma or other common problem.

Now consider the alternative, config-driven approach:

`config/dev.yaml`

```yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.TaskArgs
  owner: test
  email: [myemail@myemail.com]
  email_on_failure: false
  email_on_retry: false
  retries: 0
  depends_on_past: false
default_dag_args:
  _target_: airflow_config.DagArgs
  schedule: "01:00"
  start_date: "2024-01-01"
  catchup: false
  tags: ["utility", "test"]
```

```python
from airflow.operators.bash import BashOperator
from airflow_config import DAG, load_config

config = load_config(config_name="dev")

with DAG(
    dag_id="test-dag",
    description="test that dag is working properly",
    schedule=timedelta(minutes=1),
    config=config
):
    BashOperator(
        task_id="test-task",
        bash_command="echo 'test'",
    )
```

This has a number of benefits:

- Make changes without code changes, with static type validation
- Make changes across any number of DAGs without having to copy-paste
- Organize collections of DAGs into groups, e.g. via enviroment like `dev`, `prod`, etc

## Features

- Configure DAGs from a central config file or...
- from multiple env-specific config files (e.g. `dev`, `uat`, `prod`)
- Specialize DAGs by `dag_id` from a single file (e.g. set each DAG's `schedule` from a single shared file)
- Generate entire DAGs declaratively, like [astronomer/dag-factory](https://github.com/astronomer/dag-factory)
- Configure other extensions like [airflow-priority](https://github.com/airflow-laminar/airflow-priority), [airflow-supervisor](https://github.com/airflow-laminar/airflow-supervisor)


## Configuration

```python
class Configuration(BaseModel):
    # default task args
    # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator
    default_task_args: TaskArgs

    # default dag args
    # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
    default_dag_args: DagArgs

    # string (dag id) to Dag mapping
    dags: Optional[Dict[str, Dag]]

    # string (dag id) to Task mapping
    tasks: Optional[Dict[str, Task]]

    # used for extensions to inject arbitrary configuration.
    # See e.g.: https://github.com/airflow-laminar/airflow-supervisor?tab=readme-ov-file#example-dag-airflow-config
    extensions: Optional[Dict[str, BaseModel]]
```

### Examples - Load defaults from config


```yaml
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.DefaultTaskArgs
  owner: test
```

```python
from airflow_config import load_config, DAG, create_dag

conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert conf.default_args.owner == "test"
```

### Examples - Load more defaults from config

```yaml
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.DefaultTaskArgs
  owner: test
  email: [myemail@myemail.com]
  email_on_failure: false
  email_on_retry: false
  retries: 0
  depends_on_past: false
default_dag_args:
  _target: airflow_config.DagArgs
  schedule: "01:10"
  start_date: "2024-01-01"
  catchup: false
  tags: ["utility", "test"]
```

```python
from airflow_config import load_config, DAG, create_dag

conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert conf.default_args.owner == "test"
assert conf.default_args.email == ["myemail@myemail.com"]
assert conf.default_args.email_on_failure is False
assert conf.default_args.email_on_retry is False
assert conf.default_args.retries == 0
assert conf.default_args.depends_on_past is False
assert conf.default_dag_args.start_date == datetime(2024, 1, 1)
assert conf.default_dag_args.catchup is False
assert conf.default_dag_args.tags == ["utility", "test"]
```

### Examples - Specialize individual DAGs

```yaml
# config/test.yaml
# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.TaskArgs
  owner: test
  email: [myemail@myemail.com]
  email_on_failure: false
  email_on_retry: false
  retries: 0
  depends_on_past: false

default_dag_args:
  _target: airflow_config.DagArgs
  schedule: "01:00"
  start_date: "2024-01-01"
  catchup: false
  tags: ["utility", "test"]

dags:
  example_dag:
    default_args:
      owner: "custom_owner"
    description: "this is an example dag"
    schedule: "0 3 * * *"

  example_dag2:
    default_args:
      owner: "custom_owner2"
    schedule: "0 4 * * *"
```

```python
from airflow_config import load_config, DAG, create_dag

conf = load_config("config", "test")
d = create_dag("config", "test")
# or d = DAG(dag_id="test-dag", config=conf)
assert d.default_args["owner"] == "test"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.default_args["email_on_failure"] is False
assert d.default_args["email_on_retry"] is False
assert d.default_args["retries"] == 0
assert d.default_args["depends_on_past"] is False
assert d.schedule_interval == timedelta(seconds=3600)
assert isinstance(d.timetable, DeltaDataIntervalTimetable)
assert isinstance(d.timetable._delta, timedelta)
assert d.start_date.year == 2024
assert d.start_date.month == 1
assert d.start_date.day == 1
assert d.catchup is False
assert d.tags == ["utility", "test"]

# specialized by dag_id from shared config file
d = DAG(dag_id="example_dag", config=conf)
assert d.default_args["owner"] == "custom_owner"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.schedule_interval == "0 3 * * *"

# specialized by dag_id from shared config file
d = DAG(dag_id="example_dag2", config=conf)
assert d.default_args["owner"] == "custom_owner2"
assert d.default_args["email"] == ["myemail@myemail.com"]
assert d.schedule_interval == "0 4 * * *"
```

### Examples - DAG Factory

## Integrations

Configuration can be arbitrarily extended by the key `extensions`. Support is built in for [`airflow-priority`](https://github.com/airflow-laminar/airflow-priority), but can be extended to any aribitrary pydantic model as seen in the [README of `airflow-supervisor`](https://github.com/airflow-laminar/airflow-supervisor).

## License

This software is licensed under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "airflow-config",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "airflow, config, scheduler",
    "author": null,
    "author_email": "the airflow-config authors <3105306+timkpaine@users.noreply.github.com>",
    "download_url": "https://files.pythonhosted.org/packages/c7/8b/c19d0b8c92ada32cd594338a18f760f03deed7c756da0aa619842134eb7b/airflow_config-0.2.1.tar.gz",
    "platform": null,
    "description": "# airflow-config\n\n[Apache Airflow](https://airflow.apache.org) utilities for for configuration of many DAGs and DAG environments\n\n[![Build Status](https://github.com/airflow-laminar/airflow-config/actions/workflows/build.yml/badge.svg?branch=main&event=push)](https://github.com/airflow-laminar/airflow-config/actions/workflows/build.yml)\n[![codecov](https://codecov.io/gh/airflow-laminar/airflow-config/branch/main/graph/badge.svg)](https://codecov.io/gh/airflow-laminar/airflow-config)\n[![License](https://img.shields.io/github/license/airflow-laminar/airflow-config)](https://github.com/airflow-laminar/airflow-config)\n[![PyPI](https://img.shields.io/pypi/v/airflow-config.svg)](https://pypi.python.org/pypi/airflow-config)\n\n## Overview\n\nThis library allows for `YAML`-driven configuration of Airflow, including DAGs, Operators, and declaratively defined DAGs (\u00e0 la [dag-factory](https://github.com/astronomer/dag-factory)). It is built with [Pydantic](https://pydantic.dev), [Hydra](https://hydra.cc), and [OmegaConf](https://omegaconf.readthedocs.io/).\n\nConsider the following basic DAG:\n\n```python\nfrom airflow import DAG\nfrom airflow.operators.bash import BashOperator\nfrom datetime import datetime, timedelta\n\nwith DAG(\n    dag_id=\"test-dag\",\n    default_args={\n        \"depends_on_past\": False,\n        \"email\": [\"my.email@myemail.com\"],\n        \"email_on_failure\": False,\n        \"email_on_retry\": False,\n        \"retries\": 0,\n    },\n    description=\"test that dag is working properly\",\n    schedule=timedelta(minutes=1),\n    start_date=datetime(2024, 1, 1),\n    catchup=False,\n    tags=[\"utility\", \"test\"],\n):\n    BashOperator(\n        task_id=\"test-task\",\n        bash_command=\"echo 'test'\",\n    )\n```\n\nWe can already see many options that we might want to drive centrally via config, perhaps based on some notion of environment (e.g. `dev`, `prod`, etc).\n\n- `\"email\": [\"my.email@myemail.com\"]`\n- `\"email_on_failure\": False`\n- `\"email_on_retry\": False`\n- `\"retries\": 0`\n- `schedule=timedelta(minutes=1)`\n- `tags=[\"utility\", \"test\"]`\n\nIf we want to change these in our DAG, we need to modify code. Now imagine we have hundreds of DAGs, this can quickly get out of hand, especially since Airflow DAGs are Python code, and we might easily inject a syntax error or a trailing comma or other common problem.\n\nNow consider the alternative, config-driven approach:\n\n`config/dev.yaml`\n\n```yaml\n# @package _global_\n_target_: airflow_config.Configuration\ndefault_args:\n  _target_: airflow_config.TaskArgs\n  owner: test\n  email: [myemail@myemail.com]\n  email_on_failure: false\n  email_on_retry: false\n  retries: 0\n  depends_on_past: false\ndefault_dag_args:\n  _target_: airflow_config.DagArgs\n  schedule: \"01:00\"\n  start_date: \"2024-01-01\"\n  catchup: false\n  tags: [\"utility\", \"test\"]\n```\n\n```python\nfrom airflow.operators.bash import BashOperator\nfrom airflow_config import DAG, load_config\n\nconfig = load_config(config_name=\"dev\")\n\nwith DAG(\n    dag_id=\"test-dag\",\n    description=\"test that dag is working properly\",\n    schedule=timedelta(minutes=1),\n    config=config\n):\n    BashOperator(\n        task_id=\"test-task\",\n        bash_command=\"echo 'test'\",\n    )\n```\n\nThis has a number of benefits:\n\n- Make changes without code changes, with static type validation\n- Make changes across any number of DAGs without having to copy-paste\n- Organize collections of DAGs into groups, e.g. via enviroment like `dev`, `prod`, etc\n\n## Features\n\n- Configure DAGs from a central config file or...\n- from multiple env-specific config files (e.g. `dev`, `uat`, `prod`)\n- Specialize DAGs by `dag_id` from a single file (e.g. set each DAG's `schedule` from a single shared file)\n- Generate entire DAGs declaratively, like [astronomer/dag-factory](https://github.com/astronomer/dag-factory)\n- Configure other extensions like [airflow-priority](https://github.com/airflow-laminar/airflow-priority), [airflow-supervisor](https://github.com/airflow-laminar/airflow-supervisor)\n\n\n## Configuration\n\n```python\nclass Configuration(BaseModel):\n    # default task args\n    # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator\n    default_task_args: TaskArgs\n\n    # default dag args\n    # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG\n    default_dag_args: DagArgs\n\n    # string (dag id) to Dag mapping\n    dags: Optional[Dict[str, Dag]]\n\n    # string (dag id) to Task mapping\n    tasks: Optional[Dict[str, Task]]\n\n    # used for extensions to inject arbitrary configuration.\n    # See e.g.: https://github.com/airflow-laminar/airflow-supervisor?tab=readme-ov-file#example-dag-airflow-config\n    extensions: Optional[Dict[str, BaseModel]]\n```\n\n### Examples - Load defaults from config\n\n\n```yaml\n# config/test.yaml\n# @package _global_\n_target_: airflow_config.Configuration\ndefault_args:\n  _target_: airflow_config.DefaultTaskArgs\n  owner: test\n```\n\n```python\nfrom airflow_config import load_config, DAG, create_dag\n\nconf = load_config(\"config\", \"test\")\nd = create_dag(\"config\", \"test\")\n# or d = DAG(dag_id=\"test-dag\", config=conf)\nassert conf.default_args.owner == \"test\"\n```\n\n### Examples - Load more defaults from config\n\n```yaml\n# config/test.yaml\n# @package _global_\n_target_: airflow_config.Configuration\ndefault_args:\n  _target_: airflow_config.DefaultTaskArgs\n  owner: test\n  email: [myemail@myemail.com]\n  email_on_failure: false\n  email_on_retry: false\n  retries: 0\n  depends_on_past: false\ndefault_dag_args:\n  _target: airflow_config.DagArgs\n  schedule: \"01:10\"\n  start_date: \"2024-01-01\"\n  catchup: false\n  tags: [\"utility\", \"test\"]\n```\n\n```python\nfrom airflow_config import load_config, DAG, create_dag\n\nconf = load_config(\"config\", \"test\")\nd = create_dag(\"config\", \"test\")\n# or d = DAG(dag_id=\"test-dag\", config=conf)\nassert conf.default_args.owner == \"test\"\nassert conf.default_args.email == [\"myemail@myemail.com\"]\nassert conf.default_args.email_on_failure is False\nassert conf.default_args.email_on_retry is False\nassert conf.default_args.retries == 0\nassert conf.default_args.depends_on_past is False\nassert conf.default_dag_args.start_date == datetime(2024, 1, 1)\nassert conf.default_dag_args.catchup is False\nassert conf.default_dag_args.tags == [\"utility\", \"test\"]\n```\n\n### Examples - Specialize individual DAGs\n\n```yaml\n# config/test.yaml\n# @package _global_\n_target_: airflow_config.Configuration\ndefault_args:\n  _target_: airflow_config.TaskArgs\n  owner: test\n  email: [myemail@myemail.com]\n  email_on_failure: false\n  email_on_retry: false\n  retries: 0\n  depends_on_past: false\n\ndefault_dag_args:\n  _target: airflow_config.DagArgs\n  schedule: \"01:00\"\n  start_date: \"2024-01-01\"\n  catchup: false\n  tags: [\"utility\", \"test\"]\n\ndags:\n  example_dag:\n    default_args:\n      owner: \"custom_owner\"\n    description: \"this is an example dag\"\n    schedule: \"0 3 * * *\"\n\n  example_dag2:\n    default_args:\n      owner: \"custom_owner2\"\n    schedule: \"0 4 * * *\"\n```\n\n```python\nfrom airflow_config import load_config, DAG, create_dag\n\nconf = load_config(\"config\", \"test\")\nd = create_dag(\"config\", \"test\")\n# or d = DAG(dag_id=\"test-dag\", config=conf)\nassert d.default_args[\"owner\"] == \"test\"\nassert d.default_args[\"email\"] == [\"myemail@myemail.com\"]\nassert d.default_args[\"email_on_failure\"] is False\nassert d.default_args[\"email_on_retry\"] is False\nassert d.default_args[\"retries\"] == 0\nassert d.default_args[\"depends_on_past\"] is False\nassert d.schedule_interval == timedelta(seconds=3600)\nassert isinstance(d.timetable, DeltaDataIntervalTimetable)\nassert isinstance(d.timetable._delta, timedelta)\nassert d.start_date.year == 2024\nassert d.start_date.month == 1\nassert d.start_date.day == 1\nassert d.catchup is False\nassert d.tags == [\"utility\", \"test\"]\n\n# specialized by dag_id from shared config file\nd = DAG(dag_id=\"example_dag\", config=conf)\nassert d.default_args[\"owner\"] == \"custom_owner\"\nassert d.default_args[\"email\"] == [\"myemail@myemail.com\"]\nassert d.schedule_interval == \"0 3 * * *\"\n\n# specialized by dag_id from shared config file\nd = DAG(dag_id=\"example_dag2\", config=conf)\nassert d.default_args[\"owner\"] == \"custom_owner2\"\nassert d.default_args[\"email\"] == [\"myemail@myemail.com\"]\nassert d.schedule_interval == \"0 4 * * *\"\n```\n\n### Examples - DAG Factory\n\n## Integrations\n\nConfiguration can be arbitrarily extended by the key `extensions`. Support is built in for [`airflow-priority`](https://github.com/airflow-laminar/airflow-priority), but can be extended to any aribitrary pydantic model as seen in the [README of `airflow-supervisor`](https://github.com/airflow-laminar/airflow-supervisor).\n\n## License\n\nThis software is licensed under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Airflow utilities for configuration of many DAGs and DAG environments",
    "version": "0.2.1",
    "project_urls": {
        "Homepage": "https://github.com/airflow-laminar/airflow-config",
        "Repository": "https://github.com/airflow-laminar/airflow-config"
    },
    "split_keywords": [
        "airflow",
        " config",
        " scheduler"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "711490cd3b8ba3bd0e64775bfb0f014c84ea5e2a9512c5bebab8642557023f7d",
                "md5": "87ac4368ae0569135c61edd24fd7a2ef",
                "sha256": "1b6c7e435970e7a76ae4df7b9c2e101de8c2bd176e2c9c754e924648ccba346f"
            },
            "downloads": -1,
            "filename": "airflow_config-0.2.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "87ac4368ae0569135c61edd24fd7a2ef",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 40161,
            "upload_time": "2024-12-17T01:23:10",
            "upload_time_iso_8601": "2024-12-17T01:23:10.728765Z",
            "url": "https://files.pythonhosted.org/packages/71/14/90cd3b8ba3bd0e64775bfb0f014c84ea5e2a9512c5bebab8642557023f7d/airflow_config-0.2.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c78bc19d0b8c92ada32cd594338a18f760f03deed7c756da0aa619842134eb7b",
                "md5": "89c4afd5e4e49691c3af84670dc3808f",
                "sha256": "880abdbca590c56ac5673394d8085fd0f7dc548a75528703c82000aae3c42ed3"
            },
            "downloads": -1,
            "filename": "airflow_config-0.2.1.tar.gz",
            "has_sig": false,
            "md5_digest": "89c4afd5e4e49691c3af84670dc3808f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 26671,
            "upload_time": "2024-12-17T01:23:13",
            "upload_time_iso_8601": "2024-12-17T01:23:13.308976Z",
            "url": "https://files.pythonhosted.org/packages/c7/8b/c19d0b8c92ada32cd594338a18f760f03deed7c756da0aa619842134eb7b/airflow_config-0.2.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-12-17 01:23:13",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "airflow-laminar",
    "github_project": "airflow-config",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "airflow-config"
}
        
Elapsed time: 0.49583s