dbnd-airflow


Namedbnd-airflow JSON
Version 1.0.27.2 PyPI version JSON
download
home_pagehttps://github.com/databand-ai/dbnd
SummaryMachine Learning Orchestration
upload_time2024-10-31 16:10:30
maintainerEvgeny Shulman
docs_urlNone
authorEvgeny Shulman
requires_pythonNone
licenseNone
keywords orchestration data machinelearning
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Dbnd Airflow Operator

This plugin was written to provide an explicit way of declaratively passing messages between two airflow operators.

This plugin was inspired by [AIP-31](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+definition).
Essentially, this plugin connects between dbnd's implementation of tasks and pipelines to airflow operators.

This implementation uses XCom communication and XCom templates to transfer said messages.
This plugin is fully functional, however as soon as AIP-31 is implemented it will support all edge-cases.

Fully tested on airflow 1.10.X.

# Code Example

Here is an example of how we achieve our goal:

```python
import logging
from typing import Tuple
from datetime import timedelta, datetime
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from dbnd import task

# Define arguments that we will pass to our DAG
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": days_ago(2),
    "retries": 1,
    "retry_delay": timedelta(seconds=10),
}
@task
def my_task(p_int=3, p_str="check", p_int_with_default=0) -> str:
    logging.info("I am running")
    return "success"


@task
def my_multiple_outputs(p_str="some_string") -> Tuple[int, str]:
    return (1, p_str + "_extra_postfix")


def some_python_function(input_path, output_path):
    logging.error("I am running")
    input_value = open(input_path, "r").read()
    with open(output_path, "w") as output_file:
        output_file.write(input_value)
        output_file.write("\n\n")
        output_file.write(str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S")))
    return "success"

# Define DAG context
with DAG(dag_id="dbnd_operators", default_args=default_args) as dag_operators:
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    # All tasks and operators created under this DAG context will be collected as a part of this DAG
    t1 = my_task(2)
    t2, t3 = my_multiple_outputs(t1)
    python_op = PythonOperator(
        task_id="some_python_function",
        python_callable=some_python_function,
        op_kwargs={"input_path": t3, "output_path": "/tmp/output.txt"},
    )
    """
    t3.op describes the operator used to execute my_multiple_outputs
    This call defines the some_python_function task's operator as dependent upon t3's operator
    """
    python_op.set_upstream(t3.op)
```

As you can see, messages are passed explicitly between all three tasks:

-   t1, the result of the first task is passed to the next task my_multiple_outputs
-   t2 and t3 represent the results of my_multiple_outputs
-   some_python_function is wrapped with an operator
-   The new python operator is defined as dependent upon t3's execution (downstream) - explicitly.

> Note: If you run a function marked with the `@task` decorator without a DAG context, and without using the dbnd
> library to run it - it will execute absolutely normally!

Using this method to pass arguments between tasks not only improves developer user-experience, but also allows
for pipeline execution support for many use-cases. It does not break currently existing DAGs.

# Using dbnd_config

Let's look at the example again, but change the default_args defined at the very top:

```python
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": days_ago(2),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    'dbnd_config': {
        "my_task.p_int_with_default": 4
    }
}
```

Added a new key-value pair to the arguments called `dbnd_config`

`dbnd_config` is expected to define a dictionary of configuration settings that you can pass to your tasks. For example,
the `dbnd_config` in this code section defines that the int parameter `p_int_with_default` passed to my_task will be
overridden and changed to `4` from the default value `0`.

To see further possibilities of changing configuration settings, see our [documentation](https://dbnd.readme.io/)

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/databand-ai/dbnd",
    "name": "dbnd-airflow",
    "maintainer": "Evgeny Shulman",
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": "evgeny.shulman@databand.ai",
    "keywords": "orchestration, data, machinelearning",
    "author": "Evgeny Shulman",
    "author_email": "evgeny.shulman@databand.ai",
    "download_url": "https://files.pythonhosted.org/packages/d7/f8/f91647a4c0fe51d30bf62c5aef7999c05cd0942f7a7806a15c8dc22b6ec5/dbnd_airflow-1.0.27.2.tar.gz",
    "platform": "any",
    "description": "# Dbnd Airflow Operator\n\nThis plugin was written to provide an explicit way of declaratively passing messages between two airflow operators.\n\nThis plugin was inspired by [AIP-31](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-31%3A+Airflow+functional+DAG+definition).\nEssentially, this plugin connects between dbnd's implementation of tasks and pipelines to airflow operators.\n\nThis implementation uses XCom communication and XCom templates to transfer said messages.\nThis plugin is fully functional, however as soon as AIP-31 is implemented it will support all edge-cases.\n\nFully tested on airflow 1.10.X.\n\n# Code Example\n\nHere is an example of how we achieve our goal:\n\n```python\nimport logging\nfrom typing import Tuple\nfrom datetime import timedelta, datetime\nfrom airflow import DAG\nfrom airflow.utils.dates import days_ago\nfrom airflow.operators.python_operator import PythonOperator\nfrom dbnd import task\n\n# Define arguments that we will pass to our DAG\ndefault_args = {\n    \"owner\": \"airflow\",\n    \"depends_on_past\": False,\n    \"start_date\": days_ago(2),\n    \"retries\": 1,\n    \"retry_delay\": timedelta(seconds=10),\n}\n@task\ndef my_task(p_int=3, p_str=\"check\", p_int_with_default=0) -> str:\n    logging.info(\"I am running\")\n    return \"success\"\n\n\n@task\ndef my_multiple_outputs(p_str=\"some_string\") -> Tuple[int, str]:\n    return (1, p_str + \"_extra_postfix\")\n\n\ndef some_python_function(input_path, output_path):\n    logging.error(\"I am running\")\n    input_value = open(input_path, \"r\").read()\n    with open(output_path, \"w\") as output_file:\n        output_file.write(input_value)\n        output_file.write(\"\\n\\n\")\n        output_file.write(str(datetime.now().strftime(\"%Y-%m-%dT%H:%M:%S\")))\n    return \"success\"\n\n# Define DAG context\nwith DAG(dag_id=\"dbnd_operators\", default_args=default_args) as dag_operators:\n    # t1, t2 and t3 are examples of tasks created by instantiating operators\n    # All tasks and operators created under this DAG context will be collected as a part of this DAG\n    t1 = my_task(2)\n    t2, t3 = my_multiple_outputs(t1)\n    python_op = PythonOperator(\n        task_id=\"some_python_function\",\n        python_callable=some_python_function,\n        op_kwargs={\"input_path\": t3, \"output_path\": \"/tmp/output.txt\"},\n    )\n    \"\"\"\n    t3.op describes the operator used to execute my_multiple_outputs\n    This call defines the some_python_function task's operator as dependent upon t3's operator\n    \"\"\"\n    python_op.set_upstream(t3.op)\n```\n\nAs you can see, messages are passed explicitly between all three tasks:\n\n-   t1, the result of the first task is passed to the next task my_multiple_outputs\n-   t2 and t3 represent the results of my_multiple_outputs\n-   some_python_function is wrapped with an operator\n-   The new python operator is defined as dependent upon t3's execution (downstream) - explicitly.\n\n> Note: If you run a function marked with the `@task` decorator without a DAG context, and without using the dbnd\n> library to run it - it will execute absolutely normally!\n\nUsing this method to pass arguments between tasks not only improves developer user-experience, but also allows\nfor pipeline execution support for many use-cases. It does not break currently existing DAGs.\n\n# Using dbnd_config\n\nLet's look at the example again, but change the default_args defined at the very top:\n\n```python\ndefault_args = {\n    \"owner\": \"airflow\",\n    \"depends_on_past\": False,\n    \"start_date\": days_ago(2),\n    \"retries\": 1,\n    \"retry_delay\": timedelta(minutes=5),\n    'dbnd_config': {\n        \"my_task.p_int_with_default\": 4\n    }\n}\n```\n\nAdded a new key-value pair to the arguments called `dbnd_config`\n\n`dbnd_config` is expected to define a dictionary of configuration settings that you can pass to your tasks. For example,\nthe `dbnd_config` in this code section defines that the int parameter `p_int_with_default` passed to my_task will be\noverridden and changed to `4` from the default value `0`.\n\nTo see further possibilities of changing configuration settings, see our [documentation](https://dbnd.readme.io/)\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Machine Learning Orchestration",
    "version": "1.0.27.2",
    "project_urls": {
        "Bug-Tracker": "https://github.com/databand-ai/dbnd/issues",
        "Documentation": "https://dbnd.readme.io/",
        "Homepage": "https://github.com/databand-ai/dbnd",
        "Source-Code": "https://github.com/databand-ai/dbnd"
    },
    "split_keywords": [
        "orchestration",
        " data",
        " machinelearning"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e9386968460c629d7cdd294a5a2d73a4725f635ab179263449ce8ad79d7879f7",
                "md5": "3e7343ee5132bf2cd87d3c843562eb5f",
                "sha256": "8a270862941825c6e62278442eff74ed17642e42cfe751940cb2ac9de68911a0"
            },
            "downloads": -1,
            "filename": "dbnd_airflow-1.0.27.2-py2.py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3e7343ee5132bf2cd87d3c843562eb5f",
            "packagetype": "bdist_wheel",
            "python_version": "py2.py3",
            "requires_python": null,
            "size": 48285,
            "upload_time": "2024-10-31T16:09:45",
            "upload_time_iso_8601": "2024-10-31T16:09:45.317445Z",
            "url": "https://files.pythonhosted.org/packages/e9/38/6968460c629d7cdd294a5a2d73a4725f635ab179263449ce8ad79d7879f7/dbnd_airflow-1.0.27.2-py2.py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d7f8f91647a4c0fe51d30bf62c5aef7999c05cd0942f7a7806a15c8dc22b6ec5",
                "md5": "666c1ed8f87e41635c543e6dc0a7ceae",
                "sha256": "bcd60bbc5f3e07b626d2f61f2b92755fa889ecf6f69340072b03cb00d1813962"
            },
            "downloads": -1,
            "filename": "dbnd_airflow-1.0.27.2.tar.gz",
            "has_sig": false,
            "md5_digest": "666c1ed8f87e41635c543e6dc0a7ceae",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 40214,
            "upload_time": "2024-10-31T16:10:30",
            "upload_time_iso_8601": "2024-10-31T16:10:30.213729Z",
            "url": "https://files.pythonhosted.org/packages/d7/f8/f91647a4c0fe51d30bf62c5aef7999c05cd0942f7a7806a15c8dc22b6ec5/dbnd_airflow-1.0.27.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-10-31 16:10:30",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "databand-ai",
    "github_project": "dbnd",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "tox": true,
    "lcname": "dbnd-airflow"
}
        
Elapsed time: 0.37480s