# Astronomer SnowPatrol Plugin
The Astronomer SnowPatrol Plugin is an Airflow plugin designed to enhance your Snowflake data operations within Airflow.
This plugin installs a policy in your Airflow environment adding query tags to all Airflow Snowflake sql queries.
The Plugin will add the following Sowflake query tags to any Snowflake-related Airflow Operators:
- `dag_id`
- `task_id`
- `run_id`
- `logical_date`
- `started`
- `operator`
Once the Plugin is installed, Tags are automatically sent to Snowflake allowing you to query the QUERY_HISTORY table and
identify all queries run by a given Airflow DAG or Task.
Snowflake Costs can be then attributed to specific DAGs and Tasks.
## Features
- **Query Tagging**: Automatically adds query tags to Snowflake operators within Airflow DAGs.
- **Enhanced Monitoring**: Enables better tracking and monitoring of Snowflake queries executed through Airflow.
- **Cluster Policy**: Easily integrate with your existing Airflow clusters and workflows. No changes needed to your
existing DAGs.
**NOTE**: query tags are added to every Operator inheriting from the BaseSQLOperator.
If other third party tools are used and do not make use of this Operator, query tags will not be added automatically.
## Installation
You can install the Astronomer SnowPatrol Plugin via pip:
```bash
pip install astronomer-snowpatrol-plugin
```
## Usage
You can use the `SnowflakeOperator` or any other BaseSQLOperator-related operators in your Airflow DAGs as usual.
The plugin will automatically add the query tags earlier mentioned.
See the following Airflow documentation pages for supported SQL Operators:
https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowflake.html
[Airflow Documentation](https://airflow.apache.org/docs/apache-airflow-providers-common-sql/stable/_api/airflow/providers/common/sql/operators/sql/index.html)
### Example
Given the following DAG, query tags will be added at runtime every time the SnowflakeOperator is run.
```python
from airflow import DAG
from airflow.operators.snowflake_operator import SnowflakeOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
}
dag = DAG(
'my_snowflake_dag',
default_args=default_args,
description='A simple DAG to demonstrate SnowflakeOperator',
schedule_interval='@once',
)
with dag:
task = SnowflakeOperator(
task_id='snowflake_query',
sql="SELECT * FROM my_table",
snowflake_conn_id="snowflake_default",
warehouse="my_warehouse",
database="my_database",
schema="my_schema",
)
task
```
## Tracking Snowflake Costs for all Airflow DAGs
You can use the following sql query to get a better understanding of your Airflow-related Snowflake costs:
```sql
// To know your effective credit cost, go to the `Admin` menu on the left and click on `Cost Management`. Copy the value from `Compute price/credit`.
SET SNOWFLAKE_CREDIT_COST=1.88;
// How many days you want to include
SET NUMBER_OF_DAYS=30;
WITH warehouse_sizes AS (
SELECT 'X-Small' AS warehouse_size, 1 AS credits_per_hour UNION ALL
SELECT 'Small' AS warehouse_size, 2 AS credits_per_hour UNION ALL
SELECT 'Medium' AS warehouse_size, 4 AS credits_per_hour UNION ALL
SELECT 'Large' AS warehouse_size, 8 AS credits_per_hour UNION ALL
SELECT 'X-Large' AS warehouse_size, 16 AS credits_per_hour UNION ALL
SELECT '2X-Large' AS warehouse_size, 32 AS credits_per_hour UNION ALL
SELECT '3X-Large' AS warehouse_size, 64 AS credits_per_hour UNION ALL
SELECT '4X-Large' AS warehouse_size, 128 AS credits_per_hour
), query_history AS (
SELECT
qh.query_id,
qh.query_text,
qh.database_name,
qh.schema_name,
qh.warehouse_name,
qh.warehouse_size,
qh.warehouse_type,
qh.user_name,
qh.role_name,
DATE(qh.start_time) AS execution_date,
qh.error_code,
qh.execution_status,
qh.execution_time/(1000) AS execution_time_sec,
qh.total_elapsed_time/(1000) AS total_elapsed_time_sec,
qh.rows_deleted,
qh.rows_inserted,
qh.rows_produced,
qh.rows_unloaded,
qh.rows_updated,
TRY_PARSE_JSON(qh.query_tag):dag_id::varchar AS airflow_dag_id,
TRY_PARSE_JSON(qh.query_tag):task_id::varchar AS airflow_task_id,
TRY_PARSE_JSON(qh.query_tag):run_id::varchar AS airflow_run_id,
TRY_TO_TIMESTAMP(TRY_PARSE_JSON(qh.query_tag):logical_date::varchar) AS airflow_logical_date,
TRY_TO_TIMESTAMP(TRY_PARSE_JSON(qh.query_tag):started::varchar) AS airflow_started,
TRY_PARSE_JSON(qh.query_tag):operator::varchar AS airflow_operator,
qh.execution_time/(1000*60*60)*wh.credits_per_hour AS credit_cost,
credit_cost * $SNOWFLAKE_CREDIT_COST AS dollar_cost
FROM snowflake.account_usage.query_history AS qh
INNER JOIN warehouse_sizes AS wh
ON qh.warehouse_size=wh.warehouse_size
WHERE qh.start_time >= DATEADD(DAY, -($NUMBER_OF_DAYS), CURRENT_DATE())
AND qh.WAREHOUSE_ID > 0
)
SELECT query_text,
warehouse_name,
warehouse_size,
warehouse_type,
MAX(airflow_dag_id) AS airflow_dag_id,
MAX(airflow_task_id) AS airflow_task_id,
MAX(airflow_run_id) AS airflow_run_id,
MAX(airflow_logical_date) AS airflow_logical_date,
MAX(airflow_started) AS airflow_started,
MAX(airflow_operator) AS airflow_operator,
COUNT(query_id) AS execution_count,
MAX(execution_date) AS first_execution_date,
MIN(execution_date) AS last_execution_date,
SUM(dollar_cost) AS total_dollar_cost
FROM query_history
GROUP BY query_text,
warehouse_name,
warehouse_size,
warehouse_type
ORDER BY total_dollar_cost DESC
```
## Support
For any questions, issues, or feature requests related to the Astronomer SnowPatrol Plugin,
please [open an issue](https://github.com/astronomer/astronomer-snowpatrol-plugin/issues) on the GitHub repository.
## Feedback
Give us your feedback, comments and ideas at https://github.com/astronomer/snowpatrol/discussions
## Contributing
Contributions to the Astronomer SnowPatrol Plugin are welcome! If you would like to contribute, please fork the
repository, make your changes, and submit a pull request.
## License
`astronomer-snowpatrol-plugin` is distributed under the terms of the [Apache 2](LICENSE.txt) license.
Raw data
{
"_id": null,
"home_page": null,
"name": "astronomer-snowpatrol-plugin",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "Snowflake, airflow, apache-airflow, astronomer, dags",
"author": null,
"author_email": "Olivier Daneau <olivier.daneau@astronomer.io>",
"download_url": "https://files.pythonhosted.org/packages/e4/fb/62a984bfbb1191b122b02ae2e3d0bcba67832c9b4420f34315bcf1185625/astronomer_snowpatrol_plugin-0.0.1.tar.gz",
"platform": null,
"description": "# Astronomer SnowPatrol Plugin\n\nThe Astronomer SnowPatrol Plugin is an Airflow plugin designed to enhance your Snowflake data operations within Airflow.\nThis plugin installs a policy in your Airflow environment adding query tags to all Airflow Snowflake sql queries.\n\nThe Plugin will add the following Sowflake query tags to any Snowflake-related Airflow Operators:\n\n- `dag_id`\n- `task_id`\n- `run_id`\n- `logical_date`\n- `started`\n- `operator`\n\nOnce the Plugin is installed, Tags are automatically sent to Snowflake allowing you to query the QUERY_HISTORY table and\nidentify all queries run by a given Airflow DAG or Task.\nSnowflake Costs can be then attributed to specific DAGs and Tasks.\n\n## Features\n\n- **Query Tagging**: Automatically adds query tags to Snowflake operators within Airflow DAGs.\n- **Enhanced Monitoring**: Enables better tracking and monitoring of Snowflake queries executed through Airflow.\n- **Cluster Policy**: Easily integrate with your existing Airflow clusters and workflows. No changes needed to your\n existing DAGs.\n\n**NOTE**: query tags are added to every Operator inheriting from the BaseSQLOperator.\nIf other third party tools are used and do not make use of this Operator, query tags will not be added automatically.\n\n## Installation\n\nYou can install the Astronomer SnowPatrol Plugin via pip:\n\n```bash\npip install astronomer-snowpatrol-plugin\n```\n\n## Usage\n\nYou can use the `SnowflakeOperator` or any other BaseSQLOperator-related operators in your Airflow DAGs as usual.\nThe plugin will automatically add the query tags earlier mentioned.\n\nSee the following Airflow documentation pages for supported SQL Operators:\nhttps://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowflake.html\n[Airflow Documentation](https://airflow.apache.org/docs/apache-airflow-providers-common-sql/stable/_api/airflow/providers/common/sql/operators/sql/index.html)\n\n### Example\n\nGiven the following DAG, query tags will be added at runtime every time the SnowflakeOperator is run.\n\n```python\nfrom airflow import DAG\nfrom airflow.operators.snowflake_operator import SnowflakeOperator\nfrom datetime import datetime\n\ndefault_args = {\n 'owner': 'airflow',\n 'depends_on_past': False,\n 'start_date': datetime(2022, 1, 1),\n}\n\ndag = DAG(\n 'my_snowflake_dag',\n default_args=default_args,\n description='A simple DAG to demonstrate SnowflakeOperator',\n schedule_interval='@once',\n)\n\nwith dag:\n task = SnowflakeOperator(\n task_id='snowflake_query',\n sql=\"SELECT * FROM my_table\",\n snowflake_conn_id=\"snowflake_default\",\n warehouse=\"my_warehouse\",\n database=\"my_database\",\n schema=\"my_schema\",\n )\n\ntask\n```\n\n## Tracking Snowflake Costs for all Airflow DAGs\n\nYou can use the following sql query to get a better understanding of your Airflow-related Snowflake costs:\n\n```sql\n// To know your effective credit cost, go to the `Admin` menu on the left and click on `Cost Management`. Copy the value from `Compute price/credit`.\nSET SNOWFLAKE_CREDIT_COST=1.88;\n// How many days you want to include\nSET NUMBER_OF_DAYS=30;\n\nWITH warehouse_sizes AS (\n\tSELECT 'X-Small' AS warehouse_size, 1 AS credits_per_hour UNION ALL\n\tSELECT 'Small' AS warehouse_size, 2 AS credits_per_hour UNION ALL\n\tSELECT 'Medium' AS warehouse_size, 4 AS credits_per_hour UNION ALL\n\tSELECT 'Large' AS warehouse_size, 8 AS credits_per_hour UNION ALL\n\tSELECT 'X-Large' AS warehouse_size, 16 AS credits_per_hour UNION ALL\n\tSELECT '2X-Large' AS warehouse_size, 32 AS credits_per_hour UNION ALL\n\tSELECT '3X-Large' AS warehouse_size, 64 AS credits_per_hour UNION ALL\n\tSELECT '4X-Large' AS warehouse_size, 128 AS credits_per_hour\n), query_history AS (\n\tSELECT\n\t\tqh.query_id,\n\t\tqh.query_text,\n\t\tqh.database_name,\n\t\tqh.schema_name,\n\t\tqh.warehouse_name,\n\t\tqh.warehouse_size,\n\t\tqh.warehouse_type,\n\t\tqh.user_name,\n\t\tqh.role_name,\n\t\tDATE(qh.start_time) AS execution_date,\n\t\tqh.error_code,\n\t\tqh.execution_status,\n\t\tqh.execution_time/(1000) AS execution_time_sec,\n\t\tqh.total_elapsed_time/(1000) AS total_elapsed_time_sec,\n\t\tqh.rows_deleted,\n\t\tqh.rows_inserted,\n\t\tqh.rows_produced,\n\t\tqh.rows_unloaded,\n\t\tqh.rows_updated,\n\t\tTRY_PARSE_JSON(qh.query_tag):dag_id::varchar AS airflow_dag_id,\n\t\tTRY_PARSE_JSON(qh.query_tag):task_id::varchar AS airflow_task_id,\n\t\tTRY_PARSE_JSON(qh.query_tag):run_id::varchar AS airflow_run_id,\n\t\tTRY_TO_TIMESTAMP(TRY_PARSE_JSON(qh.query_tag):logical_date::varchar) AS airflow_logical_date,\n\t\tTRY_TO_TIMESTAMP(TRY_PARSE_JSON(qh.query_tag):started::varchar) AS airflow_started,\n\t\tTRY_PARSE_JSON(qh.query_tag):operator::varchar AS airflow_operator,\n\t\tqh.execution_time/(1000*60*60)*wh.credits_per_hour AS credit_cost,\n\t\tcredit_cost * $SNOWFLAKE_CREDIT_COST AS dollar_cost\n\tFROM snowflake.account_usage.query_history AS qh\n\tINNER JOIN warehouse_sizes AS wh\n\t\tON qh.warehouse_size=wh.warehouse_size\n\tWHERE qh.start_time >= DATEADD(DAY, -($NUMBER_OF_DAYS), CURRENT_DATE())\n\tAND qh.WAREHOUSE_ID > 0\n)\nSELECT query_text,\n\twarehouse_name,\n\twarehouse_size,\n\twarehouse_type,\n\tMAX(airflow_dag_id) AS airflow_dag_id,\n\tMAX(airflow_task_id) AS airflow_task_id,\n\tMAX(airflow_run_id) AS airflow_run_id,\n\tMAX(airflow_logical_date) AS airflow_logical_date,\n\tMAX(airflow_started) AS airflow_started,\n\tMAX(airflow_operator) AS airflow_operator,\n\tCOUNT(query_id) AS execution_count,\n\tMAX(execution_date) AS first_execution_date,\n\tMIN(execution_date) AS last_execution_date,\n\tSUM(dollar_cost) AS total_dollar_cost\nFROM query_history\nGROUP BY query_text,\n\twarehouse_name,\n\twarehouse_size,\n\twarehouse_type\nORDER BY total_dollar_cost DESC\n```\n\n## Support\n\nFor any questions, issues, or feature requests related to the Astronomer SnowPatrol Plugin,\nplease [open an issue](https://github.com/astronomer/astronomer-snowpatrol-plugin/issues) on the GitHub repository.\n\n## Feedback\n\nGive us your feedback, comments and ideas at https://github.com/astronomer/snowpatrol/discussions\n\n## Contributing\n\nContributions to the Astronomer SnowPatrol Plugin are welcome! If you would like to contribute, please fork the\nrepository, make your changes, and submit a pull request.\n\n## License\n\n`astronomer-snowpatrol-plugin` is distributed under the terms of the [Apache 2](LICENSE.txt) license.\n",
"bugtrack_url": null,
"license": null,
"summary": "The SnowPatrol Plugin seamlessly integrates Airflow DAG and Task metadata with Snowflake using Query Tags",
"version": "0.0.1",
"project_urls": {
"Documentation": "https://github.com/astronomer/snowpatrol-plugin#readme",
"Homepage": "https://github.com/astronomer/snowpatrol-plugin",
"Issues": "https://github.com/astronomer/snowpatrol-plugin/issues",
"Source": "https://github.com/astronomer/snowpatrol-plugin"
},
"split_keywords": [
"snowflake",
" airflow",
" apache-airflow",
" astronomer",
" dags"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "3baf502f3119993d8c350a4a4aefe5a17fb83c544f91ae081c98ef867270fa7e",
"md5": "03b5deb1f6c8466dff6aff30ac1378c9",
"sha256": "d11337cd7849a5ad6dd2e0597c01f15622d0d484d13271c89b6475fc70ea72c2"
},
"downloads": -1,
"filename": "astronomer_snowpatrol_plugin-0.0.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "03b5deb1f6c8466dff6aff30ac1378c9",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 8839,
"upload_time": "2024-05-30T12:23:11",
"upload_time_iso_8601": "2024-05-30T12:23:11.143586Z",
"url": "https://files.pythonhosted.org/packages/3b/af/502f3119993d8c350a4a4aefe5a17fb83c544f91ae081c98ef867270fa7e/astronomer_snowpatrol_plugin-0.0.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "e4fb62a984bfbb1191b122b02ae2e3d0bcba67832c9b4420f34315bcf1185625",
"md5": "7ff3f4a4aa78dce9148b33523cfddc83",
"sha256": "affe81c2c21b0e70a90ccd65fd131f7735e888b5dc17a8032dda25c79e5c2825"
},
"downloads": -1,
"filename": "astronomer_snowpatrol_plugin-0.0.1.tar.gz",
"has_sig": false,
"md5_digest": "7ff3f4a4aa78dce9148b33523cfddc83",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 9414,
"upload_time": "2024-05-30T12:23:13",
"upload_time_iso_8601": "2024-05-30T12:23:13.526961Z",
"url": "https://files.pythonhosted.org/packages/e4/fb/62a984bfbb1191b122b02ae2e3d0bcba67832c9b4420f34315bcf1185625/astronomer_snowpatrol_plugin-0.0.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-30 12:23:13",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "astronomer",
"github_project": "snowpatrol-plugin#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "astronomer-snowpatrol-plugin"
}