<!-- markdownlint-disable-next-line MD041 -->
<p align="center">
<img
src="https://raw.githubusercontent.com/dbt-labs/dbt/ec7dee39f793aa4f7dd3dae37282cc87664813e4/etc/dbt-logo-full.svg"
alt="dbt logo" width="500"/>
</p>
<p align="center">
<a href="https://pypi.org/project/dbt-athena-community/">
<img src="https://badge.fury.io/py/dbt-athena-community.svg" />
</a>
<a target="_blank" href="https://pypi.org/project/dbt-athena-community/" style="background:none">
<img src="https://img.shields.io/pypi/pyversions/dbt-athena-community">
</a>
<a href="https://pycqa.github.io/isort/">
<img src="https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336" />
</a>
<a href="https://github.com/psf/black"><img src="https://img.shields.io/badge/code%20style-black-000000.svg" /></a>
<a href="https://github.com/python/mypy"><img src="https://www.mypy-lang.org/static/mypy_badge.svg" /></a>
<a href="https://pepy.tech/project/dbt-athena-community">
<img src="https://static.pepy.tech/badge/dbt-athena-community/month" />
</a>
</p>
<!-- TOC -->
- [Features](#features)
- [Quick start](#quick-start)
- [Installation](#installation)
- [Prerequisites](#prerequisites)
- [Credentials](#credentials)
- [Configuring your profile](#configuring-your-profile)
- [Additional information](#additional-information)
- [Models](#models)
- [Table configuration](#table-configuration)
- [Table location](#table-location)
- [Incremental models](#incremental-models)
- [On schema change](#on-schema-change)
- [Iceberg](#iceberg)
- [Highly available table (HA)](#highly-available-table-ha)
- [HA known issues](#ha-known-issues)
- [Update glue data catalog](#update-glue-data-catalog)
- [Snapshots](#snapshots)
- [Timestamp strategy](#timestamp-strategy)
- [Check strategy](#check-strategy)
- [Hard-deletes](#hard-deletes)
- [Working example](#working-example)
- [Snapshots known issues](#snapshots-known-issues)
- [AWS Lake Formation integration](#aws-lake-formation-integration)
- [Python models](#python-models)
- [Contracts](#contracts)
- [Contributing](#contributing)
- [Contributors ✨](#contributors-)
<!-- TOC -->
# Features
- Supports dbt version `1.7.*`
- Support for Python
- Supports [seeds][seeds]
- Correctly detects views and their columns
- Supports [table materialization][table]
- [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location**
(see table location section below)
- Hive tables are supported by both Athena engines
- Supports [incremental models][incremental]
- On Iceberg tables:
- Supports the use of `unique_key` only with the `merge` strategy
- Supports the `append` strategy
- On Hive tables:
- Supports two incremental update strategies: `insert_overwrite` and `append`
- Does **not** support the use of `unique_key`
- Supports [snapshots][snapshots]
- Supports [Python models][python-models]
[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds
[incremental]: https://docs.getdbt.com/docs/build/incremental-models
[table]: https://docs.getdbt.com/docs/build/materializations#table
[python-models]: https://docs.getdbt.com/docs/build/python-models#configuring-python-models
[athena-iceberg]: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html
[snapshots]: https://docs.getdbt.com/docs/build/snapshots
## Quick start
### Installation
- `pip install dbt-athena-community`
- Or `pip install git+https://github.com/dbt-athena/dbt-athena.git`
### Prerequisites
To start, you will need an S3 bucket, for instance `my-bucket` and an Athena database:
```sql
CREATE DATABASE IF NOT EXISTS analytics_dev
COMMENT 'Analytics models generated by dbt (development)'
LOCATION 's3://my-bucket/'
WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com');
```
Notes:
- Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.).
- You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena
databases.
### Credentials
Credentials can be passed directly to the adapter, or they can
be [determined automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) based
on `aws cli`/`boto3` conventions.
You can either:
- Configure `aws_access_key_id` and `aws_secret_access_key`
- Configure `aws_profile_name` to match a profile defined in your AWS credentials file.
Checkout dbt profile configuration below for details.
### Configuring your profile
A dbt profile can be configured to run against AWS Athena using the following configuration:
| Option | Description | Required? | Example |
|-----------------------|------------------------------------------------------------------------------------------|-----------|--------------------------------------------|
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` |
| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` |
| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` |
| aws_access_key_id | Access key ID of the user performing requests | Optional | `AKIAIOSFODNN7EXAMPLE` |
| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` |
| aws_profile_name | Profile to use from your AWS shared credentials file | Optional | `my-profile` |
| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| skip_workgroup_check | Indicates if the WorkGroup check (additional AWS call) can be skipped | Optional | `true` |
| num_retries | Number of times to retry a failing query | Optional | `3` |
| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` |
| num_iceberg_retries | Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR | Optional | `3` |
| spark_work_group | Identifier of Athena Spark workgroup for running Python models | Optional | `my-spark-workgroup` |
| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` |
| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` |
**Example profiles.yml entry:**
```yaml
athena:
target: dev
outputs:
dev:
type: athena
s3_staging_dir: s3://athena-query-results/dbt/
s3_data_dir: s3://your_s3_bucket/dbt/
s3_data_naming: schema_table
s3_tmp_table_dir: s3://your_s3_bucket/temp/
region_name: eu-west-1
schema: dbt
database: awsdatacatalog
threads: 4
aws_profile_name: my-profile
work_group: my-workgroup
spark_work_group: my-spark-workgroup
seed_s3_upload_args:
ACL: bucket-owner-full-control
```
### Additional information
- `threads` is supported
- `database` and `catalog` can be used interchangeably
## Models
### Table configuration
- `external_location` (`default=none`)
- If set, the full S3 path to which the table will be saved
- Works only with incremental models
- Does not work with Hive table with `ha` set to true
- `partitioned_by` (`default=none`)
- An array list of columns by which the table will be partitioned
- Limited to creation of 100 partitions (*currently*)
- `bucketed_by` (`default=none`)
- An array list of columns to bucket data, ignored if using Iceberg
- `bucket_count` (`default=none`)
- The number of buckets for bucketing your data, ignored if using Iceberg
- `table_type` (`default='hive'`)
- The type of table
- Supports `hive` or `iceberg`
- `ha` (`default=false`)
- If the table should be built using the high-availability method. This option is only available for Hive tables
since it is by default for Iceberg tables (see the section [below](#highly-available-table-ha))
- `format` (`default='parquet'`)
- The data format for the table
- Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE`
- `write_compression` (`default=none`)
- The compression type to use for any storage format that allows compression to be specified. To see which options are
available, check out [CREATE TABLE AS][create-table-as]
- `field_delimiter` (`default=none`)
- Custom field delimiter, for when format is set to `TEXTFILE`
- `table_properties`: table properties to add to the table, valid for Iceberg only
- `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be
made to manage data in S3. Data in S3 will only be cleared up for Iceberg
tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that
Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds.
- `seed_by_insert` (`default=false`)
- Default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement
- Large seed files cannot use `seed_by_insert`, as the SQL insert statement would
exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html)
- `force_batch` (`default=false`)
- Skip creating the table as CTAS and run the operation directly in batch insert mode
- This is particularly useful when the standard table creation process fails due to partition limitations,
allowing you to work with temporary tables and persist the dataset more efficiently
- `unique_tmp_table_suffix` (`default=false`)
- For incremental models using insert overwrite strategy on hive table
- Replace the __dbt_tmp suffix used as temporary table name suffix by a unique uuid
- Useful if you are looking to run multiple dbt build inserting in the same table in parallel
- `temp_schema` (`default=none`)
- For incremental models, it allows to define a schema to hold temporary create statements
used in incremental model runs
- Schema will be created in the model target database if does not exist
- `lf_tags_config` (`default=none`)
- [AWS Lake Formation](#aws-lake-formation-integration) tags to associate with the table and columns
- `enabled` (`default=False`) whether LF tags management is enabled for a model
- `tags` dictionary with tags and their values to assign for the model
- `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to
- `lf_inherited_tags` (`default=none`)
- List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be
removed during association of those defined in `lf_tags_config`
- i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from
tables and columns before associating the ones currently defined for a given model
- This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly
```sql
{{
config(
materialized='incremental',
incremental_strategy='append',
on_schema_change='append_new_columns',
table_type='iceberg',
schema='test_schema',
lf_tags_config={
'enabled': true,
'tags': {
'tag1': 'value1',
'tag2': 'value2'
},
'tags_columns': {
'tag1': {
'value1': ['column1', 'column2'],
'value2': ['column3', 'column4']
}
},
'inherited_tags': ['tag1', 'tag2']
}
)
}}
```
- Format for `dbt_project.yml`:
```yaml
+lf_tags_config:
enabled: true
tags:
tag1: value1
tag2: value2
tags_columns:
tag1:
value1: [ column1, column2 ]
inherited_tags: [ tag1, tag2 ]
```
- `lf_grants` (`default=none`)
- Lake Formation grants config for data_cell filters
- Format:
```python
lf_grants={
'data_cell_filters': {
'enabled': True | False,
'filters': {
'filter_name': {
'row_filter': '<filter_condition>',
'principals': ['principal_arn1', 'principal_arn2']
}
}
}
}
```
> Notes:
>
> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources.
> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use
> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or
> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose.
> - `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table
> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every
> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and
> apply changes if they occur: drop, create, update filters and their permissions.
> - Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at
the table and column level
> - Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made
> previously
> - e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed
prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform
where the inherited value is configured nor in the DBT project where the override previously existed but now is
gone)
[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties
### Table location
The location a table is saved to is determined by:
1. If `external_location` is defined, that value is used
2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`
3. If `s3_data_dir` is not defined, data is stored under `s3_staging_dir/tables/`
Here all the options available for `s3_data_naming`:
- `unique`: `{s3_data_dir}/{uuid4()}/`
- `table`: `{s3_data_dir}/{table}/`
- `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/`
- `schema_table`: `{s3_data_dir}/{schema}/{table}/`
- `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/`
It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config,
or setting up the value for groups of model in dbt_project.yml.
> Note: when using a workgroup with a default output location configured, `s3_data_naming` and any configured buckets
> are ignored and the location configured in the workgroup is used.
### Incremental models
Support for [incremental models](https://docs.getdbt.com/docs/build/incremental-models).
These strategies are supported:
- `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination
table, and then inserts the new records from the source. This strategy depends on the `partitioned_by` keyword! If no
partitions are defined, dbt will fall back to the `append` strategy.
- `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate
data (e.g. great for log or historical data).
- `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`.
Only available when using Iceberg.
### On schema change
`on_schema_change` is an option to reflect changes of schema in incremental models.
The following options are supported:
- `ignore` (default)
- `fail`
- `append_new_columns`
- `sync_all_columns`
For details, please refer
to [dbt docs](https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change).
### Iceberg
The adapter supports table materialization for Iceberg.
To get started just add this as your model:
```sql
{{ config(
materialized='table',
table_type='iceberg',
format='parquet',
partitioned_by=['bucket(user_id, 5)'],
table_properties={
'optimize_rewrite_delete_file_threshold': '2'
}
) }}
select 'A' as user_id,
'pi' as name,
'active' as status,
17.89 as cost,
1 as quantity,
100000000 as quantity_big,
current_date as my_date
```
Iceberg supports bucketing as hidden partitions, therefore use the `partitioned_by` config to add specific bucketing
conditions.
Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`.
It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported:
- `append`: New records are appended to the table, this can lead to duplicates.
- `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only
available with Athena engine version 3.
- `unique_key` **(required)**: columns that define a unique record in the source and target tables.
- `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can
be useful for improving performance via predicate pushdown on the target table.
- `delete_condition` (optional): SQL condition used to identify records that should be deleted.
- `update_condition` (optional): SQL condition used to identify records that should be updated.
- `insert_condition` (optional): SQL condition used to identify records that should be inserted.
- `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of
the incremental table (`src`) or the final table (`target`).
Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error.
`delete_condition` example:
```sql
{{ config(
materialized='incremental',
table_type='iceberg',
incremental_strategy='merge',
unique_key='user_id',
incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"],
delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year",
format='parquet'
) }}
select 'A' as user_id,
'pi' as name,
'active' as status,
17.89 as cost,
1 as quantity,
100000000 as quantity_big,
current_date as my_date
```
`update_condition` example:
```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['id'],
update_condition='target.id > 1',
schema='sandbox'
)
}}
{% if is_incremental() %}
select * from (
values
(1, 'v1-updated')
, (2, 'v2-updated')
) as t (id, value)
{% else %}
select * from (
values
(-1, 'v-1')
, (0, 'v0')
, (1, 'v1')
, (2, 'v2')
) as t (id, value)
{% endif %}
```
`insert_condition` example:
```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['id'],
insert_condition='target.status != 0',
schema='sandbox'
)
}}
select * from (
values
(1, 0)
, (2, 1)
) as t (id, status)
```
### Highly available table (HA)
The current implementation of the table materialization can lead to downtime, as the target table is
dropped and re-created. To have the less destructive behavior it's possible to use the `ha` config on
your `table` materialized models. It leverages the table versions feature of glue catalog, creating
a temp table and swapping the target table to the location of the temp table. This materialization
is only available for `table_type=hive` and requires using unique locations. For iceberg, high
availability is the default.
```sql
{{ config(
materialized='table',
ha=true,
format='parquet',
table_type='hive',
partitioned_by=['status'],
s3_data_naming='table_unique'
) }}
select 'a' as user_id,
'pi' as user_name,
'active' as status
union all
select 'b' as user_id,
'sh' as user_name,
'disabled' as status
```
By default, the materialization keeps the last 4 table versions, you can change it by setting `versions_to_keep`.
#### HA known issues
- When swapping from a table with partitions to a table without (and the other way around), there could be a little
downtime.
If high performances is needed consider bucketing instead of partitions
- By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location
- It's recommended to set `versions_to_keep` >= 4, as this will avoid having the older location removed
### Update glue data catalog
Optionally persist resource descriptions as column and relation comments to the glue data catalog, and meta as
[glue table properties](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html#table-properties)
and [column parameters](https://docs.aws.amazon.com/glue/latest/webapi/API_Column.html).
By default, documentation persistence is disabled, but it can be enabled for specific resources or
groups of resources as needed.
For example:
```yaml
models:
- name: test_deduplicate
description: another value
config:
persist_docs:
relation: true
columns: true
meta:
test: value
columns:
- name: id
meta:
primary_key: true
```
See [persist docs](https://docs.getdbt.com/reference/resource-configs/persist_docs) for more details.
## Snapshots
The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot
create a snapshot file in the snapshots directory. If the directory does not exist create one.
### Timestamp strategy
To use the timestamp strategy refer to
the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#timestamp-strategy-recommended)
### Check strategy
To use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#check-strategy)
### Hard-deletes
The materialization also supports invalidating hard deletes. Check
the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage.
### Working example
seed file - employent_indicators_november_2022_csv_tables.csv
```csv
Series_reference,Period,Data_value,Suppressed
MEIM.S1WA,1999.04,80267,
MEIM.S1WA,1999.05,70803,
MEIM.S1WA,1999.06,65792,
MEIM.S1WA,1999.07,66194,
MEIM.S1WA,1999.08,67259,
MEIM.S1WA,1999.09,69691,
MEIM.S1WA,1999.1,72475,
MEIM.S1WA,1999.11,79263,
MEIM.S1WA,1999.12,86540,
MEIM.S1WA,2000.01,82552,
MEIM.S1WA,2000.02,81709,
MEIM.S1WA,2000.03,84126,
MEIM.S1WA,2000.04,77089,
MEIM.S1WA,2000.05,73811,
MEIM.S1WA,2000.06,70070,
MEIM.S1WA,2000.07,69873,
MEIM.S1WA,2000.08,71468,
MEIM.S1WA,2000.09,72462,
MEIM.S1WA,2000.1,74897,
```
model.sql
```sql
{{ config(
materialized='table'
) }}
select row_number() over() as id
, *
, cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp
from {{ ref('employment_indicators_november_2022_csv_tables') }}
```
timestamp strategy - model_snapshot_1
```sql
{% snapshot model_snapshot_1 %}
{{
config(
strategy='timestamp',
updated_at='refresh_timestamp',
unique_key='id'
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
```
invalidate hard deletes - model_snapshot_2
```sql
{% snapshot model_snapshot_2 %}
{{
config
(
unique_key='id',
strategy='timestamp',
updated_at='refresh_timestamp',
invalidate_hard_deletes=True,
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
```
check strategy - model_snapshot_3
```sql
{% snapshot model_snapshot_3 %}
{{
config
(
unique_key='id',
strategy='check',
check_cols=['series_reference','data_value']
)
}}
select *
from {{ ref('model') }} {% endsnapshot %}
```
### Snapshots known issues
- Incremental Iceberg models - Sync all columns on schema change can't remove columns used for partitioning.
The only way, from a dbt perspective, is to do a full-refresh of the incremental model.
- Tables, schemas and database names should only be lowercase
- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not
installed in the target environment.
See <https://github.com/dbt-athena/dbt-athena/issues/103> for more details.
- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column
from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history
## AWS Lake Formation integration
The adapter implements AWS Lake Formation tags management in the following way:
- You can enable or disable lf-tags management via [config](#table-configuration) (disabled by default)
- Once you enable the feature, lf-tags will be updated on every dbt run
- First, all lf-tags for columns are removed to avoid inheritance issues
- Then, all redundant lf-tags are removed from tables and actual tags from table configs are applied
- Finally, lf-tags for columns are applied
It's important to understand the following points:
- dbt does not manage lf-tags for databases
- dbt does not manage Lake Formation permissions
That's why you should handle this by yourself manually or using an automation tool like terraform, AWS CDK etc.
You may find the following links useful to manage that:
<!-- markdownlint-disable -->
* [terraform aws_lakeformation_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions)
* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags)
<!-- markdownlint-restore -->
## Python models
The adapter supports Python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html).
### Setup
- A Spark-enabled workgroup created in Athena
- Spark execution role granted access to Athena, Glue and S3
- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile to be used
is referenced in `dbt_project.yml`
### Spark-specific table configuration
- `timeout` (`default=43200`)
- Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds.
- `spark_encryption` (`default=false`)
- If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored
locally by Spark.
- `spark_cross_account_catalog` (`default=false`)
- When using the Spark Athena workgroup, queries can only be made against catalogs located on the same
AWS account by default. However, sometimes you want to query another catalog located on an external AWS
account. Setting this additional Spark properties parameter to true will enable querying external catalogs.
You can use the syntax `external_catalog_id/database.table` to access the external table on the external
catalog (ex: `999999999999/mydatabase.cloudfront_logs` where 999999999999 is the external catalog ID)
- `spark_requester_pays` (`default=false`)
- When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for
data access and data transfer fees associated with the query.
- If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark.
### Spark notes
- A session is created for each unique engine configuration defined in the models that are part of the invocation.
- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation
(Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session.
- The number of Python models running at a time depends on the `threads`. The number of sessions created for the
entire run depends on the number of unique engine configurations and the availability of sessions to maintain
thread concurrency.
- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2.
This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark.
### Example models
#### Simple pandas model
```python
import pandas as pd
def model(dbt, session):
dbt.config(materialized="table")
model_df = pd.DataFrame({"A": [1, 2, 3, 4]})
return model_df
```
#### Simple spark
```python
def model(dbt, spark_session):
dbt.config(materialized="table")
data = [(1,), (2,), (3,), (4,)]
df = spark_session.createDataFrame(data, ["A"])
return df
```
#### Spark incremental
```python
def model(dbt, spark_session):
dbt.config(materialized="incremental")
df = dbt.ref("model")
if dbt.is_incremental:
max_from_this = (
f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}"
)
df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0])
return df
```
#### Config spark model
```python
def model(dbt, spark_session):
dbt.config(
materialized="table",
engine_config={
"CoordinatorDpuSize": 1,
"MaxConcurrentDpus": 3,
"DefaultExecutorDpuSize": 1
},
spark_encryption=True,
spark_cross_account_catalog=True,
spark_requester_pays=True
polling_interval=15,
timeout=120,
)
data = [(1,), (2,), (3,), (4,)]
df = spark_session.createDataFrame(data, ["A"])
return df
```
#### Create pySpark udf using imported external python files
```python
def model(dbt, spark_session):
dbt.config(
materialized="incremental",
incremental_strategy="merge",
unique_key="num",
)
sc = spark_session.sparkContext
sc.addPyFile("s3://athena-dbt/test/file1.py")
sc.addPyFile("s3://athena-dbt/test/file2.py")
def func(iterator):
from file2 import transform
return [transform(i) for i in iterator]
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
udf_with_import = udf(func)
data = [(1, "a"), (2, "b"), (3, "c")]
cols = ["num", "alpha"]
df = spark_session.createDataFrame(data, cols)
return df.withColumn("udf_test_col", udf_with_import(col("alpha")))
```
### Known issues in Python models
- Python models cannot
[reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html).
- Third-party Python libraries can be used, but they must be [included in the pre-installed list][pre-installed list]
or [imported manually][imported manually].
- Python models can only reference or write to tables with names meeting the
regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not
supported by Spark, even though Athena supports them.
- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which
runs on Trino.
- Snapshot materializations are not supported.
- Spark can only reference tables within the same catalog.
- For tables created outside of the dbt tool, be sure to populate the location field or dbt will throw an error
when trying to create the table.
[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html
[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html
## Contracts
The adapter partly supports contract definitions:
- `data_type` is supported but needs to be adjusted for complex types. Types must be specified
entirely (for instance `array<int>`) even though they won't be checked. Indeed, as dbt recommends, we only compare
the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types
defined in Athena are ok (pre-flight check).
- The adapter does not support the constraints since there is no constraint concept in Athena.
## Contributing
See [CONTRIBUTING](CONTRIBUTING.md) for more information on how to contribute to this project.
## Contributors ✨
Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)):
<a href="https://github.com/dbt-athena/dbt-athena/graphs/contributors">
<img src="https://contrib.rocks/image?repo=dbt-athena/dbt-athena" />
</a>
Contributions of any kind welcome!
Raw data
{
"_id": null,
"home_page": null,
"name": "dbt-athena",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9.0",
"maintainer_email": "dbt Labs <info@dbtlabs.com>",
"keywords": "adapter, adapters, athena, database, dbt, dbt Cloud, dbt Core, dbt Labs, dbt-core, elt",
"author": null,
"author_email": "dbt Labs <info@dbtlabs.com>",
"download_url": "https://files.pythonhosted.org/packages/5b/3c/a99f605d655bdcc751dcd2cbceff80c4ae3c95094551fcef2bfc35c695e6/dbt_athena-1.9.0.tar.gz",
"platform": null,
"description": "<!-- markdownlint-disable-next-line MD041 -->\n<p align=\"center\">\n <img\n src=\"https://raw.githubusercontent.com/dbt-labs/dbt/ec7dee39f793aa4f7dd3dae37282cc87664813e4/etc/dbt-logo-full.svg\"\n alt=\"dbt logo\" width=\"500\"/>\n</p>\n<p align=\"center\">\n <a href=\"https://pypi.org/project/dbt-athena-community/\">\n <img src=\"https://badge.fury.io/py/dbt-athena-community.svg\" />\n </a>\n <a target=\"_blank\" href=\"https://pypi.org/project/dbt-athena-community/\" style=\"background:none\">\n <img src=\"https://img.shields.io/pypi/pyversions/dbt-athena-community\">\n </a>\n <a href=\"https://pycqa.github.io/isort/\">\n <img src=\"https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336\" />\n </a>\n <a href=\"https://github.com/psf/black\"><img src=\"https://img.shields.io/badge/code%20style-black-000000.svg\" /></a>\n <a href=\"https://github.com/python/mypy\"><img src=\"https://www.mypy-lang.org/static/mypy_badge.svg\" /></a>\n <a href=\"https://pepy.tech/project/dbt-athena-community\">\n <img src=\"https://static.pepy.tech/badge/dbt-athena-community/month\" />\n </a>\n</p>\n\n<!-- TOC -->\n- [Features](#features)\n - [Quick start](#quick-start)\n - [Installation](#installation)\n - [Prerequisites](#prerequisites)\n - [Credentials](#credentials)\n - [Configuring your profile](#configuring-your-profile)\n - [Additional information](#additional-information)\n - [Models](#models)\n - [Table configuration](#table-configuration)\n - [Table location](#table-location)\n - [Incremental models](#incremental-models)\n - [On schema change](#on-schema-change)\n - [Iceberg](#iceberg)\n - [Highly available table (HA)](#highly-available-table-ha)\n - [HA known issues](#ha-known-issues)\n - [Update glue data catalog](#update-glue-data-catalog)\n - [Snapshots](#snapshots)\n - [Timestamp strategy](#timestamp-strategy)\n - [Check strategy](#check-strategy)\n - [Hard-deletes](#hard-deletes)\n - [Working example](#working-example)\n - [Snapshots known issues](#snapshots-known-issues)\n - [AWS Lake Formation integration](#aws-lake-formation-integration)\n - [Python models](#python-models)\n - [Contracts](#contracts)\n - [Contributing](#contributing)\n - [Contributors \u2728](#contributors-)\n<!-- TOC -->\n\n# Features\n\n- Supports dbt version `1.7.*`\n- Support for Python\n- Supports [seeds][seeds]\n- Correctly detects views and their columns\n- Supports [table materialization][table]\n - [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location**\n (see table location section below)\n - Hive tables are supported by both Athena engines\n- Supports [incremental models][incremental]\n - On Iceberg tables:\n - Supports the use of `unique_key` only with the `merge` strategy\n - Supports the `append` strategy\n - On Hive tables:\n - Supports two incremental update strategies: `insert_overwrite` and `append`\n - Does **not** support the use of `unique_key`\n- Supports [snapshots][snapshots]\n- Supports [Python models][python-models]\n\n[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds\n\n[incremental]: https://docs.getdbt.com/docs/build/incremental-models\n\n[table]: https://docs.getdbt.com/docs/build/materializations#table\n\n[python-models]: https://docs.getdbt.com/docs/build/python-models#configuring-python-models\n\n[athena-iceberg]: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html\n\n[snapshots]: https://docs.getdbt.com/docs/build/snapshots\n\n## Quick start\n\n### Installation\n\n- `pip install dbt-athena-community`\n- Or `pip install git+https://github.com/dbt-athena/dbt-athena.git`\n\n### Prerequisites\n\nTo start, you will need an S3 bucket, for instance `my-bucket` and an Athena database:\n\n```sql\nCREATE DATABASE IF NOT EXISTS analytics_dev\nCOMMENT 'Analytics models generated by dbt (development)'\nLOCATION 's3://my-bucket/'\nWITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com');\n```\n\nNotes:\n\n- Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.).\n- You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena\n databases.\n\n### Credentials\n\nCredentials can be passed directly to the adapter, or they can\nbe [determined automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) based\non `aws cli`/`boto3` conventions.\nYou can either:\n\n- Configure `aws_access_key_id` and `aws_secret_access_key`\n- Configure `aws_profile_name` to match a profile defined in your AWS credentials file.\n Checkout dbt profile configuration below for details.\n\n### Configuring your profile\n\nA dbt profile can be configured to run against AWS Athena using the following configuration:\n\n| Option | Description | Required? | Example |\n|-----------------------|------------------------------------------------------------------------------------------|-----------|--------------------------------------------|\n| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |\n| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` |\n| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` |\n| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` |\n| region_name | AWS region of your Athena instance | Required | `eu-west-1` |\n| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |\n| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |\n| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |\n| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` |\n| aws_access_key_id | Access key ID of the user performing requests | Optional | `AKIAIOSFODNN7EXAMPLE` |\n| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` |\n| aws_profile_name | Profile to use from your AWS shared credentials file | Optional | `my-profile` |\n| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` |\n| skip_workgroup_check | Indicates if the WorkGroup check (additional AWS call) can be skipped | Optional | `true` |\n| num_retries | Number of times to retry a failing query | Optional | `3` |\n| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` |\n| num_iceberg_retries | Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR | Optional | `3` |\n| spark_work_group | Identifier of Athena Spark workgroup for running Python models | Optional | `my-spark-workgroup` |\n| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{\"ACL\": \"bucket-owner-full-control\"}` |\n| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` |\n\n**Example profiles.yml entry:**\n\n```yaml\nathena:\n target: dev\n outputs:\n dev:\n type: athena\n s3_staging_dir: s3://athena-query-results/dbt/\n s3_data_dir: s3://your_s3_bucket/dbt/\n s3_data_naming: schema_table\n s3_tmp_table_dir: s3://your_s3_bucket/temp/\n region_name: eu-west-1\n schema: dbt\n database: awsdatacatalog\n threads: 4\n aws_profile_name: my-profile\n work_group: my-workgroup\n spark_work_group: my-spark-workgroup\n seed_s3_upload_args:\n ACL: bucket-owner-full-control\n```\n\n### Additional information\n\n- `threads` is supported\n- `database` and `catalog` can be used interchangeably\n\n## Models\n\n### Table configuration\n\n- `external_location` (`default=none`)\n - If set, the full S3 path to which the table will be saved\n - Works only with incremental models\n - Does not work with Hive table with `ha` set to true\n- `partitioned_by` (`default=none`)\n - An array list of columns by which the table will be partitioned\n - Limited to creation of 100 partitions (*currently*)\n- `bucketed_by` (`default=none`)\n - An array list of columns to bucket data, ignored if using Iceberg\n- `bucket_count` (`default=none`)\n - The number of buckets for bucketing your data, ignored if using Iceberg\n- `table_type` (`default='hive'`)\n - The type of table\n - Supports `hive` or `iceberg`\n- `ha` (`default=false`)\n - If the table should be built using the high-availability method. This option is only available for Hive tables\n since it is by default for Iceberg tables (see the section [below](#highly-available-table-ha))\n- `format` (`default='parquet'`)\n - The data format for the table\n - Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE`\n- `write_compression` (`default=none`)\n - The compression type to use for any storage format that allows compression to be specified. To see which options are\n available, check out [CREATE TABLE AS][create-table-as]\n- `field_delimiter` (`default=none`)\n - Custom field delimiter, for when format is set to `TEXTFILE`\n- `table_properties`: table properties to add to the table, valid for Iceberg only\n- `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be\n made to manage data in S3. Data in S3 will only be cleared up for Iceberg\n tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that\n Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds.\n- `seed_by_insert` (`default=false`)\n - Default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement\n - Large seed files cannot use `seed_by_insert`, as the SQL insert statement would\n exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html)\n- `force_batch` (`default=false`)\n - Skip creating the table as CTAS and run the operation directly in batch insert mode\n - This is particularly useful when the standard table creation process fails due to partition limitations,\n allowing you to work with temporary tables and persist the dataset more efficiently\n- `unique_tmp_table_suffix` (`default=false`)\n - For incremental models using insert overwrite strategy on hive table\n - Replace the __dbt_tmp suffix used as temporary table name suffix by a unique uuid\n - Useful if you are looking to run multiple dbt build inserting in the same table in parallel\n- `temp_schema` (`default=none`)\n - For incremental models, it allows to define a schema to hold temporary create statements\n used in incremental model runs\n - Schema will be created in the model target database if does not exist\n- `lf_tags_config` (`default=none`)\n - [AWS Lake Formation](#aws-lake-formation-integration) tags to associate with the table and columns\n - `enabled` (`default=False`) whether LF tags management is enabled for a model\n - `tags` dictionary with tags and their values to assign for the model\n - `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to\n - `lf_inherited_tags` (`default=none`)\n - List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be\n removed during association of those defined in `lf_tags_config`\n - i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from\n tables and columns before associating the ones currently defined for a given model\n - This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly\n\n```sql\n{{\n config(\n materialized='incremental',\n incremental_strategy='append',\n on_schema_change='append_new_columns',\n table_type='iceberg',\n schema='test_schema',\n lf_tags_config={\n 'enabled': true,\n 'tags': {\n 'tag1': 'value1',\n 'tag2': 'value2'\n },\n 'tags_columns': {\n 'tag1': {\n 'value1': ['column1', 'column2'],\n 'value2': ['column3', 'column4']\n }\n },\n 'inherited_tags': ['tag1', 'tag2']\n }\n )\n}}\n```\n\n- Format for `dbt_project.yml`:\n\n```yaml\n +lf_tags_config:\n enabled: true\n tags:\n tag1: value1\n tag2: value2\n tags_columns:\n tag1:\n value1: [ column1, column2 ]\n inherited_tags: [ tag1, tag2 ]\n```\n\n- `lf_grants` (`default=none`)\n - Lake Formation grants config for data_cell filters\n - Format:\n\n ```python\n lf_grants={\n 'data_cell_filters': {\n 'enabled': True | False,\n 'filters': {\n 'filter_name': {\n 'row_filter': '<filter_condition>',\n 'principals': ['principal_arn1', 'principal_arn2']\n }\n }\n }\n }\n ```\n\n> Notes:\n>\n> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources.\n> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use\n> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or\n> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose.\n> - `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table\n> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every\n> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and\n> apply changes if they occur: drop, create, update filters and their permissions.\n> - Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at\n the table and column level\n> - Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made\n> previously\n> - e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed\n prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform\n where the inherited value is configured nor in the DBT project where the override previously existed but now is\n gone)\n\n[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties\n\n### Table location\n\nThe location a table is saved to is determined by:\n\n1. If `external_location` is defined, that value is used\n2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`\n3. If `s3_data_dir` is not defined, data is stored under `s3_staging_dir/tables/`\n\nHere all the options available for `s3_data_naming`:\n\n- `unique`: `{s3_data_dir}/{uuid4()}/`\n- `table`: `{s3_data_dir}/{table}/`\n- `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/`\n- `schema_table`: `{s3_data_dir}/{schema}/{table}/`\n- `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/`\n\nIt's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config,\nor setting up the value for groups of model in dbt_project.yml.\n\n> Note: when using a workgroup with a default output location configured, `s3_data_naming` and any configured buckets\n> are ignored and the location configured in the workgroup is used.\n\n### Incremental models\n\nSupport for [incremental models](https://docs.getdbt.com/docs/build/incremental-models).\n\nThese strategies are supported:\n\n- `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination\n table, and then inserts the new records from the source. This strategy depends on the `partitioned_by` keyword! If no\n partitions are defined, dbt will fall back to the `append` strategy.\n- `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate\n data (e.g. great for log or historical data).\n- `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`.\n Only available when using Iceberg.\n\n### On schema change\n\n`on_schema_change` is an option to reflect changes of schema in incremental models.\nThe following options are supported:\n\n- `ignore` (default)\n- `fail`\n- `append_new_columns`\n- `sync_all_columns`\n\nFor details, please refer\nto [dbt docs](https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change).\n\n### Iceberg\n\nThe adapter supports table materialization for Iceberg.\n\nTo get started just add this as your model:\n\n```sql\n{{ config(\n materialized='table',\n table_type='iceberg',\n format='parquet',\n partitioned_by=['bucket(user_id, 5)'],\n table_properties={\n 'optimize_rewrite_delete_file_threshold': '2'\n }\n) }}\n\nselect 'A' as user_id,\n 'pi' as name,\n 'active' as status,\n 17.89 as cost,\n 1 as quantity,\n 100000000 as quantity_big,\n current_date as my_date\n```\n\nIceberg supports bucketing as hidden partitions, therefore use the `partitioned_by` config to add specific bucketing\nconditions.\n\nIceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`.\n\nIt is possible to use Iceberg in an incremental fashion, specifically two strategies are supported:\n\n- `append`: New records are appended to the table, this can lead to duplicates.\n- `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only\n available with Athena engine version 3.\n - `unique_key` **(required)**: columns that define a unique record in the source and target tables.\n - `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can\n be useful for improving performance via predicate pushdown on the target table.\n - `delete_condition` (optional): SQL condition used to identify records that should be deleted.\n - `update_condition` (optional): SQL condition used to identify records that should be updated.\n - `insert_condition` (optional): SQL condition used to identify records that should be inserted.\n - `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of\n the incremental table (`src`) or the final table (`target`).\n Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error.\n\n`delete_condition` example:\n\n```sql\n{{ config(\n materialized='incremental',\n table_type='iceberg',\n incremental_strategy='merge',\n unique_key='user_id',\n incremental_predicates=[\"src.quantity > 1\", \"target.my_date >= now() - interval '4' year\"],\n delete_condition=\"src.status != 'active' and target.my_date < now() - interval '2' year\",\n format='parquet'\n) }}\n\nselect 'A' as user_id,\n 'pi' as name,\n 'active' as status,\n 17.89 as cost,\n 1 as quantity,\n 100000000 as quantity_big,\n current_date as my_date\n```\n\n`update_condition` example:\n\n```sql\n{{ config(\n materialized='incremental',\n incremental_strategy='merge',\n unique_key=['id'],\n update_condition='target.id > 1',\n schema='sandbox'\n )\n}}\n\n{% if is_incremental() %}\n\nselect * from (\n values\n (1, 'v1-updated')\n , (2, 'v2-updated')\n) as t (id, value)\n\n{% else %}\n\nselect * from (\n values\n (-1, 'v-1')\n , (0, 'v0')\n , (1, 'v1')\n , (2, 'v2')\n) as t (id, value)\n\n{% endif %}\n```\n\n`insert_condition` example:\n\n```sql\n{{ config(\n materialized='incremental',\n incremental_strategy='merge',\n unique_key=['id'],\n insert_condition='target.status != 0',\n schema='sandbox'\n )\n}}\n\nselect * from (\n values\n (1, 0)\n , (2, 1)\n) as t (id, status)\n\n```\n\n### Highly available table (HA)\n\nThe current implementation of the table materialization can lead to downtime, as the target table is\ndropped and re-created. To have the less destructive behavior it's possible to use the `ha` config on\nyour `table` materialized models. It leverages the table versions feature of glue catalog, creating\na temp table and swapping the target table to the location of the temp table. This materialization\nis only available for `table_type=hive` and requires using unique locations. For iceberg, high\navailability is the default.\n\n```sql\n{{ config(\n materialized='table',\n ha=true,\n format='parquet',\n table_type='hive',\n partitioned_by=['status'],\n s3_data_naming='table_unique'\n) }}\n\nselect 'a' as user_id,\n 'pi' as user_name,\n 'active' as status\nunion all\nselect 'b' as user_id,\n 'sh' as user_name,\n 'disabled' as status\n```\n\nBy default, the materialization keeps the last 4 table versions, you can change it by setting `versions_to_keep`.\n\n#### HA known issues\n\n- When swapping from a table with partitions to a table without (and the other way around), there could be a little\n downtime.\n If high performances is needed consider bucketing instead of partitions\n- By default, Glue \"duplicates\" the versions internally, so the last two versions of a table point to the same location\n- It's recommended to set `versions_to_keep` >= 4, as this will avoid having the older location removed\n\n### Update glue data catalog\n\nOptionally persist resource descriptions as column and relation comments to the glue data catalog, and meta as\n[glue table properties](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html#table-properties)\nand [column parameters](https://docs.aws.amazon.com/glue/latest/webapi/API_Column.html).\nBy default, documentation persistence is disabled, but it can be enabled for specific resources or\ngroups of resources as needed.\n\nFor example:\n\n```yaml\nmodels:\n - name: test_deduplicate\n description: another value\n config:\n persist_docs:\n relation: true\n columns: true\n meta:\n test: value\n columns:\n - name: id\n meta:\n primary_key: true\n```\n\nSee [persist docs](https://docs.getdbt.com/reference/resource-configs/persist_docs) for more details.\n\n## Snapshots\n\nThe adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot\ncreate a snapshot file in the snapshots directory. If the directory does not exist create one.\n\n### Timestamp strategy\n\nTo use the timestamp strategy refer to\nthe [dbt docs](https://docs.getdbt.com/docs/build/snapshots#timestamp-strategy-recommended)\n\n### Check strategy\n\nTo use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#check-strategy)\n\n### Hard-deletes\n\nThe materialization also supports invalidating hard deletes. Check\nthe [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage.\n\n### Working example\n\nseed file - employent_indicators_november_2022_csv_tables.csv\n\n```csv\nSeries_reference,Period,Data_value,Suppressed\nMEIM.S1WA,1999.04,80267,\nMEIM.S1WA,1999.05,70803,\nMEIM.S1WA,1999.06,65792,\nMEIM.S1WA,1999.07,66194,\nMEIM.S1WA,1999.08,67259,\nMEIM.S1WA,1999.09,69691,\nMEIM.S1WA,1999.1,72475,\nMEIM.S1WA,1999.11,79263,\nMEIM.S1WA,1999.12,86540,\nMEIM.S1WA,2000.01,82552,\nMEIM.S1WA,2000.02,81709,\nMEIM.S1WA,2000.03,84126,\nMEIM.S1WA,2000.04,77089,\nMEIM.S1WA,2000.05,73811,\nMEIM.S1WA,2000.06,70070,\nMEIM.S1WA,2000.07,69873,\nMEIM.S1WA,2000.08,71468,\nMEIM.S1WA,2000.09,72462,\nMEIM.S1WA,2000.1,74897,\n```\n\nmodel.sql\n\n```sql\n{{ config(\n materialized='table'\n) }}\n\nselect row_number() over() as id\n , *\n , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp\nfrom {{ ref('employment_indicators_november_2022_csv_tables') }}\n```\n\ntimestamp strategy - model_snapshot_1\n\n```sql\n{% snapshot model_snapshot_1 %}\n\n{{\n config(\n strategy='timestamp',\n updated_at='refresh_timestamp',\n unique_key='id'\n )\n}}\n\nselect *\nfrom {{ ref('model') }} {% endsnapshot %}\n```\n\ninvalidate hard deletes - model_snapshot_2\n\n```sql\n{% snapshot model_snapshot_2 %}\n\n{{\n config\n (\n unique_key='id',\n strategy='timestamp',\n updated_at='refresh_timestamp',\n invalidate_hard_deletes=True,\n )\n}}\nselect *\nfrom {{ ref('model') }} {% endsnapshot %}\n```\n\ncheck strategy - model_snapshot_3\n\n```sql\n{% snapshot model_snapshot_3 %}\n\n{{\n config\n (\n unique_key='id',\n strategy='check',\n check_cols=['series_reference','data_value']\n )\n}}\nselect *\nfrom {{ ref('model') }} {% endsnapshot %}\n```\n\n### Snapshots known issues\n\n- Incremental Iceberg models - Sync all columns on schema change can't remove columns used for partitioning.\n The only way, from a dbt perspective, is to do a full-refresh of the incremental model.\n\n- Tables, schemas and database names should only be lowercase\n\n- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not\n installed in the target environment.\n See <https://github.com/dbt-athena/dbt-athena/issues/103> for more details.\n\n- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column\n from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history\n\n## AWS Lake Formation integration\n\nThe adapter implements AWS Lake Formation tags management in the following way:\n\n- You can enable or disable lf-tags management via [config](#table-configuration) (disabled by default)\n- Once you enable the feature, lf-tags will be updated on every dbt run\n- First, all lf-tags for columns are removed to avoid inheritance issues\n- Then, all redundant lf-tags are removed from tables and actual tags from table configs are applied\n- Finally, lf-tags for columns are applied\n\nIt's important to understand the following points:\n\n- dbt does not manage lf-tags for databases\n- dbt does not manage Lake Formation permissions\n\nThat's why you should handle this by yourself manually or using an automation tool like terraform, AWS CDK etc.\nYou may find the following links useful to manage that:\n\n<!-- markdownlint-disable -->\n* [terraform aws_lakeformation_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions)\n* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags)\n<!-- markdownlint-restore -->\n\n## Python models\n\nThe adapter supports Python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html).\n\n### Setup\n\n- A Spark-enabled workgroup created in Athena\n- Spark execution role granted access to Athena, Glue and S3\n- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile to be used\n is referenced in `dbt_project.yml`\n\n### Spark-specific table configuration\n\n- `timeout` (`default=43200`)\n - Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds.\n- `spark_encryption` (`default=false`)\n - If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored\n locally by Spark.\n- `spark_cross_account_catalog` (`default=false`)\n - When using the Spark Athena workgroup, queries can only be made against catalogs located on the same\n AWS account by default. However, sometimes you want to query another catalog located on an external AWS\n account. Setting this additional Spark properties parameter to true will enable querying external catalogs.\n You can use the syntax `external_catalog_id/database.table` to access the external table on the external\n catalog (ex: `999999999999/mydatabase.cloudfront_logs` where 999999999999 is the external catalog ID)\n- `spark_requester_pays` (`default=false`)\n - When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for\n data access and data transfer fees associated with the query.\n - If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark.\n\n### Spark notes\n\n- A session is created for each unique engine configuration defined in the models that are part of the invocation.\n- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation\n (Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session.\n- The number of Python models running at a time depends on the `threads`. The number of sessions created for the\n entire run depends on the number of unique engine configurations and the availability of sessions to maintain\n thread concurrency.\n- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2.\n This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark.\n\n### Example models\n\n#### Simple pandas model\n\n```python\nimport pandas as pd\n\n\ndef model(dbt, session):\n dbt.config(materialized=\"table\")\n\n model_df = pd.DataFrame({\"A\": [1, 2, 3, 4]})\n\n return model_df\n```\n\n#### Simple spark\n\n```python\ndef model(dbt, spark_session):\n dbt.config(materialized=\"table\")\n\n data = [(1,), (2,), (3,), (4,)]\n\n df = spark_session.createDataFrame(data, [\"A\"])\n\n return df\n```\n\n#### Spark incremental\n\n```python\ndef model(dbt, spark_session):\n dbt.config(materialized=\"incremental\")\n df = dbt.ref(\"model\")\n\n if dbt.is_incremental:\n max_from_this = (\n f\"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}\"\n )\n df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0])\n\n return df\n```\n\n#### Config spark model\n\n```python\ndef model(dbt, spark_session):\n dbt.config(\n materialized=\"table\",\n engine_config={\n \"CoordinatorDpuSize\": 1,\n \"MaxConcurrentDpus\": 3,\n \"DefaultExecutorDpuSize\": 1\n },\n spark_encryption=True,\n spark_cross_account_catalog=True,\n spark_requester_pays=True\n polling_interval=15,\n timeout=120,\n )\n\n data = [(1,), (2,), (3,), (4,)]\n\n df = spark_session.createDataFrame(data, [\"A\"])\n\n return df\n```\n\n#### Create pySpark udf using imported external python files\n\n```python\ndef model(dbt, spark_session):\n dbt.config(\n materialized=\"incremental\",\n incremental_strategy=\"merge\",\n unique_key=\"num\",\n )\n sc = spark_session.sparkContext\n sc.addPyFile(\"s3://athena-dbt/test/file1.py\")\n sc.addPyFile(\"s3://athena-dbt/test/file2.py\")\n\n def func(iterator):\n from file2 import transform\n\n return [transform(i) for i in iterator]\n\n from pyspark.sql.functions import udf\n from pyspark.sql.functions import col\n\n udf_with_import = udf(func)\n\n data = [(1, \"a\"), (2, \"b\"), (3, \"c\")]\n cols = [\"num\", \"alpha\"]\n df = spark_session.createDataFrame(data, cols)\n\n return df.withColumn(\"udf_test_col\", udf_with_import(col(\"alpha\")))\n```\n\n### Known issues in Python models\n\n- Python models cannot\n [reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html).\n- Third-party Python libraries can be used, but they must be [included in the pre-installed list][pre-installed list]\n or [imported manually][imported manually].\n- Python models can only reference or write to tables with names meeting the\n regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not\n supported by Spark, even though Athena supports them.\n- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which\n runs on Trino.\n- Snapshot materializations are not supported.\n- Spark can only reference tables within the same catalog.\n- For tables created outside of the dbt tool, be sure to populate the location field or dbt will throw an error\nwhen trying to create the table.\n\n[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html\n[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html\n\n## Contracts\n\nThe adapter partly supports contract definitions:\n\n- `data_type` is supported but needs to be adjusted for complex types. Types must be specified\n entirely (for instance `array<int>`) even though they won't be checked. Indeed, as dbt recommends, we only compare\n the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types\n defined in Athena are ok (pre-flight check).\n- The adapter does not support the constraints since there is no constraint concept in Athena.\n\n## Contributing\n\nSee [CONTRIBUTING](CONTRIBUTING.md) for more information on how to contribute to this project.\n\n## Contributors \u2728\n\nThanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)):\n\n<a href=\"https://github.com/dbt-athena/dbt-athena/graphs/contributors\">\n <img src=\"https://contrib.rocks/image?repo=dbt-athena/dbt-athena\" />\n</a>\n\nContributions of any kind welcome!\n",
"bugtrack_url": null,
"license": null,
"summary": "The athena adapter plugin for dbt (data build tool)",
"version": "1.9.0",
"project_urls": {
"Documentation": "https://docs.getdbt.com",
"Homepage": "https://github.com/dbt-labs/dbt-athena/dbt-athena",
"Issues": "https://github.com/dbt-labs/dbt-athena/issues",
"Repository": "https://github.com/dbt-labs/dbt-athena.git#subdirectory=dbt-athena"
},
"split_keywords": [
"adapter",
" adapters",
" athena",
" database",
" dbt",
" dbt cloud",
" dbt core",
" dbt labs",
" dbt-core",
" elt"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "3ef18c97de65314f39b799b5553607c97cc69adf4e31c901e9ed84ef1dd7ff67",
"md5": "9773881fa71fbe75a8da0c6a1c0c7882",
"sha256": "7ab06a6bc408fbffad2f68db984126ff07766323c07124f918ed8fb584d309fa"
},
"downloads": -1,
"filename": "dbt_athena-1.9.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "9773881fa71fbe75a8da0c6a1c0c7882",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9.0",
"size": 80667,
"upload_time": "2024-12-10T23:19:34",
"upload_time_iso_8601": "2024-12-10T23:19:34.685650Z",
"url": "https://files.pythonhosted.org/packages/3e/f1/8c97de65314f39b799b5553607c97cc69adf4e31c901e9ed84ef1dd7ff67/dbt_athena-1.9.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "5b3ca99f605d655bdcc751dcd2cbceff80c4ae3c95094551fcef2bfc35c695e6",
"md5": "88b223df0bacd583141d2f7b765abd03",
"sha256": "10998920f2e567bcd2bea602d639d47450de0c9e5a2425f3473d9a89e496c68e"
},
"downloads": -1,
"filename": "dbt_athena-1.9.0.tar.gz",
"has_sig": false,
"md5_digest": "88b223df0bacd583141d2f7b765abd03",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9.0",
"size": 71776,
"upload_time": "2024-12-10T23:19:37",
"upload_time_iso_8601": "2024-12-10T23:19:37.399727Z",
"url": "https://files.pythonhosted.org/packages/5b/3c/a99f605d655bdcc751dcd2cbceff80c4ae3c95094551fcef2bfc35c695e6/dbt_athena-1.9.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-10 23:19:37",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "dbt-labs",
"github_project": "dbt-athena",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "dbt-athena"
}