airflow-ha


Nameairflow-ha JSON
Version 0.1.2 PyPI version JSON
download
home_pageNone
SummaryHigh Availability (HA) DAG Utility
upload_time2024-08-29 15:49:48
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseApache-2.0
keywords airflow config high-availability scheduler
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # airflow-ha

High Availability (HA) DAG Utility

[![Build Status](https://github.com/airflow-laminar/airflow-ha/actions/workflows/build.yml/badge.svg?branch=main&event=push)](https://github.com/airflow-laminar/airflow-ha/actions/workflows/build.yml)
[![codecov](https://codecov.io/gh/airflow-laminar/airflow-ha/branch/main/graph/badge.svg)](https://codecov.io/gh/airflow-laminar/airflow-ha)
[![License](https://img.shields.io/github/license/airflow-laminar/airflow-ha)](https://github.com/airflow-laminar/airflow-ha)
[![PyPI](https://img.shields.io/pypi/v/airflow-ha.svg)](https://pypi.python.org/pypi/airflow-ha)

## Overview

This library provides an operator called `HighAvailabilityOperator`, which inherits from `PythonSensor` and runs a user-provided `python_callable`.
The return value can trigger the following actions:

| Return              | Result                                       | Current DAGrun End State |
| :-----              | :-----                                       | :----------------------- |
| `(PASS, RETRIGGER)` | Retrigger the same DAG to run again          | `pass`                   |
| `(PASS, STOP)`      | Finish the DAG, until its next scheduled run | `pass`                   |
| `(FAIL, RETRIGGER)` | Retrigger the same DAG to run again          | `fail`                   |
| `(FAIL, STOP)`      | Finish the DAG, until its next scheduled run | `fail`                   |
| `(*, CONTINUE)`     | Continue to run the Sensor                   | N/A                      |

Note: if the sensor times out, the behavior matches `(Result.PASS, Action.RETRIGGER)`.

### Example - Always On

Consider the following DAG:

```python
with DAG(
    dag_id="test-high-availability",
    description="Test HA Operator",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
):
    ha = HighAvailabilityOperator(
        task_id="ha",
        timeout=30,
        poke_interval=5,
        python_callable=lambda **kwargs: choice(
            (
                (Result.PASS, Action.CONTINUE),
                (Result.PASS, Action.RETRIGGER),
                (Result.PASS, Action.STOP),
                (Result.FAIL, Action.CONTINUE),
                (Result.FAIL, Action.RETRIGGER),
                (Result.FAIL, Action.STOP),
            )
        ),
    )
    
    pre = PythonOperator(task_id="pre", python_callable=lambda **kwargs: "test")
    pre >> ha
    
    retrigger_fail = PythonOperator(task_id="retrigger_fail", python_callable=lambda **kwargs: "test")
    ha.retrigger_fail >> retrigger_fail

    stop_fail = PythonOperator(task_id="stop_fail", python_callable=lambda **kwargs: "test")
    ha.stop_fail >> stop_fail
    
    retrigger_pass = PythonOperator(task_id="retrigger_pass", python_callable=lambda **kwargs: "test")
    ha.retrigger_pass >> retrigger_pass

    stop_pass = PythonOperator(task_id="stop_pass", python_callable=lambda **kwargs: "test")
    ha.stop_pass >> stop_pass
```

This produces a DAG with the following topology:

<img src="https://raw.githubusercontent.com/airflow-laminar/airflow-ha/main/docs/src/top.png" />

This DAG exhibits cool behavior.
If the check returns `CONTINUE`, the DAG will continue to run the sensor.
If the check returns `RETRIGGER` or the interval elapses, the DAG will re-trigger itself and finish.
If the check returns `STOP`, the DAG will finish and not retrigger itself. 
If the check returns `PASS`, the current DAG run will end in a successful state.
If the check returns `FAIL`, the current DAG run will end in a failed state.

This allows the one to build "always-on" DAGs without having individual long blocking tasks.

This library is used to build [airflow-supervisor](https://github.com/airflow-laminar/airflow-supervisor), which uses [supervisor](http://supervisord.org) as a process-monitor while checking and restarting jobs via `airflow-ha`.

### Example - Recursive

You can also use this library to build recursive DAGs - or "Cyclic DAGs", despite the oxymoronic name.

The following code makes a DAG that triggers itself with some decrementing counter, starting with value 3:

```python

with DAG(
    dag_id="test-ha-counter",
    description="Test HA Countdown",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
):
    
    def _get_count(**kwargs):
        # The default is 3
        return kwargs['dag_run'].conf.get('counter', 3) - 1

    get_count = PythonOperator(task_id="get-count", python_callable=_get_count)

    def _keep_counting(**kwargs):
        count = kwargs["task_instance"].xcom_pull(key="return_value", task_ids="get-count")
        return (Result.PASS, Action.RETRIGGER) if count > 0 else (Result.PASS, Action.STOP) if count == 0 else (Result.FAIL, Action.STOP)

    keep_counting = HighAvailabilityOperator(
        task_id="ha",
        timeout=30,
        poke_interval=5,
        python_callable=_keep_counting,
        pass_trigger_kwargs={"conf": '''{"counter": {{ ti.xcom_pull(key="return_value", task_ids="get-count") }}}'''},
    )

    get_count >> keep_counting
```
<img src="https://raw.githubusercontent.com/airflow-laminar/airflow-ha/main/docs/src/rec.png" />

## License

This software is licensed under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "airflow-ha",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "airflow, config, high-availability, scheduler",
    "author": null,
    "author_email": "the airflow-config authors <3105306+timkpaine@users.noreply.github.com>",
    "download_url": "https://files.pythonhosted.org/packages/84/e4/3e124e1ce5bb15e124ae6ec49fb3ed8064a1e0b961b966e52747724c4172/airflow_ha-0.1.2.tar.gz",
    "platform": null,
    "description": "# airflow-ha\n\nHigh Availability (HA) DAG Utility\n\n[![Build Status](https://github.com/airflow-laminar/airflow-ha/actions/workflows/build.yml/badge.svg?branch=main&event=push)](https://github.com/airflow-laminar/airflow-ha/actions/workflows/build.yml)\n[![codecov](https://codecov.io/gh/airflow-laminar/airflow-ha/branch/main/graph/badge.svg)](https://codecov.io/gh/airflow-laminar/airflow-ha)\n[![License](https://img.shields.io/github/license/airflow-laminar/airflow-ha)](https://github.com/airflow-laminar/airflow-ha)\n[![PyPI](https://img.shields.io/pypi/v/airflow-ha.svg)](https://pypi.python.org/pypi/airflow-ha)\n\n## Overview\n\nThis library provides an operator called `HighAvailabilityOperator`, which inherits from `PythonSensor` and runs a user-provided `python_callable`.\nThe return value can trigger the following actions:\n\n| Return              | Result                                       | Current DAGrun End State |\n| :-----              | :-----                                       | :----------------------- |\n| `(PASS, RETRIGGER)` | Retrigger the same DAG to run again          | `pass`                   |\n| `(PASS, STOP)`      | Finish the DAG, until its next scheduled run | `pass`                   |\n| `(FAIL, RETRIGGER)` | Retrigger the same DAG to run again          | `fail`                   |\n| `(FAIL, STOP)`      | Finish the DAG, until its next scheduled run | `fail`                   |\n| `(*, CONTINUE)`     | Continue to run the Sensor                   | N/A                      |\n\nNote: if the sensor times out, the behavior matches `(Result.PASS, Action.RETRIGGER)`.\n\n### Example - Always On\n\nConsider the following DAG:\n\n```python\nwith DAG(\n    dag_id=\"test-high-availability\",\n    description=\"Test HA Operator\",\n    schedule=timedelta(days=1),\n    start_date=datetime(2024, 1, 1),\n    catchup=False,\n):\n    ha = HighAvailabilityOperator(\n        task_id=\"ha\",\n        timeout=30,\n        poke_interval=5,\n        python_callable=lambda **kwargs: choice(\n            (\n                (Result.PASS, Action.CONTINUE),\n                (Result.PASS, Action.RETRIGGER),\n                (Result.PASS, Action.STOP),\n                (Result.FAIL, Action.CONTINUE),\n                (Result.FAIL, Action.RETRIGGER),\n                (Result.FAIL, Action.STOP),\n            )\n        ),\n    )\n    \n    pre = PythonOperator(task_id=\"pre\", python_callable=lambda **kwargs: \"test\")\n    pre >> ha\n    \n    retrigger_fail = PythonOperator(task_id=\"retrigger_fail\", python_callable=lambda **kwargs: \"test\")\n    ha.retrigger_fail >> retrigger_fail\n\n    stop_fail = PythonOperator(task_id=\"stop_fail\", python_callable=lambda **kwargs: \"test\")\n    ha.stop_fail >> stop_fail\n    \n    retrigger_pass = PythonOperator(task_id=\"retrigger_pass\", python_callable=lambda **kwargs: \"test\")\n    ha.retrigger_pass >> retrigger_pass\n\n    stop_pass = PythonOperator(task_id=\"stop_pass\", python_callable=lambda **kwargs: \"test\")\n    ha.stop_pass >> stop_pass\n```\n\nThis produces a DAG with the following topology:\n\n<img src=\"https://raw.githubusercontent.com/airflow-laminar/airflow-ha/main/docs/src/top.png\" />\n\nThis DAG exhibits cool behavior.\nIf the check returns `CONTINUE`, the DAG will continue to run the sensor.\nIf the check returns `RETRIGGER` or the interval elapses, the DAG will re-trigger itself and finish.\nIf the check returns `STOP`, the DAG will finish and not retrigger itself. \nIf the check returns `PASS`, the current DAG run will end in a successful state.\nIf the check returns `FAIL`, the current DAG run will end in a failed state.\n\nThis allows the one to build \"always-on\" DAGs without having individual long blocking tasks.\n\nThis library is used to build [airflow-supervisor](https://github.com/airflow-laminar/airflow-supervisor), which uses [supervisor](http://supervisord.org) as a process-monitor while checking and restarting jobs via `airflow-ha`.\n\n### Example - Recursive\n\nYou can also use this library to build recursive DAGs - or \"Cyclic DAGs\", despite the oxymoronic name.\n\nThe following code makes a DAG that triggers itself with some decrementing counter, starting with value 3:\n\n```python\n\nwith DAG(\n    dag_id=\"test-ha-counter\",\n    description=\"Test HA Countdown\",\n    schedule=timedelta(days=1),\n    start_date=datetime(2024, 1, 1),\n    catchup=False,\n):\n    \n    def _get_count(**kwargs):\n        # The default is 3\n        return kwargs['dag_run'].conf.get('counter', 3) - 1\n\n    get_count = PythonOperator(task_id=\"get-count\", python_callable=_get_count)\n\n    def _keep_counting(**kwargs):\n        count = kwargs[\"task_instance\"].xcom_pull(key=\"return_value\", task_ids=\"get-count\")\n        return (Result.PASS, Action.RETRIGGER) if count > 0 else (Result.PASS, Action.STOP) if count == 0 else (Result.FAIL, Action.STOP)\n\n    keep_counting = HighAvailabilityOperator(\n        task_id=\"ha\",\n        timeout=30,\n        poke_interval=5,\n        python_callable=_keep_counting,\n        pass_trigger_kwargs={\"conf\": '''{\"counter\": {{ ti.xcom_pull(key=\"return_value\", task_ids=\"get-count\") }}}'''},\n    )\n\n    get_count >> keep_counting\n```\n<img src=\"https://raw.githubusercontent.com/airflow-laminar/airflow-ha/main/docs/src/rec.png\" />\n\n## License\n\nThis software is licensed under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "High Availability (HA) DAG Utility",
    "version": "0.1.2",
    "project_urls": {
        "Homepage": "https://github.com/airflow-laminar/airflow-ha",
        "Repository": "https://github.com/airflow-laminar/airflow-ha"
    },
    "split_keywords": [
        "airflow",
        " config",
        " high-availability",
        " scheduler"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6e44932761e83209369fc8f204d54a5ad98f4e2ba8bc09d51f06b120274bb11c",
                "md5": "4f7c6260a9b30777a66930cb693ce4b1",
                "sha256": "33af2b527aaa11fad3b9fbed58772c57f4e8caec577d7d2c4a2a530fc0e48c3d"
            },
            "downloads": -1,
            "filename": "airflow_ha-0.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4f7c6260a9b30777a66930cb693ce4b1",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 9284,
            "upload_time": "2024-08-29T15:49:47",
            "upload_time_iso_8601": "2024-08-29T15:49:47.523066Z",
            "url": "https://files.pythonhosted.org/packages/6e/44/932761e83209369fc8f204d54a5ad98f4e2ba8bc09d51f06b120274bb11c/airflow_ha-0.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "84e43e124e1ce5bb15e124ae6ec49fb3ed8064a1e0b961b966e52747724c4172",
                "md5": "aff58a073b883764a9e100133fa5373d",
                "sha256": "779b5832cb4ab3c389aee1b453a115b1f595dcbd14fe18ce9f6b6ee717410cfa"
            },
            "downloads": -1,
            "filename": "airflow_ha-0.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "aff58a073b883764a9e100133fa5373d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 9549,
            "upload_time": "2024-08-29T15:49:48",
            "upload_time_iso_8601": "2024-08-29T15:49:48.378999Z",
            "url": "https://files.pythonhosted.org/packages/84/e4/3e124e1ce5bb15e124ae6ec49fb3ed8064a1e0b961b966e52747724c4172/airflow_ha-0.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-08-29 15:49:48",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "airflow-laminar",
    "github_project": "airflow-ha",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "airflow-ha"
}
        
Elapsed time: 4.72558s