fast-bi-replication-control


Namefast-bi-replication-control JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryAirflow utilities to monitor and cancel long-running Airbyte jobs with advanced job tracking
upload_time2025-09-01 11:26:22
maintainerNone
docs_urlNone
authorNone
requires_python<3.13,>=3.9
licenseMIT
keywords airflow airbyte data-replication job-monitoring job-tracking etl data-pipeline fast-bi data-orchestration workflow data-workflow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Fast.Bi Replication Control

Airflow utilities to monitor and cancel long-running Airbyte jobs with advanced job tracking.

## Overview

This package provides Airflow operators and hooks to:
- Discover all Airbyte workspaces and connections
- Monitor running jobs across connections using proper API filters
- Track job durations across DAG runs using XCom persistence
- Cancel jobs that exceed configurable time limits
- Provide full context (connection names, workspace info)
- Calculate actual duration from start time (not Airbyte's unreliable duration field)

## Key Features

### 🎯 **Advanced Job Tracking**
- **Robust Persistence**: Uses Airflow Variables for cross-worker persistence (works with HPA scaling)
- **Worker Consistency**: Pinned to specific worker (`data-orchestration-worker-0`) for consistency
- **Accurate Duration Calculation**: Calculates actual runtime from stored start times
- **Full Context**: Provides connection names, workspace info, and job details

### 🔧 **Persistence Strategy**
The system uses a multi-layered persistence approach to handle dynamic worker scaling:

1. **Primary**: Airflow Variables - Works across all workers and DAG runs
2. **Fallback**: XCom - For same DAG run persistence
3. **Worker Pinning**: Queue assignment to `default` for consistency

This ensures job tracking data persists even with HPA scaling and worker restarts.

### 🔍 **Smart Job Discovery**
- **API-First Approach**: Uses proper Airbyte API filters (`status=running`) instead of hardcoded job IDs
- **Dynamic Detection**: Automatically finds all running jobs without manual configuration
- **Fallback Support**: Graceful degradation if API filters fail

### 🛡️ **Safety Features**
- **Dry Run Mode**: Default `dry_run=True` for safety - preview what would be canceled
- **Configurable Limits**: Set different time limits for different connections
- **Error Handling**: Robust error handling with detailed logging

## Installation

```bash
pip install fast_bi_replication_control
```

## Airflow Connection Setup

Create an Airflow connection with the following details:

- **Connection Id**: `Data-Replication` (or customize via parameter)
- **Connection Type**: `airbyte`
- **Host**: `http://data-replication-airbyte-webapp-svc.data-replication.svc.cluster.local`
- **Port**: (leave empty or specify if needed)
- **No authentication required** (internal network)

## Environment Variables (Fallback)

If no Airflow connection is configured, the package will use these environment variables:

```bash
AIRBYTE_API_LINK=http://data-replication-airbyte-webapp-svc.data-replication.svc.cluster.local
AIRBYTE_API_BASE=api/public
```

## Usage Examples

### 1. Monitor All Connections (Recommended)

```python
from fast_bi_replication_control import AirbyteJobMonitorOperator

# Monitor all connections with job tracking
# Note: Uses queue='default' for worker consistency
monitor_task = AirbyteJobMonitorOperator(
    task_id='monitor_all_connections',
    airbyte_conn_id='Data-Replication',
    connection_ids=None,  # Monitor all connections
    max_runtime_hours=3.0,  # Cancel jobs running longer than 3 hours
    job_type='sync',  # Monitor sync jobs only
    dry_run=True,  # Default to True for safety - set to False to actually cancel
    queue='default',  # Pin to specific worker for consistency
    dag=dag,
)
```

### 2. Monitor Specific Connections

```python
# Monitor only specific connection IDs with job tracking
# Note: Uses queue='default' for worker consistency
monitor_task = AirbyteJobMonitorOperator(
    task_id='monitor_specific_connections',
    airbyte_conn_id='Data-Replication',
    connection_ids=['fccd3766-624e-478f-bb0d-3dc31d8a4efb'],  # Specific connection IDs
    max_runtime_hours=2.0,  # More aggressive threshold for specific connections
    job_type='sync',  # Monitor sync jobs only
    dry_run=True,  # Default to True for safety - set to False to actually cancel
    queue='default',  # Pin to specific worker for consistency
    dag=dag,
)
```

### 3. Use Job Tracking System Directly

```python
from fast_bi_replication_control import track_and_monitor_jobs

def custom_monitoring(**context):
    """Custom monitoring logic using the job tracking system."""
    from fast_bi_replication_control import AirbyteApiHook
    
    hook = AirbyteApiHook(airbyte_conn_id='Data-Replication')
    
    # Use the new tracking system
    result = track_and_monitor_jobs(
        hook=hook,
        context=context,
        max_runtime_hours=1.0,  # 1 hour threshold
        dry_run=True  # Don't actually cancel
    )
    
    print(f"Tracked jobs: {result['tracked_jobs_count']}")
    print(f"Running jobs found: {result['running_jobs_found']}")
    print(f"Long running jobs: {result['long_running_jobs']}")
    
    return result

# Use in PythonOperator
custom_task = PythonOperator(
    task_id='custom_monitoring',
    python_callable=custom_monitoring,
    dag=dag,
)
```

### 4. Generate Tracking Reports

```python
from fast_bi_replication_control import create_job_tracker

def generate_report(**context):
    """Generate a report of currently tracked jobs."""
    tracker = create_job_tracker(context)
    summary = tracker.get_tracking_summary()
    
    print(f"Total tracked jobs: {summary['total_tracked']}")
    
    for job in summary['jobs']:
        print(f"Job {job['job_id']}: {job['formatted']}")
        print(f"  Connection: {job['connection_name']}")
        print(f"  Workspace: {job['workspace_name']}")
    
    return summary

report_task = PythonOperator(
    task_id='generate_report',
    python_callable=generate_report,
    dag=dag,
)
```

### 5. Complete Example DAG

```python
from fast_bi_replication_control import AirbyteApiHook

hook = AirbyteApiHook(airbyte_conn_id='Data-Replication')

# List workspaces
workspaces = hook.list_workspaces()
print(f"Found {len(workspaces.get('data', []))} workspaces")

# List connections
connections = hook.list_connections()
print(f"Found {len(connections.get('data', []))} connections")

# List jobs for a specific connection
jobs = hook.list_jobs(connection_id='your-connection-id')
print(f"Found {len(jobs.get('data', []))} jobs")

# Get job status
status, details = hook.get_job_status('your-job-id')
print(f"Job status: {status}")

# Cancel a job
result = hook.cancel_job('your-job-id')
print(f"Cancel result: {result}")
```

## API Endpoints Used

The package interacts with the following Airbyte API endpoints:

- `GET /v1/workspaces` - List all workspaces
- `GET /v1/connections` - List connections (with optional workspace filtering)
- `GET /v1/jobs` - List jobs (with optional connection/workspace filtering)
- `GET /v1/jobs/{jobId}` - Get job status and details
- `DELETE /v1/jobs/{jobId}` - Cancel a running job

## Configuration Options

### AirbyteJobMonitorOperator

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `airbyte_conn_id` | str | `'Data-Replication'` | Airflow connection ID |
| `connection_ids` | List[str] | `None` | List of connection IDs to monitor (None = all) |
| `max_runtime_hours` | float | `3.0` | Maximum runtime before canceling |
| `job_type` | str | `None` | Filter by job type ('sync', 'reset', or None for all) |
| `dry_run` | bool | `False` | If True, log what would be canceled but don't actually cancel |

### AirbyteJobMonitorSensor

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `job_id` | str | Required | Job ID to monitor |
| `airbyte_conn_id` | str | `'Data-Replication'` | Airflow connection ID |
| `cancel_after_seconds` | int | `None` | Cancel job after this many seconds |
| `poke_interval` | int | `60` | How often to check job status (seconds) |
| `timeout` | int | `None` | Maximum time to wait for job completion |

## Job Duration Tracking

**Important**: Since the Airbyte API doesn't provide runtime duration, the package uses job creation time (`createdAt`) to estimate runtime. This means:

1. On the first run, if a job is already running, the package can't determine how long it has been running
2. For accurate duration tracking across DAG runs, consider implementing persistent storage (XCom, database) to track job start times

## Example DAG

See `example_dag.py` for a comprehensive example showing all usage patterns.

## Error Handling

The package includes robust error handling:
- API failures are logged but don't stop the entire operation
- Individual connection/job failures are isolated
- Detailed logging for debugging

## Development

To build the package locally:

```bash
cd fast_bi_replication_control
python -m build
```

## License

MIT License 

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "fast-bi-replication-control",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.13,>=3.9",
    "maintainer_email": "\"Fast.BI\" <administrator@fast.bi>",
    "keywords": "airflow, airbyte, data-replication, job-monitoring, job-tracking, etl, data-pipeline, fast-bi, data-orchestration, workflow, data-workflow",
    "author": null,
    "author_email": "\"Fast.BI\" <support@fast.bi>",
    "download_url": "https://files.pythonhosted.org/packages/eb/52/6eca9a7cc08ea17999726e8f3700fa01c01af088600d63613673743428d7/fast_bi_replication_control-0.1.0.tar.gz",
    "platform": null,
    "description": "# Fast.Bi Replication Control\n\nAirflow utilities to monitor and cancel long-running Airbyte jobs with advanced job tracking.\n\n## Overview\n\nThis package provides Airflow operators and hooks to:\n- Discover all Airbyte workspaces and connections\n- Monitor running jobs across connections using proper API filters\n- Track job durations across DAG runs using XCom persistence\n- Cancel jobs that exceed configurable time limits\n- Provide full context (connection names, workspace info)\n- Calculate actual duration from start time (not Airbyte's unreliable duration field)\n\n## Key Features\n\n### \ud83c\udfaf **Advanced Job Tracking**\n- **Robust Persistence**: Uses Airflow Variables for cross-worker persistence (works with HPA scaling)\n- **Worker Consistency**: Pinned to specific worker (`data-orchestration-worker-0`) for consistency\n- **Accurate Duration Calculation**: Calculates actual runtime from stored start times\n- **Full Context**: Provides connection names, workspace info, and job details\n\n### \ud83d\udd27 **Persistence Strategy**\nThe system uses a multi-layered persistence approach to handle dynamic worker scaling:\n\n1. **Primary**: Airflow Variables - Works across all workers and DAG runs\n2. **Fallback**: XCom - For same DAG run persistence\n3. **Worker Pinning**: Queue assignment to `default` for consistency\n\nThis ensures job tracking data persists even with HPA scaling and worker restarts.\n\n### \ud83d\udd0d **Smart Job Discovery**\n- **API-First Approach**: Uses proper Airbyte API filters (`status=running`) instead of hardcoded job IDs\n- **Dynamic Detection**: Automatically finds all running jobs without manual configuration\n- **Fallback Support**: Graceful degradation if API filters fail\n\n### \ud83d\udee1\ufe0f **Safety Features**\n- **Dry Run Mode**: Default `dry_run=True` for safety - preview what would be canceled\n- **Configurable Limits**: Set different time limits for different connections\n- **Error Handling**: Robust error handling with detailed logging\n\n## Installation\n\n```bash\npip install fast_bi_replication_control\n```\n\n## Airflow Connection Setup\n\nCreate an Airflow connection with the following details:\n\n- **Connection Id**: `Data-Replication` (or customize via parameter)\n- **Connection Type**: `airbyte`\n- **Host**: `http://data-replication-airbyte-webapp-svc.data-replication.svc.cluster.local`\n- **Port**: (leave empty or specify if needed)\n- **No authentication required** (internal network)\n\n## Environment Variables (Fallback)\n\nIf no Airflow connection is configured, the package will use these environment variables:\n\n```bash\nAIRBYTE_API_LINK=http://data-replication-airbyte-webapp-svc.data-replication.svc.cluster.local\nAIRBYTE_API_BASE=api/public\n```\n\n## Usage Examples\n\n### 1. Monitor All Connections (Recommended)\n\n```python\nfrom fast_bi_replication_control import AirbyteJobMonitorOperator\n\n# Monitor all connections with job tracking\n# Note: Uses queue='default' for worker consistency\nmonitor_task = AirbyteJobMonitorOperator(\n    task_id='monitor_all_connections',\n    airbyte_conn_id='Data-Replication',\n    connection_ids=None,  # Monitor all connections\n    max_runtime_hours=3.0,  # Cancel jobs running longer than 3 hours\n    job_type='sync',  # Monitor sync jobs only\n    dry_run=True,  # Default to True for safety - set to False to actually cancel\n    queue='default',  # Pin to specific worker for consistency\n    dag=dag,\n)\n```\n\n### 2. Monitor Specific Connections\n\n```python\n# Monitor only specific connection IDs with job tracking\n# Note: Uses queue='default' for worker consistency\nmonitor_task = AirbyteJobMonitorOperator(\n    task_id='monitor_specific_connections',\n    airbyte_conn_id='Data-Replication',\n    connection_ids=['fccd3766-624e-478f-bb0d-3dc31d8a4efb'],  # Specific connection IDs\n    max_runtime_hours=2.0,  # More aggressive threshold for specific connections\n    job_type='sync',  # Monitor sync jobs only\n    dry_run=True,  # Default to True for safety - set to False to actually cancel\n    queue='default',  # Pin to specific worker for consistency\n    dag=dag,\n)\n```\n\n### 3. Use Job Tracking System Directly\n\n```python\nfrom fast_bi_replication_control import track_and_monitor_jobs\n\ndef custom_monitoring(**context):\n    \"\"\"Custom monitoring logic using the job tracking system.\"\"\"\n    from fast_bi_replication_control import AirbyteApiHook\n    \n    hook = AirbyteApiHook(airbyte_conn_id='Data-Replication')\n    \n    # Use the new tracking system\n    result = track_and_monitor_jobs(\n        hook=hook,\n        context=context,\n        max_runtime_hours=1.0,  # 1 hour threshold\n        dry_run=True  # Don't actually cancel\n    )\n    \n    print(f\"Tracked jobs: {result['tracked_jobs_count']}\")\n    print(f\"Running jobs found: {result['running_jobs_found']}\")\n    print(f\"Long running jobs: {result['long_running_jobs']}\")\n    \n    return result\n\n# Use in PythonOperator\ncustom_task = PythonOperator(\n    task_id='custom_monitoring',\n    python_callable=custom_monitoring,\n    dag=dag,\n)\n```\n\n### 4. Generate Tracking Reports\n\n```python\nfrom fast_bi_replication_control import create_job_tracker\n\ndef generate_report(**context):\n    \"\"\"Generate a report of currently tracked jobs.\"\"\"\n    tracker = create_job_tracker(context)\n    summary = tracker.get_tracking_summary()\n    \n    print(f\"Total tracked jobs: {summary['total_tracked']}\")\n    \n    for job in summary['jobs']:\n        print(f\"Job {job['job_id']}: {job['formatted']}\")\n        print(f\"  Connection: {job['connection_name']}\")\n        print(f\"  Workspace: {job['workspace_name']}\")\n    \n    return summary\n\nreport_task = PythonOperator(\n    task_id='generate_report',\n    python_callable=generate_report,\n    dag=dag,\n)\n```\n\n### 5. Complete Example DAG\n\n```python\nfrom fast_bi_replication_control import AirbyteApiHook\n\nhook = AirbyteApiHook(airbyte_conn_id='Data-Replication')\n\n# List workspaces\nworkspaces = hook.list_workspaces()\nprint(f\"Found {len(workspaces.get('data', []))} workspaces\")\n\n# List connections\nconnections = hook.list_connections()\nprint(f\"Found {len(connections.get('data', []))} connections\")\n\n# List jobs for a specific connection\njobs = hook.list_jobs(connection_id='your-connection-id')\nprint(f\"Found {len(jobs.get('data', []))} jobs\")\n\n# Get job status\nstatus, details = hook.get_job_status('your-job-id')\nprint(f\"Job status: {status}\")\n\n# Cancel a job\nresult = hook.cancel_job('your-job-id')\nprint(f\"Cancel result: {result}\")\n```\n\n## API Endpoints Used\n\nThe package interacts with the following Airbyte API endpoints:\n\n- `GET /v1/workspaces` - List all workspaces\n- `GET /v1/connections` - List connections (with optional workspace filtering)\n- `GET /v1/jobs` - List jobs (with optional connection/workspace filtering)\n- `GET /v1/jobs/{jobId}` - Get job status and details\n- `DELETE /v1/jobs/{jobId}` - Cancel a running job\n\n## Configuration Options\n\n### AirbyteJobMonitorOperator\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `airbyte_conn_id` | str | `'Data-Replication'` | Airflow connection ID |\n| `connection_ids` | List[str] | `None` | List of connection IDs to monitor (None = all) |\n| `max_runtime_hours` | float | `3.0` | Maximum runtime before canceling |\n| `job_type` | str | `None` | Filter by job type ('sync', 'reset', or None for all) |\n| `dry_run` | bool | `False` | If True, log what would be canceled but don't actually cancel |\n\n### AirbyteJobMonitorSensor\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `job_id` | str | Required | Job ID to monitor |\n| `airbyte_conn_id` | str | `'Data-Replication'` | Airflow connection ID |\n| `cancel_after_seconds` | int | `None` | Cancel job after this many seconds |\n| `poke_interval` | int | `60` | How often to check job status (seconds) |\n| `timeout` | int | `None` | Maximum time to wait for job completion |\n\n## Job Duration Tracking\n\n**Important**: Since the Airbyte API doesn't provide runtime duration, the package uses job creation time (`createdAt`) to estimate runtime. This means:\n\n1. On the first run, if a job is already running, the package can't determine how long it has been running\n2. For accurate duration tracking across DAG runs, consider implementing persistent storage (XCom, database) to track job start times\n\n## Example DAG\n\nSee `example_dag.py` for a comprehensive example showing all usage patterns.\n\n## Error Handling\n\nThe package includes robust error handling:\n- API failures are logged but don't stop the entire operation\n- Individual connection/job failures are isolated\n- Detailed logging for debugging\n\n## Development\n\nTo build the package locally:\n\n```bash\ncd fast_bi_replication_control\npython -m build\n```\n\n## License\n\nMIT License \n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Airflow utilities to monitor and cancel long-running Airbyte jobs with advanced job tracking",
    "version": "0.1.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/fast-bi/data-replication-control/issues",
        "Changelog": "https://github.com/fast-bi/data-replication-control/blob/master/CHANGELOG.md",
        "Documentation": "https://wiki.fast.bi/en/User-Guide/Data-Orchestration/Data-Replication-Control",
        "Homepage": "https://github.com/fast-bi/data-replication-control",
        "Repository": "https://github.com/fast-bi/data-replication-control"
    },
    "split_keywords": [
        "airflow",
        " airbyte",
        " data-replication",
        " job-monitoring",
        " job-tracking",
        " etl",
        " data-pipeline",
        " fast-bi",
        " data-orchestration",
        " workflow",
        " data-workflow"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "dc845b13ad6a3c36ee4437010a3597cf9a4ef3bb47267ea5d1f4d7abd8dd6275",
                "md5": "15dacfbf3f2276eff2eac4f9a1eb371b",
                "sha256": "fe2708a6c28c3838cf253b276a5626a106337cc7ed7b5f4958f8e75defda19c0"
            },
            "downloads": -1,
            "filename": "fast_bi_replication_control-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "15dacfbf3f2276eff2eac4f9a1eb371b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.13,>=3.9",
            "size": 15434,
            "upload_time": "2025-09-01T11:26:21",
            "upload_time_iso_8601": "2025-09-01T11:26:21.517291Z",
            "url": "https://files.pythonhosted.org/packages/dc/84/5b13ad6a3c36ee4437010a3597cf9a4ef3bb47267ea5d1f4d7abd8dd6275/fast_bi_replication_control-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "eb526eca9a7cc08ea17999726e8f3700fa01c01af088600d63613673743428d7",
                "md5": "0a498773c3e22806893afebc4c9b7827",
                "sha256": "096429393329c35fb6981ba5045224ce876d12c142bcc2dca5f455ae5804021a"
            },
            "downloads": -1,
            "filename": "fast_bi_replication_control-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "0a498773c3e22806893afebc4c9b7827",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<3.13,>=3.9",
            "size": 19448,
            "upload_time": "2025-09-01T11:26:22",
            "upload_time_iso_8601": "2025-09-01T11:26:22.458110Z",
            "url": "https://files.pythonhosted.org/packages/eb/52/6eca9a7cc08ea17999726e8f3700fa01c01af088600d63613673743428d7/fast_bi_replication_control-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-01 11:26:22",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "fast-bi",
    "github_project": "data-replication-control",
    "github_not_found": true,
    "lcname": "fast-bi-replication-control"
}
        
Elapsed time: 0.93879s