airflow-xtended-api


Nameairflow-xtended-api JSON
Version 0.1.6 PyPI version JSON
download
home_pagehttps://github.com/anr007/airflow-xtended-api
SummaryExposes 'Xtended' Apache Airflow management capabilities via secure API
upload_time2024-02-04 10:34:06
maintainer
docs_urlNone
authoranr007
requires_python>=3.6
licenseGNU AGPLv3
keywords apache airflow plugin flask
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Airflow Xtended API - Plugin

Apache Airflow plugin that exposes xtended secure API endpoints similar to the official [Airflow API (Stable) (1.0.0)](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html), providing richer capabilities to support more powerful DAG and job management. Apache Airflow version 2.8.0 or higher is necessary.

## Requirements

- [apache-airflow](https://github.com/apache/airflow)
- [pymongo](https://github.com/mongodb/mongo-python-driver)
- [boto3](https://github.com/boto/boto3)
- [requests](https://github.com/psf/requests)

## Installation

```python
python3 -m pip install airflow-xtended-api
```

## Build from source

Build a custom version of this plugin by following the instructions in this [doc](BUILD.md)

## Screenshots

![screen1](https://raw.githubusercontent.com/anr007/airflow-xtended-api/main/grabs/screen1.jpg)
![screen2](https://raw.githubusercontent.com/anr007/airflow-xtended-api/main/grabs/screen2.jpg)
![screen3](https://raw.githubusercontent.com/anr007/airflow-xtended-api/main/grabs/screen3.jpg)

## Authentication

Airflow Xtended API plugin uses the same auth mechanism as [Airflow API (Stable) (1.0.0)](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Trying-the-API). So, by default APIs exposed via this plugin respect the auth mechanism used by your Airflow webserver and also complies with the existing RBAC policies. Note that you will need to pass credentials data as part of the request. Here is a snippet from the official docs when basic authorization is used:

```bash
curl -X POST 'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/dags/{dag_id}?update_mask=is_paused' \
-H 'Content-Type: application/json' \
--user "username:password" \
-d '{
    "is_paused": true
}'
```

## Using the Custom API

After installing the plugin python package and restarting your airflow webserver, You can see a link under the 'Xtended API' tab called 'Reference Docs' on the airflow webserver homepage. All the necessary documentation for the supported API endpoints resides on that page. You can also directly navigate to that page using below link.

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/xtended_api/
```

All the supported endpoints are exposed in the below format:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/{ENDPOINT_NAME}
```

Following are the names of endpoints which are currently supported.

- [deploy_dag](#deploy_dag)
- [create_dag](#create_dag)
- [s3_sync](#s3_sync)
- [mongo_sync](#mongo_sync)
- [scan_dags](#scan_dags)
- [purge_dags](#purge_dags)
- [refresh_all_dags](#refresh_all_dags)
- [delete_dag](#delete_dag)
- [upload_file](#upload_file)
- [restart_failed_task](#restart_failed_task)
- [kill_running_tasks](#kill_running_tasks)
- [run_task_instance](#run_task_instance)
- [skip_task_instance](#skip_task_instance)

### **_<span id="deploy_dag">deploy_dag</span>_**

##### Description:

- Deploy a new DAG File to the DAGs directory.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag
```

##### Method:

- POST

##### POST request Arguments:

- dag_file - file - Upload & Deploy a DAG from .py or .zip files
- force (optional) - boolean - Force uploading the file if it already exists
- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!

##### Examples:

```bash
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'dag_file=@test_dag.py' \
 -F 'force=y' \
 -F 'unpause=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag
```

##### response:

```json
{
  "message": "DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded",
  "status": "success"
}
```

##### Method:

- GET

##### Get request Arguments:

- dag_file_url - file - A valid url for fetching .py, .pyc or .zip DAG files
- filename - string - A valid filename ending with .py, .pyc or .zip
- force (optional) - boolean - Force uploading the file if it already exists.
- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!

##### Examples:

```bash
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag?dag_file_url={DAG_FILE_URL}&filename=test_dag.py&force=on&unpause=on'

```

##### response:

```json
{
  "message": "DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded",
  "status": "success"
}
```

### **_<span id="create_dag">create_dag</span>_**

##### Description:

- Create a new DAG File in the DAGs directory.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/create_dag
```

##### Method:

- POST

##### POST request Arguments:

- filename - string - Name of the python DAG file
- dag_code - string(multiline) - Python code of the DAG file
- force (optional) - boolean - Force uploading the file if it already exists
- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!

### **_<span id="s3_sync">s3_sync</span>_**

##### Description:

- Sync DAG files from an S3 bucket.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync
```

##### Method:

- POST

##### POST request Arguments:

- s3_bucket_name - string - S3 bucket name where DAG files exist
- s3_region - string - S3 region name where the specified bucket exists
- s3_access_key - string - IAM access key having atleast S3 bucket read access
- s3_secret_key - string - IAM secret key for the specifed access key
- s3_object_prefix (optional) - string - Filter results by object prefix
- s3_object_keys (optional) - string - Sync DAG files specifed by the object keys. Multiple object keys are seperated by comma (,)
- skip_purge (optional) - boolean - Skip emptying DAGs directory
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!

##### Examples:

```bash
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 's3_bucket_name=test-bucket' \
 -F 's3_region=us-east-1' \
 -F 's3_access_key={IAM_ACCESS_KEY}' \
 -F 's3_secret_key={IAM_SECRET_KEY}' \
 -F 'skip_purge=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync
```

##### response:

```json
{
  "message": "dag files synced from s3",
  "sync_status": {
    "synced": ["test_dag0.py", "test_dag1.py", "test_dag2.py"],
    "failed": []
  },
  "status": "success"
}
```

### **_<span id="mongo_sync">mongo_sync</span>_**

##### Description:

- Sync DAG files from a mongo db collection

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync
```

##### Method:

- POST

##### POST request Arguments:

- connection_string - string - Source mongo server connection string
- db_name - string - Source mongo database name
- collection_name - string - Collection name where DAG data exists in the specified db
- field_filename - string - DAGs are named using value of this document field from the specified collection
- field_dag_source - string - A document field referring the Python source for the yet-to-be created DAGs
- query_filter (optional) - string - JSON query string to filter required documents
- skip_purge (optional) - boolean - Skip emptying DAGs directory
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!

##### Examples:

```bash
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'connection_string={MONGO_SERVER_CONNECTION_STRING}' \
 -F 'db_name=test_db' \
 -F 'collection_name=test_collection' \
 -F 'field_dag_source=dag_source' \
 -F 'field_filename=dag_filename' \
 -F 'skip_purge=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync
```

##### response:

```json
{
  "message": "dag files synced from mongo",
  "sync_status": {
    "synced": ["test_dag0.py", "test_dag1.py", "test_dag2.py"],
    "failed": []
  },
  "status": "success"
}
```

### **_<span id="refresh_all_dags">refresh_all_dags</span>_**

##### Description:

- Refresh all DAGs in the webserver.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags
```

##### Method:

- GET

##### GET request Arguments:

- None

##### Examples:

```bash
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags
```

##### response:

```json
{
  "message": "All DAGs are now up-to-date!!",
  "status": "success"
}
```

### **_<span id="scan_dags">scan_dags</span>_**

##### Description:

- Check for newly created DAGs.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags
```

##### Method:

- GET

##### GET request Arguments:

- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!

##### Examples:

```bash
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags
```

##### response:

```json
{
  "message": "Ondemand DAG scan complete!!",
  "status": "success"
}
```

### **_<span id="purge_dags">purge_dags</span>_**

##### Description:

- Empty DAG directory.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags
```

##### Method:

- GET

##### GET request Arguments:

- None

##### Examples:

```bash
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags
```

##### response:

```json
{
  "message": "DAG directory purged!!",
  "status": "success"
}
```

### **_<span id="delete_dag">delete_dag</span>_**

##### Description:

- Delete a DAG in the web server from Airflow database and filesystem.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag
```

##### Method:

- GET

##### GET request Arguments:

- dag_id (optional)- string - DAG id
- filename (optional) - string - Name of the DAG file that needs to be deleted
- Note: Atleast one of args 'dag_id' or 'filename' should be specified

##### Examples:

```bash
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag?dag_id=test_dag&filename=test_dag.py'
```

##### response:

```json
{
  "message": "DAG [dag_test] deleted",
  "status": "success"
}
```

### **_<span id="upload_file">upload_file</span>_**

##### Description:

- Upload a new File to the specified directory.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file
```

##### Method:

- POST

##### POST request Arguments:

- file - file - File to be uploaded
- force (optional) - boolean - Force uploading the file if it already exists
- path (optional) - string - Location where the file is to be uploaded (Default is the DAGs directory)

##### Examples:

```bash
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'file=@test_file.py' \
 -F 'force=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file
```

##### response:

```json
{
  "message": "File [/{DAG_FOLDER}/dag_test.txt] has been uploaded",
  "status": "success"
}
```

### **_<span id="restart_failed_task">restart_failed_task</span>_**

##### Description:

- Restart failed tasks with downstream.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task
```

##### Method:

- GET

##### GET request Arguments:

- dag_id - string - DAG id
- run_id - string - DagRun id

##### Examples:

```bash
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task?dag_id=test_dag&run_id=test_run'
```

##### response:

```json
{
  "message": {
    "failed_task_count": 1,
    "clear_task_count": 7
  },
  "status": "success"
}
```

### **_<span id="kill_running_tasks">kill_running_tasks</span>_**

##### Description:

- Kill running tasks having status in ['none', 'running'].

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks
```

##### Method:

- GET

##### GET request Arguments:

- dag_id - string - DAG id
- run_id - string - DagRun id
- task_id - string - If task_id is none, kill all tasks, else kill the specified task.

##### Examples:

```bash
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks?dag_id=test_dag&run_id=test_run&task_id=test_task'
```

##### response:

```json
{
  "message": "tasks in test_run killed!!",
  "status": "success"
}
```

### **_<span id="run_task_instance">run_task_instance</span>_**

##### Description:

- Create DagRun, run the specified tasks, and skip the rest.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance
```

##### Method:

- POST

##### POST request Arguments:

- dag_id - string - DAG id
- run_id - string - DagRun id
- tasks - string - task id(s), Multiple tasks are separated by comma (,)
- conf (optional)- string - Optional configuartion for creating DagRun.

##### Examples:

```bash
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'dag_id=test_dag' \
 -F 'run_id=test_run' \
 -F 'tasks=test_task' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance
```

##### response:

```json
{
  "execution_date": "2021-06-21T05:50:19.740803+0000",
  "status": "success"
}
```

### **_<span id="skip_task_instance">skip_task_instance</span>_**

##### Description:

- Skip one task instance.

##### Endpoint:

```text
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance
```

##### Method:

- GET

##### GET request Arguments:

- dag_id - string - DAG id
- run_id - string - DagRun id
- task_id - string - task id

##### Examples:

```bash
curl -X GET http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance?dag_id=test_dag&run_id=test_run&task_id=test_task
```

##### response:

```json
{
  "message": "<TaskInstance: test_dag.test_task 2021-06-21 19:59:34.638794+00:00 [skipped]> skipped!!",
  "status": "success"
}
```

## Acknowledgements

Huge shout out to these awesome plugins that contributed to the growth of Airflow ecosystem, which also inspired this plugin.

- [airflow-rest-api-plugin](https://github.com/teamclairvoyant/airflow-rest-api-plugin)
- [airflow-rest-api-plugin](https://github.com/eBay/airflow-rest-api-plugin)
- [simple-dag-editor](https://github.com/ohadmata/simple-dag-editor)
- [airflow-code-editor](https://github.com/andreax79/airflow-code-editor)

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/anr007/airflow-xtended-api",
    "name": "airflow-xtended-api",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": "",
    "keywords": "apache airflow,plugin,flask",
    "author": "anr007",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/f9/93/4ad1ab4ccbbf296173ce4fe93fb02996f8e5bec87ec969b7a08333acf5be/airflow_xtended_api-0.1.6.tar.gz",
    "platform": null,
    "description": "# Airflow Xtended API - Plugin\n\nApache Airflow plugin that exposes xtended secure API endpoints similar to the official [Airflow API (Stable) (1.0.0)](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html), providing richer capabilities to support more powerful DAG and job management. Apache Airflow version 2.8.0 or higher is necessary.\n\n## Requirements\n\n- [apache-airflow](https://github.com/apache/airflow)\n- [pymongo](https://github.com/mongodb/mongo-python-driver)\n- [boto3](https://github.com/boto/boto3)\n- [requests](https://github.com/psf/requests)\n\n## Installation\n\n```python\npython3 -m pip install airflow-xtended-api\n```\n\n## Build from source\n\nBuild a custom version of this plugin by following the instructions in this [doc](BUILD.md)\n\n## Screenshots\n\n![screen1](https://raw.githubusercontent.com/anr007/airflow-xtended-api/main/grabs/screen1.jpg)\n![screen2](https://raw.githubusercontent.com/anr007/airflow-xtended-api/main/grabs/screen2.jpg)\n![screen3](https://raw.githubusercontent.com/anr007/airflow-xtended-api/main/grabs/screen3.jpg)\n\n## Authentication\n\nAirflow Xtended API plugin uses the same auth mechanism as [Airflow API (Stable) (1.0.0)](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Trying-the-API). So, by default APIs exposed via this plugin respect the auth mechanism used by your Airflow webserver and also complies with the existing RBAC policies. Note that you will need to pass credentials data as part of the request. Here is a snippet from the official docs when basic authorization is used:\n\n```bash\ncurl -X POST 'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/dags/{dag_id}?update_mask=is_paused' \\\n-H 'Content-Type: application/json' \\\n--user \"username:password\" \\\n-d '{\n    \"is_paused\": true\n}'\n```\n\n## Using the Custom API\n\nAfter installing the plugin python package and restarting your airflow webserver, You can see a link under the 'Xtended API' tab called 'Reference Docs' on the airflow webserver homepage. All the necessary documentation for the supported API endpoints resides on that page. You can also directly navigate to that page using below link.\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/xtended_api/\n```\n\nAll the supported endpoints are exposed in the below format:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/{ENDPOINT_NAME}\n```\n\nFollowing are the names of endpoints which are currently supported.\n\n- [deploy_dag](#deploy_dag)\n- [create_dag](#create_dag)\n- [s3_sync](#s3_sync)\n- [mongo_sync](#mongo_sync)\n- [scan_dags](#scan_dags)\n- [purge_dags](#purge_dags)\n- [refresh_all_dags](#refresh_all_dags)\n- [delete_dag](#delete_dag)\n- [upload_file](#upload_file)\n- [restart_failed_task](#restart_failed_task)\n- [kill_running_tasks](#kill_running_tasks)\n- [run_task_instance](#run_task_instance)\n- [skip_task_instance](#skip_task_instance)\n\n### **_<span id=\"deploy_dag\">deploy_dag</span>_**\n\n##### Description:\n\n- Deploy a new DAG File to the DAGs directory.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag\n```\n\n##### Method:\n\n- POST\n\n##### POST request Arguments:\n\n- dag_file - file - Upload & Deploy a DAG from .py or .zip files\n- force (optional) - boolean - Force uploading the file if it already exists\n- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)\n- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!\n\n##### Examples:\n\n```bash\ncurl -X POST -H 'Content-Type: multipart/form-data' \\\n --user \"username:password\" \\\n -F 'dag_file=@test_dag.py' \\\n -F 'force=y' \\\n -F 'unpause=y' \\\n http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded\",\n  \"status\": \"success\"\n}\n```\n\n##### Method:\n\n- GET\n\n##### Get request Arguments:\n\n- dag_file_url - file - A valid url for fetching .py, .pyc or .zip DAG files\n- filename - string - A valid filename ending with .py, .pyc or .zip\n- force (optional) - boolean - Force uploading the file if it already exists.\n- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)\n- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!\n\n##### Examples:\n\n```bash\ncurl -X GET --user \"username:password\" \\\n'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag?dag_file_url={DAG_FILE_URL}&filename=test_dag.py&force=on&unpause=on'\n\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded\",\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"create_dag\">create_dag</span>_**\n\n##### Description:\n\n- Create a new DAG File in the DAGs directory.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/create_dag\n```\n\n##### Method:\n\n- POST\n\n##### POST request Arguments:\n\n- filename - string - Name of the python DAG file\n- dag_code - string(multiline) - Python code of the DAG file\n- force (optional) - boolean - Force uploading the file if it already exists\n- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)\n- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!\n\n### **_<span id=\"s3_sync\">s3_sync</span>_**\n\n##### Description:\n\n- Sync DAG files from an S3 bucket.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync\n```\n\n##### Method:\n\n- POST\n\n##### POST request Arguments:\n\n- s3_bucket_name - string - S3 bucket name where DAG files exist\n- s3_region - string - S3 region name where the specified bucket exists\n- s3_access_key - string - IAM access key having atleast S3 bucket read access\n- s3_secret_key - string - IAM secret key for the specifed access key\n- s3_object_prefix (optional) - string - Filter results by object prefix\n- s3_object_keys (optional) - string - Sync DAG files specifed by the object keys. Multiple object keys are seperated by comma (,)\n- skip_purge (optional) - boolean - Skip emptying DAGs directory\n- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!\n\n##### Examples:\n\n```bash\ncurl -X POST -H 'Content-Type: multipart/form-data' \\\n --user \"username:password\" \\\n -F 's3_bucket_name=test-bucket' \\\n -F 's3_region=us-east-1' \\\n -F 's3_access_key={IAM_ACCESS_KEY}' \\\n -F 's3_secret_key={IAM_SECRET_KEY}' \\\n -F 'skip_purge=y' \\\n http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"dag files synced from s3\",\n  \"sync_status\": {\n    \"synced\": [\"test_dag0.py\", \"test_dag1.py\", \"test_dag2.py\"],\n    \"failed\": []\n  },\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"mongo_sync\">mongo_sync</span>_**\n\n##### Description:\n\n- Sync DAG files from a mongo db collection\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync\n```\n\n##### Method:\n\n- POST\n\n##### POST request Arguments:\n\n- connection_string - string - Source mongo server connection string\n- db_name - string - Source mongo database name\n- collection_name - string - Collection name where DAG data exists in the specified db\n- field_filename - string - DAGs are named using value of this document field from the specified collection\n- field_dag_source - string - A document field referring the Python source for the yet-to-be created DAGs\n- query_filter (optional) - string - JSON query string to filter required documents\n- skip_purge (optional) - boolean - Skip emptying DAGs directory\n- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!\n\n##### Examples:\n\n```bash\ncurl -X POST -H 'Content-Type: multipart/form-data' \\\n --user \"username:password\" \\\n -F 'connection_string={MONGO_SERVER_CONNECTION_STRING}' \\\n -F 'db_name=test_db' \\\n -F 'collection_name=test_collection' \\\n -F 'field_dag_source=dag_source' \\\n -F 'field_filename=dag_filename' \\\n -F 'skip_purge=y' \\\n http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"dag files synced from mongo\",\n  \"sync_status\": {\n    \"synced\": [\"test_dag0.py\", \"test_dag1.py\", \"test_dag2.py\"],\n    \"failed\": []\n  },\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"refresh_all_dags\">refresh_all_dags</span>_**\n\n##### Description:\n\n- Refresh all DAGs in the webserver.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags\n```\n\n##### Method:\n\n- GET\n\n##### GET request Arguments:\n\n- None\n\n##### Examples:\n\n```bash\ncurl -X GET --user \"username:password\" \\\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"All DAGs are now up-to-date!!\",\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"scan_dags\">scan_dags</span>_**\n\n##### Description:\n\n- Check for newly created DAGs.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags\n```\n\n##### Method:\n\n- GET\n\n##### GET request Arguments:\n\n- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!\n\n##### Examples:\n\n```bash\ncurl -X GET --user \"username:password\" \\\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"Ondemand DAG scan complete!!\",\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"purge_dags\">purge_dags</span>_**\n\n##### Description:\n\n- Empty DAG directory.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags\n```\n\n##### Method:\n\n- GET\n\n##### GET request Arguments:\n\n- None\n\n##### Examples:\n\n```bash\ncurl -X GET --user \"username:password\" \\\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"DAG directory purged!!\",\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"delete_dag\">delete_dag</span>_**\n\n##### Description:\n\n- Delete a DAG in the web server from Airflow database and filesystem.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag\n```\n\n##### Method:\n\n- GET\n\n##### GET request Arguments:\n\n- dag_id (optional)- string - DAG id\n- filename (optional) - string - Name of the DAG file that needs to be deleted\n- Note: Atleast one of args 'dag_id' or 'filename' should be specified\n\n##### Examples:\n\n```bash\ncurl -X GET --user \"username:password\" \\\n'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag?dag_id=test_dag&filename=test_dag.py'\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"DAG [dag_test] deleted\",\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"upload_file\">upload_file</span>_**\n\n##### Description:\n\n- Upload a new File to the specified directory.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file\n```\n\n##### Method:\n\n- POST\n\n##### POST request Arguments:\n\n- file - file - File to be uploaded\n- force (optional) - boolean - Force uploading the file if it already exists\n- path (optional) - string - Location where the file is to be uploaded (Default is the DAGs directory)\n\n##### Examples:\n\n```bash\ncurl -X POST -H 'Content-Type: multipart/form-data' \\\n --user \"username:password\" \\\n -F 'file=@test_file.py' \\\n -F 'force=y' \\\n http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"File [/{DAG_FOLDER}/dag_test.txt] has been uploaded\",\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"restart_failed_task\">restart_failed_task</span>_**\n\n##### Description:\n\n- Restart failed tasks with downstream.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task\n```\n\n##### Method:\n\n- GET\n\n##### GET request Arguments:\n\n- dag_id - string - DAG id\n- run_id - string - DagRun id\n\n##### Examples:\n\n```bash\ncurl -X GET --user \"username:password\" \\\n'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task?dag_id=test_dag&run_id=test_run'\n```\n\n##### response:\n\n```json\n{\n  \"message\": {\n    \"failed_task_count\": 1,\n    \"clear_task_count\": 7\n  },\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"kill_running_tasks\">kill_running_tasks</span>_**\n\n##### Description:\n\n- Kill running tasks having status in ['none', 'running'].\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks\n```\n\n##### Method:\n\n- GET\n\n##### GET request Arguments:\n\n- dag_id - string - DAG id\n- run_id - string - DagRun id\n- task_id - string - If task_id is none, kill all tasks, else kill the specified task.\n\n##### Examples:\n\n```bash\ncurl -X GET --user \"username:password\" \\\n'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks?dag_id=test_dag&run_id=test_run&task_id=test_task'\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"tasks in test_run killed!!\",\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"run_task_instance\">run_task_instance</span>_**\n\n##### Description:\n\n- Create DagRun, run the specified tasks, and skip the rest.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance\n```\n\n##### Method:\n\n- POST\n\n##### POST request Arguments:\n\n- dag_id - string - DAG id\n- run_id - string - DagRun id\n- tasks - string - task id(s), Multiple tasks are separated by comma (,)\n- conf (optional)- string - Optional configuartion for creating DagRun.\n\n##### Examples:\n\n```bash\ncurl -X POST -H 'Content-Type: multipart/form-data' \\\n --user \"username:password\" \\\n -F 'dag_id=test_dag' \\\n -F 'run_id=test_run' \\\n -F 'tasks=test_task' \\\n http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance\n```\n\n##### response:\n\n```json\n{\n  \"execution_date\": \"2021-06-21T05:50:19.740803+0000\",\n  \"status\": \"success\"\n}\n```\n\n### **_<span id=\"skip_task_instance\">skip_task_instance</span>_**\n\n##### Description:\n\n- Skip one task instance.\n\n##### Endpoint:\n\n```text\nhttp://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance\n```\n\n##### Method:\n\n- GET\n\n##### GET request Arguments:\n\n- dag_id - string - DAG id\n- run_id - string - DagRun id\n- task_id - string - task id\n\n##### Examples:\n\n```bash\ncurl -X GET http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance?dag_id=test_dag&run_id=test_run&task_id=test_task\n```\n\n##### response:\n\n```json\n{\n  \"message\": \"<TaskInstance: test_dag.test_task 2021-06-21 19:59:34.638794+00:00 [skipped]> skipped!!\",\n  \"status\": \"success\"\n}\n```\n\n## Acknowledgements\n\nHuge shout out to these awesome plugins that contributed to the growth of Airflow ecosystem, which also inspired this plugin.\n\n- [airflow-rest-api-plugin](https://github.com/teamclairvoyant/airflow-rest-api-plugin)\n- [airflow-rest-api-plugin](https://github.com/eBay/airflow-rest-api-plugin)\n- [simple-dag-editor](https://github.com/ohadmata/simple-dag-editor)\n- [airflow-code-editor](https://github.com/andreax79/airflow-code-editor)\n",
    "bugtrack_url": null,
    "license": "GNU AGPLv3",
    "summary": "Exposes 'Xtended' Apache Airflow management capabilities via secure API",
    "version": "0.1.6",
    "project_urls": {
        "Homepage": "https://github.com/anr007/airflow-xtended-api"
    },
    "split_keywords": [
        "apache airflow",
        "plugin",
        "flask"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "74cd7846d1df7fd080ad2474112805778d8f181e70cbba1b162481a7bc617d5f",
                "md5": "de7e05d92103fc651607af4c3f76f99e",
                "sha256": "1fa0e217953cc981c2a796254ce2351ffeb3d73d0b3a948884ebb8546e85f01e"
            },
            "downloads": -1,
            "filename": "airflow_xtended_api-0.1.6-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "de7e05d92103fc651607af4c3f76f99e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.6",
            "size": 50962,
            "upload_time": "2024-02-04T10:34:04",
            "upload_time_iso_8601": "2024-02-04T10:34:04.251313Z",
            "url": "https://files.pythonhosted.org/packages/74/cd/7846d1df7fd080ad2474112805778d8f181e70cbba1b162481a7bc617d5f/airflow_xtended_api-0.1.6-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f9934ad1ab4ccbbf296173ce4fe93fb02996f8e5bec87ec969b7a08333acf5be",
                "md5": "a34b071e6e279502cbdd84bc1690035e",
                "sha256": "fcfd5261f5581e8e32b7762c04e7ff4adbafb7a7fa4eacec1ae795b78d032ecf"
            },
            "downloads": -1,
            "filename": "airflow_xtended_api-0.1.6.tar.gz",
            "has_sig": false,
            "md5_digest": "a34b071e6e279502cbdd84bc1690035e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.6",
            "size": 41494,
            "upload_time": "2024-02-04T10:34:06",
            "upload_time_iso_8601": "2024-02-04T10:34:06.577307Z",
            "url": "https://files.pythonhosted.org/packages/f9/93/4ad1ab4ccbbf296173ce4fe93fb02996f8e5bec87ec969b7a08333acf5be/airflow_xtended_api-0.1.6.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-02-04 10:34:06",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "anr007",
    "github_project": "airflow-xtended-api",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "airflow-xtended-api"
}
        
Elapsed time: 1.27801s