Name | mesonet-alerts JSON |
Version |
0.2.4
JSON |
| download |
home_page | None |
Summary | Shared email alerting, retries, and volume-drop checks for Mesonet workers |
upload_time | 2025-09-12 14:16:46 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.11 |
license | Proprietary |
keywords |
alerts
dynamodb
email
jinja2
retry
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Mesonet Alerts Package
A production-ready shared Python package for email alerting across mesonet microservices. Provides HTML + plaintext email templates, DynamoDB-backed configuration, SMTP integration, alert persistence, retry helpers, and volume drop detection.
## ๐ Quick Start
### Installation
Add to your service's `pyproject.toml`:
```toml
[project]
dependencies = [
"mesonet_alerts @ file://../common/mesonet_alerts"
]
```
### Environment Variables
```bash
# SMTP Configuration (required for email sending)
ALERTS_SMTP_HOST=localhost
ALERTS_SMTP_PORT=1025
ALERTS_SMTP_USER= # Optional for local dev
ALERTS_SMTP_PASS= # Optional for local dev
ALERTS_FROM="alerts@local.test"
ALERTS_TO="admin@local.test,kevin@local.test"
# Optional persistence
ALERTS_TABLE_NAME=alerts # Enable DynamoDB persistence
# Volume monitoring
EXPECTED_RECORDS_PER_PROVIDER_PER_HOUR=100
```
### Basic Usage
```python
from mesonet_alerts import EmailAlerter, AlertStore, run_with_retries
# Initialize components
emailer = EmailAlerter()
store = AlertStore()
# Send an alert
context = {
"stage": "ingest",
"severity": "ERROR",
"provider": "colorado",
"run_id": "run_123",
"error": "Connection timeout"
}
emailer.send("process_failure", "Alert: Colorado Ingest Failed", context)
# Store alert with deduplication
store.put_alert(
provider="colorado",
stage="ingest",
severity="ERROR",
code="CONNECTION_TIMEOUT",
message="Failed to connect to provider",
dedupe_key="timeout#colorado#run_123"
)
```
## ๐๏ธ DynamoDB-Backed Configuration
Version 0.2.0+ supports loading email configuration and templates from DynamoDB, providing centralized configuration management.
### Configuration Table Setup
Create a DynamoDB table for email configuration:
```bash
# Create config table
aws dynamodb create-table \
--table-name MesonetEmailConfig \
--attribute-definitions AttributeName=config_pk,AttributeType=S AttributeName=config_sk,AttributeType=S \
--key-schema AttributeName=config_pk,KeyType=HASH AttributeName=config_sk,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
# Insert configuration
aws dynamodb put-item \
--table-name MesonetEmailConfig \
--item '{
"config_pk": {"S": "email_config"},
"config_sk": {"S": "active"},
"smtp": {"M": {"host": {"S": "smtp.gmail.com"}, "port": {"N": "587"}, "user": {"S": "alerts@company.com"}, "pass": {"S": "app_password"}, "from": {"S": "alerts@company.com"}}},
"recipients": {"M": {"list": {"L": [{"S": "admin@company.com"}, {"S": "team@company.com"}]}}},
"templates": {"M": {
"process_failure": {"M": {"html": {"S": "<html><body><h2 style=\"color:#dc3545\">Process Failure</h2><p>Provider: {{provider}}</p><p>Error: {{error}}</p></body></html>"}, "text": {"S": "Process Failure\\nProvider: {{provider}}\\nError: {{error}}"}}},
"provider_empty_data": {"M": {"html": {"S": "<html><body><h2 style=\"color:#ffc107\">No Data</h2><p>Provider {{provider}} returned no data</p></body></html>"}, "text": {"S": "No Data\\nProvider {{provider}} returned no data"}}},
"harmonize_failure": {"M": {"html": {"S": "<html><body><h2 style=\"color:#dc3545\">Harmonize Failed</h2><p>Provider: {{provider}}</p></body></html>"}, "text": {"S": "Harmonize Failed\\nProvider: {{provider}}"}}},
"volume_drop": {"M": {"html": {"S": "<html><body><h2 style=\"color:#e67e22\">Volume Drop</h2><p>{{drop_pct}}% drop detected</p></body></html>"}, "text": {"S": "Volume Drop\\n{{drop_pct}}% drop detected"}}}
}}
}'
```
### Using DynamoDB Configuration
```python
import boto3
from mesonet_alerts import EmailAlerter, AlertStore, ConfigRepository
# Initialize DynamoDB client
dynamodb_client = boto3.client('dynamodb', region_name='us-east-1')
# Initialize with DynamoDB-backed configuration
config_repo = ConfigRepository(dynamodb_client, 'MesonetEmailConfig')
emailer = EmailAlerter(config_repo=config_repo)
alert_store = AlertStore(table_name='MesonetAlerts', dynamodb_client=dynamodb_client)
# Templates and SMTP settings are now loaded from DynamoDB
emailer.send("process_failure", "Alert: Process Failed", context)
```
### Environment Variables for DynamoDB
```bash
# DynamoDB configuration
EMAIL_CONFIG_TABLE=MesonetEmailConfig
ALERTS_TABLE_NAME=MesonetAlerts
AWS_REGION=us-east-1
# Fallback environment variables (used if DynamoDB config fails)
ALERTS_SMTP_HOST=smtp.gmail.com
ALERTS_SMTP_PORT=587
# ... other SMTP settings
```
## ๐ง Email Templates
The package includes four pre-built templates with eye-catching HTML + plaintext versions:
- **`process_failure`** - General processing failures
- **`provider_empty_data`** - Empty data warnings
- **`harmonize_failure`** - Data harmonization errors
- **`volume_drop`** - Volume drop alerts
### Template Variables
All templates support these variables:
```python
context = {
"stage": "ingest", # Processing stage
"severity": "ERROR", # ERROR, WARN, INFO
"provider": "colorado", # Provider name
"run_id": "run_123", # Optional run identifier
"trace_id": "trace_456", # Optional trace identifier
"error": "Error message", # Error details
"attempts": 3, # Number of attempts
"timestamp_iso": "2025-01-15T10:30:00Z",
# Volume drop specific
"expected": 100, # Expected record count
"actual": 75, # Actual record count
"drop_pct": "25.0", # Drop percentage
"window_start": "2025-01-15 13:00 UTC",
"window_end": "2025-01-15 14:00 UTC",
}
```
## ๐ Retry Helpers
### Basic Retry Usage
```python
from mesonet_alerts.retry import run_with_retries, ProviderEmptyDataError
def fetch_provider_data():
# Your data fetching logic
data = api_client.get_data()
if not data:
raise ProviderEmptyDataError("No data returned")
return data
def is_retryable_error(e):
return isinstance(e, (ProviderEmptyDataError, ConnectionError))
try:
data = run_with_retries(fetch_provider_data, is_retryable_error, attempts=3)
except ProviderEmptyDataError:
# Handle final failure after retries
pass
```
### Decorator Usage
```python
from mesonet_alerts.retry import retry_on_exceptions
@retry_on_exceptions(ProviderEmptyDataError, ConnectionError)
def fetch_with_auto_retry():
return api_client.get_data()
```
## ๐ Volume Drop Detection
```python
from datetime import datetime, timezone, timedelta
from mesonet_alerts.dropcheck import check_and_alert_volume_drop
# Define time window
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
window_end = now
window_start = now - timedelta(hours=1)
# Check for volume drops
check_and_alert_volume_drop(
provider="colorado",
stage="harmonize",
actual_count=75, # Only 75 records processed
expected_count=100, # Expected 100 records
threshold=0.20, # Alert on >20% drop
window_start=window_start,
window_end=window_end,
emailer=emailer,
store=store
)
```
## ๐๏ธ Integration Examples
### Ingest Worker Integration
```python
from mesonet_alerts.emailer import EmailAlerter
from mesonet_alerts.store import AlertStore
from mesonet_alerts.retry import run_with_retries, ProviderEmptyDataError
emailer = EmailAlerter()
store = AlertStore()
def _is_retryable(e: Exception) -> bool:
return isinstance(e, ProviderEmptyDataError) or "timeout" in str(e).lower()
def fetch_and_process_with_alerts(provider: str, run_id: str, trace_id: str):
def _do():
data = fetch_from_provider(provider) # your existing call
if not data:
raise ProviderEmptyDataError(f"Empty data from {provider}")
return process_data(data)
try:
return run_with_retries(_do, _is_retryable, attempts=3)
except ProviderEmptyDataError as e:
ctx = {
"stage": "ingest", "severity": "WARN", "provider": provider,
"run_id": run_id, "trace_id": trace_id, "error": str(e), "attempts": 3
}
emailer.send("provider_empty_data", f"[INGEST] Empty data: {provider}", ctx)
store.put_alert(
provider=provider, stage="ingest", severity="WARN",
code="PROVIDER_EMPTY", message="Empty data after retries",
metadata=ctx, dedupe_key=f"empty#{provider}#{run_id}"
)
raise
except Exception as e:
ctx = {
"stage": "ingest", "severity": "ERROR", "provider": provider,
"run_id": run_id, "trace_id": trace_id, "error": str(e), "attempts": 3
}
emailer.send("process_failure", f"[INGEST] Failure: {provider}", ctx)
store.put_alert(
provider=provider, stage="ingest", severity="ERROR",
code="INGEST_FAILURE", message="Ingest failure after retries",
metadata=ctx, dedupe_key=f"ingestfail#{provider}#{run_id}"
)
raise
```
### Harmonize Worker Integration
```python
from datetime import datetime, timezone, timedelta
from mesonet_alerts.emailer import EmailAlerter
from mesonet_alerts.store import AlertStore
from mesonet_alerts.dropcheck import check_and_alert_volume_drop
emailer = EmailAlerter()
store = AlertStore()
# After harmonization run completes
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
window_end = now
window_start = now - timedelta(hours=1)
actual_count = count_harmonized_records(provider, window_start, window_end) # your logic
check_and_alert_volume_drop(
provider=provider,
stage="harmonize",
actual_count=actual_count,
expected_count=None, # use ENV default for now
threshold=0.20,
window_start=window_start,
window_end=window_end,
emailer=emailer,
store=store
)
```
## ๐งช Local Development Testing
### Prerequisites
1. Build the wheel:
```bash
cd micro-services/common/mesonet_alerts
python -m build # or uv build
```
2. Install into workers (already done in pyproject.toml):
```toml
dependencies = [
"mesonet-alerts @ file://../common/mesonet_alerts/dist/mesonet_alerts-0.1.0-py3-none-any.whl"
]
```
### Usage (Local Dev)
**Terminal 1 - Start Debug SMTP Server:**
```bash
export $(grep -v '^#' .env.dev | xargs)
./scripts/run_debug_smtp.sh
```
**Terminal 2 - Test Ingest Alerts:**
```bash
export $(grep -v '^#' .env.dev | xargs)
cd micro-services/mesonet_ingest_worker
python scripts/test_ingest_alerts.py
```
**Terminal 3 - Test Harmonize Alerts:**
```bash
export $(grep -v '^#' .env.dev | xargs)
cd micro-services/mesonet_harmonize-worker
python scripts/test_harmonize_alerts.py
```
**Expected Output:**
- Terminal 1 should print full HTML+text email bodies
- Terminal 2/3 should show "โ
Alert sent" messages
- You should see nicely formatted emails with inline CSS
### Environment Variables for Testing
```bash
# Override test parameters
TEST_PROVIDER=colorado # Provider name for tests
TEST_ACTUAL=70 # Actual record count (harmonize test)
TEST_EXPECTED=100 # Expected record count (harmonize test)
```
## ๐๏ธ DynamoDB Schema
If `ALERTS_TABLE_NAME` is set, alerts are persisted with this schema:
```
Table: alerts
PK: alert_pk (String) = "{provider}#{stage}"
SK: timestamp (String, ISO8601)
Attributes:
- severity (String): ERROR, WARN, INFO
- code (String): PROVIDER_EMPTY, INGEST_FAILURE, etc.
- message (String): Human-readable message
- metadata (Map): Additional context data
- status (String): OPEN (default)
- ttl (Number): Unix timestamp for auto-deletion
- dedupe_key (String): Optional deduplication key
- provider (String): Provider name
- stage (String): Processing stage
```
## ๐งช Development
### Running Tests
```bash
# Install dev dependencies
uv sync --dev
# Run tests
pytest tests/ -v
# With coverage
pytest tests/ --cov=src --cov-report=html
```
### Local SMTP Testing
```bash
# Start local SMTP server for testing
python -m smtpd -c DebuggingServer -n localhost:1025
# Or use MailHog (recommended)
docker run -p 1025:1025 -p 8025:8025 mailhog/mailhog
# View emails at http://localhost:8025
```
## ๐ฎ Future Enhancements
The package includes commented hooks for future features:
### Database-Backed Configuration
```python
# TODO: Implement in config.py
EmailConfigRepo.get_active_config() # SMTP from DB
RecipientRoutingRepo.get_recipients(provider, severity) # Smart routing
```
### Template Overrides
```python
# TODO: Implement in templates.py
TemplateRepo.get(template_name, format_type) # Custom templates from DB
```
### EventBridge/SNS Integration
```python
# TODO: Implement in store.py
AlertEventPublisher.publish_alert_event(alert_data) # Fan-out to external systems
```
### Provider-Specific Volume Expectations
```python
# TODO: Implement in dropcheck.py
VolumeExpectationRepo.get_expected_volume(provider, stage, hours) # Smart baselines
```
## ๐ API Reference
### EmailAlerter
- `__init__(config=None, recipients=None)` - Initialize with optional config override
- `send(template, subject, context, recipients=None)` - Send alert email
- `resolve_recipients(provider, severity)` - Future: smart recipient routing
### AlertStore
- `__init__(table_name=None)` - Initialize with optional table name
- `put_alert(provider, stage, severity, code, message, metadata=None, dedupe_key=None, ttl_seconds=86400)` - Store alert
- `get_recent_alerts(provider, stage, hours=24)` - Retrieve recent alerts
### Retry Functions
- `run_with_retries(fn, is_retryable, attempts=3, backoffs=[1,3,9])` - Execute with retry logic
- `retry_on_exceptions(*exception_types)` - Decorator for auto-retry
- `is_network_error(e)`, `is_rate_limit_error(e)`, `is_provider_error(e)` - Error classifiers
### Volume Drop Detection
- `check_and_alert_volume_drop(**kwargs)` - Check and alert on volume drops
- `get_volume_trend(provider, stage, hours_back=24, store=None)` - Analyze volume trends (placeholder)
## ๐ Security Notes
- Credentials are read from environment variables only
- SMTP passwords are not logged
- DynamoDB uses IAM roles for authentication
- All database operations use conditional writes for consistency
- TTL automatically expires old alerts
## ๐ License
MIT License - see LICENSE file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "mesonet-alerts",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "alerts, dynamodb, email, jinja2, retry",
"author": null,
"author_email": "Dawood Siddiq <dawood.siddiq@codingcops.com>",
"download_url": "https://files.pythonhosted.org/packages/15/72/6c6b4cf1908524c208fbc2faebb92d5c75fad402d4f043d5d2d3637ebdb3/mesonet_alerts-0.2.4.tar.gz",
"platform": null,
"description": "# Mesonet Alerts Package\n\nA production-ready shared Python package for email alerting across mesonet microservices. Provides HTML + plaintext email templates, DynamoDB-backed configuration, SMTP integration, alert persistence, retry helpers, and volume drop detection.\n\n## \ud83d\ude80 Quick Start\n\n### Installation\n\nAdd to your service's `pyproject.toml`:\n\n```toml\n[project]\ndependencies = [\n \"mesonet_alerts @ file://../common/mesonet_alerts\"\n]\n```\n\n### Environment Variables\n\n```bash\n# SMTP Configuration (required for email sending)\nALERTS_SMTP_HOST=localhost\nALERTS_SMTP_PORT=1025\nALERTS_SMTP_USER= # Optional for local dev\nALERTS_SMTP_PASS= # Optional for local dev\nALERTS_FROM=\"alerts@local.test\"\nALERTS_TO=\"admin@local.test,kevin@local.test\"\n\n# Optional persistence\nALERTS_TABLE_NAME=alerts # Enable DynamoDB persistence\n\n# Volume monitoring\nEXPECTED_RECORDS_PER_PROVIDER_PER_HOUR=100\n```\n\n### Basic Usage\n\n```python\nfrom mesonet_alerts import EmailAlerter, AlertStore, run_with_retries\n\n# Initialize components\nemailer = EmailAlerter()\nstore = AlertStore()\n\n# Send an alert\ncontext = {\n \"stage\": \"ingest\",\n \"severity\": \"ERROR\", \n \"provider\": \"colorado\",\n \"run_id\": \"run_123\",\n \"error\": \"Connection timeout\"\n}\n\nemailer.send(\"process_failure\", \"Alert: Colorado Ingest Failed\", context)\n\n# Store alert with deduplication\nstore.put_alert(\n provider=\"colorado\",\n stage=\"ingest\", \n severity=\"ERROR\",\n code=\"CONNECTION_TIMEOUT\",\n message=\"Failed to connect to provider\",\n dedupe_key=\"timeout#colorado#run_123\"\n)\n```\n\n## \ud83d\uddc4\ufe0f DynamoDB-Backed Configuration\n\nVersion 0.2.0+ supports loading email configuration and templates from DynamoDB, providing centralized configuration management.\n\n### Configuration Table Setup\n\nCreate a DynamoDB table for email configuration:\n\n```bash\n# Create config table\naws dynamodb create-table \\\n --table-name MesonetEmailConfig \\\n --attribute-definitions AttributeName=config_pk,AttributeType=S AttributeName=config_sk,AttributeType=S \\\n --key-schema AttributeName=config_pk,KeyType=HASH AttributeName=config_sk,KeyType=RANGE \\\n --billing-mode PAY_PER_REQUEST\n\n# Insert configuration\naws dynamodb put-item \\\n --table-name MesonetEmailConfig \\\n --item '{\n \"config_pk\": {\"S\": \"email_config\"},\n \"config_sk\": {\"S\": \"active\"},\n \"smtp\": {\"M\": {\"host\": {\"S\": \"smtp.gmail.com\"}, \"port\": {\"N\": \"587\"}, \"user\": {\"S\": \"alerts@company.com\"}, \"pass\": {\"S\": \"app_password\"}, \"from\": {\"S\": \"alerts@company.com\"}}},\n \"recipients\": {\"M\": {\"list\": {\"L\": [{\"S\": \"admin@company.com\"}, {\"S\": \"team@company.com\"}]}}},\n \"templates\": {\"M\": {\n \"process_failure\": {\"M\": {\"html\": {\"S\": \"<html><body><h2 style=\\\"color:#dc3545\\\">Process Failure</h2><p>Provider: {{provider}}</p><p>Error: {{error}}</p></body></html>\"}, \"text\": {\"S\": \"Process Failure\\\\nProvider: {{provider}}\\\\nError: {{error}}\"}}},\n \"provider_empty_data\": {\"M\": {\"html\": {\"S\": \"<html><body><h2 style=\\\"color:#ffc107\\\">No Data</h2><p>Provider {{provider}} returned no data</p></body></html>\"}, \"text\": {\"S\": \"No Data\\\\nProvider {{provider}} returned no data\"}}},\n \"harmonize_failure\": {\"M\": {\"html\": {\"S\": \"<html><body><h2 style=\\\"color:#dc3545\\\">Harmonize Failed</h2><p>Provider: {{provider}}</p></body></html>\"}, \"text\": {\"S\": \"Harmonize Failed\\\\nProvider: {{provider}}\"}}},\n \"volume_drop\": {\"M\": {\"html\": {\"S\": \"<html><body><h2 style=\\\"color:#e67e22\\\">Volume Drop</h2><p>{{drop_pct}}% drop detected</p></body></html>\"}, \"text\": {\"S\": \"Volume Drop\\\\n{{drop_pct}}% drop detected\"}}}\n }}\n }'\n```\n\n### Using DynamoDB Configuration\n\n```python\nimport boto3\nfrom mesonet_alerts import EmailAlerter, AlertStore, ConfigRepository\n\n# Initialize DynamoDB client\ndynamodb_client = boto3.client('dynamodb', region_name='us-east-1')\n\n# Initialize with DynamoDB-backed configuration\nconfig_repo = ConfigRepository(dynamodb_client, 'MesonetEmailConfig')\nemailer = EmailAlerter(config_repo=config_repo)\nalert_store = AlertStore(table_name='MesonetAlerts', dynamodb_client=dynamodb_client)\n\n# Templates and SMTP settings are now loaded from DynamoDB\nemailer.send(\"process_failure\", \"Alert: Process Failed\", context)\n```\n\n### Environment Variables for DynamoDB\n\n```bash\n# DynamoDB configuration\nEMAIL_CONFIG_TABLE=MesonetEmailConfig\nALERTS_TABLE_NAME=MesonetAlerts\nAWS_REGION=us-east-1\n\n# Fallback environment variables (used if DynamoDB config fails)\nALERTS_SMTP_HOST=smtp.gmail.com\nALERTS_SMTP_PORT=587\n# ... other SMTP settings\n```\n\n## \ud83d\udce7 Email Templates\n\nThe package includes four pre-built templates with eye-catching HTML + plaintext versions:\n\n- **`process_failure`** - General processing failures\n- **`provider_empty_data`** - Empty data warnings \n- **`harmonize_failure`** - Data harmonization errors\n- **`volume_drop`** - Volume drop alerts\n\n### Template Variables\n\nAll templates support these variables:\n\n```python\ncontext = {\n \"stage\": \"ingest\", # Processing stage\n \"severity\": \"ERROR\", # ERROR, WARN, INFO\n \"provider\": \"colorado\", # Provider name\n \"run_id\": \"run_123\", # Optional run identifier\n \"trace_id\": \"trace_456\", # Optional trace identifier\n \"error\": \"Error message\", # Error details\n \"attempts\": 3, # Number of attempts\n \"timestamp_iso\": \"2025-01-15T10:30:00Z\",\n \n # Volume drop specific\n \"expected\": 100, # Expected record count\n \"actual\": 75, # Actual record count \n \"drop_pct\": \"25.0\", # Drop percentage\n \"window_start\": \"2025-01-15 13:00 UTC\",\n \"window_end\": \"2025-01-15 14:00 UTC\",\n}\n```\n\n## \ud83d\udd04 Retry Helpers\n\n### Basic Retry Usage\n\n```python\nfrom mesonet_alerts.retry import run_with_retries, ProviderEmptyDataError\n\ndef fetch_provider_data():\n # Your data fetching logic\n data = api_client.get_data()\n if not data:\n raise ProviderEmptyDataError(\"No data returned\")\n return data\n\ndef is_retryable_error(e):\n return isinstance(e, (ProviderEmptyDataError, ConnectionError))\n\ntry:\n data = run_with_retries(fetch_provider_data, is_retryable_error, attempts=3)\nexcept ProviderEmptyDataError:\n # Handle final failure after retries\n pass\n```\n\n### Decorator Usage\n\n```python\nfrom mesonet_alerts.retry import retry_on_exceptions\n\n@retry_on_exceptions(ProviderEmptyDataError, ConnectionError)\ndef fetch_with_auto_retry():\n return api_client.get_data()\n```\n\n## \ud83d\udcca Volume Drop Detection\n\n```python\nfrom datetime import datetime, timezone, timedelta\nfrom mesonet_alerts.dropcheck import check_and_alert_volume_drop\n\n# Define time window\nnow = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)\nwindow_end = now\nwindow_start = now - timedelta(hours=1)\n\n# Check for volume drops\ncheck_and_alert_volume_drop(\n provider=\"colorado\",\n stage=\"harmonize\",\n actual_count=75, # Only 75 records processed\n expected_count=100, # Expected 100 records\n threshold=0.20, # Alert on >20% drop\n window_start=window_start,\n window_end=window_end,\n emailer=emailer,\n store=store\n)\n```\n\n## \ud83c\udfd7\ufe0f Integration Examples\n\n### Ingest Worker Integration\n\n```python\nfrom mesonet_alerts.emailer import EmailAlerter\nfrom mesonet_alerts.store import AlertStore\nfrom mesonet_alerts.retry import run_with_retries, ProviderEmptyDataError\n\nemailer = EmailAlerter()\nstore = AlertStore()\n\ndef _is_retryable(e: Exception) -> bool:\n return isinstance(e, ProviderEmptyDataError) or \"timeout\" in str(e).lower()\n\ndef fetch_and_process_with_alerts(provider: str, run_id: str, trace_id: str):\n def _do():\n data = fetch_from_provider(provider) # your existing call\n if not data:\n raise ProviderEmptyDataError(f\"Empty data from {provider}\")\n return process_data(data)\n\n try:\n return run_with_retries(_do, _is_retryable, attempts=3)\n except ProviderEmptyDataError as e:\n ctx = {\n \"stage\": \"ingest\", \"severity\": \"WARN\", \"provider\": provider, \n \"run_id\": run_id, \"trace_id\": trace_id, \"error\": str(e), \"attempts\": 3\n }\n emailer.send(\"provider_empty_data\", f\"[INGEST] Empty data: {provider}\", ctx)\n store.put_alert(\n provider=provider, stage=\"ingest\", severity=\"WARN\", \n code=\"PROVIDER_EMPTY\", message=\"Empty data after retries\", \n metadata=ctx, dedupe_key=f\"empty#{provider}#{run_id}\"\n )\n raise\n except Exception as e:\n ctx = {\n \"stage\": \"ingest\", \"severity\": \"ERROR\", \"provider\": provider,\n \"run_id\": run_id, \"trace_id\": trace_id, \"error\": str(e), \"attempts\": 3\n }\n emailer.send(\"process_failure\", f\"[INGEST] Failure: {provider}\", ctx)\n store.put_alert(\n provider=provider, stage=\"ingest\", severity=\"ERROR\",\n code=\"INGEST_FAILURE\", message=\"Ingest failure after retries\",\n metadata=ctx, dedupe_key=f\"ingestfail#{provider}#{run_id}\"\n )\n raise\n```\n\n### Harmonize Worker Integration\n\n```python\nfrom datetime import datetime, timezone, timedelta\nfrom mesonet_alerts.emailer import EmailAlerter\nfrom mesonet_alerts.store import AlertStore \nfrom mesonet_alerts.dropcheck import check_and_alert_volume_drop\n\nemailer = EmailAlerter()\nstore = AlertStore()\n\n# After harmonization run completes\nnow = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)\nwindow_end = now\nwindow_start = now - timedelta(hours=1)\n\nactual_count = count_harmonized_records(provider, window_start, window_end) # your logic\ncheck_and_alert_volume_drop(\n provider=provider,\n stage=\"harmonize\", \n actual_count=actual_count,\n expected_count=None, # use ENV default for now\n threshold=0.20,\n window_start=window_start,\n window_end=window_end,\n emailer=emailer,\n store=store\n)\n```\n\n## \ud83e\uddea Local Development Testing\n\n### Prerequisites\n\n1. Build the wheel:\n ```bash\n cd micro-services/common/mesonet_alerts\n python -m build # or uv build\n ```\n\n2. Install into workers (already done in pyproject.toml):\n ```toml\n dependencies = [\n \"mesonet-alerts @ file://../common/mesonet_alerts/dist/mesonet_alerts-0.1.0-py3-none-any.whl\"\n ]\n ```\n\n### Usage (Local Dev)\n\n**Terminal 1 - Start Debug SMTP Server:**\n```bash\nexport $(grep -v '^#' .env.dev | xargs)\n./scripts/run_debug_smtp.sh\n```\n\n**Terminal 2 - Test Ingest Alerts:**\n```bash\nexport $(grep -v '^#' .env.dev | xargs)\ncd micro-services/mesonet_ingest_worker\npython scripts/test_ingest_alerts.py\n```\n\n**Terminal 3 - Test Harmonize Alerts:**\n```bash\nexport $(grep -v '^#' .env.dev | xargs)\ncd micro-services/mesonet_harmonize-worker\npython scripts/test_harmonize_alerts.py\n```\n\n**Expected Output:**\n- Terminal 1 should print full HTML+text email bodies\n- Terminal 2/3 should show \"\u2705 Alert sent\" messages\n- You should see nicely formatted emails with inline CSS\n\n### Environment Variables for Testing\n\n```bash\n# Override test parameters\nTEST_PROVIDER=colorado # Provider name for tests\nTEST_ACTUAL=70 # Actual record count (harmonize test)\nTEST_EXPECTED=100 # Expected record count (harmonize test)\n```\n\n## \ud83d\uddc4\ufe0f DynamoDB Schema\n\nIf `ALERTS_TABLE_NAME` is set, alerts are persisted with this schema:\n\n```\nTable: alerts\nPK: alert_pk (String) = \"{provider}#{stage}\" \nSK: timestamp (String, ISO8601)\nAttributes:\n - severity (String): ERROR, WARN, INFO\n - code (String): PROVIDER_EMPTY, INGEST_FAILURE, etc.\n - message (String): Human-readable message\n - metadata (Map): Additional context data\n - status (String): OPEN (default)\n - ttl (Number): Unix timestamp for auto-deletion\n - dedupe_key (String): Optional deduplication key\n - provider (String): Provider name\n - stage (String): Processing stage\n```\n\n## \ud83e\uddea Development\n\n### Running Tests\n\n```bash\n# Install dev dependencies\nuv sync --dev\n\n# Run tests\npytest tests/ -v\n\n# With coverage\npytest tests/ --cov=src --cov-report=html\n```\n\n### Local SMTP Testing\n\n```bash\n# Start local SMTP server for testing\npython -m smtpd -c DebuggingServer -n localhost:1025\n\n# Or use MailHog (recommended)\ndocker run -p 1025:1025 -p 8025:8025 mailhog/mailhog\n# View emails at http://localhost:8025\n```\n\n## \ud83d\udd2e Future Enhancements\n\nThe package includes commented hooks for future features:\n\n### Database-Backed Configuration\n```python\n# TODO: Implement in config.py\nEmailConfigRepo.get_active_config() # SMTP from DB\nRecipientRoutingRepo.get_recipients(provider, severity) # Smart routing\n```\n\n### Template Overrides\n```python \n# TODO: Implement in templates.py\nTemplateRepo.get(template_name, format_type) # Custom templates from DB\n```\n\n### EventBridge/SNS Integration\n```python\n# TODO: Implement in store.py \nAlertEventPublisher.publish_alert_event(alert_data) # Fan-out to external systems\n```\n\n### Provider-Specific Volume Expectations\n```python\n# TODO: Implement in dropcheck.py\nVolumeExpectationRepo.get_expected_volume(provider, stage, hours) # Smart baselines\n```\n\n## \ud83d\udccb API Reference\n\n### EmailAlerter\n\n- `__init__(config=None, recipients=None)` - Initialize with optional config override\n- `send(template, subject, context, recipients=None)` - Send alert email\n- `resolve_recipients(provider, severity)` - Future: smart recipient routing\n\n### AlertStore\n\n- `__init__(table_name=None)` - Initialize with optional table name\n- `put_alert(provider, stage, severity, code, message, metadata=None, dedupe_key=None, ttl_seconds=86400)` - Store alert\n- `get_recent_alerts(provider, stage, hours=24)` - Retrieve recent alerts\n\n### Retry Functions\n\n- `run_with_retries(fn, is_retryable, attempts=3, backoffs=[1,3,9])` - Execute with retry logic\n- `retry_on_exceptions(*exception_types)` - Decorator for auto-retry\n- `is_network_error(e)`, `is_rate_limit_error(e)`, `is_provider_error(e)` - Error classifiers\n\n### Volume Drop Detection\n\n- `check_and_alert_volume_drop(**kwargs)` - Check and alert on volume drops\n- `get_volume_trend(provider, stage, hours_back=24, store=None)` - Analyze volume trends (placeholder)\n\n## \ud83d\udd12 Security Notes\n\n- Credentials are read from environment variables only\n- SMTP passwords are not logged\n- DynamoDB uses IAM roles for authentication\n- All database operations use conditional writes for consistency\n- TTL automatically expires old alerts\n\n## \ud83d\udcc4 License\n\nMIT License - see LICENSE file for details. ",
"bugtrack_url": null,
"license": "Proprietary",
"summary": "Shared email alerting, retries, and volume-drop checks for Mesonet workers",
"version": "0.2.4",
"project_urls": {
"Homepage": "https://github.com/dwd94/mesonet-alerts"
},
"split_keywords": [
"alerts",
" dynamodb",
" email",
" jinja2",
" retry"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "b15d3e9f90233103c83528bd07f404d7d8891e00be3bc894ca7c44e9e77b3252",
"md5": "86f615eb1075201cffcc22c1baf3a4d1",
"sha256": "6407dfc4be9a0dbac45410acbfa4783b6b8b2ddb1dac2b0924c8421e1007bdcb"
},
"downloads": -1,
"filename": "mesonet_alerts-0.2.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "86f615eb1075201cffcc22c1baf3a4d1",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 24672,
"upload_time": "2025-09-12T14:16:36",
"upload_time_iso_8601": "2025-09-12T14:16:36.444614Z",
"url": "https://files.pythonhosted.org/packages/b1/5d/3e9f90233103c83528bd07f404d7d8891e00be3bc894ca7c44e9e77b3252/mesonet_alerts-0.2.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "15726c6b4cf1908524c208fbc2faebb92d5c75fad402d4f043d5d2d3637ebdb3",
"md5": "36d121b8cbfc988e2034c70a87db4bd6",
"sha256": "a22a7d9abb439d81d130b0f56d963544821e3aa1bcc034d87947320b051680a5"
},
"downloads": -1,
"filename": "mesonet_alerts-0.2.4.tar.gz",
"has_sig": false,
"md5_digest": "36d121b8cbfc988e2034c70a87db4bd6",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 26379,
"upload_time": "2025-09-12T14:16:46",
"upload_time_iso_8601": "2025-09-12T14:16:46.004509Z",
"url": "https://files.pythonhosted.org/packages/15/72/6c6b4cf1908524c208fbc2faebb92d5c75fad402d4f043d5d2d3637ebdb3/mesonet_alerts-0.2.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-12 14:16:46",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "dwd94",
"github_project": "mesonet-alerts",
"github_not_found": true,
"lcname": "mesonet-alerts"
}