mesonet-alerts


Namemesonet-alerts JSON
Version 0.2.4 PyPI version JSON
download
home_pageNone
SummaryShared email alerting, retries, and volume-drop checks for Mesonet workers
upload_time2025-09-12 14:16:46
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseProprietary
keywords alerts dynamodb email jinja2 retry
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Mesonet Alerts Package

A production-ready shared Python package for email alerting across mesonet microservices. Provides HTML + plaintext email templates, DynamoDB-backed configuration, SMTP integration, alert persistence, retry helpers, and volume drop detection.

## ๐Ÿš€ Quick Start

### Installation

Add to your service's `pyproject.toml`:

```toml
[project]
dependencies = [
    "mesonet_alerts @ file://../common/mesonet_alerts"
]
```

### Environment Variables

```bash
# SMTP Configuration (required for email sending)
ALERTS_SMTP_HOST=localhost
ALERTS_SMTP_PORT=1025
ALERTS_SMTP_USER=                    # Optional for local dev
ALERTS_SMTP_PASS=                    # Optional for local dev
ALERTS_FROM="alerts@local.test"
ALERTS_TO="admin@local.test,kevin@local.test"

# Optional persistence
ALERTS_TABLE_NAME=alerts             # Enable DynamoDB persistence

# Volume monitoring
EXPECTED_RECORDS_PER_PROVIDER_PER_HOUR=100
```

### Basic Usage

```python
from mesonet_alerts import EmailAlerter, AlertStore, run_with_retries

# Initialize components
emailer = EmailAlerter()
store = AlertStore()

# Send an alert
context = {
    "stage": "ingest",
    "severity": "ERROR", 
    "provider": "colorado",
    "run_id": "run_123",
    "error": "Connection timeout"
}

emailer.send("process_failure", "Alert: Colorado Ingest Failed", context)

# Store alert with deduplication
store.put_alert(
    provider="colorado",
    stage="ingest", 
    severity="ERROR",
    code="CONNECTION_TIMEOUT",
    message="Failed to connect to provider",
    dedupe_key="timeout#colorado#run_123"
)
```

## ๐Ÿ—„๏ธ DynamoDB-Backed Configuration

Version 0.2.0+ supports loading email configuration and templates from DynamoDB, providing centralized configuration management.

### Configuration Table Setup

Create a DynamoDB table for email configuration:

```bash
# Create config table
aws dynamodb create-table \
  --table-name MesonetEmailConfig \
  --attribute-definitions AttributeName=config_pk,AttributeType=S AttributeName=config_sk,AttributeType=S \
  --key-schema AttributeName=config_pk,KeyType=HASH AttributeName=config_sk,KeyType=RANGE \
  --billing-mode PAY_PER_REQUEST

# Insert configuration
aws dynamodb put-item \
  --table-name MesonetEmailConfig \
  --item '{
    "config_pk": {"S": "email_config"},
    "config_sk": {"S": "active"},
    "smtp": {"M": {"host": {"S": "smtp.gmail.com"}, "port": {"N": "587"}, "user": {"S": "alerts@company.com"}, "pass": {"S": "app_password"}, "from": {"S": "alerts@company.com"}}},
    "recipients": {"M": {"list": {"L": [{"S": "admin@company.com"}, {"S": "team@company.com"}]}}},
    "templates": {"M": {
      "process_failure": {"M": {"html": {"S": "<html><body><h2 style=\"color:#dc3545\">Process Failure</h2><p>Provider: {{provider}}</p><p>Error: {{error}}</p></body></html>"}, "text": {"S": "Process Failure\\nProvider: {{provider}}\\nError: {{error}}"}}},
      "provider_empty_data": {"M": {"html": {"S": "<html><body><h2 style=\"color:#ffc107\">No Data</h2><p>Provider {{provider}} returned no data</p></body></html>"}, "text": {"S": "No Data\\nProvider {{provider}} returned no data"}}},
      "harmonize_failure": {"M": {"html": {"S": "<html><body><h2 style=\"color:#dc3545\">Harmonize Failed</h2><p>Provider: {{provider}}</p></body></html>"}, "text": {"S": "Harmonize Failed\\nProvider: {{provider}}"}}},
      "volume_drop": {"M": {"html": {"S": "<html><body><h2 style=\"color:#e67e22\">Volume Drop</h2><p>{{drop_pct}}% drop detected</p></body></html>"}, "text": {"S": "Volume Drop\\n{{drop_pct}}% drop detected"}}}
    }}
  }'
```

### Using DynamoDB Configuration

```python
import boto3
from mesonet_alerts import EmailAlerter, AlertStore, ConfigRepository

# Initialize DynamoDB client
dynamodb_client = boto3.client('dynamodb', region_name='us-east-1')

# Initialize with DynamoDB-backed configuration
config_repo = ConfigRepository(dynamodb_client, 'MesonetEmailConfig')
emailer = EmailAlerter(config_repo=config_repo)
alert_store = AlertStore(table_name='MesonetAlerts', dynamodb_client=dynamodb_client)

# Templates and SMTP settings are now loaded from DynamoDB
emailer.send("process_failure", "Alert: Process Failed", context)
```

### Environment Variables for DynamoDB

```bash
# DynamoDB configuration
EMAIL_CONFIG_TABLE=MesonetEmailConfig
ALERTS_TABLE_NAME=MesonetAlerts
AWS_REGION=us-east-1

# Fallback environment variables (used if DynamoDB config fails)
ALERTS_SMTP_HOST=smtp.gmail.com
ALERTS_SMTP_PORT=587
# ... other SMTP settings
```

## ๐Ÿ“ง Email Templates

The package includes four pre-built templates with eye-catching HTML + plaintext versions:

- **`process_failure`** - General processing failures
- **`provider_empty_data`** - Empty data warnings  
- **`harmonize_failure`** - Data harmonization errors
- **`volume_drop`** - Volume drop alerts

### Template Variables

All templates support these variables:

```python
context = {
    "stage": "ingest",           # Processing stage
    "severity": "ERROR",         # ERROR, WARN, INFO
    "provider": "colorado",      # Provider name
    "run_id": "run_123",        # Optional run identifier
    "trace_id": "trace_456",    # Optional trace identifier
    "error": "Error message",    # Error details
    "attempts": 3,              # Number of attempts
    "timestamp_iso": "2025-01-15T10:30:00Z",
    
    # Volume drop specific
    "expected": 100,            # Expected record count
    "actual": 75,              # Actual record count  
    "drop_pct": "25.0",        # Drop percentage
    "window_start": "2025-01-15 13:00 UTC",
    "window_end": "2025-01-15 14:00 UTC",
}
```

## ๐Ÿ”„ Retry Helpers

### Basic Retry Usage

```python
from mesonet_alerts.retry import run_with_retries, ProviderEmptyDataError

def fetch_provider_data():
    # Your data fetching logic
    data = api_client.get_data()
    if not data:
        raise ProviderEmptyDataError("No data returned")
    return data

def is_retryable_error(e):
    return isinstance(e, (ProviderEmptyDataError, ConnectionError))

try:
    data = run_with_retries(fetch_provider_data, is_retryable_error, attempts=3)
except ProviderEmptyDataError:
    # Handle final failure after retries
    pass
```

### Decorator Usage

```python
from mesonet_alerts.retry import retry_on_exceptions

@retry_on_exceptions(ProviderEmptyDataError, ConnectionError)
def fetch_with_auto_retry():
    return api_client.get_data()
```

## ๐Ÿ“Š Volume Drop Detection

```python
from datetime import datetime, timezone, timedelta
from mesonet_alerts.dropcheck import check_and_alert_volume_drop

# Define time window
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
window_end = now
window_start = now - timedelta(hours=1)

# Check for volume drops
check_and_alert_volume_drop(
    provider="colorado",
    stage="harmonize",
    actual_count=75,        # Only 75 records processed
    expected_count=100,     # Expected 100 records
    threshold=0.20,         # Alert on >20% drop
    window_start=window_start,
    window_end=window_end,
    emailer=emailer,
    store=store
)
```

## ๐Ÿ—๏ธ Integration Examples

### Ingest Worker Integration

```python
from mesonet_alerts.emailer import EmailAlerter
from mesonet_alerts.store import AlertStore
from mesonet_alerts.retry import run_with_retries, ProviderEmptyDataError

emailer = EmailAlerter()
store = AlertStore()

def _is_retryable(e: Exception) -> bool:
    return isinstance(e, ProviderEmptyDataError) or "timeout" in str(e).lower()

def fetch_and_process_with_alerts(provider: str, run_id: str, trace_id: str):
    def _do():
        data = fetch_from_provider(provider)  # your existing call
        if not data:
            raise ProviderEmptyDataError(f"Empty data from {provider}")
        return process_data(data)

    try:
        return run_with_retries(_do, _is_retryable, attempts=3)
    except ProviderEmptyDataError as e:
        ctx = {
            "stage": "ingest", "severity": "WARN", "provider": provider, 
            "run_id": run_id, "trace_id": trace_id, "error": str(e), "attempts": 3
        }
        emailer.send("provider_empty_data", f"[INGEST] Empty data: {provider}", ctx)
        store.put_alert(
            provider=provider, stage="ingest", severity="WARN", 
            code="PROVIDER_EMPTY", message="Empty data after retries", 
            metadata=ctx, dedupe_key=f"empty#{provider}#{run_id}"
        )
        raise
    except Exception as e:
        ctx = {
            "stage": "ingest", "severity": "ERROR", "provider": provider,
            "run_id": run_id, "trace_id": trace_id, "error": str(e), "attempts": 3
        }
        emailer.send("process_failure", f"[INGEST] Failure: {provider}", ctx)
        store.put_alert(
            provider=provider, stage="ingest", severity="ERROR",
            code="INGEST_FAILURE", message="Ingest failure after retries",
            metadata=ctx, dedupe_key=f"ingestfail#{provider}#{run_id}"
        )
        raise
```

### Harmonize Worker Integration

```python
from datetime import datetime, timezone, timedelta
from mesonet_alerts.emailer import EmailAlerter
from mesonet_alerts.store import AlertStore  
from mesonet_alerts.dropcheck import check_and_alert_volume_drop

emailer = EmailAlerter()
store = AlertStore()

# After harmonization run completes
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
window_end = now
window_start = now - timedelta(hours=1)

actual_count = count_harmonized_records(provider, window_start, window_end)  # your logic
check_and_alert_volume_drop(
    provider=provider,
    stage="harmonize", 
    actual_count=actual_count,
    expected_count=None,  # use ENV default for now
    threshold=0.20,
    window_start=window_start,
    window_end=window_end,
    emailer=emailer,
    store=store
)
```

## ๐Ÿงช Local Development Testing

### Prerequisites

1. Build the wheel:
   ```bash
   cd micro-services/common/mesonet_alerts
   python -m build  # or uv build
   ```

2. Install into workers (already done in pyproject.toml):
   ```toml
   dependencies = [
       "mesonet-alerts @ file://../common/mesonet_alerts/dist/mesonet_alerts-0.1.0-py3-none-any.whl"
   ]
   ```

### Usage (Local Dev)

**Terminal 1 - Start Debug SMTP Server:**
```bash
export $(grep -v '^#' .env.dev | xargs)
./scripts/run_debug_smtp.sh
```

**Terminal 2 - Test Ingest Alerts:**
```bash
export $(grep -v '^#' .env.dev | xargs)
cd micro-services/mesonet_ingest_worker
python scripts/test_ingest_alerts.py
```

**Terminal 3 - Test Harmonize Alerts:**
```bash
export $(grep -v '^#' .env.dev | xargs)
cd micro-services/mesonet_harmonize-worker
python scripts/test_harmonize_alerts.py
```

**Expected Output:**
- Terminal 1 should print full HTML+text email bodies
- Terminal 2/3 should show "โœ… Alert sent" messages
- You should see nicely formatted emails with inline CSS

### Environment Variables for Testing

```bash
# Override test parameters
TEST_PROVIDER=colorado        # Provider name for tests
TEST_ACTUAL=70               # Actual record count (harmonize test)
TEST_EXPECTED=100            # Expected record count (harmonize test)
```

## ๐Ÿ—„๏ธ DynamoDB Schema

If `ALERTS_TABLE_NAME` is set, alerts are persisted with this schema:

```
Table: alerts
PK: alert_pk (String) = "{provider}#{stage}" 
SK: timestamp (String, ISO8601)
Attributes:
  - severity (String): ERROR, WARN, INFO
  - code (String): PROVIDER_EMPTY, INGEST_FAILURE, etc.
  - message (String): Human-readable message
  - metadata (Map): Additional context data
  - status (String): OPEN (default)
  - ttl (Number): Unix timestamp for auto-deletion
  - dedupe_key (String): Optional deduplication key
  - provider (String): Provider name
  - stage (String): Processing stage
```

## ๐Ÿงช Development

### Running Tests

```bash
# Install dev dependencies
uv sync --dev

# Run tests
pytest tests/ -v

# With coverage
pytest tests/ --cov=src --cov-report=html
```

### Local SMTP Testing

```bash
# Start local SMTP server for testing
python -m smtpd -c DebuggingServer -n localhost:1025

# Or use MailHog (recommended)
docker run -p 1025:1025 -p 8025:8025 mailhog/mailhog
# View emails at http://localhost:8025
```

## ๐Ÿ”ฎ Future Enhancements

The package includes commented hooks for future features:

### Database-Backed Configuration
```python
# TODO: Implement in config.py
EmailConfigRepo.get_active_config()  # SMTP from DB
RecipientRoutingRepo.get_recipients(provider, severity)  # Smart routing
```

### Template Overrides
```python  
# TODO: Implement in templates.py
TemplateRepo.get(template_name, format_type)  # Custom templates from DB
```

### EventBridge/SNS Integration
```python
# TODO: Implement in store.py  
AlertEventPublisher.publish_alert_event(alert_data)  # Fan-out to external systems
```

### Provider-Specific Volume Expectations
```python
# TODO: Implement in dropcheck.py
VolumeExpectationRepo.get_expected_volume(provider, stage, hours)  # Smart baselines
```

## ๐Ÿ“‹ API Reference

### EmailAlerter

- `__init__(config=None, recipients=None)` - Initialize with optional config override
- `send(template, subject, context, recipients=None)` - Send alert email
- `resolve_recipients(provider, severity)` - Future: smart recipient routing

### AlertStore

- `__init__(table_name=None)` - Initialize with optional table name
- `put_alert(provider, stage, severity, code, message, metadata=None, dedupe_key=None, ttl_seconds=86400)` - Store alert
- `get_recent_alerts(provider, stage, hours=24)` - Retrieve recent alerts

### Retry Functions

- `run_with_retries(fn, is_retryable, attempts=3, backoffs=[1,3,9])` - Execute with retry logic
- `retry_on_exceptions(*exception_types)` - Decorator for auto-retry
- `is_network_error(e)`, `is_rate_limit_error(e)`, `is_provider_error(e)` - Error classifiers

### Volume Drop Detection

- `check_and_alert_volume_drop(**kwargs)` - Check and alert on volume drops
- `get_volume_trend(provider, stage, hours_back=24, store=None)` - Analyze volume trends (placeholder)

## ๐Ÿ”’ Security Notes

- Credentials are read from environment variables only
- SMTP passwords are not logged
- DynamoDB uses IAM roles for authentication
- All database operations use conditional writes for consistency
- TTL automatically expires old alerts

## ๐Ÿ“„ License

MIT License - see LICENSE file for details. 
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "mesonet-alerts",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "alerts, dynamodb, email, jinja2, retry",
    "author": null,
    "author_email": "Dawood Siddiq <dawood.siddiq@codingcops.com>",
    "download_url": "https://files.pythonhosted.org/packages/15/72/6c6b4cf1908524c208fbc2faebb92d5c75fad402d4f043d5d2d3637ebdb3/mesonet_alerts-0.2.4.tar.gz",
    "platform": null,
    "description": "# Mesonet Alerts Package\n\nA production-ready shared Python package for email alerting across mesonet microservices. Provides HTML + plaintext email templates, DynamoDB-backed configuration, SMTP integration, alert persistence, retry helpers, and volume drop detection.\n\n## \ud83d\ude80 Quick Start\n\n### Installation\n\nAdd to your service's `pyproject.toml`:\n\n```toml\n[project]\ndependencies = [\n    \"mesonet_alerts @ file://../common/mesonet_alerts\"\n]\n```\n\n### Environment Variables\n\n```bash\n# SMTP Configuration (required for email sending)\nALERTS_SMTP_HOST=localhost\nALERTS_SMTP_PORT=1025\nALERTS_SMTP_USER=                    # Optional for local dev\nALERTS_SMTP_PASS=                    # Optional for local dev\nALERTS_FROM=\"alerts@local.test\"\nALERTS_TO=\"admin@local.test,kevin@local.test\"\n\n# Optional persistence\nALERTS_TABLE_NAME=alerts             # Enable DynamoDB persistence\n\n# Volume monitoring\nEXPECTED_RECORDS_PER_PROVIDER_PER_HOUR=100\n```\n\n### Basic Usage\n\n```python\nfrom mesonet_alerts import EmailAlerter, AlertStore, run_with_retries\n\n# Initialize components\nemailer = EmailAlerter()\nstore = AlertStore()\n\n# Send an alert\ncontext = {\n    \"stage\": \"ingest\",\n    \"severity\": \"ERROR\", \n    \"provider\": \"colorado\",\n    \"run_id\": \"run_123\",\n    \"error\": \"Connection timeout\"\n}\n\nemailer.send(\"process_failure\", \"Alert: Colorado Ingest Failed\", context)\n\n# Store alert with deduplication\nstore.put_alert(\n    provider=\"colorado\",\n    stage=\"ingest\", \n    severity=\"ERROR\",\n    code=\"CONNECTION_TIMEOUT\",\n    message=\"Failed to connect to provider\",\n    dedupe_key=\"timeout#colorado#run_123\"\n)\n```\n\n## \ud83d\uddc4\ufe0f DynamoDB-Backed Configuration\n\nVersion 0.2.0+ supports loading email configuration and templates from DynamoDB, providing centralized configuration management.\n\n### Configuration Table Setup\n\nCreate a DynamoDB table for email configuration:\n\n```bash\n# Create config table\naws dynamodb create-table \\\n  --table-name MesonetEmailConfig \\\n  --attribute-definitions AttributeName=config_pk,AttributeType=S AttributeName=config_sk,AttributeType=S \\\n  --key-schema AttributeName=config_pk,KeyType=HASH AttributeName=config_sk,KeyType=RANGE \\\n  --billing-mode PAY_PER_REQUEST\n\n# Insert configuration\naws dynamodb put-item \\\n  --table-name MesonetEmailConfig \\\n  --item '{\n    \"config_pk\": {\"S\": \"email_config\"},\n    \"config_sk\": {\"S\": \"active\"},\n    \"smtp\": {\"M\": {\"host\": {\"S\": \"smtp.gmail.com\"}, \"port\": {\"N\": \"587\"}, \"user\": {\"S\": \"alerts@company.com\"}, \"pass\": {\"S\": \"app_password\"}, \"from\": {\"S\": \"alerts@company.com\"}}},\n    \"recipients\": {\"M\": {\"list\": {\"L\": [{\"S\": \"admin@company.com\"}, {\"S\": \"team@company.com\"}]}}},\n    \"templates\": {\"M\": {\n      \"process_failure\": {\"M\": {\"html\": {\"S\": \"<html><body><h2 style=\\\"color:#dc3545\\\">Process Failure</h2><p>Provider: {{provider}}</p><p>Error: {{error}}</p></body></html>\"}, \"text\": {\"S\": \"Process Failure\\\\nProvider: {{provider}}\\\\nError: {{error}}\"}}},\n      \"provider_empty_data\": {\"M\": {\"html\": {\"S\": \"<html><body><h2 style=\\\"color:#ffc107\\\">No Data</h2><p>Provider {{provider}} returned no data</p></body></html>\"}, \"text\": {\"S\": \"No Data\\\\nProvider {{provider}} returned no data\"}}},\n      \"harmonize_failure\": {\"M\": {\"html\": {\"S\": \"<html><body><h2 style=\\\"color:#dc3545\\\">Harmonize Failed</h2><p>Provider: {{provider}}</p></body></html>\"}, \"text\": {\"S\": \"Harmonize Failed\\\\nProvider: {{provider}}\"}}},\n      \"volume_drop\": {\"M\": {\"html\": {\"S\": \"<html><body><h2 style=\\\"color:#e67e22\\\">Volume Drop</h2><p>{{drop_pct}}% drop detected</p></body></html>\"}, \"text\": {\"S\": \"Volume Drop\\\\n{{drop_pct}}% drop detected\"}}}\n    }}\n  }'\n```\n\n### Using DynamoDB Configuration\n\n```python\nimport boto3\nfrom mesonet_alerts import EmailAlerter, AlertStore, ConfigRepository\n\n# Initialize DynamoDB client\ndynamodb_client = boto3.client('dynamodb', region_name='us-east-1')\n\n# Initialize with DynamoDB-backed configuration\nconfig_repo = ConfigRepository(dynamodb_client, 'MesonetEmailConfig')\nemailer = EmailAlerter(config_repo=config_repo)\nalert_store = AlertStore(table_name='MesonetAlerts', dynamodb_client=dynamodb_client)\n\n# Templates and SMTP settings are now loaded from DynamoDB\nemailer.send(\"process_failure\", \"Alert: Process Failed\", context)\n```\n\n### Environment Variables for DynamoDB\n\n```bash\n# DynamoDB configuration\nEMAIL_CONFIG_TABLE=MesonetEmailConfig\nALERTS_TABLE_NAME=MesonetAlerts\nAWS_REGION=us-east-1\n\n# Fallback environment variables (used if DynamoDB config fails)\nALERTS_SMTP_HOST=smtp.gmail.com\nALERTS_SMTP_PORT=587\n# ... other SMTP settings\n```\n\n## \ud83d\udce7 Email Templates\n\nThe package includes four pre-built templates with eye-catching HTML + plaintext versions:\n\n- **`process_failure`** - General processing failures\n- **`provider_empty_data`** - Empty data warnings  \n- **`harmonize_failure`** - Data harmonization errors\n- **`volume_drop`** - Volume drop alerts\n\n### Template Variables\n\nAll templates support these variables:\n\n```python\ncontext = {\n    \"stage\": \"ingest\",           # Processing stage\n    \"severity\": \"ERROR\",         # ERROR, WARN, INFO\n    \"provider\": \"colorado\",      # Provider name\n    \"run_id\": \"run_123\",        # Optional run identifier\n    \"trace_id\": \"trace_456\",    # Optional trace identifier\n    \"error\": \"Error message\",    # Error details\n    \"attempts\": 3,              # Number of attempts\n    \"timestamp_iso\": \"2025-01-15T10:30:00Z\",\n    \n    # Volume drop specific\n    \"expected\": 100,            # Expected record count\n    \"actual\": 75,              # Actual record count  \n    \"drop_pct\": \"25.0\",        # Drop percentage\n    \"window_start\": \"2025-01-15 13:00 UTC\",\n    \"window_end\": \"2025-01-15 14:00 UTC\",\n}\n```\n\n## \ud83d\udd04 Retry Helpers\n\n### Basic Retry Usage\n\n```python\nfrom mesonet_alerts.retry import run_with_retries, ProviderEmptyDataError\n\ndef fetch_provider_data():\n    # Your data fetching logic\n    data = api_client.get_data()\n    if not data:\n        raise ProviderEmptyDataError(\"No data returned\")\n    return data\n\ndef is_retryable_error(e):\n    return isinstance(e, (ProviderEmptyDataError, ConnectionError))\n\ntry:\n    data = run_with_retries(fetch_provider_data, is_retryable_error, attempts=3)\nexcept ProviderEmptyDataError:\n    # Handle final failure after retries\n    pass\n```\n\n### Decorator Usage\n\n```python\nfrom mesonet_alerts.retry import retry_on_exceptions\n\n@retry_on_exceptions(ProviderEmptyDataError, ConnectionError)\ndef fetch_with_auto_retry():\n    return api_client.get_data()\n```\n\n## \ud83d\udcca Volume Drop Detection\n\n```python\nfrom datetime import datetime, timezone, timedelta\nfrom mesonet_alerts.dropcheck import check_and_alert_volume_drop\n\n# Define time window\nnow = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)\nwindow_end = now\nwindow_start = now - timedelta(hours=1)\n\n# Check for volume drops\ncheck_and_alert_volume_drop(\n    provider=\"colorado\",\n    stage=\"harmonize\",\n    actual_count=75,        # Only 75 records processed\n    expected_count=100,     # Expected 100 records\n    threshold=0.20,         # Alert on >20% drop\n    window_start=window_start,\n    window_end=window_end,\n    emailer=emailer,\n    store=store\n)\n```\n\n## \ud83c\udfd7\ufe0f Integration Examples\n\n### Ingest Worker Integration\n\n```python\nfrom mesonet_alerts.emailer import EmailAlerter\nfrom mesonet_alerts.store import AlertStore\nfrom mesonet_alerts.retry import run_with_retries, ProviderEmptyDataError\n\nemailer = EmailAlerter()\nstore = AlertStore()\n\ndef _is_retryable(e: Exception) -> bool:\n    return isinstance(e, ProviderEmptyDataError) or \"timeout\" in str(e).lower()\n\ndef fetch_and_process_with_alerts(provider: str, run_id: str, trace_id: str):\n    def _do():\n        data = fetch_from_provider(provider)  # your existing call\n        if not data:\n            raise ProviderEmptyDataError(f\"Empty data from {provider}\")\n        return process_data(data)\n\n    try:\n        return run_with_retries(_do, _is_retryable, attempts=3)\n    except ProviderEmptyDataError as e:\n        ctx = {\n            \"stage\": \"ingest\", \"severity\": \"WARN\", \"provider\": provider, \n            \"run_id\": run_id, \"trace_id\": trace_id, \"error\": str(e), \"attempts\": 3\n        }\n        emailer.send(\"provider_empty_data\", f\"[INGEST] Empty data: {provider}\", ctx)\n        store.put_alert(\n            provider=provider, stage=\"ingest\", severity=\"WARN\", \n            code=\"PROVIDER_EMPTY\", message=\"Empty data after retries\", \n            metadata=ctx, dedupe_key=f\"empty#{provider}#{run_id}\"\n        )\n        raise\n    except Exception as e:\n        ctx = {\n            \"stage\": \"ingest\", \"severity\": \"ERROR\", \"provider\": provider,\n            \"run_id\": run_id, \"trace_id\": trace_id, \"error\": str(e), \"attempts\": 3\n        }\n        emailer.send(\"process_failure\", f\"[INGEST] Failure: {provider}\", ctx)\n        store.put_alert(\n            provider=provider, stage=\"ingest\", severity=\"ERROR\",\n            code=\"INGEST_FAILURE\", message=\"Ingest failure after retries\",\n            metadata=ctx, dedupe_key=f\"ingestfail#{provider}#{run_id}\"\n        )\n        raise\n```\n\n### Harmonize Worker Integration\n\n```python\nfrom datetime import datetime, timezone, timedelta\nfrom mesonet_alerts.emailer import EmailAlerter\nfrom mesonet_alerts.store import AlertStore  \nfrom mesonet_alerts.dropcheck import check_and_alert_volume_drop\n\nemailer = EmailAlerter()\nstore = AlertStore()\n\n# After harmonization run completes\nnow = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)\nwindow_end = now\nwindow_start = now - timedelta(hours=1)\n\nactual_count = count_harmonized_records(provider, window_start, window_end)  # your logic\ncheck_and_alert_volume_drop(\n    provider=provider,\n    stage=\"harmonize\", \n    actual_count=actual_count,\n    expected_count=None,  # use ENV default for now\n    threshold=0.20,\n    window_start=window_start,\n    window_end=window_end,\n    emailer=emailer,\n    store=store\n)\n```\n\n## \ud83e\uddea Local Development Testing\n\n### Prerequisites\n\n1. Build the wheel:\n   ```bash\n   cd micro-services/common/mesonet_alerts\n   python -m build  # or uv build\n   ```\n\n2. Install into workers (already done in pyproject.toml):\n   ```toml\n   dependencies = [\n       \"mesonet-alerts @ file://../common/mesonet_alerts/dist/mesonet_alerts-0.1.0-py3-none-any.whl\"\n   ]\n   ```\n\n### Usage (Local Dev)\n\n**Terminal 1 - Start Debug SMTP Server:**\n```bash\nexport $(grep -v '^#' .env.dev | xargs)\n./scripts/run_debug_smtp.sh\n```\n\n**Terminal 2 - Test Ingest Alerts:**\n```bash\nexport $(grep -v '^#' .env.dev | xargs)\ncd micro-services/mesonet_ingest_worker\npython scripts/test_ingest_alerts.py\n```\n\n**Terminal 3 - Test Harmonize Alerts:**\n```bash\nexport $(grep -v '^#' .env.dev | xargs)\ncd micro-services/mesonet_harmonize-worker\npython scripts/test_harmonize_alerts.py\n```\n\n**Expected Output:**\n- Terminal 1 should print full HTML+text email bodies\n- Terminal 2/3 should show \"\u2705 Alert sent\" messages\n- You should see nicely formatted emails with inline CSS\n\n### Environment Variables for Testing\n\n```bash\n# Override test parameters\nTEST_PROVIDER=colorado        # Provider name for tests\nTEST_ACTUAL=70               # Actual record count (harmonize test)\nTEST_EXPECTED=100            # Expected record count (harmonize test)\n```\n\n## \ud83d\uddc4\ufe0f DynamoDB Schema\n\nIf `ALERTS_TABLE_NAME` is set, alerts are persisted with this schema:\n\n```\nTable: alerts\nPK: alert_pk (String) = \"{provider}#{stage}\" \nSK: timestamp (String, ISO8601)\nAttributes:\n  - severity (String): ERROR, WARN, INFO\n  - code (String): PROVIDER_EMPTY, INGEST_FAILURE, etc.\n  - message (String): Human-readable message\n  - metadata (Map): Additional context data\n  - status (String): OPEN (default)\n  - ttl (Number): Unix timestamp for auto-deletion\n  - dedupe_key (String): Optional deduplication key\n  - provider (String): Provider name\n  - stage (String): Processing stage\n```\n\n## \ud83e\uddea Development\n\n### Running Tests\n\n```bash\n# Install dev dependencies\nuv sync --dev\n\n# Run tests\npytest tests/ -v\n\n# With coverage\npytest tests/ --cov=src --cov-report=html\n```\n\n### Local SMTP Testing\n\n```bash\n# Start local SMTP server for testing\npython -m smtpd -c DebuggingServer -n localhost:1025\n\n# Or use MailHog (recommended)\ndocker run -p 1025:1025 -p 8025:8025 mailhog/mailhog\n# View emails at http://localhost:8025\n```\n\n## \ud83d\udd2e Future Enhancements\n\nThe package includes commented hooks for future features:\n\n### Database-Backed Configuration\n```python\n# TODO: Implement in config.py\nEmailConfigRepo.get_active_config()  # SMTP from DB\nRecipientRoutingRepo.get_recipients(provider, severity)  # Smart routing\n```\n\n### Template Overrides\n```python  \n# TODO: Implement in templates.py\nTemplateRepo.get(template_name, format_type)  # Custom templates from DB\n```\n\n### EventBridge/SNS Integration\n```python\n# TODO: Implement in store.py  \nAlertEventPublisher.publish_alert_event(alert_data)  # Fan-out to external systems\n```\n\n### Provider-Specific Volume Expectations\n```python\n# TODO: Implement in dropcheck.py\nVolumeExpectationRepo.get_expected_volume(provider, stage, hours)  # Smart baselines\n```\n\n## \ud83d\udccb API Reference\n\n### EmailAlerter\n\n- `__init__(config=None, recipients=None)` - Initialize with optional config override\n- `send(template, subject, context, recipients=None)` - Send alert email\n- `resolve_recipients(provider, severity)` - Future: smart recipient routing\n\n### AlertStore\n\n- `__init__(table_name=None)` - Initialize with optional table name\n- `put_alert(provider, stage, severity, code, message, metadata=None, dedupe_key=None, ttl_seconds=86400)` - Store alert\n- `get_recent_alerts(provider, stage, hours=24)` - Retrieve recent alerts\n\n### Retry Functions\n\n- `run_with_retries(fn, is_retryable, attempts=3, backoffs=[1,3,9])` - Execute with retry logic\n- `retry_on_exceptions(*exception_types)` - Decorator for auto-retry\n- `is_network_error(e)`, `is_rate_limit_error(e)`, `is_provider_error(e)` - Error classifiers\n\n### Volume Drop Detection\n\n- `check_and_alert_volume_drop(**kwargs)` - Check and alert on volume drops\n- `get_volume_trend(provider, stage, hours_back=24, store=None)` - Analyze volume trends (placeholder)\n\n## \ud83d\udd12 Security Notes\n\n- Credentials are read from environment variables only\n- SMTP passwords are not logged\n- DynamoDB uses IAM roles for authentication\n- All database operations use conditional writes for consistency\n- TTL automatically expires old alerts\n\n## \ud83d\udcc4 License\n\nMIT License - see LICENSE file for details. ",
    "bugtrack_url": null,
    "license": "Proprietary",
    "summary": "Shared email alerting, retries, and volume-drop checks for Mesonet workers",
    "version": "0.2.4",
    "project_urls": {
        "Homepage": "https://github.com/dwd94/mesonet-alerts"
    },
    "split_keywords": [
        "alerts",
        " dynamodb",
        " email",
        " jinja2",
        " retry"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b15d3e9f90233103c83528bd07f404d7d8891e00be3bc894ca7c44e9e77b3252",
                "md5": "86f615eb1075201cffcc22c1baf3a4d1",
                "sha256": "6407dfc4be9a0dbac45410acbfa4783b6b8b2ddb1dac2b0924c8421e1007bdcb"
            },
            "downloads": -1,
            "filename": "mesonet_alerts-0.2.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "86f615eb1075201cffcc22c1baf3a4d1",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 24672,
            "upload_time": "2025-09-12T14:16:36",
            "upload_time_iso_8601": "2025-09-12T14:16:36.444614Z",
            "url": "https://files.pythonhosted.org/packages/b1/5d/3e9f90233103c83528bd07f404d7d8891e00be3bc894ca7c44e9e77b3252/mesonet_alerts-0.2.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "15726c6b4cf1908524c208fbc2faebb92d5c75fad402d4f043d5d2d3637ebdb3",
                "md5": "36d121b8cbfc988e2034c70a87db4bd6",
                "sha256": "a22a7d9abb439d81d130b0f56d963544821e3aa1bcc034d87947320b051680a5"
            },
            "downloads": -1,
            "filename": "mesonet_alerts-0.2.4.tar.gz",
            "has_sig": false,
            "md5_digest": "36d121b8cbfc988e2034c70a87db4bd6",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 26379,
            "upload_time": "2025-09-12T14:16:46",
            "upload_time_iso_8601": "2025-09-12T14:16:46.004509Z",
            "url": "https://files.pythonhosted.org/packages/15/72/6c6b4cf1908524c208fbc2faebb92d5c75fad402d4f043d5d2d3637ebdb3/mesonet_alerts-0.2.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-12 14:16:46",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "dwd94",
    "github_project": "mesonet-alerts",
    "github_not_found": true,
    "lcname": "mesonet-alerts"
}
        
Elapsed time: 1.76680s