airflow-blueprint


Nameairflow-blueprint JSON
Version 0.0.1a2 PyPI version JSON
download
home_pageNone
SummaryCreate reusable Airflow DAG templates with validated configurations
upload_time2025-07-27 21:52:38
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseMIT
keywords airflow automation blueprints dags templates
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Blueprint

**⚠️ NOTE: This is currently an alpha project and may change significantly.**

Build reusable, validated Airflow DAG templates that anyone on your team can discover and use.

## What is Blueprint?

Blueprint helps data platform teams define reusable, parameterized DAG templates for Apache Airflow. These templates can be safely configured by other team members, like data analysts or less-experienced engineers, using simple YAML files.

With Blueprint, you can:

- ✅ Enforce **type-safe parameters** with validation
- 🚫 Get **clear error messages** when configs are invalid
- 🛠️ Use a **CLI** to validate configs before deployment
- 🔍 Automatically **discover available templates** and **generate new DAGs** from them

## Why Blueprint?

In most data teams, the same kind of DAG is built over and over with small variations. This usually means lots of copy-pasting and hard-to-maintain code. Blueprint solves this by letting you:

- **Create once, use everywhere** – Write a DAG pattern once as a template
- **Reduce errors** – Validate configurations before deployment
- **Build guardrails** – Enforce your standards and best practices
- **Help non-engineers** – Let others safely define DAGs without touching Python

## Example Workflow

### 1. Create a Blueprint template

Save this in `.astro/templates/etl_blueprints.py`:

```python
from blueprint import Blueprint, BaseModel, Field
from airflow import DAG

class DailyETLConfig(BaseModel):
    job_id: str = Field(description="Unique identifier for this job")
    source_table: str = Field(description="Table to read data from")
    target_table: str = Field(description="Table to write processed data to")
    schedule: str = Field(default="@daily", description="Cron expression or Airflow preset")
    retries: int = Field(default=2, description="Number of retry attempts on task failure")

class DailyETL(Blueprint[DailyETLConfig]):
    """Daily ETL job that moves data between tables with configurable scheduling."""

    # Name is auto-generated as "daily_etl" from class name
    # Or specify explicitly:
    # name = "daily_etl_job"

    def render(self, config: DailyETLConfig) -> DAG:
        from airflow.operators.python import PythonOperator
        from datetime import datetime

        with DAG(
            dag_id=config.job_id,
            schedule=config.schedule,
            start_date=datetime(2024, 1, 1),
            catchup=False,
            default_args={"retries": config.retries}
        ) as dag:
            PythonOperator(
                task_id="extract_transform_load",
                python_callable=lambda: print(
                    f"Moving data from {config.source_table} to {config.target_table}"
                )
            )
        return dag
```

### 2. Create a YAML config

Save this as `dags/configs/customer_etl.dag.yaml`:

```yaml
blueprint: daily_etl  # Auto-generated from class name DailyETL
job_id: customer-daily-sync
source_table: raw.customers
target_table: analytics.dim_customers
schedule: "@hourly"
retries: 4
```

### 3. Validate your config

```bash
$ blueprint lint
  customer_etl.dag.yaml - Valid
```

🎉 **Done!** Blueprint builds your DAG with ID `customer_etl`.

## Python API

Blueprint templates can also be consumed directly in Python, providing full type safety and IDE support:

```python
from etl_blueprints import DailyETL

# Create DAG with keyword arguments
dag = DailyETL.build(
    job_id="customer-daily-sync",
    source_table="raw.customers",
    target_table="analytics.dim_customers",
    schedule="@hourly",
    retries=4
)
```

### Benefits of Python API

- **Full IDE support** - Autocomplete, type checking, and inline documentation
- **Runtime validation** - Catch configuration errors before deployment
- **Dynamic DAG generation** - Create multiple DAGs programmatically
- **Testing support** - Easy unit testing of your DAG configurations

### Dynamic DAG Generation

The Python API shines when you need to create DAGs dynamically based on external configuration:

```python
from etl_blueprints import DailyETL
import json

# Load table configurations from external source
with open('etl_config.json') as f:
    table_configs = json.load(f)

# Generate a DAG for each table with custom logic
for config in table_configs:
    schedule = "@hourly" if config["priority"] == "high" else "@daily"
    retries = 5 if config["is_critical"] else 2

    dag = DailyETL.build(
        job_id=f"{config['name']}-etl",
        source_table=config["source"],
        target_table=config["target"],
        schedule=schedule,
        retries=retries
    )
```

### Creating Conditional DAGs

```python
from etl_blueprints import DailyETL
import os

# Only create production DAGs in production environment
if os.getenv("AIRFLOW_ENV") == "production":
    critical_tables = ["users", "transactions", "orders"]

    for table in critical_tables:
        dag = DailyETL.build(
            job_id=f"prod-{table}-sync",
            source_table=f"raw.{table}",
            target_table=f"warehouse.{table}",
            schedule="@hourly",
            retries=5
        )
```

### Python Example

```python
from blueprint import Blueprint, BaseModel, Field, field_validator
from airflow import DAG
from datetime import datetime

class DailyETLConfig(BaseModel):
    job_id: str = Field(pattern=r'^[a-zA-Z0-9_-]+$', description="Unique job identifier")
    source_table: str = Field(description="Source table name")
    target_table: str = Field(description="Target table name")
    schedule: str = Field(default="@daily", description="Cron or preset schedule")
    retries: int = Field(default=2, ge=0, le=5, description="Number of retries")

    @field_validator('schedule')
    def validate_schedule(cls, v):
        valid_presets = ['@once', '@hourly', '@daily', '@weekly', '@monthly', '@yearly']
        if not (v in valid_presets or v.startswith('0 ') or v.count(' ') == 4):
            raise ValueError(f'Invalid schedule: {v}')
        return v

class DailyETL(Blueprint[DailyETLConfig]):
    """Daily ETL job that moves data between tables."""

    def render(self, config: DailyETLConfig) -> DAG:
        with DAG(
            dag_id=config.job_id,
            schedule=config.schedule,
            start_date=datetime(2024, 1, 1),
            catchup=False,
            default_args={"retries": config.retries}
        ) as dag:
            # Define your tasks here
            pass
        return dag
```

### Loading from YAML in Python

You can also load YAML configs in Python code:

```python
from blueprint import from_yaml

# Load existing YAML config
dag = from_yaml("configs/customer_etl.dag.yaml")

# Or with runtime overrides
dag = from_yaml("configs/customer_etl.dag.yaml", overrides={
    "retries": 5,
    "schedule": "@hourly"
})
```

### Testing Blueprints

```python
import pytest
from etl_blueprints import DailyETL

def test_daily_etl_config():
    # Test valid configuration
    dag = DailyETL.build(
        job_id="test-etl",
        source_table="test.source",
        target_table="test.target"
    )
    assert dag.dag_id == "test-etl"
    assert dag.schedule_interval == "@daily"

    # Test validation errors
    with pytest.raises(ValueError, match="Invalid schedule"):
        DailyETL.build(
            job_id="test-etl",
            source_table="test.source",
            target_table="test.target",
            schedule="invalid"
        )
```

## Type Safety and Validation

Blueprint uses Pydantic under the hood for robust validation with helpful error messages. This gives you:

- **Type coercion** - Automatically converts compatible types (e.g., string "5" to integer 5)
- **Field validation** - Set constraints like min/max values, regex patterns, etc.
- **Custom validators** - Add your own validation logic for complex rules
- **Clear error messages** - Know exactly what went wrong and how to fix it

When validation fails, you get clear feedback:

```bash
$ blueprint lint
✗ customer_etl.dag.yaml
  ValidationError: 3 validation errors for DailyETLConfig

  job_id
    String does not match pattern '^[a-zA-Z0-9_-]+$' (type=value_error.str.regex)
    Given: "customer sync!" (contains spaces)

  retries
    ensure this value is less than or equal to 5 (type=value_error.number.not_le)
    Given: 10

  schedule
    Invalid schedule format (type=value_error)
    Given: "every hour" (use "@hourly" or valid cron expression)
```

### Field Validation Examples

```python
from blueprint import BaseModel, Field, field_validator

class ETLConfig(BaseModel):
    # Basic constraints
    job_id: str = Field(pattern=r'^[a-zA-Z0-9_-]+$')
    retries: int = Field(ge=0, le=5)
    timeout_minutes: int = Field(gt=0, le=1440)  # 1-1440 minutes

    # Custom validation
    @field_validator('schedule')
    def validate_schedule(cls, v):
        valid_presets = ['@once', '@hourly', '@daily', '@weekly', '@monthly']
        if v not in valid_presets and not cls._is_valid_cron(v):
            raise ValueError(f'Must be a preset ({", ".join(valid_presets)}) or valid cron')
        return v
```

## More Examples

### Complex Parameters

Blueprints support nested objects and lists:

```python
from blueprint import BaseModel, Field
from typing import Optional, List

class SourceConfig(BaseModel):
    database: str = Field(description="Database connection name")
    table: str = Field(description="Table to extract data from")

class NotificationConfig(BaseModel):
    email: Optional[str] = Field(default=None, description="Email for notifications")
    slack: Optional[str] = Field(default=None, description="Slack channel (#data-alerts)")

class MultiSourceConfig(BaseModel):
    sources: List[SourceConfig] = Field(description="List of data sources")
    notifications: NotificationConfig = Field(default_factory=NotificationConfig)

class MultiSourceETL(Blueprint[MultiSourceConfig]):
    """ETL pipeline that processes multiple data sources in parallel."""

    def render(self, config: MultiSourceConfig) -> DAG:
        # Access nested data with type safety
        for source in config.sources:
            print(f"Processing {source.table} from {source.database}")
```

```yaml
blueprint: multi_source_etl
sources:
  - database: postgres
    table: users
  - database: mysql
    table: orders
notifications:
  email: data-team@company.com
  slack: "#data-alerts"
```

### Blueprint Inheritance

Use standard Python inheritance to share common parameters:

```python
class BaseETLConfig(BaseModel):
    owner: str = Field(default="data-team", description="Team responsible for DAG")
    retries: int = Field(default=2, ge=0, le=5, description="Number of retries")
    email_on_failure: str = Field(default="alerts@company.com", description="Alert email")

class S3ImportConfig(BaseETLConfig):
    bucket: str = Field(description="S3 bucket name")
    prefix: str = Field(description="S3 key prefix")

class BaseETL(Blueprint[BaseETLConfig]):
    """Base blueprint with common ETL parameters."""

    def get_default_args(self, config: BaseETLConfig):
        return {
            "owner": config.owner,
            "retries": config.retries,
            "email_on_failure": [config.email_on_failure]
        }

class S3Import(Blueprint[S3ImportConfig]):
    """Import data from S3."""

    def render(self, config: S3ImportConfig) -> DAG:
        # Has access to all BaseETLConfig fields plus S3-specific ones
        default_args = self.get_default_args(config)
        # ... create DAG with S3 operators
```

## Installation

```bash
pip install airflow-blueprint
```

## Configuration

Blueprint looks for templates in `.astro/templates/` by default. Override with:

```bash
export BLUEPRINT_TEMPLATES_DIR=/path/to/templates
```

## CLI Commands

```bash
# Validate all configs
blueprint lint

# Validate specific config
blueprint lint dags/configs/my_job.dag.yaml

# List available blueprints
blueprint list

# Show blueprint parameters
blueprint describe daily_etl

# Interactive scaffolding (primary interface)
blueprint new
# Prompts for: DAG name, blueprint selection, parameters

# Direct scaffolding with specific blueprint
blueprint new daily_etl my_new_etl

# Scaffold to specific path
blueprint new dags/configs/prod/my_new_etl.dag.yaml daily_etl

# Quick mode with parameter overrides
blueprint new daily_etl my_new_etl --set job_id=customer-sync --set retries=3
```

## Error Messages

Blueprint provides clear, actionable error messages:

```bash
$ blueprint lint
✗ marketing_etl.dag.yaml
  Line 3: Missing required parameter 'source_table' for blueprint 'daily_etl'

  Your configuration:
    2 | blueprint: daily_etl
    3 | job_id: "marketing-sync"
    4 | target_table: "analytics.marketing_facts"

  Add the missing parameter:
    3 | job_id: "marketing-sync"
  + 4 | source_table: "raw.marketing_events"
    5 | target_table: "analytics.marketing_facts"
```

## Best Practices

1. **Keep blueprints focused** - Each blueprint should represent one type of workflow
2. **Use descriptive parameter names** - `source_table` is clearer than `src`
3. **Always add parameter descriptions** - Use `Annotated[type, "description"]` for all parameters
4. **Document your blueprints** - Add docstrings to blueprint classes explaining their purpose
5. **Provide defaults wisely** - Common values as defaults, critical values as required
6. **Validate in CI** - Add `blueprint lint` to your CI pipeline

## How is this different from DAG Factory?

[DAG Factory](https://github.com/astronomer/dag-factory) gives full control of Airflow via YAML.

Blueprint hides that complexity behind safe, pre-built templates with validation.

### DAG Factory

```yaml
my_dag:
  default_args:
    owner: 'data-team'
    retries: 2
    retry_delay_seconds: 300
  start_date: 2024-01-01
  schedule_interval: '@daily'
  tasks:
    extract_data:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: extract_from_api
      python_callable_file: /opt/airflow/dags/etl/extract.py
    transform_data:
      operator: airflow.operators.python.PythonOperator
      dependencies: [extract_data]
      # ... many more Airflow-specific configurations
```

### Blueprint

```yaml
blueprint: daily_etl
job_id: customer-sync
source_table: raw.customers
target_table: analytics.dim_customers
schedule: "@hourly"
```

Or in Python:

```python
dag = DailyETL.build(
    job_id="customer-sync",
    source_table="raw.customers",
    target_table="analytics.dim_customers",
    schedule="@hourly"
)
```

**Use DAG Factory if:** You need full Airflow flexibility and your users understand Airflow concepts

**Use Blueprint if:** You want standardized, validated patterns with type safety for teams

## Contributing

We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.

## License

Apache 2.0

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "airflow-blueprint",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "Astronomer <humans@astronomer.io>",
    "keywords": "airflow, automation, blueprints, dags, templates",
    "author": null,
    "author_email": "Astronomer <humans@astronomer.io>",
    "download_url": "https://files.pythonhosted.org/packages/17/b8/ff90948a4e9983b3f0d0351170fd2754e93ae1fb9eff42b1ff098f2dc49a/airflow_blueprint-0.0.1a2.tar.gz",
    "platform": null,
    "description": "# Blueprint\n\n**\u26a0\ufe0f NOTE: This is currently an alpha project and may change significantly.**\n\nBuild reusable, validated Airflow DAG templates that anyone on your team can discover and use.\n\n## What is Blueprint?\n\nBlueprint helps data platform teams define reusable, parameterized DAG templates for Apache Airflow. These templates can be safely configured by other team members, like data analysts or less-experienced engineers, using simple YAML files.\n\nWith Blueprint, you can:\n\n- \u2705 Enforce **type-safe parameters** with validation\n- \ud83d\udeab Get **clear error messages** when configs are invalid\n- \ud83d\udee0\ufe0f Use a **CLI** to validate configs before deployment\n- \ud83d\udd0d Automatically **discover available templates** and **generate new DAGs** from them\n\n## Why Blueprint?\n\nIn most data teams, the same kind of DAG is built over and over with small variations. This usually means lots of copy-pasting and hard-to-maintain code. Blueprint solves this by letting you:\n\n- **Create once, use everywhere** \u2013 Write a DAG pattern once as a template\n- **Reduce errors** \u2013 Validate configurations before deployment\n- **Build guardrails** \u2013 Enforce your standards and best practices\n- **Help non-engineers** \u2013 Let others safely define DAGs without touching Python\n\n## Example Workflow\n\n### 1. Create a Blueprint template\n\nSave this in `.astro/templates/etl_blueprints.py`:\n\n```python\nfrom blueprint import Blueprint, BaseModel, Field\nfrom airflow import DAG\n\nclass DailyETLConfig(BaseModel):\n    job_id: str = Field(description=\"Unique identifier for this job\")\n    source_table: str = Field(description=\"Table to read data from\")\n    target_table: str = Field(description=\"Table to write processed data to\")\n    schedule: str = Field(default=\"@daily\", description=\"Cron expression or Airflow preset\")\n    retries: int = Field(default=2, description=\"Number of retry attempts on task failure\")\n\nclass DailyETL(Blueprint[DailyETLConfig]):\n    \"\"\"Daily ETL job that moves data between tables with configurable scheduling.\"\"\"\n\n    # Name is auto-generated as \"daily_etl\" from class name\n    # Or specify explicitly:\n    # name = \"daily_etl_job\"\n\n    def render(self, config: DailyETLConfig) -> DAG:\n        from airflow.operators.python import PythonOperator\n        from datetime import datetime\n\n        with DAG(\n            dag_id=config.job_id,\n            schedule=config.schedule,\n            start_date=datetime(2024, 1, 1),\n            catchup=False,\n            default_args={\"retries\": config.retries}\n        ) as dag:\n            PythonOperator(\n                task_id=\"extract_transform_load\",\n                python_callable=lambda: print(\n                    f\"Moving data from {config.source_table} to {config.target_table}\"\n                )\n            )\n        return dag\n```\n\n### 2. Create a YAML config\n\nSave this as `dags/configs/customer_etl.dag.yaml`:\n\n```yaml\nblueprint: daily_etl  # Auto-generated from class name DailyETL\njob_id: customer-daily-sync\nsource_table: raw.customers\ntarget_table: analytics.dim_customers\nschedule: \"@hourly\"\nretries: 4\n```\n\n### 3. Validate your config\n\n```bash\n$ blueprint lint\n  customer_etl.dag.yaml - Valid\n```\n\n\ud83c\udf89 **Done!** Blueprint builds your DAG with ID `customer_etl`.\n\n## Python API\n\nBlueprint templates can also be consumed directly in Python, providing full type safety and IDE support:\n\n```python\nfrom etl_blueprints import DailyETL\n\n# Create DAG with keyword arguments\ndag = DailyETL.build(\n    job_id=\"customer-daily-sync\",\n    source_table=\"raw.customers\",\n    target_table=\"analytics.dim_customers\",\n    schedule=\"@hourly\",\n    retries=4\n)\n```\n\n### Benefits of Python API\n\n- **Full IDE support** - Autocomplete, type checking, and inline documentation\n- **Runtime validation** - Catch configuration errors before deployment\n- **Dynamic DAG generation** - Create multiple DAGs programmatically\n- **Testing support** - Easy unit testing of your DAG configurations\n\n### Dynamic DAG Generation\n\nThe Python API shines when you need to create DAGs dynamically based on external configuration:\n\n```python\nfrom etl_blueprints import DailyETL\nimport json\n\n# Load table configurations from external source\nwith open('etl_config.json') as f:\n    table_configs = json.load(f)\n\n# Generate a DAG for each table with custom logic\nfor config in table_configs:\n    schedule = \"@hourly\" if config[\"priority\"] == \"high\" else \"@daily\"\n    retries = 5 if config[\"is_critical\"] else 2\n\n    dag = DailyETL.build(\n        job_id=f\"{config['name']}-etl\",\n        source_table=config[\"source\"],\n        target_table=config[\"target\"],\n        schedule=schedule,\n        retries=retries\n    )\n```\n\n### Creating Conditional DAGs\n\n```python\nfrom etl_blueprints import DailyETL\nimport os\n\n# Only create production DAGs in production environment\nif os.getenv(\"AIRFLOW_ENV\") == \"production\":\n    critical_tables = [\"users\", \"transactions\", \"orders\"]\n\n    for table in critical_tables:\n        dag = DailyETL.build(\n            job_id=f\"prod-{table}-sync\",\n            source_table=f\"raw.{table}\",\n            target_table=f\"warehouse.{table}\",\n            schedule=\"@hourly\",\n            retries=5\n        )\n```\n\n### Python Example\n\n```python\nfrom blueprint import Blueprint, BaseModel, Field, field_validator\nfrom airflow import DAG\nfrom datetime import datetime\n\nclass DailyETLConfig(BaseModel):\n    job_id: str = Field(pattern=r'^[a-zA-Z0-9_-]+$', description=\"Unique job identifier\")\n    source_table: str = Field(description=\"Source table name\")\n    target_table: str = Field(description=\"Target table name\")\n    schedule: str = Field(default=\"@daily\", description=\"Cron or preset schedule\")\n    retries: int = Field(default=2, ge=0, le=5, description=\"Number of retries\")\n\n    @field_validator('schedule')\n    def validate_schedule(cls, v):\n        valid_presets = ['@once', '@hourly', '@daily', '@weekly', '@monthly', '@yearly']\n        if not (v in valid_presets or v.startswith('0 ') or v.count(' ') == 4):\n            raise ValueError(f'Invalid schedule: {v}')\n        return v\n\nclass DailyETL(Blueprint[DailyETLConfig]):\n    \"\"\"Daily ETL job that moves data between tables.\"\"\"\n\n    def render(self, config: DailyETLConfig) -> DAG:\n        with DAG(\n            dag_id=config.job_id,\n            schedule=config.schedule,\n            start_date=datetime(2024, 1, 1),\n            catchup=False,\n            default_args={\"retries\": config.retries}\n        ) as dag:\n            # Define your tasks here\n            pass\n        return dag\n```\n\n### Loading from YAML in Python\n\nYou can also load YAML configs in Python code:\n\n```python\nfrom blueprint import from_yaml\n\n# Load existing YAML config\ndag = from_yaml(\"configs/customer_etl.dag.yaml\")\n\n# Or with runtime overrides\ndag = from_yaml(\"configs/customer_etl.dag.yaml\", overrides={\n    \"retries\": 5,\n    \"schedule\": \"@hourly\"\n})\n```\n\n### Testing Blueprints\n\n```python\nimport pytest\nfrom etl_blueprints import DailyETL\n\ndef test_daily_etl_config():\n    # Test valid configuration\n    dag = DailyETL.build(\n        job_id=\"test-etl\",\n        source_table=\"test.source\",\n        target_table=\"test.target\"\n    )\n    assert dag.dag_id == \"test-etl\"\n    assert dag.schedule_interval == \"@daily\"\n\n    # Test validation errors\n    with pytest.raises(ValueError, match=\"Invalid schedule\"):\n        DailyETL.build(\n            job_id=\"test-etl\",\n            source_table=\"test.source\",\n            target_table=\"test.target\",\n            schedule=\"invalid\"\n        )\n```\n\n## Type Safety and Validation\n\nBlueprint uses Pydantic under the hood for robust validation with helpful error messages. This gives you:\n\n- **Type coercion** - Automatically converts compatible types (e.g., string \"5\" to integer 5)\n- **Field validation** - Set constraints like min/max values, regex patterns, etc.\n- **Custom validators** - Add your own validation logic for complex rules\n- **Clear error messages** - Know exactly what went wrong and how to fix it\n\nWhen validation fails, you get clear feedback:\n\n```bash\n$ blueprint lint\n\u2717 customer_etl.dag.yaml\n  ValidationError: 3 validation errors for DailyETLConfig\n\n  job_id\n    String does not match pattern '^[a-zA-Z0-9_-]+$' (type=value_error.str.regex)\n    Given: \"customer sync!\" (contains spaces)\n\n  retries\n    ensure this value is less than or equal to 5 (type=value_error.number.not_le)\n    Given: 10\n\n  schedule\n    Invalid schedule format (type=value_error)\n    Given: \"every hour\" (use \"@hourly\" or valid cron expression)\n```\n\n### Field Validation Examples\n\n```python\nfrom blueprint import BaseModel, Field, field_validator\n\nclass ETLConfig(BaseModel):\n    # Basic constraints\n    job_id: str = Field(pattern=r'^[a-zA-Z0-9_-]+$')\n    retries: int = Field(ge=0, le=5)\n    timeout_minutes: int = Field(gt=0, le=1440)  # 1-1440 minutes\n\n    # Custom validation\n    @field_validator('schedule')\n    def validate_schedule(cls, v):\n        valid_presets = ['@once', '@hourly', '@daily', '@weekly', '@monthly']\n        if v not in valid_presets and not cls._is_valid_cron(v):\n            raise ValueError(f'Must be a preset ({\", \".join(valid_presets)}) or valid cron')\n        return v\n```\n\n## More Examples\n\n### Complex Parameters\n\nBlueprints support nested objects and lists:\n\n```python\nfrom blueprint import BaseModel, Field\nfrom typing import Optional, List\n\nclass SourceConfig(BaseModel):\n    database: str = Field(description=\"Database connection name\")\n    table: str = Field(description=\"Table to extract data from\")\n\nclass NotificationConfig(BaseModel):\n    email: Optional[str] = Field(default=None, description=\"Email for notifications\")\n    slack: Optional[str] = Field(default=None, description=\"Slack channel (#data-alerts)\")\n\nclass MultiSourceConfig(BaseModel):\n    sources: List[SourceConfig] = Field(description=\"List of data sources\")\n    notifications: NotificationConfig = Field(default_factory=NotificationConfig)\n\nclass MultiSourceETL(Blueprint[MultiSourceConfig]):\n    \"\"\"ETL pipeline that processes multiple data sources in parallel.\"\"\"\n\n    def render(self, config: MultiSourceConfig) -> DAG:\n        # Access nested data with type safety\n        for source in config.sources:\n            print(f\"Processing {source.table} from {source.database}\")\n```\n\n```yaml\nblueprint: multi_source_etl\nsources:\n  - database: postgres\n    table: users\n  - database: mysql\n    table: orders\nnotifications:\n  email: data-team@company.com\n  slack: \"#data-alerts\"\n```\n\n### Blueprint Inheritance\n\nUse standard Python inheritance to share common parameters:\n\n```python\nclass BaseETLConfig(BaseModel):\n    owner: str = Field(default=\"data-team\", description=\"Team responsible for DAG\")\n    retries: int = Field(default=2, ge=0, le=5, description=\"Number of retries\")\n    email_on_failure: str = Field(default=\"alerts@company.com\", description=\"Alert email\")\n\nclass S3ImportConfig(BaseETLConfig):\n    bucket: str = Field(description=\"S3 bucket name\")\n    prefix: str = Field(description=\"S3 key prefix\")\n\nclass BaseETL(Blueprint[BaseETLConfig]):\n    \"\"\"Base blueprint with common ETL parameters.\"\"\"\n\n    def get_default_args(self, config: BaseETLConfig):\n        return {\n            \"owner\": config.owner,\n            \"retries\": config.retries,\n            \"email_on_failure\": [config.email_on_failure]\n        }\n\nclass S3Import(Blueprint[S3ImportConfig]):\n    \"\"\"Import data from S3.\"\"\"\n\n    def render(self, config: S3ImportConfig) -> DAG:\n        # Has access to all BaseETLConfig fields plus S3-specific ones\n        default_args = self.get_default_args(config)\n        # ... create DAG with S3 operators\n```\n\n## Installation\n\n```bash\npip install airflow-blueprint\n```\n\n## Configuration\n\nBlueprint looks for templates in `.astro/templates/` by default. Override with:\n\n```bash\nexport BLUEPRINT_TEMPLATES_DIR=/path/to/templates\n```\n\n## CLI Commands\n\n```bash\n# Validate all configs\nblueprint lint\n\n# Validate specific config\nblueprint lint dags/configs/my_job.dag.yaml\n\n# List available blueprints\nblueprint list\n\n# Show blueprint parameters\nblueprint describe daily_etl\n\n# Interactive scaffolding (primary interface)\nblueprint new\n# Prompts for: DAG name, blueprint selection, parameters\n\n# Direct scaffolding with specific blueprint\nblueprint new daily_etl my_new_etl\n\n# Scaffold to specific path\nblueprint new dags/configs/prod/my_new_etl.dag.yaml daily_etl\n\n# Quick mode with parameter overrides\nblueprint new daily_etl my_new_etl --set job_id=customer-sync --set retries=3\n```\n\n## Error Messages\n\nBlueprint provides clear, actionable error messages:\n\n```bash\n$ blueprint lint\n\u0017\u2717 marketing_etl.dag.yaml\n  Line 3: Missing required parameter 'source_table' for blueprint 'daily_etl'\n\n  Your configuration:\n    2 | blueprint: daily_etl\n    3 | job_id: \"marketing-sync\"\n    4 | target_table: \"analytics.marketing_facts\"\n\n  Add the missing parameter:\n    3 | job_id: \"marketing-sync\"\n  + 4 | source_table: \"raw.marketing_events\"\n    5 | target_table: \"analytics.marketing_facts\"\n```\n\n## Best Practices\n\n1. **Keep blueprints focused** - Each blueprint should represent one type of workflow\n2. **Use descriptive parameter names** - `source_table` is clearer than `src`\n3. **Always add parameter descriptions** - Use `Annotated[type, \"description\"]` for all parameters\n4. **Document your blueprints** - Add docstrings to blueprint classes explaining their purpose\n5. **Provide defaults wisely** - Common values as defaults, critical values as required\n6. **Validate in CI** - Add `blueprint lint` to your CI pipeline\n\n## How is this different from DAG Factory?\n\n[DAG Factory](https://github.com/astronomer/dag-factory) gives full control of Airflow via YAML.\n\nBlueprint hides that complexity behind safe, pre-built templates with validation.\n\n### DAG Factory\n\n```yaml\nmy_dag:\n  default_args:\n    owner: 'data-team'\n    retries: 2\n    retry_delay_seconds: 300\n  start_date: 2024-01-01\n  schedule_interval: '@daily'\n  tasks:\n    extract_data:\n      operator: airflow.operators.python.PythonOperator\n      python_callable_name: extract_from_api\n      python_callable_file: /opt/airflow/dags/etl/extract.py\n    transform_data:\n      operator: airflow.operators.python.PythonOperator\n      dependencies: [extract_data]\n      # ... many more Airflow-specific configurations\n```\n\n### Blueprint\n\n```yaml\nblueprint: daily_etl\njob_id: customer-sync\nsource_table: raw.customers\ntarget_table: analytics.dim_customers\nschedule: \"@hourly\"\n```\n\nOr in Python:\n\n```python\ndag = DailyETL.build(\n    job_id=\"customer-sync\",\n    source_table=\"raw.customers\",\n    target_table=\"analytics.dim_customers\",\n    schedule=\"@hourly\"\n)\n```\n\n**Use DAG Factory if:** You need full Airflow flexibility and your users understand Airflow concepts\n\n**Use Blueprint if:** You want standardized, validated patterns with type safety for teams\n\n## Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n## License\n\nApache 2.0\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Create reusable Airflow DAG templates with validated configurations",
    "version": "0.0.1a2",
    "project_urls": {
        "Documentation": "https://docs.astronomer.io/blueprint",
        "Homepage": "https://github.com/astronomer/blueprint",
        "Repository": "https://github.com/astronomer/blueprint"
    },
    "split_keywords": [
        "airflow",
        " automation",
        " blueprints",
        " dags",
        " templates"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "a1b262f6c64d6fe158aeb4335df648de2cf2f9732f7dc1d410d10a12329f1573",
                "md5": "44e855bd8810ce44fd20935bf2476119",
                "sha256": "4c9e3bb13de22d02530731e084b8db5c385431ec600de0a8b844d18f88abba53"
            },
            "downloads": -1,
            "filename": "airflow_blueprint-0.0.1a2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "44e855bd8810ce44fd20935bf2476119",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 28191,
            "upload_time": "2025-07-27T21:52:37",
            "upload_time_iso_8601": "2025-07-27T21:52:37.297530Z",
            "url": "https://files.pythonhosted.org/packages/a1/b2/62f6c64d6fe158aeb4335df648de2cf2f9732f7dc1d410d10a12329f1573/airflow_blueprint-0.0.1a2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "17b8ff90948a4e9983b3f0d0351170fd2754e93ae1fb9eff42b1ff098f2dc49a",
                "md5": "ef6d97c2ba3ede234d8347c25c89a6d9",
                "sha256": "2cdb511a8d34811cc987df9f2ae6ce66a289df72a7eb0808fdf511740cedb1aa"
            },
            "downloads": -1,
            "filename": "airflow_blueprint-0.0.1a2.tar.gz",
            "has_sig": false,
            "md5_digest": "ef6d97c2ba3ede234d8347c25c89a6d9",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 384772,
            "upload_time": "2025-07-27T21:52:38",
            "upload_time_iso_8601": "2025-07-27T21:52:38.819225Z",
            "url": "https://files.pythonhosted.org/packages/17/b8/ff90948a4e9983b3f0d0351170fd2754e93ae1fb9eff42b1ff098f2dc49a/airflow_blueprint-0.0.1a2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-27 21:52:38",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "astronomer",
    "github_project": "blueprint",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "airflow-blueprint"
}
        
Elapsed time: 1.01502s