# dagster-delta
Dagster deltalake implementation for Pyarrow & Polars. Originally forked from dagster-deltalake with customizations.
The IO Managers support partition mapping, custom write modes, special metadata configuration for advanced use cases.
The supported write modes:
- **error**
- **append**
- **overwrite**
- **ignore**
- **merge**
- **create_or_replace**
## Merge
dagster-delta supports MERGE execution with a couple pre-defined MERGE types (dagster_delta.config.MergeType):
- **deduplicate_insert** <- Deduplicates on write
- **update_only** <- updates only the matches records
- **upsert** <- updates existing matches and inserts non matched records
- **replace_and_delete_unmatched** <- updates existing matches and deletes unmatched
Example:
```python
from dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType
from dagster_delta_polars import DeltaLakePolarsIOManager
@asset(
key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": DeltaLakePolarsIOManager(
root_uri="s3://bucket",
mode=WriteMode.merge, # or just "merge"
merge_config=MergeConfig(
merge_type=MergeType.upsert,
predicate="s.a = t.a",
source_alias="s",
target_alias="t",
)
)}
)
```
## Special metadata configurations
### **Add** additional `table_configuration`
Specify additional table configurations for `configuration` in `write_deltalake`.
```python
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"table_configuration": {
"delta.enableChangeDataFeed": "true"
}},
)
def my_asset() -> pl.DataFrame:
...
```
### **Overwrite** the write `mode`
Override the write `mode` to be used in `write_deltalake`.
```python
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"mode": "append"},
)
def my_asset() -> pl.DataFrame:
...
```
### **Overwrite** the `custom_metadata`
Override the `custom_metadata` to be used in `write_deltalake`.
```python
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"custom_metadata": {"owner":"John Doe"}},
)
def my_asset() -> pl.DataFrame:
...
```
### **Overwrite** the write `schema_mode`
Override the `schema_mode` to be used in `write_deltalake`.
```python
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"schema_mode": "merge"},
)
def my_asset() -> pl.DataFrame:
...
```
### **Overwrite** the `writer_properties`
Override the `writer_properties` to be used in `write_deltalake`.
```python
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"writer_properties": {
"compression": "SNAPPY",
}},
)
def my_asset() -> pl.DataFrame:
...
```
### **Overwrite** the `merge_predicate`
Override the `merge_predicate` to be used with `merge` execution.
```python
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"merge_predicate": "s.foo = t.foo AND s.bar = t.bar"},
)
def my_asset() -> pl.DataFrame:
...
```
### **Overwrite** the `schema`
Override the `schema` of where the table will be saved
```python
@dg.asset(
io_manager_key = "deltalake_io_manager",
metadata={"schema": "custom_db_schema"},
)
def my_asset() -> pl.DataFrame:
...
```
### **Set** the `columns` that need to be read
Override the `columns` to only load these columns in
```python
@dg.asset(
io_manager_key = "deltalake_io_manager",
ins = {
"upstream_asset": dg.AssetIn(metadata={"columns":["foo","bar"]})
}
)
def my_asset(upstream_asset) -> pl.DataFrame:
...
```
### **Override** table name using `root_name`
Instead of using the asset_name for the table name it's possible to set a custom table name using the `root_name` in the asset defintion metadata.
This is useful where you have two or multiple assets who have the same table structure, but each asset is a subset of the full table partition_definition, and it wasn't possible to combine this into a single asset due to requiring different underlying Op logic and/or upstream assets:
```python
import polars as pl
import dagster as dg
@dg.asset(
io_manager_key = "deltalake_io_manager",
partitions_def=dg.StaticPartitionsDefinition(["a", "b"]),
metadata={
"partition_expr": "foo",
"root_name": "asset_partitioned",
},
)
def asset_partitioned_1(upstream_1: pl.DataFrame, upstream_2: pl.DataFrame) -> pl.DataFrame:
...
@dg.asset(
partitions_def=dg.StaticPartitionsDefinition(["c", "d"]),
metadata={
"partition_expr": "foo",
"root_name": "asset_partitioned",
},
)
def asset_partitioned_2(upstream_3: pl.DataFrame, upstream_4: pl.DataFrame) -> pl.DataFrame:
...
```
Effectively this would be the flow:
```
{static_partition_def: [a,b]}
┌───────────┐
│upstream 1 ├─┐ ┌────────────────────────┐
└───────────┘ │ │ │ write to storage on partition (a,b)
┌───────────┐ └─► asset_partitioned_1 ├──────────────────────┐
│upstream 2 ├───► │ │
└───────────┘ └────────────────────────┘ ┌──────────────▼──────────────────┐
│ partitions │
│ asset_partitioned: │
│ [a,b,c,d] │
┌───────────┐ ┌────────────────────────┐ └──────────────▲──────────────────┘
│upstream 3 ├──┐│ │ │
└───────────┘ └► asset_partitioned_2 │ │
┌───────────┐ ┌─► ├──────────────────────┘
│upstream 4 ├─┘ └────────────────────────┘ write to storage on partition (c,d)
└───────────┘
{static_partition_def: [c,d]}
```
Raw data
{
"_id": null,
"home_page": null,
"name": "dagster-delta",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "dagster, datalake, delta, deltalake, io manager, polars, pyarrow",
"author": "Ion Koutsouris",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/1f/cf/0b4e061791b1cf65a333c58e66659228b5e2fc10252f3971b81a1bd50c0e/dagster_delta-0.3.0.tar.gz",
"platform": null,
"description": "# dagster-delta\nDagster deltalake implementation for Pyarrow & Polars. Originally forked from dagster-deltalake with customizations. \n\nThe IO Managers support partition mapping, custom write modes, special metadata configuration for advanced use cases.\n\nThe supported write modes:\n\n- **error**\n- **append**\n- **overwrite**\n- **ignore**\n- **merge**\n- **create_or_replace**\n\n## Merge\n\ndagster-delta supports MERGE execution with a couple pre-defined MERGE types (dagster_delta.config.MergeType):\n\n- **deduplicate_insert** <- Deduplicates on write\n- **update_only** <- updates only the matches records\n- **upsert** <- updates existing matches and inserts non matched records\n- **replace_and_delete_unmatched** <- updates existing matches and deletes unmatched\n\nExample:\n```python\nfrom dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType\nfrom dagster_delta_polars import DeltaLakePolarsIOManager\n\n@asset(\n key_prefix=[\"my_schema\"] # will be used as the schema (parent folder) in Delta Lake\n)\ndef my_table() -> pl.DataFrame: # the name of the asset will be the table name\n ...\n\ndefs = Definitions(\n assets=[my_table],\n resources={\"io_manager\": DeltaLakePolarsIOManager(\n root_uri=\"s3://bucket\",\n mode=WriteMode.merge, # or just \"merge\"\n merge_config=MergeConfig(\n merge_type=MergeType.upsert,\n predicate=\"s.a = t.a\",\n source_alias=\"s\",\n target_alias=\"t\",\n )\n )}\n)\n```\n\n## Special metadata configurations\n\n### **Add** additional `table_configuration`\nSpecify additional table configurations for `configuration` in `write_deltalake`.\n\n```python\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n metadata={\"table_configuration\": {\n \"delta.enableChangeDataFeed\": \"true\"\n }},\n)\ndef my_asset() -> pl.DataFrame:\n ...\n\n```\n\n### **Overwrite** the write `mode`\nOverride the write `mode` to be used in `write_deltalake`.\n\n```python\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n metadata={\"mode\": \"append\"},\n)\ndef my_asset() -> pl.DataFrame:\n ...\n\n```\n\n### **Overwrite** the `custom_metadata`\nOverride the `custom_metadata` to be used in `write_deltalake`.\n\n```python\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n metadata={\"custom_metadata\": {\"owner\":\"John Doe\"}},\n)\ndef my_asset() -> pl.DataFrame:\n ...\n\n```\n\n### **Overwrite** the write `schema_mode`\nOverride the `schema_mode` to be used in `write_deltalake`.\n\n```python\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n metadata={\"schema_mode\": \"merge\"},\n)\ndef my_asset() -> pl.DataFrame:\n ...\n\n```\n\n### **Overwrite** the `writer_properties`\nOverride the `writer_properties` to be used in `write_deltalake`.\n\n```python\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n metadata={\"writer_properties\": {\n \"compression\": \"SNAPPY\",\n }},\n)\ndef my_asset() -> pl.DataFrame:\n ...\n\n```\n\n### **Overwrite** the `merge_predicate`\nOverride the `merge_predicate` to be used with `merge` execution.\n\n```python\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n metadata={\"merge_predicate\": \"s.foo = t.foo AND s.bar = t.bar\"},\n)\ndef my_asset() -> pl.DataFrame:\n ...\n\n```\n\n### **Overwrite** the `schema`\nOverride the `schema` of where the table will be saved\n\n```python\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n metadata={\"schema\": \"custom_db_schema\"},\n)\ndef my_asset() -> pl.DataFrame:\n ...\n\n```\n\n### **Set** the `columns` that need to be read\nOverride the `columns` to only load these columns in\n\n```python\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n ins = {\n \"upstream_asset\": dg.AssetIn(metadata={\"columns\":[\"foo\",\"bar\"]})\n }\n)\ndef my_asset(upstream_asset) -> pl.DataFrame:\n ...\n\n```\n\n### **Override** table name using `root_name`\n\nInstead of using the asset_name for the table name it's possible to set a custom table name using the `root_name` in the asset defintion metadata.\n\nThis is useful where you have two or multiple assets who have the same table structure, but each asset is a subset of the full table partition_definition, and it wasn't possible to combine this into a single asset due to requiring different underlying Op logic and/or upstream assets:\n\n```python\nimport polars as pl\nimport dagster as dg\n\n@dg.asset(\n io_manager_key = \"deltalake_io_manager\",\n partitions_def=dg.StaticPartitionsDefinition([\"a\", \"b\"]),\n metadata={\n \"partition_expr\": \"foo\",\n \"root_name\": \"asset_partitioned\",\n },\n)\ndef asset_partitioned_1(upstream_1: pl.DataFrame, upstream_2: pl.DataFrame) -> pl.DataFrame:\n ...\n\n@dg.asset(\n partitions_def=dg.StaticPartitionsDefinition([\"c\", \"d\"]),\n metadata={\n \"partition_expr\": \"foo\",\n \"root_name\": \"asset_partitioned\",\n },\n)\ndef asset_partitioned_2(upstream_3: pl.DataFrame, upstream_4: pl.DataFrame) -> pl.DataFrame:\n ...\n\n``` \n\nEffectively this would be the flow:\n\n``` \n \n {static_partition_def: [a,b]} \n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \n\u2502upstream 1 \u251c\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502 \u2502 \u2502 write to storage on partition (a,b) \n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2514\u2500\u25ba asset_partitioned_1 \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \n\u2502upstream 2 \u251c\u2500\u2500\u2500\u25ba \u2502 \u2502 \n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25bc\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n \u2502 partitions \u2502\n \u2502 asset_partitioned: \u2502\n \u2502 [a,b,c,d] \u2502\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u25b2\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n\u2502upstream 3 \u251c\u2500\u2500\u2510\u2502 \u2502 \u2502 \n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u25ba asset_partitioned_2 \u2502 \u2502 \n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u25ba \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \n\u2502upstream 4 \u251c\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 write to storage on partition (c,d) \n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \n {static_partition_def: [c,d]} \n \n```",
"bugtrack_url": null,
"license": null,
"summary": "Deltalake IO Managers for Dagster with pyarrow and Polars support.",
"version": "0.3.0",
"project_urls": null,
"split_keywords": [
"dagster",
" datalake",
" delta",
" deltalake",
" io manager",
" polars",
" pyarrow"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "0c5ffc3e7b0726f5f6f3fc18f8b12847633de0f999dc5801267ab5910da729c4",
"md5": "2c581a570d789d18878b9bdf7ee94f7f",
"sha256": "9257645c5e5d9270b8cbf17af5235347a47fd1883327f49f2a62d2fee98ed823"
},
"downloads": -1,
"filename": "dagster_delta-0.3.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "2c581a570d789d18878b9bdf7ee94f7f",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 31185,
"upload_time": "2025-02-11T11:27:41",
"upload_time_iso_8601": "2025-02-11T11:27:41.487819Z",
"url": "https://files.pythonhosted.org/packages/0c/5f/fc3e7b0726f5f6f3fc18f8b12847633de0f999dc5801267ab5910da729c4/dagster_delta-0.3.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "1fcf0b4e061791b1cf65a333c58e66659228b5e2fc10252f3971b81a1bd50c0e",
"md5": "4370796e38e4cfb0da8883c9dd75ef07",
"sha256": "7aa7262ec471b5433d2af126572fe7da8c09b1d56a2cf217d847babd3309a529"
},
"downloads": -1,
"filename": "dagster_delta-0.3.0.tar.gz",
"has_sig": false,
"md5_digest": "4370796e38e4cfb0da8883c9dd75ef07",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 32986,
"upload_time": "2025-02-11T11:27:43",
"upload_time_iso_8601": "2025-02-11T11:27:43.233635Z",
"url": "https://files.pythonhosted.org/packages/1f/cf/0b4e061791b1cf65a333c58e66659228b5e2fc10252f3971b81a1bd50c0e/dagster_delta-0.3.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-02-11 11:27:43",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "dagster-delta"
}