polta


Namepolta JSON
Version 0.8.2 PyPI version JSON
download
home_pageNone
SummaryData engineering tool combining Polars transformations with Delta tables/lakes
upload_time2025-07-30 01:14:56
maintainerNone
docs_urlNone
authorJoshTG
requires_python>=3.13
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Polta
_Data engineering tool combining Polars transformations with Delta tables/lakes._

![PyTest](https://github.com/JoshTG/polta/actions/workflows/run-pytest.yml/badge.svg) [![PyPI version](https://img.shields.io/pypi/v/polta.svg)](https://pypi.org/project/polta/)

# Core Concepts

The `polta` package allows you to declare simple building blocks that interact with each other to form small-to-medium-scale data pipelines.

In the Python ecosystem broadly, the existing `polars` and `delta` packages make a great team, but they can be tricky to interact with at times. The `polta` package aims to provide a unified wrapper for them, along with some custom homebrewed tools and objects, so that moving and managing data across layers of abstraction is intuitive and resilient.

## At a Glance

* A `Metastore` manages data in a uniform and consistent manner for `Pipelines`.
* A `Pipeline` connects `Pipes` together into a uniform execution plan.
* Each `Pipe` takes data from one location, transforms it, and saves it into another location. It does so in one of three ways:
  * By ingesting source data via an `Ingester`.
  * By transforming data across layers via a `Transformer`.
  * By exporting the data in a desired format via an `Exporter`.
* The data are managed in `Tables`, which use `deltalake` and `polars` under the hood.
* Each `Table` contains a `TableSchema` which wraps the polars and deltalake schemas depending on user need.

## Terminology

Throughout this README and in the repository's `sample` pipelines, various objects are aliased in a consistent manner when imported. Below is a table of such aliases for convenience.

| Object        | Alias                               | Example            |
| ------------- | ----------------------------------  | ------------------ |
| `Table`       | tab_<quality-prefix\>_<table-name\> | tab_raw_activity   |
| `Exporter`    | exp_<quality-prefix\>_<table-name\> | exp_can_user       | 
| `Ingester`    | ing_<quality-prefix\>_<table-name\> | ing_raw_activity   |
| `Transformer` | tra_<quality-prefix\>_<table-name\> | tra_con_user       |
| `Pipe`        | pip_<quality-prefix\>_<table-name\> | pip_std_category   |
| `Pipeline`    | ppl_<domain\>_<table-name\>         | ppl_standard_user  |

To illustrate, a `Table` is initially declared like this:

```python
# raw_table.py
from polta.enums import TableQuality
from polta.table import Table


table: Table = Table(
  domain='standard',
  quality=TableQuality.RAW,
  name='test'
)
```

And another like this:

```python
# conformed_table.py
from polta.enums import TableQuality
from polta.table import Table


table: Table = Table(
  domain='standard',
  quality=TableQuality.CONFORMED,
  name='test'
)
```

Then, whenever they are imported from another file, they are aliased like this:
```python
# other-file.py
from .raw_table import table as tab_raw_test
from .conformed_table import table as tab_con_test

...
```

The naming conventions are designed this way for the following reasons:
1. It keeps initial declarations simple.
2. It allows importing multiple objects (e.g., `Table` and `Pipe` objects) while avoiding variable collisions.
3. It adds consistent and descriptive identifiers to the objects throughout the codebase.

> Feel free to name and organize your objects however you wish in your own repository. However, make sure you understand how this repository works to make the most sense out of the documentation and samples.

## Metastore

Every `polta` integration should have a dedicated metastore for preserving data and logs. This is automatically created and managed by `polta` before executing any reads or writes.

There are two main aspects of a `Metastore`:

1. *Tables*: Contains every table across all layers.
2. *Volumes*: Contains file storage systems needed for transformations.

This structure is inspired by `deltalake` and follows similar metastore paradigms. It loosely follows the modern [Medallion Architecture](https://www.databricks.com/glossary/medallion-architecture) language for organizing the data layers, with these naming conventions for each layer:

1. *Raw*: Source data as a payload string.
2. *Conformed*: Source data already conformed to a schema.
3. *Standard*: Non-source, static data.
4. *Canonical*: Business-level data, built from *Raw*, *Conformed*, and/or *Standard* data.
5. *Export*: Cleaned, formatted export data.

The basic way to think about these layers is to think of three different data paths:

1. *raw* -> *conformed* -> *canonical*
2. *standard*
3. *raw/conformed/standard/canonical* -> *export* 

If you have complicated source data that will need to be unpacked, it should be brought into the *raw* or *canonical* layer and then cleaned into *canonical*.

If you have a simple table, like a crosswalk, it should be brought into the *standard* layer.

If you want to save data in different formats for external use, it should be exported via the *export* layer.

## Table

The `Table` is the primary way to read and write data.

It stores data using `deltalake`, and it transforms data using `polars`.

Because `Table` integrates two modules together, it has many fields and methods for communicating seamlessly to and fro. Most importantly, every `Table` has readily available a `TableSchema` object, contained in the `schema` field, which contains the `polars` and `deltalake` versions of the schema that you can use how you wish.

Each raw `Table` has a dedicated ingestion zone located in the `Metastore` to store sources files ready to be loaded into the raw layer.

## Pipe

The `Pipe` is the primary way to transform data from one location to another in a new format.

Currently, there are three kinds of supported pipes, each described below.

### Ingester

The `Ingester` is the primary way to load source files into the raw layer.

It currently supports ingesting these formats:

1. JSON
2. Excel
3. CSV
4. String payload

An instance can get passed into a `Pipe` to ingest data into a `Table`.

### Transformer

The `Transformer` reads one or more `Table` objects from a layer, applies transformation logic, and writes the output into a target `Table`.

This object accepts as the transformation logic either a custom function or a SQL query.

The custom function may look like this:

```python
from polars import DataFrame


def transform(dfs: dict[str, DataFrame]) -> DataFrame:
  """Applies a simple join between name and activity

  Args:
    dfs (dict[str, DataFrame]): the input DataFrames
  
  Returns:
    df (DataFrame): the resulting DataFrame
  """
  return dfs['name'].join(dfs['activity'], 'id', 'inner')
```

Alternatively, a SQL query might look like this:

```python
transform: str = '''
  SELECT
      n.*
    , a.active_ind
  FROM
    name n
  INNER JOIN activity a
  ON n.id = a.id
'''
```

### Exporter

The `Exporter` reads a `Table` and exports it in a desired format usually into an export directory within the `Metastore`.

It currently supports exporting these formats:
1. JSON
2. CSV

## Pipeline

The `Pipeline` is the primary way to link `Pipes` together to create a unified data pipeline.

It takes in a list of raw, conformed, canonical, and export `Pipes` and executes them sequentially.

There are two kinds of pipelines you can build:

1. `Standard`: each step in the pipeline saves to `Table` objects in the metastore. During execution, pipes typically retrieve the current state of each of those `Table` objects and saves the output in the target `Table`. This allows a full end-to-end-pipeline that preserves all pipe outputs into the metastore for future usage.
2. `In Memory`: each step in the pipeline preserves the `DataFrames` across layers and loads them into each subsequent pipe. This allows a full end-to-end pipeline that can export the results without reling on preserving the intermediate data in the metastore. 

If you need to store each run over time, you should use a `Standard` pipeline. However, if you simply want to load a dataset, transform it, and export it into a format, just wanting to preserve that export, then you should use an `In Memory` pipeline. The `sample` directory contains pipelines for both kinds.

# Installation

## Installing to a Project
This project exists in `PyPI` and can be installed this way:

```sh
pip install polta
```

## Initializing the Repository

To use the code from the repository itself, either for testing or contributing, follow these steps:

1. Clone the repository to your local machine.
2. Create a virtual environment, preferably using `venv`, that runs `Python 3.13`. 
3. Ensure you have `poetry` installed (installation instructions [here](https://python-poetry.org/docs/#installation)).
4. Make `poetry` use the virtual environment using `poetry env use .venv/path/to/python`.
5. Download dependencies by executing `poetry install`.
6. Building a wheel file by executing `poetry build`.

# Testing

This project uses `pytest` for its tests, all of which exist in the `tests` directory. Below are recommended testing options.

## Run Tests via VS Code

There is a `Testing` tab in the left-most menu by default that allows you to run `pytest` tests in bulk or individually.

## Run Tests via Poetry
To execute tests using `poetry`, run this command in your terminal at the top-level directory:

```sh
poetry run pytest tests/ -vv -s
```

##  Check Test Coverage

To check the overall test coverage, use the `pytest-cov` package by running this command in your terminal at the top-level directory:

```sh
poetry run pytest --cov=polta tests/ -vv -s
```

If you do not have 100% coverage, you can see which lines of code are not covered by running this command:

```sh
poetry run coverage report -m
```

## Linting

This repository uses `ruff` as its linter.

To lint the code, run the following command in your terminal at the top-level directory:

```sh
poetry run ruff check
```

# Usage

Below are sample code snippets to show basic usage. For full sample pipelines, consult the `sample` directory in the repository. These tables, pipes, and pipeline get used in the unit test which is located in the `tests/test_pipeline.py` pytest file.

## Sample Metastore

The creation of a new metastore is simple. Below is a sample metastore that can be passed into the initialization of any `Table` to ensure the table writes to the metastore.

```python
from polta.metastore import Metastore


metastore: Metastore = Metastore('path/to/desired/store')
```

## Sample Ingester Pipe

This sample code illustrates a simple raw ingestion pipe.

A pipe file typically contains a `Table` and a `Pipe`, and a raw table might have an additional `Ingester`.

```python
from deltalake import Field, Schema

from polta.enums import (
  DirectoryType,
  RawFileType,
  TableQuality
)
from polta.ingester import Ingester
from polta.pipe import Pipe
from polta.table import Table

from .metastore import metastore


table: Table = Table(
  domain='sample',
  quality=TableQuality.RAW,
  name='table',
  raw_schema=Schema([
    Field('payload', 'string')
  ]),
  metastore=metastore
)

ingester: Ingester = Ingester(
  table=table,
  directory_type=DirectoryType.SHALLOW,
  raw_file_type=RawFileType.JSON
)

pipe: Pipe = Pipe(ingester)
```

By making `table.raw_schema` a simple payload, that signals to the ingester that the transformation is a simple file read.

This code is all that is needed to execute a load of all data from the ingestion zone to a raw table. To do so, execute `pipe.execute()`.

If you want to read the data, execute `table.get()`.

## Sample Transformer Pipe

For instances where transformation logic is required, you should use the `Transformer` class to transform data from one layer to another.

```python
from deltalake import Field, Schema
from polars import col, DataFrame
from polars.datatypes import DataType, List, Struct

from polta.enums import TableQuality, WriteLogic
from polta.maps import Maps
from polta.pipe import Pipe
from polta.table import Table
from polta.transformer import Transformer
from polta.udfs import string_to_struct
from sample.standard.table import \
  table as tab_raw_table

from .metastore import metastore


table: Table = Table(
  domain='test',
  quality=TableQuality.CONFORMED,
  name='table',
  raw_schema=Schema([
    Field('id', 'string'),
    Field('active_ind', 'boolean')
  ]),
  metastore=metastore
)

def get_dfs() -> dict[str, DataFrame]:
  """Basic load logic:
    1. Get raw table data as a DataFrame
    2. Anti join against conformed layer to get net-new records
  
  Returns:
    dfs (dict[str, DataFrame]): The resulting data as 'table'
  """
  conformed_ids: DataFrame = table.get(select=['_raw_id'], unique=True)
  df: DataFrame = (tab_raw_table
    .get()
    .join(conformed_ids, '_raw_id', 'anti')
  )
  return {'table': df}

def transform(dfs: dict[str, DataFrame]) -> DataFrame:
  """Basic transformation logic:
    1. Retrieve the raw table DataFrame
    2. Convert 'payload' into a struct
    3. Explode the struct
    4. Convert the struct key-value pairs into column-cell values

  Returns:
    df (DataFrame): the resulting DataFrame
  """
  df: DataFrame = dfs['table']
  raw_polars_schema: dict[str, DataType] = Maps \
      .deltalake_schema_to_polars_schema(table.raw_schema)

  return (df
    .with_columns([
      col('payload')
        .map_elements(string_to_struct, return_dtype=List(Struct(raw_polars_schema)))
    ])
    .explode('payload')
    .with_columns([
      col('payload').struct.field(f).alias(f)
      for f in [n.name for n in table.raw_schema.fields]
    ])
    .drop('payload')
  )

transformer: Transformer = Transformer(
  table=table,
  load_logic=get_dfs,
  transform_logic=transform,
  write_logic=WriteLogic.APPEND
)

pipe: Pipe = Pipe(transformer)
```

This `Transformer` instance receives the raw data from the previous example, explodes the data, and extracts the proper fields into a proper conformed DataFrame.

This one file contains every object in a modular format, which means you can import in another file any part of the pipe as needed.

> This modular design also allows you to create integration and unit tests around your `load_logic` and `transform_logic` easily, as illustrated in the `tests/` directory.

You can execute the `Pipe` by running `pipe.execute()` wherever you want, and any new raw files will get transformed and loaded into the conformed layer.

## Sample Pipeline

To connect the above pipes together, you can create a `Pipeline`, as sampled below.

```python
from polta.pipeline import Pipeline

from sample.standard.raw.table import \
  pipe as pip_raw_sample
from sample.standard.conformed.table import \
  pipe as pip_con_sample


pipeline: Pipeline = Pipeline(
  raw_pipes=[pip_raw_sample],
  conformed_pipes=[pip_con_sample]
)
```

You can then execute your pipeline by running `pipeline.execute()`.

# License

This project exists under the `MIT License`.

## Acknowledgements

The `polta` project uses third-party dependencies that use the following permissive open-source licenses:

1. `Apache Software License (Apache-2.0)`
2. `BSD-3-Clause License`
3. `MIT License`

Below are the top-level packages with their licenses.

| Package | Version | Purpose | License |
| ------- | ------- | ------- | ------- |
| [deltalake](https://github.com/delta-io/delta-rs) | >=0.25.5, <1.0.0 | Stores and reads data | Apache Software License (Apache-2.0) |
| [ipykernel](https://github.com/ipython/ipykernel) | >=6.29.5, <6.30.0 | Creates Jupyter notebooks for ad hoc analytics | BSD-3-Clause License |
| [openpyxl](https://foss.heptapod.net/openpyxl/openpyxl) | >=3.1.5, <3.2.0 | The underlying engine for pl.read_excel() | MIT License |
| [pip](https://github.com/pypa/pip) | >=25.1.1, <25.2.0 | Installs Python packages for the virtual environment | MIT License |
| [polars](https://github.com/pola-rs/polars) | >=1.30.0, <1.31.0 | Executes DataFrame transformation | MIT License |
| [pytest](https://github.com/pytest-dev/pytest) | >=8.3.5, <8.4.0 | Runs test cases for unit testing | MIT License |
| [pytest-cov](https://github.com/pytest-dev/pytest-cov) | >=6.2.1, <6.3.0 | Applies test coverage to pytest runs | MIT License |
| [ruff](https://github.com/astral-sh/ruff) | >=0.12.3, <0.13.0 | Executes linting checks in the repository | MIT License |
| [ruff-action](https://github.com/astral-sh/ruff-action) | latest | Executes a ruff check in the GitHub workflow | Apache Software License (Apache-2.0) |
| [tzdata](https://github.com/python/tzdata) | >=2025.2, <2026.1 | Contains timezone information for Datetime objects | Apache Software License (Apache-2.0) |

# Contributing

Because this project is open-source, contributions are most welcome by following these steps:

1. Submit the contribution request to the [issues page](https://github.com/JoshTG/polta/issues).
2. Await signoff/feedback from a repository administrator.
3. Clone the repository into your local machine.
4. Create a descriptive feature branch.
5. Make the desired changes.
6. Fully test the desired changes using `test` directory. Ensure you have 100% `pytest` test coverage and the code passes a `ruff check`.
7. Uptick the `poetry` project version appropriately using standard semantic versioning.
8. Create a merge request into the `main` branch of the official `polta` project and assign it initially to @JoshTG.
9. Once the merge request is approved and merged, an administrator will schedule a release cycle and deploy the changes using a new release tag.

# Contact

You may contact the main contributor, [JoshTG](https://github.com/JoshTG), by sending an email to this address: jgillilanddata@gmail.com.


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "polta",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.13",
    "maintainer_email": null,
    "keywords": null,
    "author": "JoshTG",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/ae/36/574d7f5b7709aa1c179c498dfd46664282cb10a861a0d5998672758d7f71/polta-0.8.2.tar.gz",
    "platform": null,
    "description": "# Polta\n_Data engineering tool combining Polars transformations with Delta tables/lakes._\n\n![PyTest](https://github.com/JoshTG/polta/actions/workflows/run-pytest.yml/badge.svg) [![PyPI version](https://img.shields.io/pypi/v/polta.svg)](https://pypi.org/project/polta/)\n\n# Core Concepts\n\nThe `polta` package allows you to declare simple building blocks that interact with each other to form small-to-medium-scale data pipelines.\n\nIn the Python ecosystem broadly, the existing `polars` and `delta` packages make a great team, but they can be tricky to interact with at times. The `polta` package aims to provide a unified wrapper for them, along with some custom homebrewed tools and objects, so that moving and managing data across layers of abstraction is intuitive and resilient.\n\n## At a Glance\n\n* A `Metastore` manages data in a uniform and consistent manner for `Pipelines`.\n* A `Pipeline` connects `Pipes` together into a uniform execution plan.\n* Each `Pipe` takes data from one location, transforms it, and saves it into another location. It does so in one of three ways:\n  * By ingesting source data via an `Ingester`.\n  * By transforming data across layers via a `Transformer`.\n  * By exporting the data in a desired format via an `Exporter`.\n* The data are managed in `Tables`, which use `deltalake` and `polars` under the hood.\n* Each `Table` contains a `TableSchema` which wraps the polars and deltalake schemas depending on user need.\n\n## Terminology\n\nThroughout this README and in the repository's `sample` pipelines, various objects are aliased in a consistent manner when imported. Below is a table of such aliases for convenience.\n\n| Object        | Alias                               | Example            |\n| ------------- | ----------------------------------  | ------------------ |\n| `Table`       | tab_<quality-prefix\\>_<table-name\\> | tab_raw_activity   |\n| `Exporter`    | exp_<quality-prefix\\>_<table-name\\> | exp_can_user       | \n| `Ingester`    | ing_<quality-prefix\\>_<table-name\\> | ing_raw_activity   |\n| `Transformer` | tra_<quality-prefix\\>_<table-name\\> | tra_con_user       |\n| `Pipe`        | pip_<quality-prefix\\>_<table-name\\> | pip_std_category   |\n| `Pipeline`    | ppl_<domain\\>_<table-name\\>         | ppl_standard_user  |\n\nTo illustrate, a `Table` is initially declared like this:\n\n```python\n# raw_table.py\nfrom polta.enums import TableQuality\nfrom polta.table import Table\n\n\ntable: Table = Table(\n  domain='standard',\n  quality=TableQuality.RAW,\n  name='test'\n)\n```\n\nAnd another like this:\n\n```python\n# conformed_table.py\nfrom polta.enums import TableQuality\nfrom polta.table import Table\n\n\ntable: Table = Table(\n  domain='standard',\n  quality=TableQuality.CONFORMED,\n  name='test'\n)\n```\n\nThen, whenever they are imported from another file, they are aliased like this:\n```python\n# other-file.py\nfrom .raw_table import table as tab_raw_test\nfrom .conformed_table import table as tab_con_test\n\n...\n```\n\nThe naming conventions are designed this way for the following reasons:\n1. It keeps initial declarations simple.\n2. It allows importing multiple objects (e.g., `Table` and `Pipe` objects) while avoiding variable collisions.\n3. It adds consistent and descriptive identifiers to the objects throughout the codebase.\n\n> Feel free to name and organize your objects however you wish in your own repository. However, make sure you understand how this repository works to make the most sense out of the documentation and samples.\n\n## Metastore\n\nEvery `polta` integration should have a dedicated metastore for preserving data and logs. This is automatically created and managed by `polta` before executing any reads or writes.\n\nThere are two main aspects of a `Metastore`:\n\n1. *Tables*: Contains every table across all layers.\n2. *Volumes*: Contains file storage systems needed for transformations.\n\nThis structure is inspired by `deltalake` and follows similar metastore paradigms. It loosely follows the modern [Medallion Architecture](https://www.databricks.com/glossary/medallion-architecture) language for organizing the data layers, with these naming conventions for each layer:\n\n1. *Raw*: Source data as a payload string.\n2. *Conformed*: Source data already conformed to a schema.\n3. *Standard*: Non-source, static data.\n4. *Canonical*: Business-level data, built from *Raw*, *Conformed*, and/or *Standard* data.\n5. *Export*: Cleaned, formatted export data.\n\nThe basic way to think about these layers is to think of three different data paths:\n\n1. *raw* -> *conformed* -> *canonical*\n2. *standard*\n3. *raw/conformed/standard/canonical* -> *export* \n\nIf you have complicated source data that will need to be unpacked, it should be brought into the *raw* or *canonical* layer and then cleaned into *canonical*.\n\nIf you have a simple table, like a crosswalk, it should be brought into the *standard* layer.\n\nIf you want to save data in different formats for external use, it should be exported via the *export* layer.\n\n## Table\n\nThe `Table` is the primary way to read and write data.\n\nIt stores data using `deltalake`, and it transforms data using `polars`.\n\nBecause `Table` integrates two modules together, it has many fields and methods for communicating seamlessly to and fro. Most importantly, every `Table` has readily available a `TableSchema` object, contained in the `schema` field, which contains the `polars` and `deltalake` versions of the schema that you can use how you wish.\n\nEach raw `Table` has a dedicated ingestion zone located in the `Metastore` to store sources files ready to be loaded into the raw layer.\n\n## Pipe\n\nThe `Pipe` is the primary way to transform data from one location to another in a new format.\n\nCurrently, there are three kinds of supported pipes, each described below.\n\n### Ingester\n\nThe `Ingester` is the primary way to load source files into the raw layer.\n\nIt currently supports ingesting these formats:\n\n1. JSON\n2. Excel\n3. CSV\n4. String payload\n\nAn instance can get passed into a `Pipe` to ingest data into a `Table`.\n\n### Transformer\n\nThe `Transformer` reads one or more `Table` objects from a layer, applies transformation logic, and writes the output into a target `Table`.\n\nThis object accepts as the transformation logic either a custom function or a SQL query.\n\nThe custom function may look like this:\n\n```python\nfrom polars import DataFrame\n\n\ndef transform(dfs: dict[str, DataFrame]) -> DataFrame:\n  \"\"\"Applies a simple join between name and activity\n\n  Args:\n    dfs (dict[str, DataFrame]): the input DataFrames\n  \n  Returns:\n    df (DataFrame): the resulting DataFrame\n  \"\"\"\n  return dfs['name'].join(dfs['activity'], 'id', 'inner')\n```\n\nAlternatively, a SQL query might look like this:\n\n```python\ntransform: str = '''\n  SELECT\n      n.*\n    , a.active_ind\n  FROM\n    name n\n  INNER JOIN activity a\n  ON n.id = a.id\n'''\n```\n\n### Exporter\n\nThe `Exporter` reads a `Table` and exports it in a desired format usually into an export directory within the `Metastore`.\n\nIt currently supports exporting these formats:\n1. JSON\n2. CSV\n\n## Pipeline\n\nThe `Pipeline` is the primary way to link `Pipes` together to create a unified data pipeline.\n\nIt takes in a list of raw, conformed, canonical, and export `Pipes` and executes them sequentially.\n\nThere are two kinds of pipelines you can build:\n\n1. `Standard`: each step in the pipeline saves to `Table` objects in the metastore. During execution, pipes typically retrieve the current state of each of those `Table` objects and saves the output in the target `Table`. This allows a full end-to-end-pipeline that preserves all pipe outputs into the metastore for future usage.\n2. `In Memory`: each step in the pipeline preserves the `DataFrames` across layers and loads them into each subsequent pipe. This allows a full end-to-end pipeline that can export the results without reling on preserving the intermediate data in the metastore. \n\nIf you need to store each run over time, you should use a `Standard` pipeline. However, if you simply want to load a dataset, transform it, and export it into a format, just wanting to preserve that export, then you should use an `In Memory` pipeline. The `sample` directory contains pipelines for both kinds.\n\n# Installation\n\n## Installing to a Project\nThis project exists in `PyPI` and can be installed this way:\n\n```sh\npip install polta\n```\n\n## Initializing the Repository\n\nTo use the code from the repository itself, either for testing or contributing, follow these steps:\n\n1. Clone the repository to your local machine.\n2. Create a virtual environment, preferably using `venv`, that runs `Python 3.13`. \n3. Ensure you have `poetry` installed (installation instructions [here](https://python-poetry.org/docs/#installation)).\n4. Make `poetry` use the virtual environment using `poetry env use .venv/path/to/python`.\n5. Download dependencies by executing `poetry install`.\n6. Building a wheel file by executing `poetry build`.\n\n# Testing\n\nThis project uses `pytest` for its tests, all of which exist in the `tests` directory. Below are recommended testing options.\n\n## Run Tests via VS Code\n\nThere is a `Testing` tab in the left-most menu by default that allows you to run `pytest` tests in bulk or individually.\n\n## Run Tests via Poetry\nTo execute tests using `poetry`, run this command in your terminal at the top-level directory:\n\n```sh\npoetry run pytest tests/ -vv -s\n```\n\n##  Check Test Coverage\n\nTo check the overall test coverage, use the `pytest-cov` package by running this command in your terminal at the top-level directory:\n\n```sh\npoetry run pytest --cov=polta tests/ -vv -s\n```\n\nIf you do not have 100% coverage, you can see which lines of code are not covered by running this command:\n\n```sh\npoetry run coverage report -m\n```\n\n## Linting\n\nThis repository uses `ruff` as its linter.\n\nTo lint the code, run the following command in your terminal at the top-level directory:\n\n```sh\npoetry run ruff check\n```\n\n# Usage\n\nBelow are sample code snippets to show basic usage. For full sample pipelines, consult the `sample` directory in the repository. These tables, pipes, and pipeline get used in the unit test which is located in the `tests/test_pipeline.py` pytest file.\n\n## Sample Metastore\n\nThe creation of a new metastore is simple. Below is a sample metastore that can be passed into the initialization of any `Table` to ensure the table writes to the metastore.\n\n```python\nfrom polta.metastore import Metastore\n\n\nmetastore: Metastore = Metastore('path/to/desired/store')\n```\n\n## Sample Ingester Pipe\n\nThis sample code illustrates a simple raw ingestion pipe.\n\nA pipe file typically contains a `Table` and a `Pipe`, and a raw table might have an additional `Ingester`.\n\n```python\nfrom deltalake import Field, Schema\n\nfrom polta.enums import (\n  DirectoryType,\n  RawFileType,\n  TableQuality\n)\nfrom polta.ingester import Ingester\nfrom polta.pipe import Pipe\nfrom polta.table import Table\n\nfrom .metastore import metastore\n\n\ntable: Table = Table(\n  domain='sample',\n  quality=TableQuality.RAW,\n  name='table',\n  raw_schema=Schema([\n    Field('payload', 'string')\n  ]),\n  metastore=metastore\n)\n\ningester: Ingester = Ingester(\n  table=table,\n  directory_type=DirectoryType.SHALLOW,\n  raw_file_type=RawFileType.JSON\n)\n\npipe: Pipe = Pipe(ingester)\n```\n\nBy making `table.raw_schema` a simple payload, that signals to the ingester that the transformation is a simple file read.\n\nThis code is all that is needed to execute a load of all data from the ingestion zone to a raw table. To do so, execute `pipe.execute()`.\n\nIf you want to read the data, execute `table.get()`.\n\n## Sample Transformer Pipe\n\nFor instances where transformation logic is required, you should use the `Transformer` class to transform data from one layer to another.\n\n```python\nfrom deltalake import Field, Schema\nfrom polars import col, DataFrame\nfrom polars.datatypes import DataType, List, Struct\n\nfrom polta.enums import TableQuality, WriteLogic\nfrom polta.maps import Maps\nfrom polta.pipe import Pipe\nfrom polta.table import Table\nfrom polta.transformer import Transformer\nfrom polta.udfs import string_to_struct\nfrom sample.standard.table import \\\n  table as tab_raw_table\n\nfrom .metastore import metastore\n\n\ntable: Table = Table(\n  domain='test',\n  quality=TableQuality.CONFORMED,\n  name='table',\n  raw_schema=Schema([\n    Field('id', 'string'),\n    Field('active_ind', 'boolean')\n  ]),\n  metastore=metastore\n)\n\ndef get_dfs() -> dict[str, DataFrame]:\n  \"\"\"Basic load logic:\n    1. Get raw table data as a DataFrame\n    2. Anti join against conformed layer to get net-new records\n  \n  Returns:\n    dfs (dict[str, DataFrame]): The resulting data as 'table'\n  \"\"\"\n  conformed_ids: DataFrame = table.get(select=['_raw_id'], unique=True)\n  df: DataFrame = (tab_raw_table\n    .get()\n    .join(conformed_ids, '_raw_id', 'anti')\n  )\n  return {'table': df}\n\ndef transform(dfs: dict[str, DataFrame]) -> DataFrame:\n  \"\"\"Basic transformation logic:\n    1. Retrieve the raw table DataFrame\n    2. Convert 'payload' into a struct\n    3. Explode the struct\n    4. Convert the struct key-value pairs into column-cell values\n\n  Returns:\n    df (DataFrame): the resulting DataFrame\n  \"\"\"\n  df: DataFrame = dfs['table']\n  raw_polars_schema: dict[str, DataType] = Maps \\\n      .deltalake_schema_to_polars_schema(table.raw_schema)\n\n  return (df\n    .with_columns([\n      col('payload')\n        .map_elements(string_to_struct, return_dtype=List(Struct(raw_polars_schema)))\n    ])\n    .explode('payload')\n    .with_columns([\n      col('payload').struct.field(f).alias(f)\n      for f in [n.name for n in table.raw_schema.fields]\n    ])\n    .drop('payload')\n  )\n\ntransformer: Transformer = Transformer(\n  table=table,\n  load_logic=get_dfs,\n  transform_logic=transform,\n  write_logic=WriteLogic.APPEND\n)\n\npipe: Pipe = Pipe(transformer)\n```\n\nThis `Transformer` instance receives the raw data from the previous example, explodes the data, and extracts the proper fields into a proper conformed DataFrame.\n\nThis one file contains every object in a modular format, which means you can import in another file any part of the pipe as needed.\n\n> This modular design also allows you to create integration and unit tests around your `load_logic` and `transform_logic` easily, as illustrated in the `tests/` directory.\n\nYou can execute the `Pipe` by running `pipe.execute()` wherever you want, and any new raw files will get transformed and loaded into the conformed layer.\n\n## Sample Pipeline\n\nTo connect the above pipes together, you can create a `Pipeline`, as sampled below.\n\n```python\nfrom polta.pipeline import Pipeline\n\nfrom sample.standard.raw.table import \\\n  pipe as pip_raw_sample\nfrom sample.standard.conformed.table import \\\n  pipe as pip_con_sample\n\n\npipeline: Pipeline = Pipeline(\n  raw_pipes=[pip_raw_sample],\n  conformed_pipes=[pip_con_sample]\n)\n```\n\nYou can then execute your pipeline by running `pipeline.execute()`.\n\n# License\n\nThis project exists under the `MIT License`.\n\n## Acknowledgements\n\nThe `polta` project uses third-party dependencies that use the following permissive open-source licenses:\n\n1. `Apache Software License (Apache-2.0)`\n2. `BSD-3-Clause License`\n3. `MIT License`\n\nBelow are the top-level packages with their licenses.\n\n| Package | Version | Purpose | License |\n| ------- | ------- | ------- | ------- |\n| [deltalake](https://github.com/delta-io/delta-rs) | >=0.25.5, <1.0.0 | Stores and reads data | Apache Software License (Apache-2.0) |\n| [ipykernel](https://github.com/ipython/ipykernel) | >=6.29.5, <6.30.0 | Creates Jupyter notebooks for ad hoc analytics | BSD-3-Clause License |\n| [openpyxl](https://foss.heptapod.net/openpyxl/openpyxl) | >=3.1.5, <3.2.0 | The underlying engine for pl.read_excel() | MIT License |\n| [pip](https://github.com/pypa/pip) | >=25.1.1, <25.2.0 | Installs Python packages for the virtual environment | MIT License |\n| [polars](https://github.com/pola-rs/polars) | >=1.30.0, <1.31.0 | Executes DataFrame transformation | MIT License |\n| [pytest](https://github.com/pytest-dev/pytest) | >=8.3.5, <8.4.0 | Runs test cases for unit testing | MIT License |\n| [pytest-cov](https://github.com/pytest-dev/pytest-cov) | >=6.2.1, <6.3.0 | Applies test coverage to pytest runs | MIT License |\n| [ruff](https://github.com/astral-sh/ruff) | >=0.12.3, <0.13.0 | Executes linting checks in the repository | MIT License |\n| [ruff-action](https://github.com/astral-sh/ruff-action) | latest | Executes a ruff check in the GitHub workflow | Apache Software License (Apache-2.0) |\n| [tzdata](https://github.com/python/tzdata) | >=2025.2, <2026.1 | Contains timezone information for Datetime objects | Apache Software License (Apache-2.0) |\n\n# Contributing\n\nBecause this project is open-source, contributions are most welcome by following these steps:\n\n1. Submit the contribution request to the [issues page](https://github.com/JoshTG/polta/issues).\n2. Await signoff/feedback from a repository administrator.\n3. Clone the repository into your local machine.\n4. Create a descriptive feature branch.\n5. Make the desired changes.\n6. Fully test the desired changes using `test` directory. Ensure you have 100% `pytest` test coverage and the code passes a `ruff check`.\n7. Uptick the `poetry` project version appropriately using standard semantic versioning.\n8. Create a merge request into the `main` branch of the official `polta` project and assign it initially to @JoshTG.\n9. Once the merge request is approved and merged, an administrator will schedule a release cycle and deploy the changes using a new release tag.\n\n# Contact\n\nYou may contact the main contributor, [JoshTG](https://github.com/JoshTG), by sending an email to this address: jgillilanddata@gmail.com.\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Data engineering tool combining Polars transformations with Delta tables/lakes",
    "version": "0.8.2",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "d378dc1bf396ba868b5cf4cca1df9dbf8975598f94f5acfb80be2aa941ac8577",
                "md5": "118d1f74e0d4fd4441d3cdbd68afa1b0",
                "sha256": "0e0e0cd7ab89c58ee1b4c064eda64d9d2c66915e420f37b743930578bb42b127"
            },
            "downloads": -1,
            "filename": "polta-0.8.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "118d1f74e0d4fd4441d3cdbd68afa1b0",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.13",
            "size": 32873,
            "upload_time": "2025-07-30T01:14:55",
            "upload_time_iso_8601": "2025-07-30T01:14:55.262795Z",
            "url": "https://files.pythonhosted.org/packages/d3/78/dc1bf396ba868b5cf4cca1df9dbf8975598f94f5acfb80be2aa941ac8577/polta-0.8.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "ae36574d7f5b7709aa1c179c498dfd46664282cb10a861a0d5998672758d7f71",
                "md5": "53dade16cb149e8b2b2a4258f09ac2bd",
                "sha256": "bac5f2deaf20be3496258caefca23002caf795b60e931d56a14b69d0def43bca"
            },
            "downloads": -1,
            "filename": "polta-0.8.2.tar.gz",
            "has_sig": false,
            "md5_digest": "53dade16cb149e8b2b2a4258f09ac2bd",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.13",
            "size": 28705,
            "upload_time": "2025-07-30T01:14:56",
            "upload_time_iso_8601": "2025-07-30T01:14:56.974788Z",
            "url": "https://files.pythonhosted.org/packages/ae/36/574d7f5b7709aa1c179c498dfd46664282cb10a861a0d5998672758d7f71/polta-0.8.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-30 01:14:56",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "polta"
}
        
Elapsed time: 0.74379s