dg-sqlmesh


Namedg-sqlmesh JSON
Version 1.2.2 PyPI version JSON
download
home_pageNone
SummarySeamless integration between Dagster and SQLMesh for modern data engineering workflows
upload_time2025-08-06 11:32:01
maintainerNone
docs_urlNone
authorNone
requires_python<3.13,>=3.11
licenseNone
keywords dagster sqlmesh data-engineering etl data-pipeline
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # SQLMesh-Dagster Integration

This module provides a complete integration between SQLMesh and Dagster, allowing SQLMesh models to be materialized as Dagster assets with support for audits, metadata, and adaptive scheduling.

## Features

### 🎯 **SQLMesh Model to Dagster Asset Conversion**

- **Individual asset control** : Each SQLMesh model becomes a separate Dagster asset with granular success/failure control
- **Automatic materialization** : SQLMesh models are automatically converted to Dagster assets
- **External assets support** : SQLMesh sources (external assets) are mapped to Dagster AssetKeys
- **Automatic dependencies** : Dependencies between models are preserved in Dagster
- **Partitioning** : Support for partitioned SQLMesh models (managed by SQLMesh, no integration with Dagster partitions - no Dagster → SQLMesh backfill)

### 📊 **SQLMesh Metadata Integration to Dagster**

- **Complete metadata** : Cron, tags, kind, dialect, query, partitioned_by, clustered_by
- **Code versioning** : Uses SQLMesh data_hash for Dagster versioning
- **Column descriptions** : Table metadata with descriptions
- **Customizable tags** : SQLMesh tags mapping to Dagster

### ✅ **SQLMesh Audits to Asset Checks Conversion**

- **Automatic audits** : SQLMesh audits become Dagster AssetCheckSpec
- **AssetCheckResult** : Automatic emission of audit results with proper output handling
- **Audit metadata** : SQL query, arguments, dialect, blocking status
- **Non-blocking** : Dagster checks are non-blocking (SQLMesh handles blocking)
- **Fallback handling** : Graceful handling when no evaluation events are found

### ⏰ **Adaptive Scheduling**

- **Automatic analysis** : Detection of the finest granularity from SQLMesh crons
- **Adaptive schedule** : Automatic creation of a Dagster schedule based on crons
- **Intelligent execution** : SQLMesh manages which models should be executed
- **Monitoring** : Detailed logs and granularity metadata

### 🔧 **All-in-One Factory**

- **Simple configuration** : Single function to configure everything
- **Extensible translator** : Customizable translator system
- **Automatic validation** : External dependencies validation
- **Retry policy** : Centralized retry policy configuration

## Basic Usage

### **Simple Factory (Recommended)**

```python
from dagster import RetryPolicy, AssetKey, Backoff
from dg_sqlmesh import sqlmesh_definitions_factory
from dg_sqlmesh import SQLMeshTranslator

class SlingToSqlmeshTranslator(SQLMeshTranslator):
    def get_external_asset_key(self, external_fqn: str) -> AssetKey:
        """
        Custom mapping for external assets.
        SQLMesh: 'jaffle_db.main.raw_source_customers' → Sling: ['target', 'main', 'raw_source_customers']
        """
        parts = external_fqn.replace('"', '').split('.')
        if len(parts) >= 3:
            catalog, schema, table = parts[0], parts[1], parts[2]
            return AssetKey(['target', 'main', table])
        return AssetKey(['external'] + parts[1:])

# All-in-one factory: everything configured in one line!
defs = sqlmesh_definitions_factory(
    project_dir="sqlmesh_project",
    gateway="postgres",
    translator=SlingToSqlmeshTranslator(),
    concurrency_limit=1,
    name="sqlmesh_multi_asset",
    group_name="sqlmesh",
    op_tags={"team": "data", "env": "prod"},
    retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),
    enable_schedule=True,  # Enable adaptive scheduling
)
```

### **Advanced Configuration**

```python
from dagster import Definitions, RetryPolicy
from dg_sqlmesh import sqlmesh_assets_factory, sqlmesh_adaptive_schedule_factory
from dg_sqlmesh import SQLMeshResource
from dg_sqlmesh import SQLMeshTranslator

# SQLMesh resource configuration
sqlmesh_resource = SQLMeshResource(
    project_dir="sqlmesh_project",
    gateway="postgres",
    translator=SlingToSqlmeshTranslator(),
    concurrency_limit=1,
    ignore_cron=True  # only for testing purposes
)

# SQLMesh assets configuration
sqlmesh_assets = sqlmesh_assets_factory(
    sqlmesh_resource=sqlmesh_resource,
    name="sqlmesh_multi_asset",
    group_name="sqlmesh",
    op_tags={"team": "data", "env": "prod"},
    retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),
)

# Adaptive schedule and job created automatically
sqlmesh_adaptive_schedule, sqlmesh_job, _ = sqlmesh_adaptive_schedule_factory(
    sqlmesh_resource=sqlmesh_resource
)

defs = Definitions(
    assets=[sqlmesh_assets],
    jobs=[sqlmesh_job],
    schedules=[sqlmesh_adaptive_schedule],
    resources={
        "sqlmesh": sqlmesh_resource,
    },
)
```

## Custom Translator

To map external assets (SQLMesh sources) to your Dagster conventions, you can create a custom translator:

```python
from dg_sqlmesh import SQLMeshTranslator
import dagster as dg

class MyCustomTranslator(SQLMeshTranslator):
    def get_external_asset_key(self, external_fqn: str) -> dg.AssetKey:
        """
        Custom mapping for external assets.
        Example: 'jaffle_db.main.raw_source_customers' → ['target', 'main', 'raw_source_customers']
        """
        parts = external_fqn.replace('"', '').split('.')
        # We ignore the catalog (jaffle_db), we take the rest
        return dg.AssetKey(['target'] + parts[1:])

    def get_group_name(self, context, model) -> str:
        """
        Custom mapping for groups.
        """
        model_name = getattr(model, "view_name", "")
        if model_name.startswith("stg_"):
            return "staging"
        elif model_name.startswith("mart_"):
            return "marts"
        return super().get_group_name(context, model)
```

## Translator Methods

The `SQLMeshTranslator` exposes several methods you can override:

### `get_external_asset_key(external_fqn: str) -> AssetKey`

Maps an external asset FQN to a Dagster AssetKey.

### `get_asset_key(model) -> AssetKey`

Maps a SQLMesh model to a Dagster AssetKey.

### `get_group_name(context, model) -> str`

Determines the group for a model.

### `get_tags(context, model) -> dict`

Generates tags for a model.

### `get_metadata(model, keys: list[str]) -> dict`

Extracts specified metadata from the model.

## Asset Checks and Audits

### **Automatic Audit Conversion**

SQLMesh audits are automatically converted to Dagster AssetCheckSpec:

```python
# SQLMesh audit
MODEL (
    name customers,
    audits (
        not_null(column=id),
        unique_values(columns=[id, email])
    )
);

# Automatically becomes in Dagster
AssetCheckSpec(
    name="not_null",
    asset=AssetKey(["customers"]),
    blocking=False,  # SQLMesh handles blocking
    description="SQLMesh audit: not_null(column=id)"
)
```

### **AssetCheckResult Emission**

During execution, audit results are emitted as AssetCheckResult:

```python
AssetCheckResult(
    passed=True,
    asset_key=AssetKey(["customers"]),
    check_name="not_null",
    metadata={
        "sqlmesh_model_name": "customers",
        "audit_query": "SELECT COUNT(*) FROM customers WHERE id IS NULL",
        "audit_blocking": False,
        "audit_dialect": "postgres",
        "audit_args": {"column": "id"}
    }
)
```

## Adaptive Scheduling

### **Automatic Cron Analysis**

The system automatically analyzes all SQLMesh crons and determines the finest granularity:

```python
# If you have models with different crons:
# - customers: @daily
# - orders: @hourly
# - events: */5 * * * * (every 5 minutes)

# The adaptive schedule will be: */5 * * * * (every 5 minutes)
```

### **Intelligent Execution**

The schedule runs `sqlmesh run` on all models, but SQLMesh automatically manages which models should be executed:

```python
# The schedule simply does:
sqlmesh_resource.context.run(
    ignore_cron=False,  # SQLMesh respects crons
    execution_time=datetime.datetime.now(),
)
```

## Architecture

### **Individual Asset Pattern**

Each SQLMesh model becomes a separate Dagster asset that:

- **Materializes independently** : Each asset calls `sqlmesh.materialize_assets_threaded()` for its specific model
- **Controls success/failure** : Each asset can succeed or fail individually based on SQLMesh execution results
- **Handles dependencies** : Uses `translator.get_model_deps_with_external()` for proper dependency mapping
- **Manages checks** : Each asset handles its own audit results with `AssetCheckResult` outputs

### **Benefits of Individual Assets**

- **Granular control** : Each asset can succeed or fail independently in the Dagster UI
- **Clear visibility** : See exactly which models are running, succeeded, or failed
- **Individual retries** : Failed assets can be retried without affecting others
- **Better monitoring** : Track performance and issues per model
- **Flexible scheduling** : Different assets can have different schedules if needed

### **SQLMeshResource**

- Manages SQLMesh context and caching
- Implements strict singleton pattern
- Uses AnyIO for multithreading
- Accepts a custom translator

### **SQLMeshTranslator**

- Maps SQLMesh concepts to Dagster
- Extensible via inheritance
- Handles external assets and dependencies

### **SQLMesh Metadata via Tags**

You can pass metadata from SQLMesh models to Dagster assets using the tag convention `dagster:property:value`:

```sql
-- In your SQLMesh model
MODEL (
    name customers,
    tags ARRAY["dagster:group_name:sqlmesh_datamarts"],
    -- ... other model properties
);
```

#### **Supported Properties**

Currently supported Dagster properties via tags:

- **`dagster:group_name:value`** : Sets the Dagster asset group name
  - Example: `"dagster:group_name:sqlmesh_datamarts"`
  - Result: Asset will be in the "sqlmesh_datamarts" group

#### **Tag Convention**

The convention follows the pattern: `dagster:property:value`

- **`dagster`** : Prefix to indicate this is for Dagster
- **`property`** : The Dagster property to update on the asset
- **`value`** : The value to set for that property

#### **Priority Order**

When determining asset properties, the translator follows this priority:

1. **SQLMesh tags** : `dagster:group_name:value` (highest priority)
2. **Factory parameter** : `group_name="sqlmesh"` in factory call
3. **Default logic** : Automatic group determination based on model path

### **sqlmesh_definitions_factory**

- All-in-one factory for simple configuration
- Automatically creates: resource, assets, job, schedule
- Validates external dependencies
- Returns Definitions directly

### **SQLMeshEventCaptureConsole**

- Custom SQLMesh console to capture events
- Captures audit results for AssetCheckResult
- Handles metadata serialization

## Plan + Run Architecture

### **Individual Asset Materialization**

Each Dagster asset materializes its specific SQLMesh model using:

1. **Model Selection** : `get_models_to_materialize()` selects the specific model for the asset
2. **Materialization** : `sqlmesh.materialize_assets_threaded()` executes the model
3. **Result Handling** : Console events determine success/failure and audit results

### **Implementation Details**

```python
# In each individual asset
def model_asset(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
    # Materialize this specific model
    models_to_materialize = get_models_to_materialize(
        [current_asset_spec.key],
        sqlmesh.get_models,
        sqlmesh.translator,
    )

    # Execute materialization
    plan = sqlmesh.materialize_assets_threaded(models_to_materialize, context=context)

    # Check results via console events
    failed_models_events = sqlmesh._console.get_failed_models_events()
    evaluation_events = sqlmesh._console.get_evaluation_events()

    # Return MaterializeResult + AssetCheckResult for audits
    return MaterializeResult(...), *check_results
```

This approach provides granular control while maintaining all SQLMesh integration features.

## Performance

- **Individual execution** : Each asset runs its own SQLMesh materialization (may result in multiple `sqlmesh run` calls)
- **Strict singleton** : Only one active SQLMesh instance
- **Caching** : Contexts, models and translators are cached
- **Multithreading** : Uses AnyIO to avoid Dagster blocking
- **Lazy loading** : Resources are loaded on demand
- **Early validation** : External dependencies validation before execution
- **Optimized execution** : SQLMesh automatically skips models that don't need materialization

## Development Workflow

### **SQLMesh Development Philosophy**

This module follows SQLMesh's philosophy of **separation of concerns**:

- **Development** : Use SQLMesh CLI for development and schema changes
- **Production** : Use SQLMesh CLI for promoting changes
- **Orchestration** : Use this Dagster module only for running models

### **Development Workflow**

#### **1. Local Development**

```bash
# Develop your models locally
sqlmesh plan dev
sqlmesh apply dev

# Test your changes
sqlmesh run dev
```

#### **2. Production Promotion**

```bash
# Promote changes to production
sqlmesh plan prod # ->manual operation to validate the plan (apply it)

# Or use CI/CD pipeline
```

#### **3. Dagster Orchestration**

```python
# Dagster takes over for production runs
# - Automatic scheduling via adaptive schedule
# - Manual runs via Dagster UI
# - Only executes: sqlmesh run prod
```

### **Module Responsibilities**

#### **What this module DOES:**

- ✅ **Orchestrates** `sqlmesh run` commands
- ✅ **Schedules** model execution
- ✅ **Monitors** execution and audits
- ✅ **Emits** Dagster events and metadata

#### **What this module DOES NOT:**

- ❌ **Plan changes** (`sqlmesh plan`)
- ❌ **Apply changes** (`sqlmesh apply`)
- ❌ **Handle breaking changes**
- ❌ **Manage environments**

### **Breaking Changes Management**

Breaking changes are handled **outside** this module:

- **Development** : `sqlmesh plan dev` + manual review
- **Production** : `sqlmesh plan prod` + CI/CD approval
- **Orchestration** : This module only runs approved models

### **Environment Separation**

```bash
# Development (SQLMesh CLI)
sqlmesh plan dev
sqlmesh apply dev
sqlmesh run dev

# Production (Dagster module)
# Automatically runs: sqlmesh run prod
# Based on schedules and triggers
```

This separation ensures:

- ✅ **Clear responsibilities** : Development vs Orchestration
- ✅ **Safe deployments** : Breaking changes handled by SQLMesh CLI
- ✅ **Reliable orchestration** : Dagster only runs approved models
- ✅ **CI/CD friendly** : Standard SQLMesh workflow for deployments

## Installation

```bash
pip install dg-sqlmesh
```

## Requirements

- Python 3.11+
- Dagster 1.11.4+
- SQLMesh 0.206.1+

## Limitations

- **Multiple SQLMesh runs** : Each asset triggers its own `sqlmesh run` (may impact performance with many assets)
- **No Dagster → SQLMesh backfill** : Partitions managed only by SQLMesh itself (run a materialization to backfill)
- **Breaking changes** : Handled outside the module (SQLMesh CLI or CI/CD)
- **Environment management** : SQLMesh CLI or CI/CD

## Troubleshooting

### Common Issues

#### **"Invalid cron" errors**

- **Cause** : Cron faster than 5 minutes
- **Solution** : Use `ignore_cron=True` for testing

#### **External asset mapping errors**

- **Cause** : Translator doesn't handle FQN format
- **Solution** : Check `get_external_asset_key` method

#### **Performance issues**

- **Cause** : Too many models loaded
- **Solution** : Use `concurrency_limit` and caching

## Contributing

1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests for new functionality
5. Ensure all tests pass
6. Submit a pull request

## License

Apache 2.0 License - see LICENSE file for details.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "dg-sqlmesh",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.13,>=3.11",
    "maintainer_email": null,
    "keywords": "dagster, sqlmesh, data-engineering, etl, data-pipeline",
    "author": null,
    "author_email": "Thomas Trividic <thomas.trividic@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/34/c6/3bcd0847cad993bf0b4bedffbb740d4f0941ae3e9638f837f93306a5cf7b/dg_sqlmesh-1.2.2.tar.gz",
    "platform": null,
    "description": "# SQLMesh-Dagster Integration\n\nThis module provides a complete integration between SQLMesh and Dagster, allowing SQLMesh models to be materialized as Dagster assets with support for audits, metadata, and adaptive scheduling.\n\n## Features\n\n### \ud83c\udfaf **SQLMesh Model to Dagster Asset Conversion**\n\n- **Individual asset control** : Each SQLMesh model becomes a separate Dagster asset with granular success/failure control\n- **Automatic materialization** : SQLMesh models are automatically converted to Dagster assets\n- **External assets support** : SQLMesh sources (external assets) are mapped to Dagster AssetKeys\n- **Automatic dependencies** : Dependencies between models are preserved in Dagster\n- **Partitioning** : Support for partitioned SQLMesh models (managed by SQLMesh, no integration with Dagster partitions - no Dagster \u2192 SQLMesh backfill)\n\n### \ud83d\udcca **SQLMesh Metadata Integration to Dagster**\n\n- **Complete metadata** : Cron, tags, kind, dialect, query, partitioned_by, clustered_by\n- **Code versioning** : Uses SQLMesh data_hash for Dagster versioning\n- **Column descriptions** : Table metadata with descriptions\n- **Customizable tags** : SQLMesh tags mapping to Dagster\n\n### \u2705 **SQLMesh Audits to Asset Checks Conversion**\n\n- **Automatic audits** : SQLMesh audits become Dagster AssetCheckSpec\n- **AssetCheckResult** : Automatic emission of audit results with proper output handling\n- **Audit metadata** : SQL query, arguments, dialect, blocking status\n- **Non-blocking** : Dagster checks are non-blocking (SQLMesh handles blocking)\n- **Fallback handling** : Graceful handling when no evaluation events are found\n\n### \u23f0 **Adaptive Scheduling**\n\n- **Automatic analysis** : Detection of the finest granularity from SQLMesh crons\n- **Adaptive schedule** : Automatic creation of a Dagster schedule based on crons\n- **Intelligent execution** : SQLMesh manages which models should be executed\n- **Monitoring** : Detailed logs and granularity metadata\n\n### \ud83d\udd27 **All-in-One Factory**\n\n- **Simple configuration** : Single function to configure everything\n- **Extensible translator** : Customizable translator system\n- **Automatic validation** : External dependencies validation\n- **Retry policy** : Centralized retry policy configuration\n\n## Basic Usage\n\n### **Simple Factory (Recommended)**\n\n```python\nfrom dagster import RetryPolicy, AssetKey, Backoff\nfrom dg_sqlmesh import sqlmesh_definitions_factory\nfrom dg_sqlmesh import SQLMeshTranslator\n\nclass SlingToSqlmeshTranslator(SQLMeshTranslator):\n    def get_external_asset_key(self, external_fqn: str) -> AssetKey:\n        \"\"\"\n        Custom mapping for external assets.\n        SQLMesh: 'jaffle_db.main.raw_source_customers' \u2192 Sling: ['target', 'main', 'raw_source_customers']\n        \"\"\"\n        parts = external_fqn.replace('\"', '').split('.')\n        if len(parts) >= 3:\n            catalog, schema, table = parts[0], parts[1], parts[2]\n            return AssetKey(['target', 'main', table])\n        return AssetKey(['external'] + parts[1:])\n\n# All-in-one factory: everything configured in one line!\ndefs = sqlmesh_definitions_factory(\n    project_dir=\"sqlmesh_project\",\n    gateway=\"postgres\",\n    translator=SlingToSqlmeshTranslator(),\n    concurrency_limit=1,\n    name=\"sqlmesh_multi_asset\",\n    group_name=\"sqlmesh\",\n    op_tags={\"team\": \"data\", \"env\": \"prod\"},\n    retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),\n    enable_schedule=True,  # Enable adaptive scheduling\n)\n```\n\n### **Advanced Configuration**\n\n```python\nfrom dagster import Definitions, RetryPolicy\nfrom dg_sqlmesh import sqlmesh_assets_factory, sqlmesh_adaptive_schedule_factory\nfrom dg_sqlmesh import SQLMeshResource\nfrom dg_sqlmesh import SQLMeshTranslator\n\n# SQLMesh resource configuration\nsqlmesh_resource = SQLMeshResource(\n    project_dir=\"sqlmesh_project\",\n    gateway=\"postgres\",\n    translator=SlingToSqlmeshTranslator(),\n    concurrency_limit=1,\n    ignore_cron=True  # only for testing purposes\n)\n\n# SQLMesh assets configuration\nsqlmesh_assets = sqlmesh_assets_factory(\n    sqlmesh_resource=sqlmesh_resource,\n    name=\"sqlmesh_multi_asset\",\n    group_name=\"sqlmesh\",\n    op_tags={\"team\": \"data\", \"env\": \"prod\"},\n    retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),\n)\n\n# Adaptive schedule and job created automatically\nsqlmesh_adaptive_schedule, sqlmesh_job, _ = sqlmesh_adaptive_schedule_factory(\n    sqlmesh_resource=sqlmesh_resource\n)\n\ndefs = Definitions(\n    assets=[sqlmesh_assets],\n    jobs=[sqlmesh_job],\n    schedules=[sqlmesh_adaptive_schedule],\n    resources={\n        \"sqlmesh\": sqlmesh_resource,\n    },\n)\n```\n\n## Custom Translator\n\nTo map external assets (SQLMesh sources) to your Dagster conventions, you can create a custom translator:\n\n```python\nfrom dg_sqlmesh import SQLMeshTranslator\nimport dagster as dg\n\nclass MyCustomTranslator(SQLMeshTranslator):\n    def get_external_asset_key(self, external_fqn: str) -> dg.AssetKey:\n        \"\"\"\n        Custom mapping for external assets.\n        Example: 'jaffle_db.main.raw_source_customers' \u2192 ['target', 'main', 'raw_source_customers']\n        \"\"\"\n        parts = external_fqn.replace('\"', '').split('.')\n        # We ignore the catalog (jaffle_db), we take the rest\n        return dg.AssetKey(['target'] + parts[1:])\n\n    def get_group_name(self, context, model) -> str:\n        \"\"\"\n        Custom mapping for groups.\n        \"\"\"\n        model_name = getattr(model, \"view_name\", \"\")\n        if model_name.startswith(\"stg_\"):\n            return \"staging\"\n        elif model_name.startswith(\"mart_\"):\n            return \"marts\"\n        return super().get_group_name(context, model)\n```\n\n## Translator Methods\n\nThe `SQLMeshTranslator` exposes several methods you can override:\n\n### `get_external_asset_key(external_fqn: str) -> AssetKey`\n\nMaps an external asset FQN to a Dagster AssetKey.\n\n### `get_asset_key(model) -> AssetKey`\n\nMaps a SQLMesh model to a Dagster AssetKey.\n\n### `get_group_name(context, model) -> str`\n\nDetermines the group for a model.\n\n### `get_tags(context, model) -> dict`\n\nGenerates tags for a model.\n\n### `get_metadata(model, keys: list[str]) -> dict`\n\nExtracts specified metadata from the model.\n\n## Asset Checks and Audits\n\n### **Automatic Audit Conversion**\n\nSQLMesh audits are automatically converted to Dagster AssetCheckSpec:\n\n```python\n# SQLMesh audit\nMODEL (\n    name customers,\n    audits (\n        not_null(column=id),\n        unique_values(columns=[id, email])\n    )\n);\n\n# Automatically becomes in Dagster\nAssetCheckSpec(\n    name=\"not_null\",\n    asset=AssetKey([\"customers\"]),\n    blocking=False,  # SQLMesh handles blocking\n    description=\"SQLMesh audit: not_null(column=id)\"\n)\n```\n\n### **AssetCheckResult Emission**\n\nDuring execution, audit results are emitted as AssetCheckResult:\n\n```python\nAssetCheckResult(\n    passed=True,\n    asset_key=AssetKey([\"customers\"]),\n    check_name=\"not_null\",\n    metadata={\n        \"sqlmesh_model_name\": \"customers\",\n        \"audit_query\": \"SELECT COUNT(*) FROM customers WHERE id IS NULL\",\n        \"audit_blocking\": False,\n        \"audit_dialect\": \"postgres\",\n        \"audit_args\": {\"column\": \"id\"}\n    }\n)\n```\n\n## Adaptive Scheduling\n\n### **Automatic Cron Analysis**\n\nThe system automatically analyzes all SQLMesh crons and determines the finest granularity:\n\n```python\n# If you have models with different crons:\n# - customers: @daily\n# - orders: @hourly\n# - events: */5 * * * * (every 5 minutes)\n\n# The adaptive schedule will be: */5 * * * * (every 5 minutes)\n```\n\n### **Intelligent Execution**\n\nThe schedule runs `sqlmesh run` on all models, but SQLMesh automatically manages which models should be executed:\n\n```python\n# The schedule simply does:\nsqlmesh_resource.context.run(\n    ignore_cron=False,  # SQLMesh respects crons\n    execution_time=datetime.datetime.now(),\n)\n```\n\n## Architecture\n\n### **Individual Asset Pattern**\n\nEach SQLMesh model becomes a separate Dagster asset that:\n\n- **Materializes independently** : Each asset calls `sqlmesh.materialize_assets_threaded()` for its specific model\n- **Controls success/failure** : Each asset can succeed or fail individually based on SQLMesh execution results\n- **Handles dependencies** : Uses `translator.get_model_deps_with_external()` for proper dependency mapping\n- **Manages checks** : Each asset handles its own audit results with `AssetCheckResult` outputs\n\n### **Benefits of Individual Assets**\n\n- **Granular control** : Each asset can succeed or fail independently in the Dagster UI\n- **Clear visibility** : See exactly which models are running, succeeded, or failed\n- **Individual retries** : Failed assets can be retried without affecting others\n- **Better monitoring** : Track performance and issues per model\n- **Flexible scheduling** : Different assets can have different schedules if needed\n\n### **SQLMeshResource**\n\n- Manages SQLMesh context and caching\n- Implements strict singleton pattern\n- Uses AnyIO for multithreading\n- Accepts a custom translator\n\n### **SQLMeshTranslator**\n\n- Maps SQLMesh concepts to Dagster\n- Extensible via inheritance\n- Handles external assets and dependencies\n\n### **SQLMesh Metadata via Tags**\n\nYou can pass metadata from SQLMesh models to Dagster assets using the tag convention `dagster:property:value`:\n\n```sql\n-- In your SQLMesh model\nMODEL (\n    name customers,\n    tags ARRAY[\"dagster:group_name:sqlmesh_datamarts\"],\n    -- ... other model properties\n);\n```\n\n#### **Supported Properties**\n\nCurrently supported Dagster properties via tags:\n\n- **`dagster:group_name:value`** : Sets the Dagster asset group name\n  - Example: `\"dagster:group_name:sqlmesh_datamarts\"`\n  - Result: Asset will be in the \"sqlmesh_datamarts\" group\n\n#### **Tag Convention**\n\nThe convention follows the pattern: `dagster:property:value`\n\n- **`dagster`** : Prefix to indicate this is for Dagster\n- **`property`** : The Dagster property to update on the asset\n- **`value`** : The value to set for that property\n\n#### **Priority Order**\n\nWhen determining asset properties, the translator follows this priority:\n\n1. **SQLMesh tags** : `dagster:group_name:value` (highest priority)\n2. **Factory parameter** : `group_name=\"sqlmesh\"` in factory call\n3. **Default logic** : Automatic group determination based on model path\n\n### **sqlmesh_definitions_factory**\n\n- All-in-one factory for simple configuration\n- Automatically creates: resource, assets, job, schedule\n- Validates external dependencies\n- Returns Definitions directly\n\n### **SQLMeshEventCaptureConsole**\n\n- Custom SQLMesh console to capture events\n- Captures audit results for AssetCheckResult\n- Handles metadata serialization\n\n## Plan + Run Architecture\n\n### **Individual Asset Materialization**\n\nEach Dagster asset materializes its specific SQLMesh model using:\n\n1. **Model Selection** : `get_models_to_materialize()` selects the specific model for the asset\n2. **Materialization** : `sqlmesh.materialize_assets_threaded()` executes the model\n3. **Result Handling** : Console events determine success/failure and audit results\n\n### **Implementation Details**\n\n```python\n# In each individual asset\ndef model_asset(context: AssetExecutionContext, sqlmesh: SQLMeshResource):\n    # Materialize this specific model\n    models_to_materialize = get_models_to_materialize(\n        [current_asset_spec.key],\n        sqlmesh.get_models,\n        sqlmesh.translator,\n    )\n\n    # Execute materialization\n    plan = sqlmesh.materialize_assets_threaded(models_to_materialize, context=context)\n\n    # Check results via console events\n    failed_models_events = sqlmesh._console.get_failed_models_events()\n    evaluation_events = sqlmesh._console.get_evaluation_events()\n\n    # Return MaterializeResult + AssetCheckResult for audits\n    return MaterializeResult(...), *check_results\n```\n\nThis approach provides granular control while maintaining all SQLMesh integration features.\n\n## Performance\n\n- **Individual execution** : Each asset runs its own SQLMesh materialization (may result in multiple `sqlmesh run` calls)\n- **Strict singleton** : Only one active SQLMesh instance\n- **Caching** : Contexts, models and translators are cached\n- **Multithreading** : Uses AnyIO to avoid Dagster blocking\n- **Lazy loading** : Resources are loaded on demand\n- **Early validation** : External dependencies validation before execution\n- **Optimized execution** : SQLMesh automatically skips models that don't need materialization\n\n## Development Workflow\n\n### **SQLMesh Development Philosophy**\n\nThis module follows SQLMesh's philosophy of **separation of concerns**:\n\n- **Development** : Use SQLMesh CLI for development and schema changes\n- **Production** : Use SQLMesh CLI for promoting changes\n- **Orchestration** : Use this Dagster module only for running models\n\n### **Development Workflow**\n\n#### **1. Local Development**\n\n```bash\n# Develop your models locally\nsqlmesh plan dev\nsqlmesh apply dev\n\n# Test your changes\nsqlmesh run dev\n```\n\n#### **2. Production Promotion**\n\n```bash\n# Promote changes to production\nsqlmesh plan prod # ->manual operation to validate the plan (apply it)\n\n# Or use CI/CD pipeline\n```\n\n#### **3. Dagster Orchestration**\n\n```python\n# Dagster takes over for production runs\n# - Automatic scheduling via adaptive schedule\n# - Manual runs via Dagster UI\n# - Only executes: sqlmesh run prod\n```\n\n### **Module Responsibilities**\n\n#### **What this module DOES:**\n\n- \u2705 **Orchestrates** `sqlmesh run` commands\n- \u2705 **Schedules** model execution\n- \u2705 **Monitors** execution and audits\n- \u2705 **Emits** Dagster events and metadata\n\n#### **What this module DOES NOT:**\n\n- \u274c **Plan changes** (`sqlmesh plan`)\n- \u274c **Apply changes** (`sqlmesh apply`)\n- \u274c **Handle breaking changes**\n- \u274c **Manage environments**\n\n### **Breaking Changes Management**\n\nBreaking changes are handled **outside** this module:\n\n- **Development** : `sqlmesh plan dev` + manual review\n- **Production** : `sqlmesh plan prod` + CI/CD approval\n- **Orchestration** : This module only runs approved models\n\n### **Environment Separation**\n\n```bash\n# Development (SQLMesh CLI)\nsqlmesh plan dev\nsqlmesh apply dev\nsqlmesh run dev\n\n# Production (Dagster module)\n# Automatically runs: sqlmesh run prod\n# Based on schedules and triggers\n```\n\nThis separation ensures:\n\n- \u2705 **Clear responsibilities** : Development vs Orchestration\n- \u2705 **Safe deployments** : Breaking changes handled by SQLMesh CLI\n- \u2705 **Reliable orchestration** : Dagster only runs approved models\n- \u2705 **CI/CD friendly** : Standard SQLMesh workflow for deployments\n\n## Installation\n\n```bash\npip install dg-sqlmesh\n```\n\n## Requirements\n\n- Python 3.11+\n- Dagster 1.11.4+\n- SQLMesh 0.206.1+\n\n## Limitations\n\n- **Multiple SQLMesh runs** : Each asset triggers its own `sqlmesh run` (may impact performance with many assets)\n- **No Dagster \u2192 SQLMesh backfill** : Partitions managed only by SQLMesh itself (run a materialization to backfill)\n- **Breaking changes** : Handled outside the module (SQLMesh CLI or CI/CD)\n- **Environment management** : SQLMesh CLI or CI/CD\n\n## Troubleshooting\n\n### Common Issues\n\n#### **\"Invalid cron\" errors**\n\n- **Cause** : Cron faster than 5 minutes\n- **Solution** : Use `ignore_cron=True` for testing\n\n#### **External asset mapping errors**\n\n- **Cause** : Translator doesn't handle FQN format\n- **Solution** : Check `get_external_asset_key` method\n\n#### **Performance issues**\n\n- **Cause** : Too many models loaded\n- **Solution** : Use `concurrency_limit` and caching\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Make your changes\n4. Add tests for new functionality\n5. Ensure all tests pass\n6. Submit a pull request\n\n## License\n\nApache 2.0 License - see LICENSE file for details.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Seamless integration between Dagster and SQLMesh for modern data engineering workflows",
    "version": "1.2.2",
    "project_urls": {
        "Documentation": "https://github.com/fosk06/dagster-sqlmesh#readme",
        "Homepage": "https://github.com/fosk06/dagster-sqlmesh",
        "Issues": "https://github.com/fosk06/dagster-sqlmesh/issues",
        "Repository": "https://github.com/fosk06/dagster-sqlmesh"
    },
    "split_keywords": [
        "dagster",
        " sqlmesh",
        " data-engineering",
        " etl",
        " data-pipeline"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f6ec05e0360783ceac1b11f731fdc548e9e8e5b575108aa3c9dc6ba3f409719b",
                "md5": "3bb04076404d4cf2c0a771f2d47767b8",
                "sha256": "d90967a249a0c3bd7615707c61f651528e1e374cc81de165f131b254fdb34642"
            },
            "downloads": -1,
            "filename": "dg_sqlmesh-1.2.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3bb04076404d4cf2c0a771f2d47767b8",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.13,>=3.11",
            "size": 36350,
            "upload_time": "2025-08-06T11:31:59",
            "upload_time_iso_8601": "2025-08-06T11:31:59.881736Z",
            "url": "https://files.pythonhosted.org/packages/f6/ec/05e0360783ceac1b11f731fdc548e9e8e5b575108aa3c9dc6ba3f409719b/dg_sqlmesh-1.2.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "34c63bcd0847cad993bf0b4bedffbb740d4f0941ae3e9638f837f93306a5cf7b",
                "md5": "9d300a5f530e236cb27866d3a768f041",
                "sha256": "df20c9ff26b73430e768c6b92e09bf887798d0f2d0ab736406d6ef7393442635"
            },
            "downloads": -1,
            "filename": "dg_sqlmesh-1.2.2.tar.gz",
            "has_sig": false,
            "md5_digest": "9d300a5f530e236cb27866d3a768f041",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<3.13,>=3.11",
            "size": 38491,
            "upload_time": "2025-08-06T11:32:01",
            "upload_time_iso_8601": "2025-08-06T11:32:01.232252Z",
            "url": "https://files.pythonhosted.org/packages/34/c6/3bcd0847cad993bf0b4bedffbb740d4f0941ae3e9638f837f93306a5cf7b/dg_sqlmesh-1.2.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-06 11:32:01",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "fosk06",
    "github_project": "dagster-sqlmesh#readme",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "dg-sqlmesh"
}
        
Elapsed time: 0.83632s