# dbt Flink Adapter
[![Python Version](https://img.shields.io/badge/python-3.8-blue.svg)](https://github.com/getindata/dbt-flink-adapter)
[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![SemVer](https://img.shields.io/badge/semver-2.0.0-green)](https://semver.org/)
[![PyPI version](https://badge.fury.io/py/dbt-flink-adapter.svg)](https://badge.fury.io/py/dbt-flink-adapter)
[![Downloads](https://pepy.tech/badge/dbt-flink-adapter)](https://pepy.tech/badge/dbt-flink-adapter)
This is an MVP of dbt Flink Adapter. It allows materializing of dbt models as Flink cluster streaming pipelines and batch jobs.
Check out our [blogpost about dbt-flink-adapter with tutorial](https://getindata.com/blog/dbt-run-real-time-analytics-on-apache-flink-announcing-the-dbt-flink-adapter/)
## Prerequisites
* Flink 1.16+ with Flink SQL Gateway
* Python 3.8+ with pip
* (Optionally) venv
## Setup
This adapter is connecting to Flink SQL Gateway which is not started in Flink by default.
Please refer to [flink-doc/starting-the-sql-gateway](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/#starting-the-sql-gateway)
on how to start SQL gateway in your cluster.
For testing and developing purposes you can use `envs/flink-1.16/docker-compose.yml` to start one node Flink cluster with SQL Gateway.
```shell
$ cd envs/flink-1.16
$ docker compose up
```
### Install `dbt-flink-adapter`
Create virtual environment and install `dbt-flink-adapter` from [PyPI/dbt-flink-adapter](https://pypi.org/project/dbt-flink-adapter/) with `pip`
```shell
$ python3 -m venv ~/.virtualenvs/dbt-example1
$ source ~/.virtualenvs/dbt-example1/bin/activate
$ pip3 install dbt-flink-adapter
$ dbt --version
...
Plugins:
- flink: x.y.z
```
### Create and initialize dbt project
Navigate to directory in which you want to create your project. If you are using Flink with SQL Gateway started
from docker-compose.yml file in this repo you can leave all values as defaults.
```shell
$ dbt init
Enter a name for your project (letters, digits, underscore): example1
Which database would you like to use?
[1] flink
Enter a number: 1
host (Flink SQL Gateway host) [localhost]:
port [8083]:
session_name [test_session]:
database (Flink catalog) [default_catalog]:
schema (Flink database) [default_database]:
threads (1 or more) [1]:
$ cd example1
```
## Creating and running dbt model
On how to create and run dbt model please refer to [dbt-docs](https://docs.getdbt.com/docs/build/projects).
This README will focus on things that are specific for this adapter.
```shell
dbt run
```
### Source
In typical use-case dbt connects and runs its ETL processes on database engine that already has connection with underlying persistence layer.
In case of Flink however it's only a processing engine, and we need to define connectivity with external persistence.
To do so we have to define sources in our dbt model.
#### Connector properties
dbt-flink-adapter will read `config/connector_properties` key and use it as connector properties.
#### Type
Flink supports sources in batch and streaming mode, use `type` to select what execution environment will be used during source creation.
#### column type
current has these values, refer to [flink-doc/create-table](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#create-table)
- physical (default)
- metadata
- computed
#### Watermark
To provide watermark pass `column` and `strategy` reference under `watermark` key in config.
Please refer to Flink documentation about possible watermark strategies: [flink-doc/watermark](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark)
#### Example
```yaml
sources:
- name: my_source
tables:
- name: clickstream
config:
type: streaming
connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
topic: 'clickstream'
watermark:
column: event_timestamp
strategy: event_timestamp
columns:
- name: id
data_type: BIGINT
- name: id2
column_type: computed
expression: id + 1
- name: event_timestamp
data_type: TIMESTAMP(3)
- name: ts2
column_type: metadata
data_type: TIMESTAMP(3)
expression: timestamp
```
SQL passed to Flink will look like:
```sql
CREATE TABLE IF NOT EXISTS my_source (
`id` BIGINT,
`id2` AS id + 1,
`event_timestamp` TIMESTAMP(3),
`ts2` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR event_timestamp AS event_timestamp
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka:29092',
'topic' = 'clickstream'
)
```
### Model
This adapter currently supports two types of materialization *table* and *view*. Because in Flink table has to be
associated with a connector `type` and `connector_properties` have to be provided similar like in case of defining sources.
#### Example
`models.yml`
```yaml
models:
- name: my_model
config:
type: streaming
connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
topic: 'some-output'
```
`my_model.sql`
```sql
select *
from {{ source('my_source', 'clickstream') }}
where event = 'some-event'
```
### Seed
dbt-flink-adapter can use Flink to insert seed data in any Flink supported connector.
Similar like in case of sources and models you have to provide connector configuration.
#### Example
`seeds.yml`
```yaml
seeds:
- name: clickstream
config:
connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
topic: 'clickstream'
```
### Tests
Dbt also allows executing assertions in a form of tests to validate input data or model output if it does not contain abnormal values.
All generic tests are a select statement which is considered as passed when it did not found any rows.
The problem is how to define such thing in streaming pipeline? It is not possible to tell that in entire stream there are no such entries
as stream by definition is infinite. What we can do however is to have run the test for some specific time and if in that time there are no
abnormal values, test will be considered as passed.
To facilitate it dbt-flink-adapter when writing a sql query supports fetch_timeout_ms and mode directive.
```sql
select /** fetch_timeout_ms(5000) mode('streaming') */
*
from {{ ref('my_model')}}
where
event <> 'some-event'
```
In this example we are telling dbt-flink-adapter to fetch for 5 seconds in streaming mode.
### dbt_project.yml
You can extract common configurations of your model and sources into `dbt_project.yml` [dbt-docs/general-configuration](https://docs.getdbt.com/reference/model-configs#general-configurations).
If you define the same kay in `dbt_project.yml` and in your model or source dbt will always override entire key value.
In case you wish to extract some keys from under `connector_properties` you can specify configuration under `default_connector_properties`
which will get merged with `connection_properies`.
#### Example
`dbt_project.yml`
```yaml
models:
example1:
+materialized: table
+type: streaming
+default_connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
sources:
example1:
+type: streaming
+default_connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
seeds:
example1:
+default_connector_properties:
connector: 'kafka'
properties.bootstrap.servers: 'kafka:29092'
```
`models.yml`
```yaml
models:
- name: my_model
config:
connector_properties:
topic: 'some-output'
```
`sources.yml`
```yaml
sources:
- name: my_source
tables:
- name: clickstream
config:
connector_properties:
topic: 'clickstream'
watermark:
column: event_timestamp
strategy: event_timestamp
columns:
- name: event_timestamp
data_type: TIMESTAMP(3)
```
`seeds.yml`
```yaml
seeds:
- name: clickstream
config:
connector_properties:
topic: 'clickstream'
```
## Sessions
Our interaction with Flink cluster is done in sessions any table and view created in one session will not be visible in another session.
Session by default is only valid for 10 minutes. Because of that if you will run dbt test after more than 10 minutes from dbt run
it will fail and in Flink logs you will find that it cannot find your tables. Currently, the only way to run this would be to rerun entire model.
Session handler is stored in `~/.dbt/flink-session.yml` file, if you want to force new session you can simply delete that file.
## Releasing
To release new version first execute [prepare-release](https://github.com/getindata/dbt-flink-adapter/actions/workflows/prepare-release.yml) action.
Please keep in mind that major and minor version have to be exactly the same as major and minor version of dbt-core.
This action will create a release branch with bumped version and changelog prepared for release. It will also open a Pull Request to main branch if everything is ok with it - merge it.
Next execute [publish](https://github.com/getindata/dbt-flink-adapter/actions/workflows/publish.yml) on branch that was just created by prepare-release action.
Raw data
{
"_id": null,
"home_page": "https://github.com/getindata/dbt-flink-adapter",
"name": "dbt-flink-adapter",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": "",
"keywords": "",
"author": "GetInData",
"author_email": "office@getindata.com",
"download_url": "https://files.pythonhosted.org/packages/49/71/e931f86d2dbd4a3b30b94d2ce4ffcfe70805ecf08ca4cd04920b08c898f0/dbt-flink-adapter-1.3.9.tar.gz",
"platform": null,
"description": "# dbt Flink Adapter\n\n[![Python Version](https://img.shields.io/badge/python-3.8-blue.svg)](https://github.com/getindata/dbt-flink-adapter)\n[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)\n[![SemVer](https://img.shields.io/badge/semver-2.0.0-green)](https://semver.org/)\n[![PyPI version](https://badge.fury.io/py/dbt-flink-adapter.svg)](https://badge.fury.io/py/dbt-flink-adapter)\n[![Downloads](https://pepy.tech/badge/dbt-flink-adapter)](https://pepy.tech/badge/dbt-flink-adapter)\n\nThis is an MVP of dbt Flink Adapter. It allows materializing of dbt models as Flink cluster streaming pipelines and batch jobs.\n\nCheck out our [blogpost about dbt-flink-adapter with tutorial](https://getindata.com/blog/dbt-run-real-time-analytics-on-apache-flink-announcing-the-dbt-flink-adapter/)\n\n## Prerequisites\n\n* Flink 1.16+ with Flink SQL Gateway\n* Python 3.8+ with pip\n* (Optionally) venv\n\n## Setup\n\nThis adapter is connecting to Flink SQL Gateway which is not started in Flink by default.\nPlease refer to [flink-doc/starting-the-sql-gateway](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/#starting-the-sql-gateway)\non how to start SQL gateway in your cluster.\n\nFor testing and developing purposes you can use `envs/flink-1.16/docker-compose.yml` to start one node Flink cluster with SQL Gateway.\n\n```shell\n$ cd envs/flink-1.16\n$ docker compose up\n```\n\n### Install `dbt-flink-adapter`\n\nCreate virtual environment and install `dbt-flink-adapter` from [PyPI/dbt-flink-adapter](https://pypi.org/project/dbt-flink-adapter/) with `pip`\n\n```shell\n$ python3 -m venv ~/.virtualenvs/dbt-example1\n$ source ~/.virtualenvs/dbt-example1/bin/activate\n$ pip3 install dbt-flink-adapter\n$ dbt --version\n...\nPlugins:\n - flink: x.y.z\n```\n\n### Create and initialize dbt project\n\nNavigate to directory in which you want to create your project. If you are using Flink with SQL Gateway started\nfrom docker-compose.yml file in this repo you can leave all values as defaults.\n\n```shell\n$ dbt init\nEnter a name for your project (letters, digits, underscore): example1\nWhich database would you like to use?\n[1] flink\n\nEnter a number: 1\nhost (Flink SQL Gateway host) [localhost]:\nport [8083]:\nsession_name [test_session]:\ndatabase (Flink catalog) [default_catalog]:\nschema (Flink database) [default_database]:\nthreads (1 or more) [1]:\n\n$ cd example1\n```\n\n## Creating and running dbt model\n\nOn how to create and run dbt model please refer to [dbt-docs](https://docs.getdbt.com/docs/build/projects).\nThis README will focus on things that are specific for this adapter.\n\n```shell\ndbt run\n```\n\n### Source\n\nIn typical use-case dbt connects and runs its ETL processes on database engine that already has connection with underlying persistence layer.\nIn case of Flink however it's only a processing engine, and we need to define connectivity with external persistence.\nTo do so we have to define sources in our dbt model.\n\n#### Connector properties\n\ndbt-flink-adapter will read `config/connector_properties` key and use it as connector properties.\n\n#### Type\n\nFlink supports sources in batch and streaming mode, use `type` to select what execution environment will be used during source creation.\n\n#### column type\ncurrent has these values, refer to [flink-doc/create-table](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#create-table)\n- physical (default)\n- metadata\n- computed\n\n#### Watermark\n\nTo provide watermark pass `column` and `strategy` reference under `watermark` key in config.\n\nPlease refer to Flink documentation about possible watermark strategies: [flink-doc/watermark](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark)\n\n#### Example\n\n```yaml\nsources:\n - name: my_source\n tables:\n - name: clickstream\n config:\n type: streaming\n connector_properties:\n connector: 'kafka'\n properties.bootstrap.servers: 'kafka:29092'\n topic: 'clickstream'\n watermark:\n column: event_timestamp\n strategy: event_timestamp\n columns:\n - name: id\n data_type: BIGINT\n - name: id2\n column_type: computed\n expression: id + 1\n - name: event_timestamp\n data_type: TIMESTAMP(3)\n - name: ts2\n column_type: metadata\n data_type: TIMESTAMP(3)\n expression: timestamp\n```\n\nSQL passed to Flink will look like:\n```sql\nCREATE TABLE IF NOT EXISTS my_source (\n `id` BIGINT,\n `id2` AS id + 1,\n `event_timestamp` TIMESTAMP(3),\n `ts2` TIMESTAMP(3) METADATA FROM 'timestamp',\n WATERMARK FOR event_timestamp AS event_timestamp\n) WITH (\n 'connector' = 'kafka',\n 'properties.bootstrap.servers' = 'kafka:29092',\n 'topic' = 'clickstream'\n)\n```\n\n### Model\n\nThis adapter currently supports two types of materialization *table* and *view*. Because in Flink table has to be\nassociated with a connector `type` and `connector_properties` have to be provided similar like in case of defining sources.\n\n#### Example\n\n`models.yml`\n\n```yaml\nmodels:\n - name: my_model\n config:\n type: streaming\n connector_properties:\n connector: 'kafka'\n properties.bootstrap.servers: 'kafka:29092'\n topic: 'some-output'\n```\n\n`my_model.sql`\n\n```sql\nselect *\nfrom {{ source('my_source', 'clickstream') }}\nwhere event = 'some-event'\n```\n\n### Seed\n\ndbt-flink-adapter can use Flink to insert seed data in any Flink supported connector.\nSimilar like in case of sources and models you have to provide connector configuration.\n\n#### Example\n\n`seeds.yml`\n\n```yaml\nseeds:\n - name: clickstream\n config:\n connector_properties:\n connector: 'kafka'\n properties.bootstrap.servers: 'kafka:29092'\n topic: 'clickstream'\n```\n\n### Tests\n\nDbt also allows executing assertions in a form of tests to validate input data or model output if it does not contain abnormal values.\nAll generic tests are a select statement which is considered as passed when it did not found any rows.\n\nThe problem is how to define such thing in streaming pipeline? It is not possible to tell that in entire stream there are no such entries\nas stream by definition is infinite. What we can do however is to have run the test for some specific time and if in that time there are no\nabnormal values, test will be considered as passed.\n\nTo facilitate it dbt-flink-adapter when writing a sql query supports fetch_timeout_ms and mode directive.\n\n```sql\nselect /** fetch_timeout_ms(5000) mode('streaming') */\n *\nfrom {{ ref('my_model')}}\nwhere\n event <> 'some-event'\n```\n\nIn this example we are telling dbt-flink-adapter to fetch for 5 seconds in streaming mode.\n\n### dbt_project.yml\n\nYou can extract common configurations of your model and sources into `dbt_project.yml` [dbt-docs/general-configuration](https://docs.getdbt.com/reference/model-configs#general-configurations).\nIf you define the same kay in `dbt_project.yml` and in your model or source dbt will always override entire key value.\nIn case you wish to extract some keys from under `connector_properties` you can specify configuration under `default_connector_properties`\nwhich will get merged with `connection_properies`.\n\n#### Example\n\n`dbt_project.yml`\n\n```yaml\nmodels:\n example1:\n +materialized: table\n +type: streaming\n +default_connector_properties:\n connector: 'kafka'\n properties.bootstrap.servers: 'kafka:29092'\n\nsources:\n example1:\n +type: streaming\n +default_connector_properties:\n connector: 'kafka'\n properties.bootstrap.servers: 'kafka:29092'\n\nseeds:\n example1:\n +default_connector_properties:\n connector: 'kafka'\n properties.bootstrap.servers: 'kafka:29092'\n```\n\n`models.yml`\n\n```yaml\nmodels:\n - name: my_model\n config:\n connector_properties:\n topic: 'some-output'\n```\n\n`sources.yml`\n\n```yaml\nsources:\n - name: my_source\n tables:\n - name: clickstream\n config:\n connector_properties:\n topic: 'clickstream'\n watermark:\n column: event_timestamp\n strategy: event_timestamp\n columns:\n - name: event_timestamp\n data_type: TIMESTAMP(3)\n```\n\n`seeds.yml`\n\n```yaml\nseeds:\n - name: clickstream\n config:\n connector_properties:\n topic: 'clickstream'\n```\n\n## Sessions\n\nOur interaction with Flink cluster is done in sessions any table and view created in one session will not be visible in another session.\nSession by default is only valid for 10 minutes. Because of that if you will run dbt test after more than 10 minutes from dbt run\nit will fail and in Flink logs you will find that it cannot find your tables. Currently, the only way to run this would be to rerun entire model.\n\nSession handler is stored in `~/.dbt/flink-session.yml` file, if you want to force new session you can simply delete that file.\n\n## Releasing\n\nTo release new version first execute [prepare-release](https://github.com/getindata/dbt-flink-adapter/actions/workflows/prepare-release.yml) action.\nPlease keep in mind that major and minor version have to be exactly the same as major and minor version of dbt-core.\n\nThis action will create a release branch with bumped version and changelog prepared for release. It will also open a Pull Request to main branch if everything is ok with it - merge it.\n\nNext execute [publish](https://github.com/getindata/dbt-flink-adapter/actions/workflows/publish.yml) on branch that was just created by prepare-release action.\n",
"bugtrack_url": null,
"license": "",
"summary": "The Flink adapter plugin for dbt",
"version": "1.3.9",
"project_urls": {
"Homepage": "https://github.com/getindata/dbt-flink-adapter"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "4971e931f86d2dbd4a3b30b94d2ce4ffcfe70805ecf08ca4cd04920b08c898f0",
"md5": "97fae27f8478344381845a3c1655d02b",
"sha256": "431fda988cdf0c766a3016a46d93c4db5810a2c9a98c2330c592fc1fd26cf7ec"
},
"downloads": -1,
"filename": "dbt-flink-adapter-1.3.9.tar.gz",
"has_sig": false,
"md5_digest": "97fae27f8478344381845a3c1655d02b",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 26905,
"upload_time": "2024-03-06T12:33:50",
"upload_time_iso_8601": "2024-03-06T12:33:50.055678Z",
"url": "https://files.pythonhosted.org/packages/49/71/e931f86d2dbd4a3b30b94d2ce4ffcfe70805ecf08ca4cd04920b08c898f0/dbt-flink-adapter-1.3.9.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-03-06 12:33:50",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "getindata",
"github_project": "dbt-flink-adapter",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"tox": true,
"lcname": "dbt-flink-adapter"
}