# Blueprint
**⚠️ NOTE: This is currently an alpha project and may change significantly.**
Build reusable, validated Airflow DAG templates that anyone on your team can discover and use.
## What is Blueprint?
Blueprint helps data platform teams define reusable, parameterized DAG templates for Apache Airflow. These templates can be safely configured by other team members, like data analysts or less-experienced engineers, using simple YAML files.
With Blueprint, you can:
- ✅ Enforce **type-safe parameters** with validation
- 🚫 Get **clear error messages** when configs are invalid
- 🛠️ Use a **CLI** to validate configs before deployment
- 🔍 Automatically **discover available templates** and **generate new DAGs** from them
## Why Blueprint?
In most data teams, the same kind of DAG is built over and over with small variations. This usually means lots of copy-pasting and hard-to-maintain code. Blueprint solves this by letting you:
- **Create once, use everywhere** – Write a DAG pattern once as a template
- **Reduce errors** – Validate configurations before deployment
- **Build guardrails** – Enforce your standards and best practices
- **Help non-engineers** – Let others safely define DAGs without touching Python
## Example Workflow
### 1. Create a Blueprint template
Save this in `.astro/templates/etl_blueprints.py`:
```python
from blueprint import Blueprint, BaseModel, Field
from airflow import DAG
class DailyETLConfig(BaseModel):
job_id: str = Field(description="Unique identifier for this job")
source_table: str = Field(description="Table to read data from")
target_table: str = Field(description="Table to write processed data to")
schedule: str = Field(default="@daily", description="Cron expression or Airflow preset")
retries: int = Field(default=2, description="Number of retry attempts on task failure")
class DailyETL(Blueprint[DailyETLConfig]):
"""Daily ETL job that moves data between tables with configurable scheduling."""
# Name is auto-generated as "daily_etl" from class name
# Or specify explicitly:
# name = "daily_etl_job"
def render(self, config: DailyETLConfig) -> DAG:
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG(
dag_id=config.job_id,
schedule=config.schedule,
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={"retries": config.retries}
) as dag:
PythonOperator(
task_id="extract_transform_load",
python_callable=lambda: print(
f"Moving data from {config.source_table} to {config.target_table}"
)
)
return dag
```
### 2. Create a YAML config
Save this as `dags/configs/customer_etl.dag.yaml`:
```yaml
blueprint: daily_etl # Auto-generated from class name DailyETL
job_id: customer-daily-sync
source_table: raw.customers
target_table: analytics.dim_customers
schedule: "@hourly"
retries: 4
```
### 3. Validate your config
```bash
$ blueprint lint
customer_etl.dag.yaml - Valid
```
🎉 **Done!** Blueprint builds your DAG with ID `customer_etl`.
## Python API
Blueprint templates can also be consumed directly in Python, providing full type safety and IDE support:
```python
from etl_blueprints import DailyETL
# Create DAG with keyword arguments
dag = DailyETL.build(
job_id="customer-daily-sync",
source_table="raw.customers",
target_table="analytics.dim_customers",
schedule="@hourly",
retries=4
)
```
### Benefits of Python API
- **Full IDE support** - Autocomplete, type checking, and inline documentation
- **Runtime validation** - Catch configuration errors before deployment
- **Dynamic DAG generation** - Create multiple DAGs programmatically
- **Testing support** - Easy unit testing of your DAG configurations
### Dynamic DAG Generation
The Python API shines when you need to create DAGs dynamically based on external configuration:
```python
from etl_blueprints import DailyETL
import json
# Load table configurations from external source
with open('etl_config.json') as f:
table_configs = json.load(f)
# Generate a DAG for each table with custom logic
for config in table_configs:
schedule = "@hourly" if config["priority"] == "high" else "@daily"
retries = 5 if config["is_critical"] else 2
dag = DailyETL.build(
job_id=f"{config['name']}-etl",
source_table=config["source"],
target_table=config["target"],
schedule=schedule,
retries=retries
)
```
### Creating Conditional DAGs
```python
from etl_blueprints import DailyETL
import os
# Only create production DAGs in production environment
if os.getenv("AIRFLOW_ENV") == "production":
critical_tables = ["users", "transactions", "orders"]
for table in critical_tables:
dag = DailyETL.build(
job_id=f"prod-{table}-sync",
source_table=f"raw.{table}",
target_table=f"warehouse.{table}",
schedule="@hourly",
retries=5
)
```
### Python Example
```python
from blueprint import Blueprint, BaseModel, Field, field_validator
from airflow import DAG
from datetime import datetime
class DailyETLConfig(BaseModel):
job_id: str = Field(pattern=r'^[a-zA-Z0-9_-]+$', description="Unique job identifier")
source_table: str = Field(description="Source table name")
target_table: str = Field(description="Target table name")
schedule: str = Field(default="@daily", description="Cron or preset schedule")
retries: int = Field(default=2, ge=0, le=5, description="Number of retries")
@field_validator('schedule')
def validate_schedule(cls, v):
valid_presets = ['@once', '@hourly', '@daily', '@weekly', '@monthly', '@yearly']
if not (v in valid_presets or v.startswith('0 ') or v.count(' ') == 4):
raise ValueError(f'Invalid schedule: {v}')
return v
class DailyETL(Blueprint[DailyETLConfig]):
"""Daily ETL job that moves data between tables."""
def render(self, config: DailyETLConfig) -> DAG:
with DAG(
dag_id=config.job_id,
schedule=config.schedule,
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={"retries": config.retries}
) as dag:
# Define your tasks here
pass
return dag
```
### Loading from YAML in Python
You can also load YAML configs in Python code:
```python
from blueprint import from_yaml
# Load existing YAML config
dag = from_yaml("configs/customer_etl.dag.yaml")
# Or with runtime overrides
dag = from_yaml("configs/customer_etl.dag.yaml", overrides={
"retries": 5,
"schedule": "@hourly"
})
```
### Testing Blueprints
```python
import pytest
from etl_blueprints import DailyETL
def test_daily_etl_config():
# Test valid configuration
dag = DailyETL.build(
job_id="test-etl",
source_table="test.source",
target_table="test.target"
)
assert dag.dag_id == "test-etl"
assert dag.schedule_interval == "@daily"
# Test validation errors
with pytest.raises(ValueError, match="Invalid schedule"):
DailyETL.build(
job_id="test-etl",
source_table="test.source",
target_table="test.target",
schedule="invalid"
)
```
## Type Safety and Validation
Blueprint uses Pydantic under the hood for robust validation with helpful error messages. This gives you:
- **Type coercion** - Automatically converts compatible types (e.g., string "5" to integer 5)
- **Field validation** - Set constraints like min/max values, regex patterns, etc.
- **Custom validators** - Add your own validation logic for complex rules
- **Clear error messages** - Know exactly what went wrong and how to fix it
When validation fails, you get clear feedback:
```bash
$ blueprint lint
✗ customer_etl.dag.yaml
ValidationError: 3 validation errors for DailyETLConfig
job_id
String does not match pattern '^[a-zA-Z0-9_-]+$' (type=value_error.str.regex)
Given: "customer sync!" (contains spaces)
retries
ensure this value is less than or equal to 5 (type=value_error.number.not_le)
Given: 10
schedule
Invalid schedule format (type=value_error)
Given: "every hour" (use "@hourly" or valid cron expression)
```
### Field Validation Examples
```python
from blueprint import BaseModel, Field, field_validator
class ETLConfig(BaseModel):
# Basic constraints
job_id: str = Field(pattern=r'^[a-zA-Z0-9_-]+$')
retries: int = Field(ge=0, le=5)
timeout_minutes: int = Field(gt=0, le=1440) # 1-1440 minutes
# Custom validation
@field_validator('schedule')
def validate_schedule(cls, v):
valid_presets = ['@once', '@hourly', '@daily', '@weekly', '@monthly']
if v not in valid_presets and not cls._is_valid_cron(v):
raise ValueError(f'Must be a preset ({", ".join(valid_presets)}) or valid cron')
return v
```
## More Examples
### Complex Parameters
Blueprints support nested objects and lists:
```python
from blueprint import BaseModel, Field
from typing import Optional, List
class SourceConfig(BaseModel):
database: str = Field(description="Database connection name")
table: str = Field(description="Table to extract data from")
class NotificationConfig(BaseModel):
email: Optional[str] = Field(default=None, description="Email for notifications")
slack: Optional[str] = Field(default=None, description="Slack channel (#data-alerts)")
class MultiSourceConfig(BaseModel):
sources: List[SourceConfig] = Field(description="List of data sources")
notifications: NotificationConfig = Field(default_factory=NotificationConfig)
class MultiSourceETL(Blueprint[MultiSourceConfig]):
"""ETL pipeline that processes multiple data sources in parallel."""
def render(self, config: MultiSourceConfig) -> DAG:
# Access nested data with type safety
for source in config.sources:
print(f"Processing {source.table} from {source.database}")
```
```yaml
blueprint: multi_source_etl
sources:
- database: postgres
table: users
- database: mysql
table: orders
notifications:
email: data-team@company.com
slack: "#data-alerts"
```
### Blueprint Inheritance
Use standard Python inheritance to share common parameters:
```python
class BaseETLConfig(BaseModel):
owner: str = Field(default="data-team", description="Team responsible for DAG")
retries: int = Field(default=2, ge=0, le=5, description="Number of retries")
email_on_failure: str = Field(default="alerts@company.com", description="Alert email")
class S3ImportConfig(BaseETLConfig):
bucket: str = Field(description="S3 bucket name")
prefix: str = Field(description="S3 key prefix")
class BaseETL(Blueprint[BaseETLConfig]):
"""Base blueprint with common ETL parameters."""
def get_default_args(self, config: BaseETLConfig):
return {
"owner": config.owner,
"retries": config.retries,
"email_on_failure": [config.email_on_failure]
}
class S3Import(Blueprint[S3ImportConfig]):
"""Import data from S3."""
def render(self, config: S3ImportConfig) -> DAG:
# Has access to all BaseETLConfig fields plus S3-specific ones
default_args = self.get_default_args(config)
# ... create DAG with S3 operators
```
## Installation
```bash
pip install airflow-blueprint
```
## Configuration
Blueprint looks for templates in `.astro/templates/` by default. Override with:
```bash
export BLUEPRINT_TEMPLATES_DIR=/path/to/templates
```
## CLI Commands
```bash
# Validate all configs
blueprint lint
# Validate specific config
blueprint lint dags/configs/my_job.dag.yaml
# List available blueprints
blueprint list
# Show blueprint parameters
blueprint describe daily_etl
# Interactive scaffolding (primary interface)
blueprint new
# Prompts for: DAG name, blueprint selection, parameters
# Direct scaffolding with specific blueprint
blueprint new daily_etl my_new_etl
# Scaffold to specific path
blueprint new dags/configs/prod/my_new_etl.dag.yaml daily_etl
# Quick mode with parameter overrides
blueprint new daily_etl my_new_etl --set job_id=customer-sync --set retries=3
```
## Error Messages
Blueprint provides clear, actionable error messages:
```bash
$ blueprint lint
✗ marketing_etl.dag.yaml
Line 3: Missing required parameter 'source_table' for blueprint 'daily_etl'
Your configuration:
2 | blueprint: daily_etl
3 | job_id: "marketing-sync"
4 | target_table: "analytics.marketing_facts"
Add the missing parameter:
3 | job_id: "marketing-sync"
+ 4 | source_table: "raw.marketing_events"
5 | target_table: "analytics.marketing_facts"
```
## Best Practices
1. **Keep blueprints focused** - Each blueprint should represent one type of workflow
2. **Use descriptive parameter names** - `source_table` is clearer than `src`
3. **Always add parameter descriptions** - Use `Annotated[type, "description"]` for all parameters
4. **Document your blueprints** - Add docstrings to blueprint classes explaining their purpose
5. **Provide defaults wisely** - Common values as defaults, critical values as required
6. **Validate in CI** - Add `blueprint lint` to your CI pipeline
## How is this different from DAG Factory?
[DAG Factory](https://github.com/astronomer/dag-factory) gives full control of Airflow via YAML.
Blueprint hides that complexity behind safe, pre-built templates with validation.
### DAG Factory
```yaml
my_dag:
default_args:
owner: 'data-team'
retries: 2
retry_delay_seconds: 300
start_date: 2024-01-01
schedule_interval: '@daily'
tasks:
extract_data:
operator: airflow.operators.python.PythonOperator
python_callable_name: extract_from_api
python_callable_file: /opt/airflow/dags/etl/extract.py
transform_data:
operator: airflow.operators.python.PythonOperator
dependencies: [extract_data]
# ... many more Airflow-specific configurations
```
### Blueprint
```yaml
blueprint: daily_etl
job_id: customer-sync
source_table: raw.customers
target_table: analytics.dim_customers
schedule: "@hourly"
```
Or in Python:
```python
dag = DailyETL.build(
job_id="customer-sync",
source_table="raw.customers",
target_table="analytics.dim_customers",
schedule="@hourly"
)
```
**Use DAG Factory if:** You need full Airflow flexibility and your users understand Airflow concepts
**Use Blueprint if:** You want standardized, validated patterns with type safety for teams
## Contributing
We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.
## License
Apache 2.0
Raw data
{
"_id": null,
"home_page": null,
"name": "airflow-blueprint",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "Astronomer <humans@astronomer.io>",
"keywords": "airflow, automation, blueprints, dags, templates",
"author": null,
"author_email": "Astronomer <humans@astronomer.io>",
"download_url": "https://files.pythonhosted.org/packages/17/b8/ff90948a4e9983b3f0d0351170fd2754e93ae1fb9eff42b1ff098f2dc49a/airflow_blueprint-0.0.1a2.tar.gz",
"platform": null,
"description": "# Blueprint\n\n**\u26a0\ufe0f NOTE: This is currently an alpha project and may change significantly.**\n\nBuild reusable, validated Airflow DAG templates that anyone on your team can discover and use.\n\n## What is Blueprint?\n\nBlueprint helps data platform teams define reusable, parameterized DAG templates for Apache Airflow. These templates can be safely configured by other team members, like data analysts or less-experienced engineers, using simple YAML files.\n\nWith Blueprint, you can:\n\n- \u2705 Enforce **type-safe parameters** with validation\n- \ud83d\udeab Get **clear error messages** when configs are invalid\n- \ud83d\udee0\ufe0f Use a **CLI** to validate configs before deployment\n- \ud83d\udd0d Automatically **discover available templates** and **generate new DAGs** from them\n\n## Why Blueprint?\n\nIn most data teams, the same kind of DAG is built over and over with small variations. This usually means lots of copy-pasting and hard-to-maintain code. Blueprint solves this by letting you:\n\n- **Create once, use everywhere** \u2013 Write a DAG pattern once as a template\n- **Reduce errors** \u2013 Validate configurations before deployment\n- **Build guardrails** \u2013 Enforce your standards and best practices\n- **Help non-engineers** \u2013 Let others safely define DAGs without touching Python\n\n## Example Workflow\n\n### 1. Create a Blueprint template\n\nSave this in `.astro/templates/etl_blueprints.py`:\n\n```python\nfrom blueprint import Blueprint, BaseModel, Field\nfrom airflow import DAG\n\nclass DailyETLConfig(BaseModel):\n job_id: str = Field(description=\"Unique identifier for this job\")\n source_table: str = Field(description=\"Table to read data from\")\n target_table: str = Field(description=\"Table to write processed data to\")\n schedule: str = Field(default=\"@daily\", description=\"Cron expression or Airflow preset\")\n retries: int = Field(default=2, description=\"Number of retry attempts on task failure\")\n\nclass DailyETL(Blueprint[DailyETLConfig]):\n \"\"\"Daily ETL job that moves data between tables with configurable scheduling.\"\"\"\n\n # Name is auto-generated as \"daily_etl\" from class name\n # Or specify explicitly:\n # name = \"daily_etl_job\"\n\n def render(self, config: DailyETLConfig) -> DAG:\n from airflow.operators.python import PythonOperator\n from datetime import datetime\n\n with DAG(\n dag_id=config.job_id,\n schedule=config.schedule,\n start_date=datetime(2024, 1, 1),\n catchup=False,\n default_args={\"retries\": config.retries}\n ) as dag:\n PythonOperator(\n task_id=\"extract_transform_load\",\n python_callable=lambda: print(\n f\"Moving data from {config.source_table} to {config.target_table}\"\n )\n )\n return dag\n```\n\n### 2. Create a YAML config\n\nSave this as `dags/configs/customer_etl.dag.yaml`:\n\n```yaml\nblueprint: daily_etl # Auto-generated from class name DailyETL\njob_id: customer-daily-sync\nsource_table: raw.customers\ntarget_table: analytics.dim_customers\nschedule: \"@hourly\"\nretries: 4\n```\n\n### 3. Validate your config\n\n```bash\n$ blueprint lint\n customer_etl.dag.yaml - Valid\n```\n\n\ud83c\udf89 **Done!** Blueprint builds your DAG with ID `customer_etl`.\n\n## Python API\n\nBlueprint templates can also be consumed directly in Python, providing full type safety and IDE support:\n\n```python\nfrom etl_blueprints import DailyETL\n\n# Create DAG with keyword arguments\ndag = DailyETL.build(\n job_id=\"customer-daily-sync\",\n source_table=\"raw.customers\",\n target_table=\"analytics.dim_customers\",\n schedule=\"@hourly\",\n retries=4\n)\n```\n\n### Benefits of Python API\n\n- **Full IDE support** - Autocomplete, type checking, and inline documentation\n- **Runtime validation** - Catch configuration errors before deployment\n- **Dynamic DAG generation** - Create multiple DAGs programmatically\n- **Testing support** - Easy unit testing of your DAG configurations\n\n### Dynamic DAG Generation\n\nThe Python API shines when you need to create DAGs dynamically based on external configuration:\n\n```python\nfrom etl_blueprints import DailyETL\nimport json\n\n# Load table configurations from external source\nwith open('etl_config.json') as f:\n table_configs = json.load(f)\n\n# Generate a DAG for each table with custom logic\nfor config in table_configs:\n schedule = \"@hourly\" if config[\"priority\"] == \"high\" else \"@daily\"\n retries = 5 if config[\"is_critical\"] else 2\n\n dag = DailyETL.build(\n job_id=f\"{config['name']}-etl\",\n source_table=config[\"source\"],\n target_table=config[\"target\"],\n schedule=schedule,\n retries=retries\n )\n```\n\n### Creating Conditional DAGs\n\n```python\nfrom etl_blueprints import DailyETL\nimport os\n\n# Only create production DAGs in production environment\nif os.getenv(\"AIRFLOW_ENV\") == \"production\":\n critical_tables = [\"users\", \"transactions\", \"orders\"]\n\n for table in critical_tables:\n dag = DailyETL.build(\n job_id=f\"prod-{table}-sync\",\n source_table=f\"raw.{table}\",\n target_table=f\"warehouse.{table}\",\n schedule=\"@hourly\",\n retries=5\n )\n```\n\n### Python Example\n\n```python\nfrom blueprint import Blueprint, BaseModel, Field, field_validator\nfrom airflow import DAG\nfrom datetime import datetime\n\nclass DailyETLConfig(BaseModel):\n job_id: str = Field(pattern=r'^[a-zA-Z0-9_-]+$', description=\"Unique job identifier\")\n source_table: str = Field(description=\"Source table name\")\n target_table: str = Field(description=\"Target table name\")\n schedule: str = Field(default=\"@daily\", description=\"Cron or preset schedule\")\n retries: int = Field(default=2, ge=0, le=5, description=\"Number of retries\")\n\n @field_validator('schedule')\n def validate_schedule(cls, v):\n valid_presets = ['@once', '@hourly', '@daily', '@weekly', '@monthly', '@yearly']\n if not (v in valid_presets or v.startswith('0 ') or v.count(' ') == 4):\n raise ValueError(f'Invalid schedule: {v}')\n return v\n\nclass DailyETL(Blueprint[DailyETLConfig]):\n \"\"\"Daily ETL job that moves data between tables.\"\"\"\n\n def render(self, config: DailyETLConfig) -> DAG:\n with DAG(\n dag_id=config.job_id,\n schedule=config.schedule,\n start_date=datetime(2024, 1, 1),\n catchup=False,\n default_args={\"retries\": config.retries}\n ) as dag:\n # Define your tasks here\n pass\n return dag\n```\n\n### Loading from YAML in Python\n\nYou can also load YAML configs in Python code:\n\n```python\nfrom blueprint import from_yaml\n\n# Load existing YAML config\ndag = from_yaml(\"configs/customer_etl.dag.yaml\")\n\n# Or with runtime overrides\ndag = from_yaml(\"configs/customer_etl.dag.yaml\", overrides={\n \"retries\": 5,\n \"schedule\": \"@hourly\"\n})\n```\n\n### Testing Blueprints\n\n```python\nimport pytest\nfrom etl_blueprints import DailyETL\n\ndef test_daily_etl_config():\n # Test valid configuration\n dag = DailyETL.build(\n job_id=\"test-etl\",\n source_table=\"test.source\",\n target_table=\"test.target\"\n )\n assert dag.dag_id == \"test-etl\"\n assert dag.schedule_interval == \"@daily\"\n\n # Test validation errors\n with pytest.raises(ValueError, match=\"Invalid schedule\"):\n DailyETL.build(\n job_id=\"test-etl\",\n source_table=\"test.source\",\n target_table=\"test.target\",\n schedule=\"invalid\"\n )\n```\n\n## Type Safety and Validation\n\nBlueprint uses Pydantic under the hood for robust validation with helpful error messages. This gives you:\n\n- **Type coercion** - Automatically converts compatible types (e.g., string \"5\" to integer 5)\n- **Field validation** - Set constraints like min/max values, regex patterns, etc.\n- **Custom validators** - Add your own validation logic for complex rules\n- **Clear error messages** - Know exactly what went wrong and how to fix it\n\nWhen validation fails, you get clear feedback:\n\n```bash\n$ blueprint lint\n\u2717 customer_etl.dag.yaml\n ValidationError: 3 validation errors for DailyETLConfig\n\n job_id\n String does not match pattern '^[a-zA-Z0-9_-]+$' (type=value_error.str.regex)\n Given: \"customer sync!\" (contains spaces)\n\n retries\n ensure this value is less than or equal to 5 (type=value_error.number.not_le)\n Given: 10\n\n schedule\n Invalid schedule format (type=value_error)\n Given: \"every hour\" (use \"@hourly\" or valid cron expression)\n```\n\n### Field Validation Examples\n\n```python\nfrom blueprint import BaseModel, Field, field_validator\n\nclass ETLConfig(BaseModel):\n # Basic constraints\n job_id: str = Field(pattern=r'^[a-zA-Z0-9_-]+$')\n retries: int = Field(ge=0, le=5)\n timeout_minutes: int = Field(gt=0, le=1440) # 1-1440 minutes\n\n # Custom validation\n @field_validator('schedule')\n def validate_schedule(cls, v):\n valid_presets = ['@once', '@hourly', '@daily', '@weekly', '@monthly']\n if v not in valid_presets and not cls._is_valid_cron(v):\n raise ValueError(f'Must be a preset ({\", \".join(valid_presets)}) or valid cron')\n return v\n```\n\n## More Examples\n\n### Complex Parameters\n\nBlueprints support nested objects and lists:\n\n```python\nfrom blueprint import BaseModel, Field\nfrom typing import Optional, List\n\nclass SourceConfig(BaseModel):\n database: str = Field(description=\"Database connection name\")\n table: str = Field(description=\"Table to extract data from\")\n\nclass NotificationConfig(BaseModel):\n email: Optional[str] = Field(default=None, description=\"Email for notifications\")\n slack: Optional[str] = Field(default=None, description=\"Slack channel (#data-alerts)\")\n\nclass MultiSourceConfig(BaseModel):\n sources: List[SourceConfig] = Field(description=\"List of data sources\")\n notifications: NotificationConfig = Field(default_factory=NotificationConfig)\n\nclass MultiSourceETL(Blueprint[MultiSourceConfig]):\n \"\"\"ETL pipeline that processes multiple data sources in parallel.\"\"\"\n\n def render(self, config: MultiSourceConfig) -> DAG:\n # Access nested data with type safety\n for source in config.sources:\n print(f\"Processing {source.table} from {source.database}\")\n```\n\n```yaml\nblueprint: multi_source_etl\nsources:\n - database: postgres\n table: users\n - database: mysql\n table: orders\nnotifications:\n email: data-team@company.com\n slack: \"#data-alerts\"\n```\n\n### Blueprint Inheritance\n\nUse standard Python inheritance to share common parameters:\n\n```python\nclass BaseETLConfig(BaseModel):\n owner: str = Field(default=\"data-team\", description=\"Team responsible for DAG\")\n retries: int = Field(default=2, ge=0, le=5, description=\"Number of retries\")\n email_on_failure: str = Field(default=\"alerts@company.com\", description=\"Alert email\")\n\nclass S3ImportConfig(BaseETLConfig):\n bucket: str = Field(description=\"S3 bucket name\")\n prefix: str = Field(description=\"S3 key prefix\")\n\nclass BaseETL(Blueprint[BaseETLConfig]):\n \"\"\"Base blueprint with common ETL parameters.\"\"\"\n\n def get_default_args(self, config: BaseETLConfig):\n return {\n \"owner\": config.owner,\n \"retries\": config.retries,\n \"email_on_failure\": [config.email_on_failure]\n }\n\nclass S3Import(Blueprint[S3ImportConfig]):\n \"\"\"Import data from S3.\"\"\"\n\n def render(self, config: S3ImportConfig) -> DAG:\n # Has access to all BaseETLConfig fields plus S3-specific ones\n default_args = self.get_default_args(config)\n # ... create DAG with S3 operators\n```\n\n## Installation\n\n```bash\npip install airflow-blueprint\n```\n\n## Configuration\n\nBlueprint looks for templates in `.astro/templates/` by default. Override with:\n\n```bash\nexport BLUEPRINT_TEMPLATES_DIR=/path/to/templates\n```\n\n## CLI Commands\n\n```bash\n# Validate all configs\nblueprint lint\n\n# Validate specific config\nblueprint lint dags/configs/my_job.dag.yaml\n\n# List available blueprints\nblueprint list\n\n# Show blueprint parameters\nblueprint describe daily_etl\n\n# Interactive scaffolding (primary interface)\nblueprint new\n# Prompts for: DAG name, blueprint selection, parameters\n\n# Direct scaffolding with specific blueprint\nblueprint new daily_etl my_new_etl\n\n# Scaffold to specific path\nblueprint new dags/configs/prod/my_new_etl.dag.yaml daily_etl\n\n# Quick mode with parameter overrides\nblueprint new daily_etl my_new_etl --set job_id=customer-sync --set retries=3\n```\n\n## Error Messages\n\nBlueprint provides clear, actionable error messages:\n\n```bash\n$ blueprint lint\n\u0017\u2717 marketing_etl.dag.yaml\n Line 3: Missing required parameter 'source_table' for blueprint 'daily_etl'\n\n Your configuration:\n 2 | blueprint: daily_etl\n 3 | job_id: \"marketing-sync\"\n 4 | target_table: \"analytics.marketing_facts\"\n\n Add the missing parameter:\n 3 | job_id: \"marketing-sync\"\n + 4 | source_table: \"raw.marketing_events\"\n 5 | target_table: \"analytics.marketing_facts\"\n```\n\n## Best Practices\n\n1. **Keep blueprints focused** - Each blueprint should represent one type of workflow\n2. **Use descriptive parameter names** - `source_table` is clearer than `src`\n3. **Always add parameter descriptions** - Use `Annotated[type, \"description\"]` for all parameters\n4. **Document your blueprints** - Add docstrings to blueprint classes explaining their purpose\n5. **Provide defaults wisely** - Common values as defaults, critical values as required\n6. **Validate in CI** - Add `blueprint lint` to your CI pipeline\n\n## How is this different from DAG Factory?\n\n[DAG Factory](https://github.com/astronomer/dag-factory) gives full control of Airflow via YAML.\n\nBlueprint hides that complexity behind safe, pre-built templates with validation.\n\n### DAG Factory\n\n```yaml\nmy_dag:\n default_args:\n owner: 'data-team'\n retries: 2\n retry_delay_seconds: 300\n start_date: 2024-01-01\n schedule_interval: '@daily'\n tasks:\n extract_data:\n operator: airflow.operators.python.PythonOperator\n python_callable_name: extract_from_api\n python_callable_file: /opt/airflow/dags/etl/extract.py\n transform_data:\n operator: airflow.operators.python.PythonOperator\n dependencies: [extract_data]\n # ... many more Airflow-specific configurations\n```\n\n### Blueprint\n\n```yaml\nblueprint: daily_etl\njob_id: customer-sync\nsource_table: raw.customers\ntarget_table: analytics.dim_customers\nschedule: \"@hourly\"\n```\n\nOr in Python:\n\n```python\ndag = DailyETL.build(\n job_id=\"customer-sync\",\n source_table=\"raw.customers\",\n target_table=\"analytics.dim_customers\",\n schedule=\"@hourly\"\n)\n```\n\n**Use DAG Factory if:** You need full Airflow flexibility and your users understand Airflow concepts\n\n**Use Blueprint if:** You want standardized, validated patterns with type safety for teams\n\n## Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n## License\n\nApache 2.0\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Create reusable Airflow DAG templates with validated configurations",
"version": "0.0.1a2",
"project_urls": {
"Documentation": "https://docs.astronomer.io/blueprint",
"Homepage": "https://github.com/astronomer/blueprint",
"Repository": "https://github.com/astronomer/blueprint"
},
"split_keywords": [
"airflow",
" automation",
" blueprints",
" dags",
" templates"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "a1b262f6c64d6fe158aeb4335df648de2cf2f9732f7dc1d410d10a12329f1573",
"md5": "44e855bd8810ce44fd20935bf2476119",
"sha256": "4c9e3bb13de22d02530731e084b8db5c385431ec600de0a8b844d18f88abba53"
},
"downloads": -1,
"filename": "airflow_blueprint-0.0.1a2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "44e855bd8810ce44fd20935bf2476119",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 28191,
"upload_time": "2025-07-27T21:52:37",
"upload_time_iso_8601": "2025-07-27T21:52:37.297530Z",
"url": "https://files.pythonhosted.org/packages/a1/b2/62f6c64d6fe158aeb4335df648de2cf2f9732f7dc1d410d10a12329f1573/airflow_blueprint-0.0.1a2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "17b8ff90948a4e9983b3f0d0351170fd2754e93ae1fb9eff42b1ff098f2dc49a",
"md5": "ef6d97c2ba3ede234d8347c25c89a6d9",
"sha256": "2cdb511a8d34811cc987df9f2ae6ce66a289df72a7eb0808fdf511740cedb1aa"
},
"downloads": -1,
"filename": "airflow_blueprint-0.0.1a2.tar.gz",
"has_sig": false,
"md5_digest": "ef6d97c2ba3ede234d8347c25c89a6d9",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 384772,
"upload_time": "2025-07-27T21:52:38",
"upload_time_iso_8601": "2025-07-27T21:52:38.819225Z",
"url": "https://files.pythonhosted.org/packages/17/b8/ff90948a4e9983b3f0d0351170fd2754e93ae1fb9eff42b1ff098f2dc49a/airflow_blueprint-0.0.1a2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-27 21:52:38",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "astronomer",
"github_project": "blueprint",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "airflow-blueprint"
}