dagster-delta


Namedagster-delta JSON
Version 0.3.0 PyPI version JSON
download
home_pageNone
SummaryDeltalake IO Managers for Dagster with pyarrow and Polars support.
upload_time2025-02-11 11:27:43
maintainerNone
docs_urlNone
authorIon Koutsouris
requires_python>=3.9
licenseNone
keywords dagster datalake delta deltalake io manager polars pyarrow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # dagster-delta
Dagster deltalake implementation for Pyarrow & Polars. Originally forked from dagster-deltalake with customizations. 

The IO Managers support partition mapping, custom write modes, special metadata configuration for advanced use cases.

The supported write modes:

- **error**
- **append**
- **overwrite**
- **ignore**
- **merge**
- **create_or_replace**

## Merge

dagster-delta supports MERGE execution with a couple pre-defined MERGE types (dagster_delta.config.MergeType):

- **deduplicate_insert**  <- Deduplicates on write
- **update_only**  <- updates only the matches records
- **upsert**  <- updates existing matches and inserts non matched records
- **replace_and_delete_unmatched** <- updates existing matches and deletes unmatched

Example:
```python
from dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType
from dagster_delta_polars import DeltaLakePolarsIOManager

@asset(
    key_prefix=["my_schema"]  # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pl.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": DeltaLakePolarsIOManager(
        root_uri="s3://bucket",
        mode=WriteMode.merge, # or just "merge"
        merge_config=MergeConfig(
            merge_type=MergeType.upsert,
            predicate="s.a = t.a",
            source_alias="s",
            target_alias="t",
        )
    )}
)
```

## Special metadata configurations

### **Add** additional `table_configuration`
Specify additional table configurations for `configuration` in `write_deltalake`.

```python
@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"table_configuration": {
        "delta.enableChangeDataFeed": "true"
    }},
)
def my_asset() -> pl.DataFrame:
    ...

```

### **Overwrite** the write `mode`
Override the write `mode` to be used in `write_deltalake`.

```python
@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"mode": "append"},
)
def my_asset() -> pl.DataFrame:
    ...

```

### **Overwrite** the `custom_metadata`
Override the `custom_metadata` to be used in `write_deltalake`.

```python
@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"custom_metadata": {"owner":"John Doe"}},
)
def my_asset() -> pl.DataFrame:
    ...

```

### **Overwrite** the write `schema_mode`
Override the `schema_mode` to be used in `write_deltalake`.

```python
@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"schema_mode": "merge"},
)
def my_asset() -> pl.DataFrame:
    ...

```

### **Overwrite** the `writer_properties`
Override the `writer_properties` to be used in `write_deltalake`.

```python
@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"writer_properties": {
        "compression": "SNAPPY",
    }},
)
def my_asset() -> pl.DataFrame:
    ...

```

### **Overwrite** the `merge_predicate`
Override the `merge_predicate` to be used with `merge` execution.

```python
@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"merge_predicate": "s.foo = t.foo AND s.bar = t.bar"},
)
def my_asset() -> pl.DataFrame:
    ...

```

### **Overwrite** the `schema`
Override the `schema` of where the table will be saved

```python
@dg.asset(
    io_manager_key = "deltalake_io_manager",
    metadata={"schema": "custom_db_schema"},
)
def my_asset() -> pl.DataFrame:
    ...

```

### **Set** the `columns` that need to be read
Override the `columns` to only load these columns in

```python
@dg.asset(
    io_manager_key = "deltalake_io_manager",
    ins = {
        "upstream_asset": dg.AssetIn(metadata={"columns":["foo","bar"]})
    }
)
def my_asset(upstream_asset) -> pl.DataFrame:
    ...

```

### **Override** table name using `root_name`

Instead of using the asset_name for the table name it's possible to set a custom table name using the `root_name` in the asset defintion metadata.

This is useful where you have two or multiple assets who have the same table structure, but each asset is a subset of the full table partition_definition, and it wasn't possible to combine this into a single asset due to requiring different underlying Op logic and/or upstream assets:

```python
import polars as pl
import dagster as dg

@dg.asset(
    io_manager_key = "deltalake_io_manager",
    partitions_def=dg.StaticPartitionsDefinition(["a", "b"]),
    metadata={
        "partition_expr": "foo",
        "root_name": "asset_partitioned",
        },
)
def asset_partitioned_1(upstream_1: pl.DataFrame, upstream_2: pl.DataFrame) -> pl.DataFrame:
    ...

@dg.asset(
    partitions_def=dg.StaticPartitionsDefinition(["c", "d"]),
    metadata={
        "partition_expr": "foo",
        "root_name": "asset_partitioned",
        },
)
def asset_partitioned_2(upstream_3: pl.DataFrame, upstream_4: pl.DataFrame) -> pl.DataFrame:
    ...

```                                                                                       

Effectively this would be the flow:

```                                                                             
                                                                                    
                 {static_partition_def: [a,b]}                                      
┌───────────┐                                                                       
│upstream 1 ├─┐ ┌────────────────────────┐                                          
└───────────┘ │ │                        │            write to storage on partition (a,b)                                 
┌───────────┐ └─►   asset_partitioned_1  ├──────────────────────┐                   
│upstream 2 ├───►                        │                      │                   
└───────────┘   └────────────────────────┘       ┌──────────────▼──────────────────┐
                                                 │                     partitions  │
                                                 │  asset_partitioned:             │
                                                 │                     [a,b,c,d]   │
┌───────────┐   ┌────────────────────────┐       └──────────────▲──────────────────┘
│upstream 3 ├──┐│                        │                      │                   
└───────────┘  └►   asset_partitioned_2  │                      │                   
┌───────────┐ ┌─►                        ├──────────────────────┘                   
│upstream 4 ├─┘ └────────────────────────┘            write to storage on partition (c,d)                            
└───────────┘                                                                       
                 {static_partition_def: [c,d]}                                      
                                            
```
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "dagster-delta",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "dagster, datalake, delta, deltalake, io manager, polars, pyarrow",
    "author": "Ion Koutsouris",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/1f/cf/0b4e061791b1cf65a333c58e66659228b5e2fc10252f3971b81a1bd50c0e/dagster_delta-0.3.0.tar.gz",
    "platform": null,
    "description": "# dagster-delta\nDagster deltalake implementation for Pyarrow & Polars. Originally forked from dagster-deltalake with customizations. \n\nThe IO Managers support partition mapping, custom write modes, special metadata configuration for advanced use cases.\n\nThe supported write modes:\n\n- **error**\n- **append**\n- **overwrite**\n- **ignore**\n- **merge**\n- **create_or_replace**\n\n## Merge\n\ndagster-delta supports MERGE execution with a couple pre-defined MERGE types (dagster_delta.config.MergeType):\n\n- **deduplicate_insert**  <- Deduplicates on write\n- **update_only**  <- updates only the matches records\n- **upsert**  <- updates existing matches and inserts non matched records\n- **replace_and_delete_unmatched** <- updates existing matches and deletes unmatched\n\nExample:\n```python\nfrom dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType\nfrom dagster_delta_polars import DeltaLakePolarsIOManager\n\n@asset(\n    key_prefix=[\"my_schema\"]  # will be used as the schema (parent folder) in Delta Lake\n)\ndef my_table() -> pl.DataFrame:  # the name of the asset will be the table name\n    ...\n\ndefs = Definitions(\n    assets=[my_table],\n    resources={\"io_manager\": DeltaLakePolarsIOManager(\n        root_uri=\"s3://bucket\",\n        mode=WriteMode.merge, # or just \"merge\"\n        merge_config=MergeConfig(\n            merge_type=MergeType.upsert,\n            predicate=\"s.a = t.a\",\n            source_alias=\"s\",\n            target_alias=\"t\",\n        )\n    )}\n)\n```\n\n## Special metadata configurations\n\n### **Add** additional `table_configuration`\nSpecify additional table configurations for `configuration` in `write_deltalake`.\n\n```python\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    metadata={\"table_configuration\": {\n        \"delta.enableChangeDataFeed\": \"true\"\n    }},\n)\ndef my_asset() -> pl.DataFrame:\n    ...\n\n```\n\n### **Overwrite** the write `mode`\nOverride the write `mode` to be used in `write_deltalake`.\n\n```python\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    metadata={\"mode\": \"append\"},\n)\ndef my_asset() -> pl.DataFrame:\n    ...\n\n```\n\n### **Overwrite** the `custom_metadata`\nOverride the `custom_metadata` to be used in `write_deltalake`.\n\n```python\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    metadata={\"custom_metadata\": {\"owner\":\"John Doe\"}},\n)\ndef my_asset() -> pl.DataFrame:\n    ...\n\n```\n\n### **Overwrite** the write `schema_mode`\nOverride the `schema_mode` to be used in `write_deltalake`.\n\n```python\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    metadata={\"schema_mode\": \"merge\"},\n)\ndef my_asset() -> pl.DataFrame:\n    ...\n\n```\n\n### **Overwrite** the `writer_properties`\nOverride the `writer_properties` to be used in `write_deltalake`.\n\n```python\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    metadata={\"writer_properties\": {\n        \"compression\": \"SNAPPY\",\n    }},\n)\ndef my_asset() -> pl.DataFrame:\n    ...\n\n```\n\n### **Overwrite** the `merge_predicate`\nOverride the `merge_predicate` to be used with `merge` execution.\n\n```python\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    metadata={\"merge_predicate\": \"s.foo = t.foo AND s.bar = t.bar\"},\n)\ndef my_asset() -> pl.DataFrame:\n    ...\n\n```\n\n### **Overwrite** the `schema`\nOverride the `schema` of where the table will be saved\n\n```python\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    metadata={\"schema\": \"custom_db_schema\"},\n)\ndef my_asset() -> pl.DataFrame:\n    ...\n\n```\n\n### **Set** the `columns` that need to be read\nOverride the `columns` to only load these columns in\n\n```python\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    ins = {\n        \"upstream_asset\": dg.AssetIn(metadata={\"columns\":[\"foo\",\"bar\"]})\n    }\n)\ndef my_asset(upstream_asset) -> pl.DataFrame:\n    ...\n\n```\n\n### **Override** table name using `root_name`\n\nInstead of using the asset_name for the table name it's possible to set a custom table name using the `root_name` in the asset defintion metadata.\n\nThis is useful where you have two or multiple assets who have the same table structure, but each asset is a subset of the full table partition_definition, and it wasn't possible to combine this into a single asset due to requiring different underlying Op logic and/or upstream assets:\n\n```python\nimport polars as pl\nimport dagster as dg\n\n@dg.asset(\n    io_manager_key = \"deltalake_io_manager\",\n    partitions_def=dg.StaticPartitionsDefinition([\"a\", \"b\"]),\n    metadata={\n        \"partition_expr\": \"foo\",\n        \"root_name\": \"asset_partitioned\",\n        },\n)\ndef asset_partitioned_1(upstream_1: pl.DataFrame, upstream_2: pl.DataFrame) -> pl.DataFrame:\n    ...\n\n@dg.asset(\n    partitions_def=dg.StaticPartitionsDefinition([\"c\", \"d\"]),\n    metadata={\n        \"partition_expr\": \"foo\",\n        \"root_name\": \"asset_partitioned\",\n        },\n)\ndef asset_partitioned_2(upstream_3: pl.DataFrame, upstream_4: pl.DataFrame) -> pl.DataFrame:\n    ...\n\n```                                                                                       \n\nEffectively this would be the flow:\n\n```                                                                             \n                                                                                    \n                 {static_partition_def: [a,b]}                                      \n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510                                                                       \n\u2502upstream 1 \u251c\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510                                          \n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502 \u2502                        \u2502            write to storage on partition (a,b)                                 \n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2514\u2500\u25ba   asset_partitioned_1  \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510                   \n\u2502upstream 2 \u251c\u2500\u2500\u2500\u25ba                        \u2502                      \u2502                   \n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518   \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518       \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n                                                 \u2502                     partitions  \u2502\n                                                 \u2502  asset_partitioned:             \u2502\n                                                 \u2502                     [a,b,c,d]   \u2502\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510   \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510       \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n\u2502upstream 3 \u251c\u2500\u2500\u2510\u2502                        \u2502                      \u2502                   \n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518  \u2514\u25ba   asset_partitioned_2  \u2502                      \u2502                   \n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u25ba                        \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518                   \n\u2502upstream 4 \u251c\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518            write to storage on partition (c,d)                            \n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518                                                                       \n                 {static_partition_def: [c,d]}                                      \n                                            \n```",
    "bugtrack_url": null,
    "license": null,
    "summary": "Deltalake IO Managers for Dagster with pyarrow and Polars support.",
    "version": "0.3.0",
    "project_urls": null,
    "split_keywords": [
        "dagster",
        " datalake",
        " delta",
        " deltalake",
        " io manager",
        " polars",
        " pyarrow"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "0c5ffc3e7b0726f5f6f3fc18f8b12847633de0f999dc5801267ab5910da729c4",
                "md5": "2c581a570d789d18878b9bdf7ee94f7f",
                "sha256": "9257645c5e5d9270b8cbf17af5235347a47fd1883327f49f2a62d2fee98ed823"
            },
            "downloads": -1,
            "filename": "dagster_delta-0.3.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "2c581a570d789d18878b9bdf7ee94f7f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 31185,
            "upload_time": "2025-02-11T11:27:41",
            "upload_time_iso_8601": "2025-02-11T11:27:41.487819Z",
            "url": "https://files.pythonhosted.org/packages/0c/5f/fc3e7b0726f5f6f3fc18f8b12847633de0f999dc5801267ab5910da729c4/dagster_delta-0.3.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "1fcf0b4e061791b1cf65a333c58e66659228b5e2fc10252f3971b81a1bd50c0e",
                "md5": "4370796e38e4cfb0da8883c9dd75ef07",
                "sha256": "7aa7262ec471b5433d2af126572fe7da8c09b1d56a2cf217d847babd3309a529"
            },
            "downloads": -1,
            "filename": "dagster_delta-0.3.0.tar.gz",
            "has_sig": false,
            "md5_digest": "4370796e38e4cfb0da8883c9dd75ef07",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 32986,
            "upload_time": "2025-02-11T11:27:43",
            "upload_time_iso_8601": "2025-02-11T11:27:43.233635Z",
            "url": "https://files.pythonhosted.org/packages/1f/cf/0b4e061791b1cf65a333c58e66659228b5e2fc10252f3971b81a1bd50c0e/dagster_delta-0.3.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-02-11 11:27:43",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "dagster-delta"
}
        
Elapsed time: 0.38708s