dbt-flink-adapter


Namedbt-flink-adapter JSON
Version 1.3.9 PyPI version JSON
download
home_pagehttps://github.com/getindata/dbt-flink-adapter
SummaryThe Flink adapter plugin for dbt
upload_time2024-03-06 12:33:50
maintainer
docs_urlNone
authorGetInData
requires_python>=3.7
license
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # dbt Flink Adapter

[![Python Version](https://img.shields.io/badge/python-3.8-blue.svg)](https://github.com/getindata/dbt-flink-adapter)
[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![SemVer](https://img.shields.io/badge/semver-2.0.0-green)](https://semver.org/)
[![PyPI version](https://badge.fury.io/py/dbt-flink-adapter.svg)](https://badge.fury.io/py/dbt-flink-adapter)
[![Downloads](https://pepy.tech/badge/dbt-flink-adapter)](https://pepy.tech/badge/dbt-flink-adapter)

This is an MVP of dbt Flink Adapter. It allows materializing of dbt models as Flink cluster streaming pipelines and batch jobs.

Check out our [blogpost about dbt-flink-adapter with tutorial](https://getindata.com/blog/dbt-run-real-time-analytics-on-apache-flink-announcing-the-dbt-flink-adapter/)

## Prerequisites

* Flink 1.16+ with Flink SQL Gateway
* Python 3.8+ with pip
* (Optionally) venv

## Setup

This adapter is connecting to Flink SQL Gateway which is not started in Flink by default.
Please refer to [flink-doc/starting-the-sql-gateway](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/#starting-the-sql-gateway)
on how to start SQL gateway in your cluster.

For testing and developing purposes you can use `envs/flink-1.16/docker-compose.yml` to start one node Flink cluster with SQL Gateway.

```shell
$ cd envs/flink-1.16
$ docker compose up
```

### Install `dbt-flink-adapter`

Create virtual environment and install `dbt-flink-adapter` from [PyPI/dbt-flink-adapter](https://pypi.org/project/dbt-flink-adapter/) with `pip`

```shell
$ python3 -m venv ~/.virtualenvs/dbt-example1
$ source ~/.virtualenvs/dbt-example1/bin/activate
$ pip3 install dbt-flink-adapter
$ dbt --version
...
Plugins:
  - flink: x.y.z
```

### Create and initialize dbt project

Navigate to directory in which you want to create your project. If you are using Flink with SQL Gateway started
from docker-compose.yml file in this repo you can leave all values as defaults.

```shell
$  dbt init
Enter a name for your project (letters, digits, underscore): example1
Which database would you like to use?
[1] flink

Enter a number: 1
host (Flink SQL Gateway host) [localhost]:
port [8083]:
session_name [test_session]:
database (Flink catalog) [default_catalog]:
schema (Flink database) [default_database]:
threads (1 or more) [1]:

$ cd example1
```

## Creating and running dbt model

On how to create and run dbt model please refer to [dbt-docs](https://docs.getdbt.com/docs/build/projects).
This README will focus on things that are specific for this adapter.

```shell
dbt run
```

### Source

In typical use-case dbt connects and runs its ETL processes on database engine that already has connection with underlying persistence layer.
In case of Flink however it's only a processing engine, and we need to define connectivity with external persistence.
To do so we have to define sources in our dbt model.

#### Connector properties

dbt-flink-adapter will read `config/connector_properties` key and use it as connector properties.

#### Type

Flink supports sources in batch and streaming mode, use `type` to select what execution environment will be used during source creation.

#### column type
current has these values, refer to [flink-doc/create-table](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#create-table)
- physical (default)
- metadata
- computed

#### Watermark

To provide watermark pass `column` and `strategy` reference under `watermark` key in config.

Please refer to Flink documentation about possible watermark strategies: [flink-doc/watermark](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark)

#### Example

```yaml
sources:
  - name: my_source
    tables:
      - name: clickstream
        config:
          type: streaming
          connector_properties:
            connector: 'kafka'
            properties.bootstrap.servers: 'kafka:29092'
            topic: 'clickstream'
          watermark:
            column: event_timestamp
            strategy: event_timestamp
        columns:
          - name: id
            data_type: BIGINT
          - name: id2
            column_type: computed
            expression: id + 1
          - name: event_timestamp
            data_type: TIMESTAMP(3)
          - name: ts2
            column_type: metadata
            data_type: TIMESTAMP(3)
            expression: timestamp
```

SQL passed to Flink will look like:
```sql
CREATE TABLE IF NOT EXISTS my_source (
  `id` BIGINT,
  `id2` AS id + 1,
  `event_timestamp` TIMESTAMP(3),
  `ts2` TIMESTAMP(3) METADATA  FROM 'timestamp',
  WATERMARK FOR event_timestamp AS event_timestamp
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'kafka:29092',
  'topic' = 'clickstream'
)
```

### Model

This adapter currently supports two types of materialization *table* and *view*. Because in Flink table has to be
associated with a connector `type` and `connector_properties` have to be provided similar like in case of defining sources.

#### Example

`models.yml`

```yaml
models:
  - name: my_model
    config:
      type: streaming
      connector_properties:
        connector: 'kafka'
        properties.bootstrap.servers: 'kafka:29092'
        topic: 'some-output'
```

`my_model.sql`

```sql
select *
from {{ source('my_source', 'clickstream') }}
where event = 'some-event'
```

### Seed

dbt-flink-adapter can use Flink to insert seed data in any Flink supported connector.
Similar like in case of sources and models you have to provide connector configuration.

#### Example

`seeds.yml`

```yaml
seeds:
  - name: clickstream
    config:
      connector_properties:
        connector: 'kafka'
        properties.bootstrap.servers: 'kafka:29092'
        topic: 'clickstream'
```

### Tests

Dbt also allows executing assertions in a form of tests to validate input data or model output if it does not contain abnormal values.
All generic tests are a select statement which is considered as passed when it did not found any rows.

The problem is how to define such thing in streaming pipeline? It is not possible to tell that in entire stream there are no such entries
as stream by definition is infinite. What we can do however is to have run the test for some specific time and if in that time there are no
abnormal values, test will be considered as passed.

To facilitate it dbt-flink-adapter when writing a sql query supports fetch_timeout_ms and mode directive.

```sql
select /** fetch_timeout_ms(5000) mode('streaming') */
  *
from {{ ref('my_model')}}
where
  event <> 'some-event'
```

In this example we are telling dbt-flink-adapter to fetch for 5 seconds in streaming mode.

### dbt_project.yml

You can extract common configurations of your model and sources into `dbt_project.yml` [dbt-docs/general-configuration](https://docs.getdbt.com/reference/model-configs#general-configurations).
If you define the same kay in `dbt_project.yml` and in your model or source dbt will always override entire key value.
In case you wish to extract some keys from under `connector_properties` you can specify configuration under `default_connector_properties`
which will get merged with `connection_properies`.

#### Example

`dbt_project.yml`

```yaml
models:
  example1:
    +materialized: table
    +type: streaming
    +default_connector_properties:
      connector: 'kafka'
      properties.bootstrap.servers: 'kafka:29092'

sources:
  example1:
    +type: streaming
    +default_connector_properties:
      connector: 'kafka'
      properties.bootstrap.servers: 'kafka:29092'

seeds:
  example1:
    +default_connector_properties:
      connector: 'kafka'
      properties.bootstrap.servers: 'kafka:29092'
```

`models.yml`

```yaml
models:
  - name: my_model
    config:
      connector_properties:
        topic: 'some-output'
```

`sources.yml`

```yaml
sources:
  - name: my_source
    tables:
      - name: clickstream
        config:
          connector_properties:
            topic: 'clickstream'
          watermark:
            column: event_timestamp
            strategy: event_timestamp
        columns:
          - name: event_timestamp
            data_type: TIMESTAMP(3)
```

`seeds.yml`

```yaml
seeds:
  - name: clickstream
    config:
      connector_properties:
        topic: 'clickstream'
```

## Sessions

Our interaction with Flink cluster is done in sessions any table and view created in one session will not be visible in another session.
Session by default is only valid for 10 minutes. Because of that if you will run dbt test after more than 10 minutes from dbt run
it will fail and in Flink logs you will find that it cannot find your tables. Currently, the only way to run this would be to rerun entire model.

Session handler is stored in `~/.dbt/flink-session.yml` file, if you want to force new session you can simply delete that file.

## Releasing

To release new version first execute [prepare-release](https://github.com/getindata/dbt-flink-adapter/actions/workflows/prepare-release.yml) action.
Please keep in mind that major and minor version have to be exactly the same as major and minor version of dbt-core.

This action will create a release branch with bumped version and changelog prepared for release. It will also open a Pull Request to main branch if everything is ok with it - merge it.

Next execute [publish](https://github.com/getindata/dbt-flink-adapter/actions/workflows/publish.yml) on branch that was just created by prepare-release action.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/getindata/dbt-flink-adapter",
    "name": "dbt-flink-adapter",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "",
    "keywords": "",
    "author": "GetInData",
    "author_email": "office@getindata.com",
    "download_url": "https://files.pythonhosted.org/packages/49/71/e931f86d2dbd4a3b30b94d2ce4ffcfe70805ecf08ca4cd04920b08c898f0/dbt-flink-adapter-1.3.9.tar.gz",
    "platform": null,
    "description": "# dbt Flink Adapter\n\n[![Python Version](https://img.shields.io/badge/python-3.8-blue.svg)](https://github.com/getindata/dbt-flink-adapter)\n[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)\n[![SemVer](https://img.shields.io/badge/semver-2.0.0-green)](https://semver.org/)\n[![PyPI version](https://badge.fury.io/py/dbt-flink-adapter.svg)](https://badge.fury.io/py/dbt-flink-adapter)\n[![Downloads](https://pepy.tech/badge/dbt-flink-adapter)](https://pepy.tech/badge/dbt-flink-adapter)\n\nThis is an MVP of dbt Flink Adapter. It allows materializing of dbt models as Flink cluster streaming pipelines and batch jobs.\n\nCheck out our [blogpost about dbt-flink-adapter with tutorial](https://getindata.com/blog/dbt-run-real-time-analytics-on-apache-flink-announcing-the-dbt-flink-adapter/)\n\n## Prerequisites\n\n* Flink 1.16+ with Flink SQL Gateway\n* Python 3.8+ with pip\n* (Optionally) venv\n\n## Setup\n\nThis adapter is connecting to Flink SQL Gateway which is not started in Flink by default.\nPlease refer to [flink-doc/starting-the-sql-gateway](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/#starting-the-sql-gateway)\non how to start SQL gateway in your cluster.\n\nFor testing and developing purposes you can use `envs/flink-1.16/docker-compose.yml` to start one node Flink cluster with SQL Gateway.\n\n```shell\n$ cd envs/flink-1.16\n$ docker compose up\n```\n\n### Install `dbt-flink-adapter`\n\nCreate virtual environment and install `dbt-flink-adapter` from [PyPI/dbt-flink-adapter](https://pypi.org/project/dbt-flink-adapter/) with `pip`\n\n```shell\n$ python3 -m venv ~/.virtualenvs/dbt-example1\n$ source ~/.virtualenvs/dbt-example1/bin/activate\n$ pip3 install dbt-flink-adapter\n$ dbt --version\n...\nPlugins:\n  - flink: x.y.z\n```\n\n### Create and initialize dbt project\n\nNavigate to directory in which you want to create your project. If you are using Flink with SQL Gateway started\nfrom docker-compose.yml file in this repo you can leave all values as defaults.\n\n```shell\n$  dbt init\nEnter a name for your project (letters, digits, underscore): example1\nWhich database would you like to use?\n[1] flink\n\nEnter a number: 1\nhost (Flink SQL Gateway host) [localhost]:\nport [8083]:\nsession_name [test_session]:\ndatabase (Flink catalog) [default_catalog]:\nschema (Flink database) [default_database]:\nthreads (1 or more) [1]:\n\n$ cd example1\n```\n\n## Creating and running dbt model\n\nOn how to create and run dbt model please refer to [dbt-docs](https://docs.getdbt.com/docs/build/projects).\nThis README will focus on things that are specific for this adapter.\n\n```shell\ndbt run\n```\n\n### Source\n\nIn typical use-case dbt connects and runs its ETL processes on database engine that already has connection with underlying persistence layer.\nIn case of Flink however it's only a processing engine, and we need to define connectivity with external persistence.\nTo do so we have to define sources in our dbt model.\n\n#### Connector properties\n\ndbt-flink-adapter will read `config/connector_properties` key and use it as connector properties.\n\n#### Type\n\nFlink supports sources in batch and streaming mode, use `type` to select what execution environment will be used during source creation.\n\n#### column type\ncurrent has these values, refer to [flink-doc/create-table](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#create-table)\n- physical (default)\n- metadata\n- computed\n\n#### Watermark\n\nTo provide watermark pass `column` and `strategy` reference under `watermark` key in config.\n\nPlease refer to Flink documentation about possible watermark strategies: [flink-doc/watermark](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark)\n\n#### Example\n\n```yaml\nsources:\n  - name: my_source\n    tables:\n      - name: clickstream\n        config:\n          type: streaming\n          connector_properties:\n            connector: 'kafka'\n            properties.bootstrap.servers: 'kafka:29092'\n            topic: 'clickstream'\n          watermark:\n            column: event_timestamp\n            strategy: event_timestamp\n        columns:\n          - name: id\n            data_type: BIGINT\n          - name: id2\n            column_type: computed\n            expression: id + 1\n          - name: event_timestamp\n            data_type: TIMESTAMP(3)\n          - name: ts2\n            column_type: metadata\n            data_type: TIMESTAMP(3)\n            expression: timestamp\n```\n\nSQL passed to Flink will look like:\n```sql\nCREATE TABLE IF NOT EXISTS my_source (\n  `id` BIGINT,\n  `id2` AS id + 1,\n  `event_timestamp` TIMESTAMP(3),\n  `ts2` TIMESTAMP(3) METADATA  FROM 'timestamp',\n  WATERMARK FOR event_timestamp AS event_timestamp\n) WITH (\n  'connector' = 'kafka',\n  'properties.bootstrap.servers' = 'kafka:29092',\n  'topic' = 'clickstream'\n)\n```\n\n### Model\n\nThis adapter currently supports two types of materialization *table* and *view*. Because in Flink table has to be\nassociated with a connector `type` and `connector_properties` have to be provided similar like in case of defining sources.\n\n#### Example\n\n`models.yml`\n\n```yaml\nmodels:\n  - name: my_model\n    config:\n      type: streaming\n      connector_properties:\n        connector: 'kafka'\n        properties.bootstrap.servers: 'kafka:29092'\n        topic: 'some-output'\n```\n\n`my_model.sql`\n\n```sql\nselect *\nfrom {{ source('my_source', 'clickstream') }}\nwhere event = 'some-event'\n```\n\n### Seed\n\ndbt-flink-adapter can use Flink to insert seed data in any Flink supported connector.\nSimilar like in case of sources and models you have to provide connector configuration.\n\n#### Example\n\n`seeds.yml`\n\n```yaml\nseeds:\n  - name: clickstream\n    config:\n      connector_properties:\n        connector: 'kafka'\n        properties.bootstrap.servers: 'kafka:29092'\n        topic: 'clickstream'\n```\n\n### Tests\n\nDbt also allows executing assertions in a form of tests to validate input data or model output if it does not contain abnormal values.\nAll generic tests are a select statement which is considered as passed when it did not found any rows.\n\nThe problem is how to define such thing in streaming pipeline? It is not possible to tell that in entire stream there are no such entries\nas stream by definition is infinite. What we can do however is to have run the test for some specific time and if in that time there are no\nabnormal values, test will be considered as passed.\n\nTo facilitate it dbt-flink-adapter when writing a sql query supports fetch_timeout_ms and mode directive.\n\n```sql\nselect /** fetch_timeout_ms(5000) mode('streaming') */\n  *\nfrom {{ ref('my_model')}}\nwhere\n  event <> 'some-event'\n```\n\nIn this example we are telling dbt-flink-adapter to fetch for 5 seconds in streaming mode.\n\n### dbt_project.yml\n\nYou can extract common configurations of your model and sources into `dbt_project.yml` [dbt-docs/general-configuration](https://docs.getdbt.com/reference/model-configs#general-configurations).\nIf you define the same kay in `dbt_project.yml` and in your model or source dbt will always override entire key value.\nIn case you wish to extract some keys from under `connector_properties` you can specify configuration under `default_connector_properties`\nwhich will get merged with `connection_properies`.\n\n#### Example\n\n`dbt_project.yml`\n\n```yaml\nmodels:\n  example1:\n    +materialized: table\n    +type: streaming\n    +default_connector_properties:\n      connector: 'kafka'\n      properties.bootstrap.servers: 'kafka:29092'\n\nsources:\n  example1:\n    +type: streaming\n    +default_connector_properties:\n      connector: 'kafka'\n      properties.bootstrap.servers: 'kafka:29092'\n\nseeds:\n  example1:\n    +default_connector_properties:\n      connector: 'kafka'\n      properties.bootstrap.servers: 'kafka:29092'\n```\n\n`models.yml`\n\n```yaml\nmodels:\n  - name: my_model\n    config:\n      connector_properties:\n        topic: 'some-output'\n```\n\n`sources.yml`\n\n```yaml\nsources:\n  - name: my_source\n    tables:\n      - name: clickstream\n        config:\n          connector_properties:\n            topic: 'clickstream'\n          watermark:\n            column: event_timestamp\n            strategy: event_timestamp\n        columns:\n          - name: event_timestamp\n            data_type: TIMESTAMP(3)\n```\n\n`seeds.yml`\n\n```yaml\nseeds:\n  - name: clickstream\n    config:\n      connector_properties:\n        topic: 'clickstream'\n```\n\n## Sessions\n\nOur interaction with Flink cluster is done in sessions any table and view created in one session will not be visible in another session.\nSession by default is only valid for 10 minutes. Because of that if you will run dbt test after more than 10 minutes from dbt run\nit will fail and in Flink logs you will find that it cannot find your tables. Currently, the only way to run this would be to rerun entire model.\n\nSession handler is stored in `~/.dbt/flink-session.yml` file, if you want to force new session you can simply delete that file.\n\n## Releasing\n\nTo release new version first execute [prepare-release](https://github.com/getindata/dbt-flink-adapter/actions/workflows/prepare-release.yml) action.\nPlease keep in mind that major and minor version have to be exactly the same as major and minor version of dbt-core.\n\nThis action will create a release branch with bumped version and changelog prepared for release. It will also open a Pull Request to main branch if everything is ok with it - merge it.\n\nNext execute [publish](https://github.com/getindata/dbt-flink-adapter/actions/workflows/publish.yml) on branch that was just created by prepare-release action.\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "The Flink adapter plugin for dbt",
    "version": "1.3.9",
    "project_urls": {
        "Homepage": "https://github.com/getindata/dbt-flink-adapter"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4971e931f86d2dbd4a3b30b94d2ce4ffcfe70805ecf08ca4cd04920b08c898f0",
                "md5": "97fae27f8478344381845a3c1655d02b",
                "sha256": "431fda988cdf0c766a3016a46d93c4db5810a2c9a98c2330c592fc1fd26cf7ec"
            },
            "downloads": -1,
            "filename": "dbt-flink-adapter-1.3.9.tar.gz",
            "has_sig": false,
            "md5_digest": "97fae27f8478344381845a3c1655d02b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 26905,
            "upload_time": "2024-03-06T12:33:50",
            "upload_time_iso_8601": "2024-03-06T12:33:50.055678Z",
            "url": "https://files.pythonhosted.org/packages/49/71/e931f86d2dbd4a3b30b94d2ce4ffcfe70805ecf08ca4cd04920b08c898f0/dbt-flink-adapter-1.3.9.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-06 12:33:50",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "getindata",
    "github_project": "dbt-flink-adapter",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "dbt-flink-adapter"
}
        
Elapsed time: 0.20274s