# airflow-mcd
Monte Carlo's Airflow provider.
## Installation
Requires Python 3.7 or greater and is compatible with Airflow 1.10.14 or greater.
You can install and update using pip. For instance:
```
pip install -U airflow-mcd
```
This package can be added like any other python dependency to Airflow (e.g. via `requirements.txt`).
## Basic usage
### Callbacks
Sends a webhook back to Monte Carlo upon an event in Airflow. [Detailed examples and documentation here]
(https://docs.getmontecarlo.com/docs/airflow-incidents-dags-and-tasks). Callbacks are at the DAG or Task level.
To import: `from airflow_mcd.callbacks import mcd_callbacks`
#### Broad Callbacks
if you don't have existing callbacks, these provide all-in-one callbacks:
`dag_callbacks`
`task_callbacks`
examples:
```
dag = DAG(
'dag_name',~~~~
**mcd_callbacks.dag_callbacks,
)
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
**mcd_callbacks.task_callbacks,
)
```
#### Explicit Callbacks
| Callback Type | Description | DAG | Task |
| :-------------------- |:-----------------------------------------------------| :------------------------- |:----------------------------|
| `on_success_callback` | Invoked when the DAG/task succeeds | `mcd_dag_success_callback` | `mcd_task_success_callback` |
| `on_failure_callback` | Invoked when the DAG/task fails | `mcd_dag_failure_callback` | `mcd_task_failure_callback` |
| `sla_miss_callback` | Invoked when task(s) in a DAG misses its defined SLA | `mcd_sla_miss_callback` | N/A |
| `on_retry_callback` | Invoked when the task is up for retry | N/A | `mcd_task_retry_callback` |
| `on_execute_callback` | Invoked right before the task begins executing. | N/A | `mcd_task_execute_callback` |
examples:
```
dag = DAG(
'dag_name',
on_success_callback=mcd_callbacks.mcd_dag_success_callback,
on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,
sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,
)
task = BashOperator(
task_id='task_name',
bash_command='command',
dag=dag,
on_success_callback=mcd_callbacks.mcd_task_success_callback,
on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
on_execute_callback=mcd_callbacks.mcd_task_execute_callback,
task_retry_callback=mcd_callbacks.mcd_task_retry_callback,
)
```
### Hooks:
- **SessionHook**
Creates a [pycarlo](https://pypi.org/project/pycarlo/) compatible session. This is useful
for creating your own operator built on top of our Python SDK.
This hook expects an Airflow HTTP connection with the Monte Carlo API id as the "login" and the API token as the
"password".
Alternatively, you could define both the Monte Carlo API id and token in "extra" with the following format:
```
{
"mcd_id": "<ID>",
"mcd_token": "<TOKEN>"
}
```
See [here](https://docs.getmontecarlo.com/docs/creating-an-api-token) for details on how to generate a token.
### Operators:
- **BaseMcdOperator**
This operator can be extended to build your own operator using our [SDK](https://pypi.org/project/pycarlo/) or any other
dependencies. This is useful if you want implement your own custom logic (e.g. creating custom lineage after a task completes).
- **SimpleCircuitBreakerOperator**
This operator can be used to execute a circuit breaker compatible rule (custom SQL monitor) to run integrity tests
before allowing any downstream tasks to execute. Raises an `AirflowFailException` if the rule condition is in
breach when using an Airflow version newer than 1.10.11, as that is preferred for tasks that can be failed without
retrying. Older Airflow versions raise an `AirflowException`. For instance:
```
from datetime import datetime, timedelta
from airflow import DAG
try:
from airflow.operators.bash import BashOperator
except ImportError:
# For airflow versions <= 2.0.0. This module was deprecated in 2.0.0.
from airflow.operators.bash_operator import BashOperator
from airflow_mcd.operators import SimpleCircuitBreakerOperator
mcd_connection_id = 'mcd_default_session'
with DAG('sample-dag', start_date=datetime(2022, 2, 8), catchup=False, schedule_interval=timedelta(1)) as dag:
task1 = BashOperator(
task_id='example_elt_job_1',
bash_command='echo I am transforming a very important table!',
)
breaker = SimpleCircuitBreakerOperator(
task_id='example_circuit_breaker',
mcd_session_conn_id=mcd_connection_id,
rule_uuid='<RULE_UUID>'
)
task2 = BashOperator(
task_id='example_elt_job_2',
bash_command='echo I am building a very important dashboard from the table created in task1!',
trigger_rule='none_failed'
)
task1 >> breaker >> task2
```
This operator expects the following parameters:
- `mcd_session_conn_id`: A SessionHook compatible connection.
- `rule_uuid`: UUID of the rule (custom SQL monitor) to execute.
The following parameters can also be passed:
- `timeout_in_minutes` [default=5]: Polling timeout in minutes. Note that The Data Collector Lambda has a max timeout of
15 minutes when executing a query. Queries that take longer to execute are not supported, so we recommend
filtering down the query output to improve performance (e.g limit WHERE clause). If you expect a query to
take the full 15 minutes we recommend padding the timeout to 20 minutes.
- `fail_open` [default=True]: Prevent any errors or timeouts when executing a rule from stopping your pipeline.
Raises `AirflowSkipException` if set to True and any issues are encountered. Recommended to set the
[trigger_rule](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#trigger-rules)
param for any downstream tasks to `none_failed` in this case.
- **dbt Operators**
The following suite of Airflow operators can be used to execute dbt commands. They include our [dbt Core](https://docs.getmontecarlo.com/docs/dbt-core) integration (via our [Python SDK](https://pypi.org/project/pycarlo/)), to automatically send dbt artifacts to Monte Carlo.
- `DbtBuildOperator`
- `DbtRunOperator`
- `DbtSeedOperator`
- `DbtSnapshotOperator`
- `DbtTestOperator`
Example of usage:
```
from airflow_mcd.operators.dbt import DbtRunOperator
dbt_run = DbtRunOperator(
task_id='run-model', # Airflow task id
project_name='some_project', # name of project to associate dbt results
job_name='some_job', # name of job to associate dbt results
models='some_model', # dbt model selector
mc_conn_id='monte_carlo', # id of Monte Carlo API connection configured in Airflow
)
```
Many more operator options are available. See the base `DbtOperator` for a comprehensive list.
***Advanced Configuration***
To reduce repetitive configuration of the dbt operators, you can define a `DefaultConfigProvider` that would apply
configuration to every Monte Carlo dbt operator.
Example of usage:
```
from airflow_mcd.operators.dbt import DefaultConfig, DefaultConfigProvider
class DefaultConfig(DefaultConfigProvider):
"""
This default configuration will be applied to all Monte Carlo dbt operators.
Any property defined here can be overridden with arguments provided to an operator.
"""
def config(self) -> DbtConfig:
return DbtConfig(
mc_conn_id='monte_carlo',
env={
'foo': 'bar',
}
)
```
The location of this class should be provided in an environment variable:
```
AIRFLOW_MCD_DBT_CONFIG_PROVIDER=configs.dbt.DefaultConfig
```
If you are using AWS Managed Apache Airflow (MWAA), the location of this class should be defined in a configuration
option in your Airflow environment:
```
mc.airflow_mcd_dbt_config_provider=configs.dbt.DefaultConfig
```
## Tests and releases
Locally make test will run all tests. See [README-dev.md](README-dev.md) for additional details on development. When
ready for a review, create a PR against main.
When ready to release, create a new Github release with a tag using semantic versioning (e.g. v0.42.0) and CircleCI will
test and publish to PyPI. Note that an existing version will not be deployed.
## License
Apache 2.0 - See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) for more information.
Raw data
{
"_id": null,
"home_page": "https://www.montecarlodata.com/",
"name": "airflow-mcd",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": null,
"keywords": null,
"author": "Monte Carlo Data, Inc",
"author_email": "info@montecarlodata.com",
"download_url": "https://files.pythonhosted.org/packages/fb/73/ff62b13c6cea940dd8e9e112ea0db4f6854551b7ff129c1b44958d870ae0/airflow_mcd-0.3.4.tar.gz",
"platform": null,
"description": "# airflow-mcd\n\nMonte Carlo's Airflow provider.\n\n## Installation\n\nRequires Python 3.7 or greater and is compatible with Airflow 1.10.14 or greater.\n\nYou can install and update using pip. For instance:\n```\npip install -U airflow-mcd\n```\n\nThis package can be added like any other python dependency to Airflow (e.g. via `requirements.txt`).\n\n## Basic usage\n\n### Callbacks\n\nSends a webhook back to Monte Carlo upon an event in Airflow. [Detailed examples and documentation here]\n(https://docs.getmontecarlo.com/docs/airflow-incidents-dags-and-tasks). Callbacks are at the DAG or Task level.\n\nTo import: `from airflow_mcd.callbacks import mcd_callbacks`\n\n#### Broad Callbacks\n\nif you don't have existing callbacks, these provide all-in-one callbacks:\n\n`dag_callbacks`\n\n`task_callbacks`\n\nexamples:\n\n```\ndag = DAG(\n 'dag_name',~~~~\n **mcd_callbacks.dag_callbacks,\n)\n\ntask = BashOperator(\n task_id='task_name',\n bash_command='command',\n dag=dag,\n **mcd_callbacks.task_callbacks,\n)\n```\n\n#### Explicit Callbacks\n\n| Callback Type | Description | DAG | Task |\n| :-------------------- |:-----------------------------------------------------| :------------------------- |:----------------------------|\n| `on_success_callback` | Invoked when the DAG/task succeeds | `mcd_dag_success_callback` | `mcd_task_success_callback` |\n| `on_failure_callback` | Invoked when the DAG/task fails | `mcd_dag_failure_callback` | `mcd_task_failure_callback` |\n| `sla_miss_callback` | Invoked when task(s) in a DAG misses its defined SLA | `mcd_sla_miss_callback` | N/A |\n| `on_retry_callback` | Invoked when the task is up for retry | N/A | `mcd_task_retry_callback` |\n| `on_execute_callback` | Invoked right before the task begins executing. | N/A | `mcd_task_execute_callback` |\n\nexamples:\n\n```\ndag = DAG(\n 'dag_name',\n on_success_callback=mcd_callbacks.mcd_dag_success_callback,\n on_failure_callback=mcd_callbacks.mcd_dag_failure_callback,\n sla_miss_callback=mcd_callbacks.mcd_sla_miss_callback,\n)\n\ntask = BashOperator(\n task_id='task_name',\n bash_command='command',\n dag=dag,\n on_success_callback=mcd_callbacks.mcd_task_success_callback,\n on_failure_callback=mcd_callbacks.mcd_task_failure_callback,\n on_execute_callback=mcd_callbacks.mcd_task_execute_callback,\n task_retry_callback=mcd_callbacks.mcd_task_retry_callback,\n)\n```\n\n\n### Hooks:\n\n- **SessionHook**\n\n Creates a [pycarlo](https://pypi.org/project/pycarlo/) compatible session. This is useful \n for creating your own operator built on top of our Python SDK.\n\n This hook expects an Airflow HTTP connection with the Monte Carlo API id as the \"login\" and the API token as the\n \"password\".\n\n Alternatively, you could define both the Monte Carlo API id and token in \"extra\" with the following format:\n ```\n {\n \"mcd_id\": \"<ID>\",\n \"mcd_token\": \"<TOKEN>\"\n }\n ```\n See [here](https://docs.getmontecarlo.com/docs/creating-an-api-token) for details on how to generate a token.\n \n### Operators:\n\n- **BaseMcdOperator**\n\n This operator can be extended to build your own operator using our [SDK](https://pypi.org/project/pycarlo/) or any other \n dependencies. This is useful if you want implement your own custom logic (e.g. creating custom lineage after a task completes).\n\n- **SimpleCircuitBreakerOperator**\n \n This operator can be used to execute a circuit breaker compatible rule (custom SQL monitor) to run integrity tests \n before allowing any downstream tasks to execute. Raises an `AirflowFailException` if the rule condition is in\n breach when using an Airflow version newer than 1.10.11, as that is preferred for tasks that can be failed without \n retrying. Older Airflow versions raise an `AirflowException`. For instance:\n ```\n from datetime import datetime, timedelta\n \n from airflow import DAG\n \n try:\n from airflow.operators.bash import BashOperator\n except ImportError:\n # For airflow versions <= 2.0.0. This module was deprecated in 2.0.0.\n from airflow.operators.bash_operator import BashOperator\n \n from airflow_mcd.operators import SimpleCircuitBreakerOperator\n \n mcd_connection_id = 'mcd_default_session'\n \n with DAG('sample-dag', start_date=datetime(2022, 2, 8), catchup=False, schedule_interval=timedelta(1)) as dag:\n task1 = BashOperator(\n task_id='example_elt_job_1',\n bash_command='echo I am transforming a very important table!',\n )\n breaker = SimpleCircuitBreakerOperator(\n task_id='example_circuit_breaker',\n mcd_session_conn_id=mcd_connection_id,\n rule_uuid='<RULE_UUID>'\n )\n task2 = BashOperator(\n task_id='example_elt_job_2',\n bash_command='echo I am building a very important dashboard from the table created in task1!',\n trigger_rule='none_failed'\n )\n \n task1 >> breaker >> task2\n ```\n This operator expects the following parameters:\n - `mcd_session_conn_id`: A SessionHook compatible connection.\n - `rule_uuid`: UUID of the rule (custom SQL monitor) to execute.\n\n The following parameters can also be passed:\n - `timeout_in_minutes` [default=5]: Polling timeout in minutes. Note that The Data Collector Lambda has a max timeout of\n 15 minutes when executing a query. Queries that take longer to execute are not supported, so we recommend\n filtering down the query output to improve performance (e.g limit WHERE clause). If you expect a query to\n take the full 15 minutes we recommend padding the timeout to 20 minutes.\n - `fail_open` [default=True]: Prevent any errors or timeouts when executing a rule from stopping your pipeline.\n Raises `AirflowSkipException` if set to True and any issues are encountered. Recommended to set the \n [trigger_rule](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#trigger-rules)\n param for any downstream tasks to `none_failed` in this case.\n\n- **dbt Operators**\n \n The following suite of Airflow operators can be used to execute dbt commands. They include our [dbt Core](https://docs.getmontecarlo.com/docs/dbt-core) integration (via our [Python SDK](https://pypi.org/project/pycarlo/)), to automatically send dbt artifacts to Monte Carlo.\n - `DbtBuildOperator`\n - `DbtRunOperator`\n - `DbtSeedOperator`\n - `DbtSnapshotOperator`\n - `DbtTestOperator`\n\n Example of usage:\n ```\n from airflow_mcd.operators.dbt import DbtRunOperator\n\n dbt_run = DbtRunOperator(\n task_id='run-model', # Airflow task id\n project_name='some_project', # name of project to associate dbt results\n job_name='some_job', # name of job to associate dbt results\n models='some_model', # dbt model selector\n mc_conn_id='monte_carlo', # id of Monte Carlo API connection configured in Airflow\n )\n ```\n \n Many more operator options are available. See the base `DbtOperator` for a comprehensive list.\n\n ***Advanced Configuration***\n\n To reduce repetitive configuration of the dbt operators, you can define a `DefaultConfigProvider` that would apply\n configuration to every Monte Carlo dbt operator.\n\n Example of usage:\n ```\n from airflow_mcd.operators.dbt import DefaultConfig, DefaultConfigProvider\n\n class DefaultConfig(DefaultConfigProvider):\n \"\"\"\n This default configuration will be applied to all Monte Carlo dbt operators.\n Any property defined here can be overridden with arguments provided to an operator.\n \"\"\"\n def config(self) -> DbtConfig:\n return DbtConfig(\n mc_conn_id='monte_carlo',\n env={\n 'foo': 'bar',\n }\n )\n ```\n The location of this class should be provided in an environment variable:\n ``` \n AIRFLOW_MCD_DBT_CONFIG_PROVIDER=configs.dbt.DefaultConfig\n ```\n \n If you are using AWS Managed Apache Airflow (MWAA), the location of this class should be defined in a configuration\n option in your Airflow environment:\n ```\n mc.airflow_mcd_dbt_config_provider=configs.dbt.DefaultConfig\n ```\n## Tests and releases\n\nLocally make test will run all tests. See [README-dev.md](README-dev.md) for additional details on development. When \nready for a review, create a PR against main.\n\nWhen ready to release, create a new Github release with a tag using semantic versioning (e.g. v0.42.0) and CircleCI will \ntest and publish to PyPI. Note that an existing version will not be deployed.\n\n## License\n\nApache 2.0 - See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) for more information.\n",
"bugtrack_url": null,
"license": "Apache Software License (Apache 2.0)",
"summary": "Monte Carlo's Apache Airflow Provider",
"version": "0.3.4",
"project_urls": {
"Homepage": "https://www.montecarlodata.com/"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "c21d7e3eb158c9543ec94223215ca8a0f36bad614c71b4c1578ed2e85f15eb9b",
"md5": "70d37b17ed7bcffec52b99c91041df7e",
"sha256": "1065359dddafdf56045d2ee6383dfae9ccb56632dd9effca3215c3a8007c6346"
},
"downloads": -1,
"filename": "airflow_mcd-0.3.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "70d37b17ed7bcffec52b99c91041df7e",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 25642,
"upload_time": "2024-12-03T12:43:51",
"upload_time_iso_8601": "2024-12-03T12:43:51.931874Z",
"url": "https://files.pythonhosted.org/packages/c2/1d/7e3eb158c9543ec94223215ca8a0f36bad614c71b4c1578ed2e85f15eb9b/airflow_mcd-0.3.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "fb73ff62b13c6cea940dd8e9e112ea0db4f6854551b7ff129c1b44958d870ae0",
"md5": "91e1a94cb87f6ccb0b287efaf8ae1c4d",
"sha256": "263eadccf54ffa383a03a937cc4e3662e036e09161b533a644d3a7fe306b1194"
},
"downloads": -1,
"filename": "airflow_mcd-0.3.4.tar.gz",
"has_sig": false,
"md5_digest": "91e1a94cb87f6ccb0b287efaf8ae1c4d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 35033,
"upload_time": "2024-12-03T12:43:53",
"upload_time_iso_8601": "2024-12-03T12:43:53.410074Z",
"url": "https://files.pythonhosted.org/packages/fb/73/ff62b13c6cea940dd8e9e112ea0db4f6854551b7ff129c1b44958d870ae0/airflow_mcd-0.3.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-03 12:43:53",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "airflow-mcd"
}