astro-extras


Nameastro-extras JSON
Version 0.1.8.1 PyPI version JSON
download
home_pagehttps://github.com/skolchin/astro-extras
SummaryAdditional Astro SDK operators
upload_time2024-07-11 07:42:57
maintainerNone
docs_urlNone
authorKol
requires_python>=3.10
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Astro SDK extras project

We've been using [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/)
as ETL tool for quite some time. Recently I came across
[Astronomer's Astro SDK](https://docs.astronomer.io/astro)
which simplifies data processig with Airflow very much by providing
functionality to easily load or save data,
define custom transformations, adding transparent support of newest
[Dataset](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html)
concept, automatic lineage and so on.

The goal of this project is to add some usefull features to these great products, such as:

* ETL session concept
* SQL file-based templates support
* Simplified table processing routines
* Heterogenios data transfer
* and more

Please note that this project is in the very early stage of development
so anything could change in the future.

## Concepts

### ETL Session

ETL session is a logical group of data transfers united by single identifier (`session_id`).
Sessions store information about data transfer source, target, data loading period
and completion state thus providing all necessary information about the data flow.

ETL session object defines its own data loading interval and allows
to override automatic interval boundaries calculation using parameters when a DAG is started.

Sessions are stored in a database in `sessions` table. Entries to that table are created
when a new session is instantiated withing a DAG run and closed when DAG
is finished.

ETL session instance is pushed to XCom under `session` key 
and becomes available to all operators within the data pipeline (see query example below).

### SQL Templates

SQL template is a file, which contains instructions and Jinja-macros substituted at runtime. 
For example, this template could be created in order to load data from `data_table` table 
within the limited date range:

``` sql
select * from data_table
where some_date between '{{ ti.xcom_pull(key="session").period_start }}'::timestamp
                and '{{ ti.xcom_pull(key="session").period_end }}'::timestamp
```

In order to be used, template file must be named after target object name
and placed in a `templates\<dag_id>` folder under DAGS_ROOT. In given case, 
assuming it will be used in `test-data-load` DAG,
template file name must be `./dags/templates/data_table.sql`.

Note that it uses an `ETLSession` object from XCom to filter only actual data.

### Heterogenuos data transfer

Original Astro-SDK misses functionality to transfer data between different
database systems (for example, from Oracle to PostgreSQL or vise versa).

However, there is a [GenericTransfer](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/generic_transfer/index.html) 
operator available in original Airflow code, so this package adopts it to 
Astro-SDK style providing some easy-to-user `transfer_table` and `transfer_tables` functions. 
They are highly customizable and fully support SQL templates.

### Timed dictionaries support

Timed dictionaries are special kind of tables used mostly in Data Warehousing.
They are designed in such a way that they keep multiple values of the same
attribute for given key, and have a timestamp on what attribute value was
at particular time moment.

For example, this is a time dictionary table (PostgreSQL notation is used):

```sql
create table dict_values(
    record_id serial primary key,
    effective_from timestamp not null,
    effective_to timestamp,
    src_id int not null,
    some_value text not null
);
```
Here, `src_id` is an ID of a record on a source, and `some_value` is
time-dependent attribute of that ID. 
And, `effective_from` / `effective_to` pair of columns denotes
what period the particular value of `some_value` was assigned to
particular `src_id` value.

If we have this table filled like this:


| record_id | effective_from | effective_to | src_id | some_value |
| --------- | -------------- | ---------    | ------ | ---------- |
| 1         | 2023-01-11 00:00:00 | null | 10 | Some value |


and then `some_value` property of record with `id = 10` on source changes to `Some other value``, 
then, after updating at 2023-11-02 00:01:00, this timed dictionary should look like this:

| record_id | effective_from | effective_to | src_id | some_value |
| --------- | -------------- | ---------    | ------ | ---------- |
| 1         | 2023-11-01 00:00:01 | 2023-11-02 00:00:59 | 10 | Some value |
| 2         | 2023-11-02 00:01:00 | null | 10 | Some other value |


As you see, record with `record_id = 1` is now "closed", meaning that it
has `effective_to` attribute set, and new record with `record_id = 2` was
added and have `effective_to` set to null.

Any query on this timed dictionarty should include `effective_from` / `effective_to`
columns check, for example like this:

```sql
select src_id as id, some_value from dict_values
where date_of_interest between effective_from and coalesce(effective_to, '2099-12-31')
```

where date_of_interest is some date when this query is running for.

To update timed dictionaries, one should use `update_timed_table` / `update_timed_tables`
functions.

## Usage examples

Create a DAG which opens a session, outputs session info to log and closes it:

``` python
with DAG(...) as dag, ETLSession('source', 'target') as session:
    @dag.task
    def print_session(session: ETLSession):
        print(session)
    print_session(session)
```

Create a DAG with `open-session -> transfer-data_table -> close_session`
task sequence:

``` python
with DAG(...) as dag:
    session = open_session('source', 'target')
    transfer_table('data_table', session=session)
    close_session(session)
```

Upon execution, this DAG will transfer data of `data_table` from source to target
database wrapping this in a session. This would help to easily identify when particular record
was loaded or clean up after unsuccessfull attempts.

The same, but using a context manager:

``` python
with DAG(...) as dag, ETLSession('source', 'target') as session:
    transfer_table('test_table', session=session)
```

## Installation

To build a package from source, run Python build command and then install the package:

``` console
python -m build .
pip install ./dist/astro_extras-xxxx-py3-none-any.whl
```

Airflow could be used directly under Unix environment, but this is not possible in Windows.
Anyway, using a docker is much easier in all cases.

A `docker-compose.yml` file should be used to start Airflow in a docker
environment. After building the package, run this command at the top directory:

``` console
docker compose -p astro-extras up -d --build
```

To shutdown the docker, run:

``` console
docker compose -p astro-extras down -v
```

## Testing

To setup testing evironment, install pytest with:

``` console
pip install -r ./tests/requirements.txt 
```

Then, jump to `/tests` directory and run pytest:

``` console
cd ./tests
pytest -s -v
```

This will build docker image and start `astro-extras-test` stack using `docker-compose-test.yml` file.
The packaget must be built beforehand.

After the tests complete, test stack will be shot down, but you may add `--keep` option
in order to keep it running.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/skolchin/astro-extras",
    "name": "astro-extras",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": null,
    "author": "Kol",
    "author_email": "skolchin@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/90/3b/7814fdc3dd44ca6cb6335fb4226f3d20d3101e796b0919e691fa4f368ac3/astro_extras-0.1.8.1.tar.gz",
    "platform": null,
    "description": "# Astro SDK extras project\n\nWe've been using [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/)\nas ETL tool for quite some time. Recently I came across\n[Astronomer's Astro SDK](https://docs.astronomer.io/astro)\nwhich simplifies data processig with Airflow very much by providing\nfunctionality to easily load or save data,\ndefine custom transformations, adding transparent support of newest\n[Dataset](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html)\nconcept, automatic lineage and so on.\n\nThe goal of this project is to add some usefull features to these great products, such as:\n\n* ETL session concept\n* SQL file-based templates support\n* Simplified table processing routines\n* Heterogenios data transfer\n* and more\n\nPlease note that this project is in the very early stage of development\nso anything could change in the future.\n\n## Concepts\n\n### ETL Session\n\nETL session is a logical group of data transfers united by single identifier (`session_id`).\nSessions store information about data transfer source, target, data loading period\nand completion state thus providing all necessary information about the data flow.\n\nETL session object defines its own data loading interval and allows\nto override automatic interval boundaries calculation using parameters when a DAG is started.\n\nSessions are stored in a database in `sessions` table. Entries to that table are created\nwhen a new session is instantiated withing a DAG run and closed when DAG\nis finished.\n\nETL session instance is pushed to XCom under `session` key \nand becomes available to all operators within the data pipeline (see query example below).\n\n### SQL Templates\n\nSQL template is a file, which contains instructions and Jinja-macros substituted at runtime. \nFor example, this template could be created in order to load data from `data_table` table \nwithin the limited date range:\n\n``` sql\nselect * from data_table\nwhere some_date between '{{ ti.xcom_pull(key=\"session\").period_start }}'::timestamp\n                and '{{ ti.xcom_pull(key=\"session\").period_end }}'::timestamp\n```\n\nIn order to be used, template file must be named after target object name\nand placed in a `templates\\<dag_id>` folder under DAGS_ROOT. In given case, \nassuming it will be used in `test-data-load` DAG,\ntemplate file name must be `./dags/templates/data_table.sql`.\n\nNote that it uses an `ETLSession` object from XCom to filter only actual data.\n\n### Heterogenuos data transfer\n\nOriginal Astro-SDK misses functionality to transfer data between different\ndatabase systems (for example, from Oracle to PostgreSQL or vise versa).\n\nHowever, there is a [GenericTransfer](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/generic_transfer/index.html) \noperator available in original Airflow code, so this package adopts it to \nAstro-SDK style providing some easy-to-user `transfer_table` and `transfer_tables` functions. \nThey are highly customizable and fully support SQL templates.\n\n### Timed dictionaries support\n\nTimed dictionaries are special kind of tables used mostly in Data Warehousing.\nThey are designed in such a way that they keep multiple values of the same\nattribute for given key, and have a timestamp on what attribute value was\nat particular time moment.\n\nFor example, this is a time dictionary table (PostgreSQL notation is used):\n\n```sql\ncreate table dict_values(\n    record_id serial primary key,\n    effective_from timestamp not null,\n    effective_to timestamp,\n    src_id int not null,\n    some_value text not null\n);\n```\nHere, `src_id` is an ID of a record on a source, and `some_value` is\ntime-dependent attribute of that ID. \nAnd, `effective_from` / `effective_to` pair of columns denotes\nwhat period the particular value of `some_value` was assigned to\nparticular `src_id` value.\n\nIf we have this table filled like this:\n\n\n| record_id | effective_from | effective_to | src_id | some_value |\n| --------- | -------------- | ---------    | ------ | ---------- |\n| 1         | 2023-01-11 00:00:00 | null | 10 | Some value |\n\n\nand then `some_value` property of record with `id = 10` on source changes to `Some other value``, \nthen, after updating at 2023-11-02 00:01:00, this timed dictionary should look like this:\n\n| record_id | effective_from | effective_to | src_id | some_value |\n| --------- | -------------- | ---------    | ------ | ---------- |\n| 1         | 2023-11-01 00:00:01 | 2023-11-02 00:00:59 | 10 | Some value |\n| 2         | 2023-11-02 00:01:00 | null | 10 | Some other value |\n\n\nAs you see, record with `record_id = 1` is now \"closed\", meaning that it\nhas `effective_to` attribute set, and new record with `record_id = 2` was\nadded and have `effective_to` set to null.\n\nAny query on this timed dictionarty should include `effective_from` / `effective_to`\ncolumns check, for example like this:\n\n```sql\nselect src_id as id, some_value from dict_values\nwhere date_of_interest between effective_from and coalesce(effective_to, '2099-12-31')\n```\n\nwhere date_of_interest is some date when this query is running for.\n\nTo update timed dictionaries, one should use `update_timed_table` / `update_timed_tables`\nfunctions.\n\n## Usage examples\n\nCreate a DAG which opens a session, outputs session info to log and closes it:\n\n``` python\nwith DAG(...) as dag, ETLSession('source', 'target') as session:\n    @dag.task\n    def print_session(session: ETLSession):\n        print(session)\n    print_session(session)\n```\n\nCreate a DAG with `open-session -> transfer-data_table -> close_session`\ntask sequence:\n\n``` python\nwith DAG(...) as dag:\n    session = open_session('source', 'target')\n    transfer_table('data_table', session=session)\n    close_session(session)\n```\n\nUpon execution, this DAG will transfer data of `data_table` from source to target\ndatabase wrapping this in a session. This would help to easily identify when particular record\nwas loaded or clean up after unsuccessfull attempts.\n\nThe same, but using a context manager:\n\n``` python\nwith DAG(...) as dag, ETLSession('source', 'target') as session:\n    transfer_table('test_table', session=session)\n```\n\n## Installation\n\nTo build a package from source, run Python build command and then install the package:\n\n``` console\npython -m build .\npip install ./dist/astro_extras-xxxx-py3-none-any.whl\n```\n\nAirflow could be used directly under Unix environment, but this is not possible in Windows.\nAnyway, using a docker is much easier in all cases.\n\nA `docker-compose.yml` file should be used to start Airflow in a docker\nenvironment. After building the package, run this command at the top directory:\n\n``` console\ndocker compose -p astro-extras up -d --build\n```\n\nTo shutdown the docker, run:\n\n``` console\ndocker compose -p astro-extras down -v\n```\n\n## Testing\n\nTo setup testing evironment, install pytest with:\n\n``` console\npip install -r ./tests/requirements.txt \n```\n\nThen, jump to `/tests` directory and run pytest:\n\n``` console\ncd ./tests\npytest -s -v\n```\n\nThis will build docker image and start `astro-extras-test` stack using `docker-compose-test.yml` file.\nThe packaget must be built beforehand.\n\nAfter the tests complete, test stack will be shot down, but you may add `--keep` option\nin order to keep it running.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Additional Astro SDK operators",
    "version": "0.1.8.1",
    "project_urls": {
        "Homepage": "https://github.com/skolchin/astro-extras"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b1b8b9791d6fa31bcb3034b60955db992b3df242a7dc2f179cc656958b8b82a2",
                "md5": "cfad3ed959cb1915270a4b6124a46d83",
                "sha256": "e5e0aa0330a6136d8ac8d4b5c5e53689b673bf63b164e4ce52c1d1b12d107bf3"
            },
            "downloads": -1,
            "filename": "astro_extras-0.1.8.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "cfad3ed959cb1915270a4b6124a46d83",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 39206,
            "upload_time": "2024-07-11T07:42:53",
            "upload_time_iso_8601": "2024-07-11T07:42:53.268758Z",
            "url": "https://files.pythonhosted.org/packages/b1/b8/b9791d6fa31bcb3034b60955db992b3df242a7dc2f179cc656958b8b82a2/astro_extras-0.1.8.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "903b7814fdc3dd44ca6cb6335fb4226f3d20d3101e796b0919e691fa4f368ac3",
                "md5": "f2b4fc63837acae102dcb75ca975b059",
                "sha256": "561b307d68298a35566b51363f97dc9f91f45b5c46dc1f6db7225d27d62864e8"
            },
            "downloads": -1,
            "filename": "astro_extras-0.1.8.1.tar.gz",
            "has_sig": false,
            "md5_digest": "f2b4fc63837acae102dcb75ca975b059",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 42737,
            "upload_time": "2024-07-11T07:42:57",
            "upload_time_iso_8601": "2024-07-11T07:42:57.477138Z",
            "url": "https://files.pythonhosted.org/packages/90/3b/7814fdc3dd44ca6cb6335fb4226f3d20d3101e796b0919e691fa4f368ac3/astro_extras-0.1.8.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-07-11 07:42:57",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "skolchin",
    "github_project": "astro-extras",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [],
    "lcname": "astro-extras"
}
        
Kol
Elapsed time: 0.27426s