Name | dg-sqlmesh JSON |
Version |
1.2.2
JSON |
| download |
home_page | None |
Summary | Seamless integration between Dagster and SQLMesh for modern data engineering workflows |
upload_time | 2025-08-06 11:32:01 |
maintainer | None |
docs_url | None |
author | None |
requires_python | <3.13,>=3.11 |
license | None |
keywords |
dagster
sqlmesh
data-engineering
etl
data-pipeline
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# SQLMesh-Dagster Integration
This module provides a complete integration between SQLMesh and Dagster, allowing SQLMesh models to be materialized as Dagster assets with support for audits, metadata, and adaptive scheduling.
## Features
### 🎯 **SQLMesh Model to Dagster Asset Conversion**
- **Individual asset control** : Each SQLMesh model becomes a separate Dagster asset with granular success/failure control
- **Automatic materialization** : SQLMesh models are automatically converted to Dagster assets
- **External assets support** : SQLMesh sources (external assets) are mapped to Dagster AssetKeys
- **Automatic dependencies** : Dependencies between models are preserved in Dagster
- **Partitioning** : Support for partitioned SQLMesh models (managed by SQLMesh, no integration with Dagster partitions - no Dagster → SQLMesh backfill)
### 📊 **SQLMesh Metadata Integration to Dagster**
- **Complete metadata** : Cron, tags, kind, dialect, query, partitioned_by, clustered_by
- **Code versioning** : Uses SQLMesh data_hash for Dagster versioning
- **Column descriptions** : Table metadata with descriptions
- **Customizable tags** : SQLMesh tags mapping to Dagster
### ✅ **SQLMesh Audits to Asset Checks Conversion**
- **Automatic audits** : SQLMesh audits become Dagster AssetCheckSpec
- **AssetCheckResult** : Automatic emission of audit results with proper output handling
- **Audit metadata** : SQL query, arguments, dialect, blocking status
- **Non-blocking** : Dagster checks are non-blocking (SQLMesh handles blocking)
- **Fallback handling** : Graceful handling when no evaluation events are found
### ⏰ **Adaptive Scheduling**
- **Automatic analysis** : Detection of the finest granularity from SQLMesh crons
- **Adaptive schedule** : Automatic creation of a Dagster schedule based on crons
- **Intelligent execution** : SQLMesh manages which models should be executed
- **Monitoring** : Detailed logs and granularity metadata
### 🔧 **All-in-One Factory**
- **Simple configuration** : Single function to configure everything
- **Extensible translator** : Customizable translator system
- **Automatic validation** : External dependencies validation
- **Retry policy** : Centralized retry policy configuration
## Basic Usage
### **Simple Factory (Recommended)**
```python
from dagster import RetryPolicy, AssetKey, Backoff
from dg_sqlmesh import sqlmesh_definitions_factory
from dg_sqlmesh import SQLMeshTranslator
class SlingToSqlmeshTranslator(SQLMeshTranslator):
def get_external_asset_key(self, external_fqn: str) -> AssetKey:
"""
Custom mapping for external assets.
SQLMesh: 'jaffle_db.main.raw_source_customers' → Sling: ['target', 'main', 'raw_source_customers']
"""
parts = external_fqn.replace('"', '').split('.')
if len(parts) >= 3:
catalog, schema, table = parts[0], parts[1], parts[2]
return AssetKey(['target', 'main', table])
return AssetKey(['external'] + parts[1:])
# All-in-one factory: everything configured in one line!
defs = sqlmesh_definitions_factory(
project_dir="sqlmesh_project",
gateway="postgres",
translator=SlingToSqlmeshTranslator(),
concurrency_limit=1,
name="sqlmesh_multi_asset",
group_name="sqlmesh",
op_tags={"team": "data", "env": "prod"},
retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),
enable_schedule=True, # Enable adaptive scheduling
)
```
### **Advanced Configuration**
```python
from dagster import Definitions, RetryPolicy
from dg_sqlmesh import sqlmesh_assets_factory, sqlmesh_adaptive_schedule_factory
from dg_sqlmesh import SQLMeshResource
from dg_sqlmesh import SQLMeshTranslator
# SQLMesh resource configuration
sqlmesh_resource = SQLMeshResource(
project_dir="sqlmesh_project",
gateway="postgres",
translator=SlingToSqlmeshTranslator(),
concurrency_limit=1,
ignore_cron=True # only for testing purposes
)
# SQLMesh assets configuration
sqlmesh_assets = sqlmesh_assets_factory(
sqlmesh_resource=sqlmesh_resource,
name="sqlmesh_multi_asset",
group_name="sqlmesh",
op_tags={"team": "data", "env": "prod"},
retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),
)
# Adaptive schedule and job created automatically
sqlmesh_adaptive_schedule, sqlmesh_job, _ = sqlmesh_adaptive_schedule_factory(
sqlmesh_resource=sqlmesh_resource
)
defs = Definitions(
assets=[sqlmesh_assets],
jobs=[sqlmesh_job],
schedules=[sqlmesh_adaptive_schedule],
resources={
"sqlmesh": sqlmesh_resource,
},
)
```
## Custom Translator
To map external assets (SQLMesh sources) to your Dagster conventions, you can create a custom translator:
```python
from dg_sqlmesh import SQLMeshTranslator
import dagster as dg
class MyCustomTranslator(SQLMeshTranslator):
def get_external_asset_key(self, external_fqn: str) -> dg.AssetKey:
"""
Custom mapping for external assets.
Example: 'jaffle_db.main.raw_source_customers' → ['target', 'main', 'raw_source_customers']
"""
parts = external_fqn.replace('"', '').split('.')
# We ignore the catalog (jaffle_db), we take the rest
return dg.AssetKey(['target'] + parts[1:])
def get_group_name(self, context, model) -> str:
"""
Custom mapping for groups.
"""
model_name = getattr(model, "view_name", "")
if model_name.startswith("stg_"):
return "staging"
elif model_name.startswith("mart_"):
return "marts"
return super().get_group_name(context, model)
```
## Translator Methods
The `SQLMeshTranslator` exposes several methods you can override:
### `get_external_asset_key(external_fqn: str) -> AssetKey`
Maps an external asset FQN to a Dagster AssetKey.
### `get_asset_key(model) -> AssetKey`
Maps a SQLMesh model to a Dagster AssetKey.
### `get_group_name(context, model) -> str`
Determines the group for a model.
### `get_tags(context, model) -> dict`
Generates tags for a model.
### `get_metadata(model, keys: list[str]) -> dict`
Extracts specified metadata from the model.
## Asset Checks and Audits
### **Automatic Audit Conversion**
SQLMesh audits are automatically converted to Dagster AssetCheckSpec:
```python
# SQLMesh audit
MODEL (
name customers,
audits (
not_null(column=id),
unique_values(columns=[id, email])
)
);
# Automatically becomes in Dagster
AssetCheckSpec(
name="not_null",
asset=AssetKey(["customers"]),
blocking=False, # SQLMesh handles blocking
description="SQLMesh audit: not_null(column=id)"
)
```
### **AssetCheckResult Emission**
During execution, audit results are emitted as AssetCheckResult:
```python
AssetCheckResult(
passed=True,
asset_key=AssetKey(["customers"]),
check_name="not_null",
metadata={
"sqlmesh_model_name": "customers",
"audit_query": "SELECT COUNT(*) FROM customers WHERE id IS NULL",
"audit_blocking": False,
"audit_dialect": "postgres",
"audit_args": {"column": "id"}
}
)
```
## Adaptive Scheduling
### **Automatic Cron Analysis**
The system automatically analyzes all SQLMesh crons and determines the finest granularity:
```python
# If you have models with different crons:
# - customers: @daily
# - orders: @hourly
# - events: */5 * * * * (every 5 minutes)
# The adaptive schedule will be: */5 * * * * (every 5 minutes)
```
### **Intelligent Execution**
The schedule runs `sqlmesh run` on all models, but SQLMesh automatically manages which models should be executed:
```python
# The schedule simply does:
sqlmesh_resource.context.run(
ignore_cron=False, # SQLMesh respects crons
execution_time=datetime.datetime.now(),
)
```
## Architecture
### **Individual Asset Pattern**
Each SQLMesh model becomes a separate Dagster asset that:
- **Materializes independently** : Each asset calls `sqlmesh.materialize_assets_threaded()` for its specific model
- **Controls success/failure** : Each asset can succeed or fail individually based on SQLMesh execution results
- **Handles dependencies** : Uses `translator.get_model_deps_with_external()` for proper dependency mapping
- **Manages checks** : Each asset handles its own audit results with `AssetCheckResult` outputs
### **Benefits of Individual Assets**
- **Granular control** : Each asset can succeed or fail independently in the Dagster UI
- **Clear visibility** : See exactly which models are running, succeeded, or failed
- **Individual retries** : Failed assets can be retried without affecting others
- **Better monitoring** : Track performance and issues per model
- **Flexible scheduling** : Different assets can have different schedules if needed
### **SQLMeshResource**
- Manages SQLMesh context and caching
- Implements strict singleton pattern
- Uses AnyIO for multithreading
- Accepts a custom translator
### **SQLMeshTranslator**
- Maps SQLMesh concepts to Dagster
- Extensible via inheritance
- Handles external assets and dependencies
### **SQLMesh Metadata via Tags**
You can pass metadata from SQLMesh models to Dagster assets using the tag convention `dagster:property:value`:
```sql
-- In your SQLMesh model
MODEL (
name customers,
tags ARRAY["dagster:group_name:sqlmesh_datamarts"],
-- ... other model properties
);
```
#### **Supported Properties**
Currently supported Dagster properties via tags:
- **`dagster:group_name:value`** : Sets the Dagster asset group name
- Example: `"dagster:group_name:sqlmesh_datamarts"`
- Result: Asset will be in the "sqlmesh_datamarts" group
#### **Tag Convention**
The convention follows the pattern: `dagster:property:value`
- **`dagster`** : Prefix to indicate this is for Dagster
- **`property`** : The Dagster property to update on the asset
- **`value`** : The value to set for that property
#### **Priority Order**
When determining asset properties, the translator follows this priority:
1. **SQLMesh tags** : `dagster:group_name:value` (highest priority)
2. **Factory parameter** : `group_name="sqlmesh"` in factory call
3. **Default logic** : Automatic group determination based on model path
### **sqlmesh_definitions_factory**
- All-in-one factory for simple configuration
- Automatically creates: resource, assets, job, schedule
- Validates external dependencies
- Returns Definitions directly
### **SQLMeshEventCaptureConsole**
- Custom SQLMesh console to capture events
- Captures audit results for AssetCheckResult
- Handles metadata serialization
## Plan + Run Architecture
### **Individual Asset Materialization**
Each Dagster asset materializes its specific SQLMesh model using:
1. **Model Selection** : `get_models_to_materialize()` selects the specific model for the asset
2. **Materialization** : `sqlmesh.materialize_assets_threaded()` executes the model
3. **Result Handling** : Console events determine success/failure and audit results
### **Implementation Details**
```python
# In each individual asset
def model_asset(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
# Materialize this specific model
models_to_materialize = get_models_to_materialize(
[current_asset_spec.key],
sqlmesh.get_models,
sqlmesh.translator,
)
# Execute materialization
plan = sqlmesh.materialize_assets_threaded(models_to_materialize, context=context)
# Check results via console events
failed_models_events = sqlmesh._console.get_failed_models_events()
evaluation_events = sqlmesh._console.get_evaluation_events()
# Return MaterializeResult + AssetCheckResult for audits
return MaterializeResult(...), *check_results
```
This approach provides granular control while maintaining all SQLMesh integration features.
## Performance
- **Individual execution** : Each asset runs its own SQLMesh materialization (may result in multiple `sqlmesh run` calls)
- **Strict singleton** : Only one active SQLMesh instance
- **Caching** : Contexts, models and translators are cached
- **Multithreading** : Uses AnyIO to avoid Dagster blocking
- **Lazy loading** : Resources are loaded on demand
- **Early validation** : External dependencies validation before execution
- **Optimized execution** : SQLMesh automatically skips models that don't need materialization
## Development Workflow
### **SQLMesh Development Philosophy**
This module follows SQLMesh's philosophy of **separation of concerns**:
- **Development** : Use SQLMesh CLI for development and schema changes
- **Production** : Use SQLMesh CLI for promoting changes
- **Orchestration** : Use this Dagster module only for running models
### **Development Workflow**
#### **1. Local Development**
```bash
# Develop your models locally
sqlmesh plan dev
sqlmesh apply dev
# Test your changes
sqlmesh run dev
```
#### **2. Production Promotion**
```bash
# Promote changes to production
sqlmesh plan prod # ->manual operation to validate the plan (apply it)
# Or use CI/CD pipeline
```
#### **3. Dagster Orchestration**
```python
# Dagster takes over for production runs
# - Automatic scheduling via adaptive schedule
# - Manual runs via Dagster UI
# - Only executes: sqlmesh run prod
```
### **Module Responsibilities**
#### **What this module DOES:**
- ✅ **Orchestrates** `sqlmesh run` commands
- ✅ **Schedules** model execution
- ✅ **Monitors** execution and audits
- ✅ **Emits** Dagster events and metadata
#### **What this module DOES NOT:**
- ❌ **Plan changes** (`sqlmesh plan`)
- ❌ **Apply changes** (`sqlmesh apply`)
- ❌ **Handle breaking changes**
- ❌ **Manage environments**
### **Breaking Changes Management**
Breaking changes are handled **outside** this module:
- **Development** : `sqlmesh plan dev` + manual review
- **Production** : `sqlmesh plan prod` + CI/CD approval
- **Orchestration** : This module only runs approved models
### **Environment Separation**
```bash
# Development (SQLMesh CLI)
sqlmesh plan dev
sqlmesh apply dev
sqlmesh run dev
# Production (Dagster module)
# Automatically runs: sqlmesh run prod
# Based on schedules and triggers
```
This separation ensures:
- ✅ **Clear responsibilities** : Development vs Orchestration
- ✅ **Safe deployments** : Breaking changes handled by SQLMesh CLI
- ✅ **Reliable orchestration** : Dagster only runs approved models
- ✅ **CI/CD friendly** : Standard SQLMesh workflow for deployments
## Installation
```bash
pip install dg-sqlmesh
```
## Requirements
- Python 3.11+
- Dagster 1.11.4+
- SQLMesh 0.206.1+
## Limitations
- **Multiple SQLMesh runs** : Each asset triggers its own `sqlmesh run` (may impact performance with many assets)
- **No Dagster → SQLMesh backfill** : Partitions managed only by SQLMesh itself (run a materialization to backfill)
- **Breaking changes** : Handled outside the module (SQLMesh CLI or CI/CD)
- **Environment management** : SQLMesh CLI or CI/CD
## Troubleshooting
### Common Issues
#### **"Invalid cron" errors**
- **Cause** : Cron faster than 5 minutes
- **Solution** : Use `ignore_cron=True` for testing
#### **External asset mapping errors**
- **Cause** : Translator doesn't handle FQN format
- **Solution** : Check `get_external_asset_key` method
#### **Performance issues**
- **Cause** : Too many models loaded
- **Solution** : Use `concurrency_limit` and caching
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests for new functionality
5. Ensure all tests pass
6. Submit a pull request
## License
Apache 2.0 License - see LICENSE file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "dg-sqlmesh",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.13,>=3.11",
"maintainer_email": null,
"keywords": "dagster, sqlmesh, data-engineering, etl, data-pipeline",
"author": null,
"author_email": "Thomas Trividic <thomas.trividic@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/34/c6/3bcd0847cad993bf0b4bedffbb740d4f0941ae3e9638f837f93306a5cf7b/dg_sqlmesh-1.2.2.tar.gz",
"platform": null,
"description": "# SQLMesh-Dagster Integration\n\nThis module provides a complete integration between SQLMesh and Dagster, allowing SQLMesh models to be materialized as Dagster assets with support for audits, metadata, and adaptive scheduling.\n\n## Features\n\n### \ud83c\udfaf **SQLMesh Model to Dagster Asset Conversion**\n\n- **Individual asset control** : Each SQLMesh model becomes a separate Dagster asset with granular success/failure control\n- **Automatic materialization** : SQLMesh models are automatically converted to Dagster assets\n- **External assets support** : SQLMesh sources (external assets) are mapped to Dagster AssetKeys\n- **Automatic dependencies** : Dependencies between models are preserved in Dagster\n- **Partitioning** : Support for partitioned SQLMesh models (managed by SQLMesh, no integration with Dagster partitions - no Dagster \u2192 SQLMesh backfill)\n\n### \ud83d\udcca **SQLMesh Metadata Integration to Dagster**\n\n- **Complete metadata** : Cron, tags, kind, dialect, query, partitioned_by, clustered_by\n- **Code versioning** : Uses SQLMesh data_hash for Dagster versioning\n- **Column descriptions** : Table metadata with descriptions\n- **Customizable tags** : SQLMesh tags mapping to Dagster\n\n### \u2705 **SQLMesh Audits to Asset Checks Conversion**\n\n- **Automatic audits** : SQLMesh audits become Dagster AssetCheckSpec\n- **AssetCheckResult** : Automatic emission of audit results with proper output handling\n- **Audit metadata** : SQL query, arguments, dialect, blocking status\n- **Non-blocking** : Dagster checks are non-blocking (SQLMesh handles blocking)\n- **Fallback handling** : Graceful handling when no evaluation events are found\n\n### \u23f0 **Adaptive Scheduling**\n\n- **Automatic analysis** : Detection of the finest granularity from SQLMesh crons\n- **Adaptive schedule** : Automatic creation of a Dagster schedule based on crons\n- **Intelligent execution** : SQLMesh manages which models should be executed\n- **Monitoring** : Detailed logs and granularity metadata\n\n### \ud83d\udd27 **All-in-One Factory**\n\n- **Simple configuration** : Single function to configure everything\n- **Extensible translator** : Customizable translator system\n- **Automatic validation** : External dependencies validation\n- **Retry policy** : Centralized retry policy configuration\n\n## Basic Usage\n\n### **Simple Factory (Recommended)**\n\n```python\nfrom dagster import RetryPolicy, AssetKey, Backoff\nfrom dg_sqlmesh import sqlmesh_definitions_factory\nfrom dg_sqlmesh import SQLMeshTranslator\n\nclass SlingToSqlmeshTranslator(SQLMeshTranslator):\n def get_external_asset_key(self, external_fqn: str) -> AssetKey:\n \"\"\"\n Custom mapping for external assets.\n SQLMesh: 'jaffle_db.main.raw_source_customers' \u2192 Sling: ['target', 'main', 'raw_source_customers']\n \"\"\"\n parts = external_fqn.replace('\"', '').split('.')\n if len(parts) >= 3:\n catalog, schema, table = parts[0], parts[1], parts[2]\n return AssetKey(['target', 'main', table])\n return AssetKey(['external'] + parts[1:])\n\n# All-in-one factory: everything configured in one line!\ndefs = sqlmesh_definitions_factory(\n project_dir=\"sqlmesh_project\",\n gateway=\"postgres\",\n translator=SlingToSqlmeshTranslator(),\n concurrency_limit=1,\n name=\"sqlmesh_multi_asset\",\n group_name=\"sqlmesh\",\n op_tags={\"team\": \"data\", \"env\": \"prod\"},\n retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),\n enable_schedule=True, # Enable adaptive scheduling\n)\n```\n\n### **Advanced Configuration**\n\n```python\nfrom dagster import Definitions, RetryPolicy\nfrom dg_sqlmesh import sqlmesh_assets_factory, sqlmesh_adaptive_schedule_factory\nfrom dg_sqlmesh import SQLMeshResource\nfrom dg_sqlmesh import SQLMeshTranslator\n\n# SQLMesh resource configuration\nsqlmesh_resource = SQLMeshResource(\n project_dir=\"sqlmesh_project\",\n gateway=\"postgres\",\n translator=SlingToSqlmeshTranslator(),\n concurrency_limit=1,\n ignore_cron=True # only for testing purposes\n)\n\n# SQLMesh assets configuration\nsqlmesh_assets = sqlmesh_assets_factory(\n sqlmesh_resource=sqlmesh_resource,\n name=\"sqlmesh_multi_asset\",\n group_name=\"sqlmesh\",\n op_tags={\"team\": \"data\", \"env\": \"prod\"},\n retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),\n)\n\n# Adaptive schedule and job created automatically\nsqlmesh_adaptive_schedule, sqlmesh_job, _ = sqlmesh_adaptive_schedule_factory(\n sqlmesh_resource=sqlmesh_resource\n)\n\ndefs = Definitions(\n assets=[sqlmesh_assets],\n jobs=[sqlmesh_job],\n schedules=[sqlmesh_adaptive_schedule],\n resources={\n \"sqlmesh\": sqlmesh_resource,\n },\n)\n```\n\n## Custom Translator\n\nTo map external assets (SQLMesh sources) to your Dagster conventions, you can create a custom translator:\n\n```python\nfrom dg_sqlmesh import SQLMeshTranslator\nimport dagster as dg\n\nclass MyCustomTranslator(SQLMeshTranslator):\n def get_external_asset_key(self, external_fqn: str) -> dg.AssetKey:\n \"\"\"\n Custom mapping for external assets.\n Example: 'jaffle_db.main.raw_source_customers' \u2192 ['target', 'main', 'raw_source_customers']\n \"\"\"\n parts = external_fqn.replace('\"', '').split('.')\n # We ignore the catalog (jaffle_db), we take the rest\n return dg.AssetKey(['target'] + parts[1:])\n\n def get_group_name(self, context, model) -> str:\n \"\"\"\n Custom mapping for groups.\n \"\"\"\n model_name = getattr(model, \"view_name\", \"\")\n if model_name.startswith(\"stg_\"):\n return \"staging\"\n elif model_name.startswith(\"mart_\"):\n return \"marts\"\n return super().get_group_name(context, model)\n```\n\n## Translator Methods\n\nThe `SQLMeshTranslator` exposes several methods you can override:\n\n### `get_external_asset_key(external_fqn: str) -> AssetKey`\n\nMaps an external asset FQN to a Dagster AssetKey.\n\n### `get_asset_key(model) -> AssetKey`\n\nMaps a SQLMesh model to a Dagster AssetKey.\n\n### `get_group_name(context, model) -> str`\n\nDetermines the group for a model.\n\n### `get_tags(context, model) -> dict`\n\nGenerates tags for a model.\n\n### `get_metadata(model, keys: list[str]) -> dict`\n\nExtracts specified metadata from the model.\n\n## Asset Checks and Audits\n\n### **Automatic Audit Conversion**\n\nSQLMesh audits are automatically converted to Dagster AssetCheckSpec:\n\n```python\n# SQLMesh audit\nMODEL (\n name customers,\n audits (\n not_null(column=id),\n unique_values(columns=[id, email])\n )\n);\n\n# Automatically becomes in Dagster\nAssetCheckSpec(\n name=\"not_null\",\n asset=AssetKey([\"customers\"]),\n blocking=False, # SQLMesh handles blocking\n description=\"SQLMesh audit: not_null(column=id)\"\n)\n```\n\n### **AssetCheckResult Emission**\n\nDuring execution, audit results are emitted as AssetCheckResult:\n\n```python\nAssetCheckResult(\n passed=True,\n asset_key=AssetKey([\"customers\"]),\n check_name=\"not_null\",\n metadata={\n \"sqlmesh_model_name\": \"customers\",\n \"audit_query\": \"SELECT COUNT(*) FROM customers WHERE id IS NULL\",\n \"audit_blocking\": False,\n \"audit_dialect\": \"postgres\",\n \"audit_args\": {\"column\": \"id\"}\n }\n)\n```\n\n## Adaptive Scheduling\n\n### **Automatic Cron Analysis**\n\nThe system automatically analyzes all SQLMesh crons and determines the finest granularity:\n\n```python\n# If you have models with different crons:\n# - customers: @daily\n# - orders: @hourly\n# - events: */5 * * * * (every 5 minutes)\n\n# The adaptive schedule will be: */5 * * * * (every 5 minutes)\n```\n\n### **Intelligent Execution**\n\nThe schedule runs `sqlmesh run` on all models, but SQLMesh automatically manages which models should be executed:\n\n```python\n# The schedule simply does:\nsqlmesh_resource.context.run(\n ignore_cron=False, # SQLMesh respects crons\n execution_time=datetime.datetime.now(),\n)\n```\n\n## Architecture\n\n### **Individual Asset Pattern**\n\nEach SQLMesh model becomes a separate Dagster asset that:\n\n- **Materializes independently** : Each asset calls `sqlmesh.materialize_assets_threaded()` for its specific model\n- **Controls success/failure** : Each asset can succeed or fail individually based on SQLMesh execution results\n- **Handles dependencies** : Uses `translator.get_model_deps_with_external()` for proper dependency mapping\n- **Manages checks** : Each asset handles its own audit results with `AssetCheckResult` outputs\n\n### **Benefits of Individual Assets**\n\n- **Granular control** : Each asset can succeed or fail independently in the Dagster UI\n- **Clear visibility** : See exactly which models are running, succeeded, or failed\n- **Individual retries** : Failed assets can be retried without affecting others\n- **Better monitoring** : Track performance and issues per model\n- **Flexible scheduling** : Different assets can have different schedules if needed\n\n### **SQLMeshResource**\n\n- Manages SQLMesh context and caching\n- Implements strict singleton pattern\n- Uses AnyIO for multithreading\n- Accepts a custom translator\n\n### **SQLMeshTranslator**\n\n- Maps SQLMesh concepts to Dagster\n- Extensible via inheritance\n- Handles external assets and dependencies\n\n### **SQLMesh Metadata via Tags**\n\nYou can pass metadata from SQLMesh models to Dagster assets using the tag convention `dagster:property:value`:\n\n```sql\n-- In your SQLMesh model\nMODEL (\n name customers,\n tags ARRAY[\"dagster:group_name:sqlmesh_datamarts\"],\n -- ... other model properties\n);\n```\n\n#### **Supported Properties**\n\nCurrently supported Dagster properties via tags:\n\n- **`dagster:group_name:value`** : Sets the Dagster asset group name\n - Example: `\"dagster:group_name:sqlmesh_datamarts\"`\n - Result: Asset will be in the \"sqlmesh_datamarts\" group\n\n#### **Tag Convention**\n\nThe convention follows the pattern: `dagster:property:value`\n\n- **`dagster`** : Prefix to indicate this is for Dagster\n- **`property`** : The Dagster property to update on the asset\n- **`value`** : The value to set for that property\n\n#### **Priority Order**\n\nWhen determining asset properties, the translator follows this priority:\n\n1. **SQLMesh tags** : `dagster:group_name:value` (highest priority)\n2. **Factory parameter** : `group_name=\"sqlmesh\"` in factory call\n3. **Default logic** : Automatic group determination based on model path\n\n### **sqlmesh_definitions_factory**\n\n- All-in-one factory for simple configuration\n- Automatically creates: resource, assets, job, schedule\n- Validates external dependencies\n- Returns Definitions directly\n\n### **SQLMeshEventCaptureConsole**\n\n- Custom SQLMesh console to capture events\n- Captures audit results for AssetCheckResult\n- Handles metadata serialization\n\n## Plan + Run Architecture\n\n### **Individual Asset Materialization**\n\nEach Dagster asset materializes its specific SQLMesh model using:\n\n1. **Model Selection** : `get_models_to_materialize()` selects the specific model for the asset\n2. **Materialization** : `sqlmesh.materialize_assets_threaded()` executes the model\n3. **Result Handling** : Console events determine success/failure and audit results\n\n### **Implementation Details**\n\n```python\n# In each individual asset\ndef model_asset(context: AssetExecutionContext, sqlmesh: SQLMeshResource):\n # Materialize this specific model\n models_to_materialize = get_models_to_materialize(\n [current_asset_spec.key],\n sqlmesh.get_models,\n sqlmesh.translator,\n )\n\n # Execute materialization\n plan = sqlmesh.materialize_assets_threaded(models_to_materialize, context=context)\n\n # Check results via console events\n failed_models_events = sqlmesh._console.get_failed_models_events()\n evaluation_events = sqlmesh._console.get_evaluation_events()\n\n # Return MaterializeResult + AssetCheckResult for audits\n return MaterializeResult(...), *check_results\n```\n\nThis approach provides granular control while maintaining all SQLMesh integration features.\n\n## Performance\n\n- **Individual execution** : Each asset runs its own SQLMesh materialization (may result in multiple `sqlmesh run` calls)\n- **Strict singleton** : Only one active SQLMesh instance\n- **Caching** : Contexts, models and translators are cached\n- **Multithreading** : Uses AnyIO to avoid Dagster blocking\n- **Lazy loading** : Resources are loaded on demand\n- **Early validation** : External dependencies validation before execution\n- **Optimized execution** : SQLMesh automatically skips models that don't need materialization\n\n## Development Workflow\n\n### **SQLMesh Development Philosophy**\n\nThis module follows SQLMesh's philosophy of **separation of concerns**:\n\n- **Development** : Use SQLMesh CLI for development and schema changes\n- **Production** : Use SQLMesh CLI for promoting changes\n- **Orchestration** : Use this Dagster module only for running models\n\n### **Development Workflow**\n\n#### **1. Local Development**\n\n```bash\n# Develop your models locally\nsqlmesh plan dev\nsqlmesh apply dev\n\n# Test your changes\nsqlmesh run dev\n```\n\n#### **2. Production Promotion**\n\n```bash\n# Promote changes to production\nsqlmesh plan prod # ->manual operation to validate the plan (apply it)\n\n# Or use CI/CD pipeline\n```\n\n#### **3. Dagster Orchestration**\n\n```python\n# Dagster takes over for production runs\n# - Automatic scheduling via adaptive schedule\n# - Manual runs via Dagster UI\n# - Only executes: sqlmesh run prod\n```\n\n### **Module Responsibilities**\n\n#### **What this module DOES:**\n\n- \u2705 **Orchestrates** `sqlmesh run` commands\n- \u2705 **Schedules** model execution\n- \u2705 **Monitors** execution and audits\n- \u2705 **Emits** Dagster events and metadata\n\n#### **What this module DOES NOT:**\n\n- \u274c **Plan changes** (`sqlmesh plan`)\n- \u274c **Apply changes** (`sqlmesh apply`)\n- \u274c **Handle breaking changes**\n- \u274c **Manage environments**\n\n### **Breaking Changes Management**\n\nBreaking changes are handled **outside** this module:\n\n- **Development** : `sqlmesh plan dev` + manual review\n- **Production** : `sqlmesh plan prod` + CI/CD approval\n- **Orchestration** : This module only runs approved models\n\n### **Environment Separation**\n\n```bash\n# Development (SQLMesh CLI)\nsqlmesh plan dev\nsqlmesh apply dev\nsqlmesh run dev\n\n# Production (Dagster module)\n# Automatically runs: sqlmesh run prod\n# Based on schedules and triggers\n```\n\nThis separation ensures:\n\n- \u2705 **Clear responsibilities** : Development vs Orchestration\n- \u2705 **Safe deployments** : Breaking changes handled by SQLMesh CLI\n- \u2705 **Reliable orchestration** : Dagster only runs approved models\n- \u2705 **CI/CD friendly** : Standard SQLMesh workflow for deployments\n\n## Installation\n\n```bash\npip install dg-sqlmesh\n```\n\n## Requirements\n\n- Python 3.11+\n- Dagster 1.11.4+\n- SQLMesh 0.206.1+\n\n## Limitations\n\n- **Multiple SQLMesh runs** : Each asset triggers its own `sqlmesh run` (may impact performance with many assets)\n- **No Dagster \u2192 SQLMesh backfill** : Partitions managed only by SQLMesh itself (run a materialization to backfill)\n- **Breaking changes** : Handled outside the module (SQLMesh CLI or CI/CD)\n- **Environment management** : SQLMesh CLI or CI/CD\n\n## Troubleshooting\n\n### Common Issues\n\n#### **\"Invalid cron\" errors**\n\n- **Cause** : Cron faster than 5 minutes\n- **Solution** : Use `ignore_cron=True` for testing\n\n#### **External asset mapping errors**\n\n- **Cause** : Translator doesn't handle FQN format\n- **Solution** : Check `get_external_asset_key` method\n\n#### **Performance issues**\n\n- **Cause** : Too many models loaded\n- **Solution** : Use `concurrency_limit` and caching\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Make your changes\n4. Add tests for new functionality\n5. Ensure all tests pass\n6. Submit a pull request\n\n## License\n\nApache 2.0 License - see LICENSE file for details.\n",
"bugtrack_url": null,
"license": null,
"summary": "Seamless integration between Dagster and SQLMesh for modern data engineering workflows",
"version": "1.2.2",
"project_urls": {
"Documentation": "https://github.com/fosk06/dagster-sqlmesh#readme",
"Homepage": "https://github.com/fosk06/dagster-sqlmesh",
"Issues": "https://github.com/fosk06/dagster-sqlmesh/issues",
"Repository": "https://github.com/fosk06/dagster-sqlmesh"
},
"split_keywords": [
"dagster",
" sqlmesh",
" data-engineering",
" etl",
" data-pipeline"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "f6ec05e0360783ceac1b11f731fdc548e9e8e5b575108aa3c9dc6ba3f409719b",
"md5": "3bb04076404d4cf2c0a771f2d47767b8",
"sha256": "d90967a249a0c3bd7615707c61f651528e1e374cc81de165f131b254fdb34642"
},
"downloads": -1,
"filename": "dg_sqlmesh-1.2.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "3bb04076404d4cf2c0a771f2d47767b8",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.13,>=3.11",
"size": 36350,
"upload_time": "2025-08-06T11:31:59",
"upload_time_iso_8601": "2025-08-06T11:31:59.881736Z",
"url": "https://files.pythonhosted.org/packages/f6/ec/05e0360783ceac1b11f731fdc548e9e8e5b575108aa3c9dc6ba3f409719b/dg_sqlmesh-1.2.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "34c63bcd0847cad993bf0b4bedffbb740d4f0941ae3e9638f837f93306a5cf7b",
"md5": "9d300a5f530e236cb27866d3a768f041",
"sha256": "df20c9ff26b73430e768c6b92e09bf887798d0f2d0ab736406d6ef7393442635"
},
"downloads": -1,
"filename": "dg_sqlmesh-1.2.2.tar.gz",
"has_sig": false,
"md5_digest": "9d300a5f530e236cb27866d3a768f041",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.13,>=3.11",
"size": 38491,
"upload_time": "2025-08-06T11:32:01",
"upload_time_iso_8601": "2025-08-06T11:32:01.232252Z",
"url": "https://files.pythonhosted.org/packages/34/c6/3bcd0847cad993bf0b4bedffbb740d4f0941ae3e9638f837f93306a5cf7b/dg_sqlmesh-1.2.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-06 11:32:01",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "fosk06",
"github_project": "dagster-sqlmesh#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "dg-sqlmesh"
}