| Name | starlake-airflow JSON |
| Version |
0.1.3.1
JSON |
| download |
| home_page | None |
| Summary | Starlake Python Distribution For Airflow |
| upload_time | 2024-10-17 13:43:47 |
| maintainer | None |
| docs_url | None |
| author | Stéphane Manciot |
| requires_python | None |
| license | Apache 2.0 |
| keywords |
|
| VCS |
|
| bugtrack_url |
|
| requirements |
No requirements were recorded.
|
| Travis-CI |
No Travis.
|
| coveralls test coverage |
No coveralls.
|
# starlake-airflow
**starlake-airflow** is the **[Starlake](https://starlake-ai.github.io/starlake/index.html)** Python Distribution for **Airflow**.
It is recommended to use it in combinaison with **[starlake dag generation](https://starlake-ai.github.io/starlake/docs/concepts/orchestration)**, but can be used directly as is in your **DAGs**.
## Prerequisites
Before installing starlake-airflow, ensure the following minimum versions are installed on your system:
- starlake: 1.0.0 or higher
- python: 3.8 or higher
- Apache Airflow: 2.4.0 or higher (2.6.0 or higher is recommanded with cloud-run)
## Installation
```bash
pip install starlake-airflow --upgrade
```
## StarlakeAirflowJob
`ai.starlake.airflow.StarlakeAirflowJob` is an **abstract factory class** that extends the generic factory interface `ai.starlake.job.IStarlakeJob` and is responsible for **generating** the **Airflow tasks** that will run the [import](https://starlake-ai.github.io/starlake/docs/user-guide/load#import-step), [load](https://starlake-ai.github.io/starlake/docs/concepts/load) and [transform](https://starlake-ai.github.io/starlake/docs/concepts/transform) starlake commands.
### sl_import
It generates the Airflow task that will run the starlake [import](https://starlake-ai.github.io/starlake/docs/cli/import) command.
```python
def sl_import(
self,
task_id: str,
domain: str,
**kwargs) -> BaseOperator:
#...
```
| name | type | description |
| ------- | ---- | --------------------------------------------------- |
| task_id | str | the optional task id (`{domain}_import` by default) |
| domain | str | the required domain to import |
### sl_load
It generates the Airflow task that will run the starlake [load](https://starlake-ai.github.io/starlake/docs/cli/load) command.
```python
def sl_load(
self,
task_id: str,
domain: str,
table: str,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
```
| name | type | description |
| ------------ | ------------------- | --------------------------------------------------------- |
| task_id | str | the optional task id (`{domain}_{table}_load` by default) |
| domain | str | the required domain of the table to load |
| table | str | the required table to load |
| spark_config | StarlakeSparkConfig | the optional `ai.starlake.job.StarlakeSparkConfig` |
### sl_transform
It generates the Airflow task that will run the starlake [transform](https://starlake-ai.github.io/starlake/docs/cli/transform) command.
```python
def sl_transform(
self,
task_id: str,
transform_name: str,
transform_options: str=None,
spark_config: StarlakeSparkConfig=None, **kwargs) -> BaseOperator:
#...
```
| name | type | description |
| ----------------- | ------------------- | ---------------------------------------------------- |
| task_id | str | the optional task id (`{transform_name}` by default) |
| transform_name | str | the transform to run |
| transform_options | str | the optional transform options |
| spark_config | StarlakeSparkConfig | the optional `ai.starlake.job.StarlakeSparkConfig` |
### sl_job
Ultimately, all of these methods will call the `sl_job` method that needs to be **implemented** in all **concrete** factory classes.
```python
def sl_job(
self,
task_id: str,
arguments: list,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
```
| name | type | description |
| ------------ | ------------------- | ----------------------------------------------------- |
| task_id | str | the required task id |
| arguments | list | The required arguments of the starlake command to run |
| spark_config | StarlakeSparkConfig | the optional `ai.starlake.job.StarlakeSparkConfig` |
### Init
To initialize this class, you may specify the optional **pre load strategy** and **options** to use.
```python
def __init__(self, pre_load_strategy: Union[StarlakePreLoadStrategy, str, None], options: dict=None, **kwargs) -> None:
"""Overrides IStarlakeJob.__init__()
Args:
pre_load_strategy (Union[StarlakePreLoadStrategy, str, None]): The pre-load strategy to use.
options (dict): The options to use.
"""
super().__init__(pre_load_strategy, options, **kwargs)
#...
```
#### StarlakePreLoadStrategy
`ai.starlake.job.StarlakePreLoadStrategy` is an enum that defines the different **pre load strategies** that can be used to conditionaly load a domain.
The pre-load strategy is implemented by `sl_pre_load` method that will generate the Airflow group of tasks corresponding to the choosen strategy.
```python
def sl_pre_load(
self,
domain: str,
pre_load_strategy: Union[StarlakePreLoadStrategy, str, None]=None,
**kwargs) -> BaseOperator:
#...
```
| name | type | description |
| ----------------- | ---- | ------------------------------------------------------------------ |
| domain | str | the domain to load |
| pre_load_strategy | str | the optional pre load strategy (self.pre_load_strategy by default) |
##### NONE
The load of the domain will not be conditionned and no pre-load tasks will be executed.

##### IMPORTED
This strategy implies that at least one file is present in the landing area (`SL_ROOT/importing/{domain}` by default, if option `incoming_path` has not been specified). If there is one or more files to load, the method `sl_import` will be called to import the domain before loading it, otherwise the loading of the domain will be skipped.

##### PENDING
This strategy implies that at least one file is present in the pending datasets area of the domain (`SL_ROOT/datasets/pending/{domain}` by default if option `pending_path` has not been specified), otherwise the loading of the domain will be skipped.

##### ACK
This strategy implies that an **ack file** is present at the specified path (option `global_ack_file_path`), otherwise the loading of the domain will be skipped.

#### Options
The following options can be specified in all concrete factory classes:
| name | type | description |
| ------------------------ | ---- | ----------------------------------------------------------------------------------------- |
| **default_pool** | str | pool of slots to use (`default_pool` by default) |
| **sl_env_var** | str | optional starlake environment variables passed as an encoded json string |
| **pre_load_strategy** | str | one of `none` (default), `imported`, `pending` or `ack` |
| **incoming_path** | str | path to the landing area for the domain to load (`{SL_ROOT}/incoming` by default) |
| **pending_path** | str | path to the pending datastets for the domain to load (`{SL_DATASETS}/pending` by default) |
| **global_ack_file_path** | str | path to the ack file (`{SL_DATASETS}/pending/{domain}/{{{{ds}}}}.ack` by default) |
| **ack_wait_timeout** | int | timeout in seconds to wait for the ack file(`1 hour` by default) |
## Data-aware scheduling
The `ai.starlake.airflow.StarlakeAirflowJob` class is also responsible for recording the `outlets` related to the execution of each starlake command, usefull for scheduling DAGs using **data-aware scheduling**.
All the outlets that have been recorded are available in the `outlets` property of the instance of the concrete class.
```python
def __init__(
self,
pre_load_strategy: Union[StarlakePreLoadStrategy, str, None],
options: dict=None,
**kwargs) -> None:
#...
self.outlets: List[Dataset] = kwargs.get('outlets', [])
def sl_import(self, task_id: str, domain: str, **kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(domain).lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...
def sl_load(
self,
task_id: str,
domain: str,
table: str,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(f'{domain}.{table}').lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...
def sl_transform(
self,
task_id: str,
transform_name: str,
transform_options: str=None,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(transform_name).lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...
```
In conjonction with the starlake dag generation, the `outlets` property can be used to schedule **effortless** DAGs that will run the **transform** commands.
## On premise
### StarlakeAirflowBashJob
This class is a concrete implementation of `StarlakeAirflowJob` that generates tasks using `airflow.operators.bash.BashOperator`. Usefull for **on premise** execution.
An additional `SL_STARLAKE_PATH` option is required to specify the **path** to the `starlake` **executable**.
#### StarlakeAirflowBashJob load Example
The following example shows how to use `StarlakeAirflowBashJob` to generate dynamically DAGs that **load** domains using `starlake` and record corresponding `outlets`.
```python
description="""example to load domain(s) using airflow starlake bash job"""
options = {
# General options
'sl_env_var':'{"SL_ROOT": "/starlake/samples/starbake"}',
'pre_load_strategy':'imported',
# Bash options
'SL_STARLAKE_PATH':'/starlake/starlake.sh',
}
from ai.starlake.airflow.bash import StarlakeAirflowBashJob
sl_job = StarlakeAirflowBashJob(options=options)
schedules= [{
'schedule': 'None',
'cron': None,
'domains': [{
'name':'starbake',
'final_name':'starbake',
'tables': [
{
'name': 'Customers',
'final_name': 'Customers'
},
{
'name': 'Ingredients',
'final_name': 'Ingredients'
},
{
'name': 'Orders',
'final_name': 'Orders'
},
{
'name': 'Products',
'final_name': 'Products'
}
]
}]
}]
def generate_dag_name(schedule):
dag_name = os.path.basename(__file__).replace(".py", "").replace(".pyc", "").lower()
return (f"{dag_name}-{schedule['schedule']}" if len(schedules) > 1 else dag_name)
from ai.starlake.common import keep_ascii_only, sanitize_id
from ai.starlake.airflow import DEFAULT_DAG_ARGS
import os
from airflow import DAG
from airflow.datasets import Dataset
from airflow.utils.task_group import TaskGroup
# [START instantiate_dag]
for schedule in schedules:
for domain in schedule["domains"]:
tags.append(domain["name"])
with DAG(dag_id=generate_dag_name(schedule),
schedule_interval=schedule['cron'],
default_args=DEFAULT_DAG_ARGS,
catchup=False,
tags=set([tag.upper() for tag in tags]),
description=description) as dag:
start = sl_job.dummy_op(task_id="start")
post_tasks = sl_job.post_tasks()
pre_load_tasks = sl_job.sl_pre_load(domain=domain["name"])
def generate_task_group_for_domain(domain):
with TaskGroup(group_id=sanitize_id(f'{domain["name"]}_load_tasks')) as domain_load_tasks:
for table in domain["tables"]:
load_task_id = sanitize_id(f'{domain["name"]}_{table["name"]}_load')
sl_job.sl_load(
task_id=load_task_id,
domain=domain["name"],
table=table["name"]
)
return domain_load_tasks
all_load_tasks = [generate_task_group_for_domain(domain) for domain in schedule["domains"]]
if pre_load_tasks:
start >> pre_load_tasks >> all_load_tasks
else:
start >> all_load_tasks
all_done = sl_job.dummy_op(task_id="all_done", outlets=[Dataset(keep_ascii_only(dag.dag_id))]+sl_job.outlets)
if post_tasks:
all_load_tasks >> all_done >> post_tasks
else:
all_load_tasks >> all_done
```

#### StarlakeAirflowBashJob Transform Examples
The following example shows how to use `StarlakeAirflowBashJob` to generate dynamically **transform** Jobs using `starlake` and record corresponding `outlets`.
```python
options = {
# General options
'sl_env_var':'{"SL_ROOT": "/starlake/samples/starbake"}',
'pre_load_strategy':'imported',
# Bash options
'SL_STARLAKE_PATH':'/starlake/starlake.sh',
}
from ai.starlake.airflow.bash import StarlakeAirflowBashJob
sl_job = StarlakeAirflowBashJob(options=options)
from ai.starlake.common import keep_ascii_only, sanitize_id
from ai.starlake.job import StarlakeSparkConfig
from ai.starlake.airflow import StarlakeAirflowJob, DEFAULT_DAG_ARGS
import json
import os
import sys
from typing import Set, Union
from airflow import DAG
from airflow.datasets import Dataset
from airflow.utils.task_group import TaskGroup
cron = "None"
task_deps=json.loads("""[ {
"data" : {
"name" : "Customers.HighValueCustomers",
"typ" : "task",
"parent" : "Customers.CustomerLifeTimeValue",
"parentTyp" : "task",
"parentRef" : "CustomerLifetimeValue",
"sink" : "Customers.HighValueCustomers"
},
"children" : [ {
"data" : {
"name" : "Customers.CustomerLifeTimeValue",
"typ" : "task",
"parent" : "starbake.Customers",
"parentTyp" : "table",
"parentRef" : "starbake.Customers",
"sink" : "Customers.CustomerLifeTimeValue"
},
"children" : [ {
"data" : {
"name" : "starbake.Customers",
"typ" : "table",
"parentTyp" : "unknown"
},
"task" : false
}, {
"data" : {
"name" : "starbake.Orders",
"typ" : "table",
"parentTyp" : "unknown"
},
"task" : false
} ],
"task" : true
} ],
"task" : true
} ]""")
load_dependencies = StarlakeAirflowJob.get_context_var(var_name='load_dependencies', default_value='False', options=options)
schedule = None
datasets: Set[str] = []
_extra_dataset: Union[dict, None] = sys.modules[__name__].__dict__.get('extra_dataset', None)
_extra_dataset_parameters = '?' + '&'.join(list(f'{k}={v}' for (k,v) in _extra_dataset.items())) if _extra_dataset else ''
# if you choose to not load the dependencies, a schedule will be created to check if the dependencies are met
def _load_datasets(task: dict):
if 'children' in task:
for child in task['children']:
datasets.append(keep_ascii_only(child['data']['name']).lower())
_load_datasets(child)
if load_dependencies.lower() != 'true':
for task in task_deps:
_load_datasets(task)
schedule = list(map(lambda dataset: Dataset(dataset + _extra_dataset_parameters), datasets))
tags = StarlakeAirflowJob.get_context_var(var_name='tags', default_value="", options=options).split()
# [START instantiate_dag]
with DAG(dag_id=os.path.basename(__file__).replace(".py", "").replace(".pyc", "").lower(),
schedule_interval=None if cron == "None" else cron,
schedule=schedule,
default_args=sys.modules[__name__].__dict__.get('default_dag_args', DEFAULT_DAG_ARGS),
catchup=False,
user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None),
user_defined_filters=sys.modules[__name__].__dict__.get('user_defined_filters', None),
tags=set([tag.upper() for tag in tags]),
description=description) as dag:
start = sl_job.dummy_op(task_id="start")
pre_tasks = sl_job.pre_tasks(dag=dag)
post_tasks = sl_job.post_tasks(dag=dag)
def create_task(airflow_task_id: str, task_name: str, task_type: str):
spark_config_name=StarlakeAirflowOptions.get_context_var('spark_config_name', task_name.lower(), options)
if (task_type == 'task'):
return sl_job.sl_transform(
task_id=airflow_task_id,
transform_name=task_name,
spark_config=spark_config(spark_config_name, **sys.modules[__name__].__dict__.get('spark_properties', {}))
)
else:
load_domain_and_table = task_name.split(".",1)
domain = load_domain_and_table[0]
table = load_domain_and_table[1]
return sl_job.sl_load(
task_id=airflow_task_id,
domain=domain,
table=table,
spark_config=spark_config(spark_config_name, **sys.modules[__name__].__dict__.get('spark_properties', {}))
)
# build takgroups recursively
def generate_task_group_for_task(task):
task_name = task['data']['name']
airflow_task_group_id = sanitize_id(task_name)
airflow_task_id = airflow_task_group_id
task_type = task['data']['typ']
if (task_type == 'task'):
airflow_task_id = airflow_task_group_id + "_task"
else:
airflow_task_id = airflow_task_group_id + "_table"
if (load_dependencies.lower() == 'true' and 'children' in task):
with TaskGroup(group_id=airflow_task_group_id) as airflow_task_group:
for transform_sub_task in task['children']:
generate_task_group_for_task(transform_sub_task)
upstream_tasks = list(airflow_task_group.children.values())
airflow_task = create_task(airflow_task_id, task_name, task_type)
airflow_task.set_upstream(upstream_tasks)
return airflow_task_group
else:
airflow_task = create_task(airflow_task_id=airflow_task_id, task_name=task_name, task_type=task_type)
return airflow_task
all_transform_tasks = [generate_task_group_for_task(task) for task in task_deps]
if pre_tasks:
start >> pre_tasks >> all_transform_tasks
else:
start >> all_transform_tasks
end = sl_job.dummy_op(task_id="end", outlets=[Dataset(keep_ascii_only(dag.dag_id))]+list(map(lambda x: Dataset(x.uri + _extra_dataset_parameters), sl_job.outlets)))
all_transform_tasks >> end
if post_tasks:
all_done = sl_job.dummy_op(task_id="all_done")
all_transform_tasks >> all_done >> post_tasks >> end
```

If you want to load the dependencies, you just need to set the `load_dependencies` option to `True`:

## Google Cloud Platform
### StarlakeAirflowDataprocJob
This class is a concrete implementation of `StarlakeAirflowJob` that overrides the `sl_job` method that will run the starlake command by submitting **Dataproc job** to the configured **Dataproc cluster**.
It delegates to an instance of the `ai.starlake.airflow.gcp.StarlakeAirflowDataprocCluster` class the responsibility to :
* **create** the **Dataproc cluster** by instantiating `airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`
* **submit Dataproc job** to the latter by instantiating `airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
* **delete** the **Dataproc cluster** by instantiating `airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator`
This instance is available in the `cluster` property of the `StarlakeAirflowDataprocJob` class and can be configured using the `ai.starlake.airflow.gcp.StarlakeAirflowDataprocClusterConfig` class.
The creation of the **Dataproc cluster** can be performed by calling the `create_cluster` method of the `cluster` property or by calling the `pre_tasks` method of the StarlakeAirflowDataprocJob (the call to the `pre_load` method will, behind the scene, call the `pre_tasks` method and add the optional resulting task to the group of Airflow tasks).
The deletion of the **Dataproc cluster** can be performed by calling the `delete_cluster` method of the `cluster` property or by calling the `post_tasks` method of the StarlakeAirflowDataprocJob.
#### Dataproc cluster configuration
Additional options may be specified to configure the **Dataproc cluster**.
| name | type | description |
| -------------------------------- | ---- | -------------------------------------------------------------------- |
| **cluster_id** | str | the optional unique id of the cluster that will participate in the definition of the Dataproc cluster name (if not specified) |
| **dataproc_name** | str | the optional dataproc name of the cluster that will participate in the definition of the Dataproc cluster name (if not specified) |
| **dataproc_project_id** | str | the optional dataproc project id (the project id on which the composer has been instantiated by default) |
| **dataproc_region** | str | the optional region (`europe-west1` by default) |
| **dataproc_subnet** | str | the optional subnet (the `default` subnet if not specified) |
| **dataproc_service_account** | str | the optional service account (`service-{self.project_id}@dataproc-accounts.iam.gserviceaccount.com` by default) |
| **dataproc_image_version** | str | the image version of the dataproc cluster (`2.2-debian1` by default) |
| **dataproc_master_machine_type** | str | the optional master machine type (`n1-standard-4` by default) |
| **dataproc_master_disk_type** | str | the optional master disk type (`pd-standard` by default) |
| **dataproc_master_disk_size** | int | the optional master disk size (`1024` by default) |
| **dataproc_worker_machine_type** | str | the optional worker machine type (`n1-standard-4` by default) |
| **dataproc_worker_disk_type** | str | the optional worker disk size (`pd-standard` by default) |
| **dataproc_worker_disk_size** | int | the optional worker disk size (`1024` by default) |
| **dataproc_num_workers** | int | the optional number of workers (`4` by default) |
All of these options will be used by default if no **StarlakeAirflowDataprocClusterConfig** was defined when instantiating **StarlakeAirflowDataprocCluster** or if the latter was not defined when instantiating **StarlakeAirflowDataprocJob**.
#### Dataproc Job configuration
Additional options may be specified to configure the **Dataproc job**.
| name | type | description |
| ---------------------------- | ---- | ---------------------------------------------------------------------------- |
| **spark_jar_list** | str | the required list of spark jars to be used (using `,` as separator) |
| **spark_bucket** | str | the required bucket to use for spark and biqquery temporary storage |
| **spark_job_main_class** | str | the optional main class of the spark job (`ai.starlake.job.Main` by default) |
| **spark_executor_memory** | str | the optional amount of memory to use per executor process (`11g` by default) |
| **spark_executor_cores** | int | the optional number of cores to use on each executor (`4` by default) |
| **spark_executor_instances** | int | the optional number of executor instances (`1` by default) |
`spark_executor_memory`, `spark_executor_cores` and `spark_executor_instances` options will be used by default if no **StarlakeSparkConfig** was passed to the `sl_load` and `sl_transform` methods.
#### StarlakeAirflowDataprocJob load Example
The following example shows how to use `StarlakeAirflowDataprocJob` to generate dynamically DAGs that **load** domains using `starlake` and record corresponding `outlets`.
```python
description="""example to load domain(s) using airflow starlake dataproc job"""
options = {
# General options
'sl_env_var':'{"SL_ROOT": "gcs://starlake/samples/starbake"}',
'pre_load_strategy':'pending',
# Dataproc cluster configuration
'dataproc_project_id':'starbake',
# Dataproc job configuration
'spark_bucket':'my-bucket',
'spark_jar_list':'gcs://artifacts/starlake.jar',
}
from ai.starlake.airflow.gcp import StarlakeAirflowDataprocJob
sl_job = StarlakeAirflowDataprocJob(options=options)
# all the code following the instantiation of the starlake job is exactly the same as that defined for StarlakeAirflowBashJob
#...
```

### StarlakeAirflowCloudRunJob
This class is a concrete implementation of `StarlakeAirflowJob` that overrides the `sl_job` method that will run the starlake command by executing **Cloud Run job**.
#### Cloud Run job configuration
Additional options may be specified to configure the **Cloud Run job**.
| name | type | description |
| ---------------------------- | ---- | ---------------------------------------------------------------------------- |
| **cloud_run_project_id** | str | the optional cloud run project id (the project id on which the composer has been instantiated by default) |
| **cloud_run_job_name** | str | the required name of the cloud run job |
| **cloud_run_region** | str | the optional region (`europe-west1` by default) |
| **cloud_run_service_account** | str | the optional cloud run service account |
| **cloud_run_async** | bool | the optional flag to run the cloud run job asynchronously (`True` by default)|
| **retry_on_failure** | bool | the optional flag to retry the cloud run job on failure (`False` by default) |
| **retry_delay_in_seconds** | int | the optional delay in seconds to wait before retrying the cloud run job (`10` by default) |
If the execution has been parameterized to be **asynchronous**, an `airflow.sensors.bash.BashSensor` will be instantiated to wait for the completion of the **Cloud Run job** execution.
#### StarlakeAirflowCloudRunJob load Examples
The following examples shows how to use `StarlakeAirflowCloudRunJob` to generate dynamically DAGs that **load** domains using `starlake` and record corresponding `outlets`.
##### Synchronous execution
```python
description="""example to load domain(s) using airflow starlake cloud run job synchronously"""
options = {
# General options
'sl_env_var':'{"SL_ROOT": "gs://my-bucket/starbake"}',
'pre_load_strategy':'ack',
'global_ack_file_path':'gs://my-bucket/starbake/pending/HighValueCustomers/2024-22-01.ack',
# Cloud run options
'cloud_run_job_name':'starlake',
'cloud_run_project_id':'starbake',
'cloud_run_async':'False'
}
from ai.starlake.airflow.gcp import StarlakeAirflowCloudRunJob
sl_job = StarlakeAirflowCloudRunJob(options=options)
# all the code following the instantiation of the starlake job is exactly the same as that defined for StarlakeAirflowBashJob
#...
```

##### Asynchronous execution
```python
description="""example to load domain(s) using airflow starlake cloud run job asynchronously"""
options = {
# General options
'sl_env_var':'{"SL_ROOT": "gs://my-bucket/starbake"}',
'pre_load_strategy':'pending',
# Cloud run options
'cloud_run_job_name':'starlake',
'cloud_run_project_id':'starbake',
# 'cloud_run_async':'True'
'retry_on_failure':'True',
}
# all the code following the options is exactly the same as that defined above
#...
```

## Amazon Web Services
## Azure
Raw data
{
"_id": null,
"home_page": null,
"name": "starlake-airflow",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": null,
"author": "St\u00e9phane Manciot",
"author_email": "stephane.manciot@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/e6/71/76f7932d1545d164cc1c432776c937a55b1dd4a1d94cad9e7af37a071914/starlake_airflow-0.1.3.1.tar.gz",
"platform": null,
"description": "# starlake-airflow\n\n**starlake-airflow** is the **[Starlake](https://starlake-ai.github.io/starlake/index.html)** Python Distribution for **Airflow**.\n\nIt is recommended to use it in combinaison with **[starlake dag generation](https://starlake-ai.github.io/starlake/docs/concepts/orchestration)**, but can be used directly as is in your **DAGs**.\n\n## Prerequisites\n\nBefore installing starlake-airflow, ensure the following minimum versions are installed on your system:\n\n- starlake: 1.0.0 or higher\n- python: 3.8 or higher\n- Apache Airflow: 2.4.0 or higher (2.6.0 or higher is recommanded with cloud-run)\n\n## Installation\n\n```bash\npip install starlake-airflow --upgrade\n```\n\n## StarlakeAirflowJob\n\n`ai.starlake.airflow.StarlakeAirflowJob` is an **abstract factory class** that extends the generic factory interface `ai.starlake.job.IStarlakeJob` and is responsible for **generating** the **Airflow tasks** that will run the [import](https://starlake-ai.github.io/starlake/docs/user-guide/load#import-step), [load](https://starlake-ai.github.io/starlake/docs/concepts/load) and [transform](https://starlake-ai.github.io/starlake/docs/concepts/transform) starlake commands.\n\n### sl_import\n\nIt generates the Airflow task that will run the starlake [import](https://starlake-ai.github.io/starlake/docs/cli/import) command.\n\n```python\ndef sl_import(\n self, \n task_id: str, \n domain: str, \n **kwargs) -> BaseOperator:\n #...\n```\n\n| name | type | description |\n| ------- | ---- | --------------------------------------------------- |\n| task_id | str | the optional task id (`{domain}_import` by default) |\n| domain | str | the required domain to import |\n\n### sl_load\n\nIt generates the Airflow task that will run the starlake [load](https://starlake-ai.github.io/starlake/docs/cli/load) command.\n\n```python\ndef sl_load(\n self, \n task_id: str, \n domain: str, \n table: str, \n spark_config: StarlakeSparkConfig=None,\n **kwargs) -> BaseOperator:\n #...\n```\n\n| name | type | description |\n| ------------ | ------------------- | --------------------------------------------------------- |\n| task_id | str | the optional task id (`{domain}_{table}_load` by default) |\n| domain | str | the required domain of the table to load |\n| table | str | the required table to load |\n| spark_config | StarlakeSparkConfig | the optional `ai.starlake.job.StarlakeSparkConfig` |\n\n### sl_transform\n\nIt generates the Airflow task that will run the starlake [transform](https://starlake-ai.github.io/starlake/docs/cli/transform) command.\n\n```python\ndef sl_transform(\n self, \n task_id: str, \n transform_name: str, \n transform_options: str=None, \n spark_config: StarlakeSparkConfig=None, **kwargs) -> BaseOperator:\n #...\n```\n\n| name | type | description |\n| ----------------- | ------------------- | ---------------------------------------------------- |\n| task_id | str | the optional task id (`{transform_name}` by default) |\n| transform_name | str | the transform to run |\n| transform_options | str | the optional transform options |\n| spark_config | StarlakeSparkConfig | the optional `ai.starlake.job.StarlakeSparkConfig` |\n\n### sl_job\n\nUltimately, all of these methods will call the `sl_job` method that needs to be **implemented** in all **concrete** factory classes.\n\n```python\ndef sl_job(\n self, \n task_id: str, \n arguments: list, \n spark_config: StarlakeSparkConfig=None, \n **kwargs) -> BaseOperator:\n #...\n```\n\n| name | type | description |\n| ------------ | ------------------- | ----------------------------------------------------- |\n| task_id | str | the required task id |\n| arguments | list | The required arguments of the starlake command to run |\n| spark_config | StarlakeSparkConfig | the optional `ai.starlake.job.StarlakeSparkConfig` |\n\n### Init\n\nTo initialize this class, you may specify the optional **pre load strategy** and **options** to use.\n\n```python\n def __init__(self, pre_load_strategy: Union[StarlakePreLoadStrategy, str, None], options: dict=None, **kwargs) -> None:\n \"\"\"Overrides IStarlakeJob.__init__()\n Args:\n pre_load_strategy (Union[StarlakePreLoadStrategy, str, None]): The pre-load strategy to use.\n options (dict): The options to use.\n \"\"\"\n super().__init__(pre_load_strategy, options, **kwargs)\n #...\n```\n\n#### StarlakePreLoadStrategy\n\n`ai.starlake.job.StarlakePreLoadStrategy` is an enum that defines the different **pre load strategies** that can be used to conditionaly load a domain.\n\nThe pre-load strategy is implemented by `sl_pre_load` method that will generate the Airflow group of tasks corresponding to the choosen strategy.\n\n```python\ndef sl_pre_load(\n self, \n domain: str, \n pre_load_strategy: Union[StarlakePreLoadStrategy, str, None]=None,\n **kwargs) -> BaseOperator:\n #...\n```\n\n| name | type | description |\n| ----------------- | ---- | ------------------------------------------------------------------ |\n| domain | str | the domain to load |\n| pre_load_strategy | str | the optional pre load strategy (self.pre_load_strategy by default) |\n\n##### NONE\n\nThe load of the domain will not be conditionned and no pre-load tasks will be executed.\n\n\n\n##### IMPORTED\n\nThis strategy implies that at least one file is present in the landing area (`SL_ROOT/importing/{domain}` by default, if option `incoming_path` has not been specified). If there is one or more files to load, the method `sl_import` will be called to import the domain before loading it, otherwise the loading of the domain will be skipped.\n\n\n\n##### PENDING\n\nThis strategy implies that at least one file is present in the pending datasets area of the domain (`SL_ROOT/datasets/pending/{domain}` by default if option `pending_path` has not been specified), otherwise the loading of the domain will be skipped.\n\n\n\n##### ACK\n\nThis strategy implies that an **ack file** is present at the specified path (option `global_ack_file_path`), otherwise the loading of the domain will be skipped.\n\n\n\n#### Options\n\nThe following options can be specified in all concrete factory classes:\n\n| name | type | description |\n| ------------------------ | ---- | ----------------------------------------------------------------------------------------- |\n| **default_pool** | str | pool of slots to use (`default_pool` by default) |\n| **sl_env_var** | str | optional starlake environment variables passed as an encoded json string |\n| **pre_load_strategy** | str | one of `none` (default), `imported`, `pending` or `ack` |\n| **incoming_path** | str | path to the landing area for the domain to load (`{SL_ROOT}/incoming` by default) |\n| **pending_path** | str | path to the pending datastets for the domain to load (`{SL_DATASETS}/pending` by default) |\n| **global_ack_file_path** | str | path to the ack file (`{SL_DATASETS}/pending/{domain}/{{{{ds}}}}.ack` by default) |\n| **ack_wait_timeout** | int | timeout in seconds to wait for the ack file(`1 hour` by default) |\n\n## Data-aware scheduling\n\nThe `ai.starlake.airflow.StarlakeAirflowJob` class is also responsible for recording the `outlets` related to the execution of each starlake command, usefull for scheduling DAGs using **data-aware scheduling**.\n\nAll the outlets that have been recorded are available in the `outlets` property of the instance of the concrete class.\n\n```python\ndef __init__(\n self, \n pre_load_strategy: Union[StarlakePreLoadStrategy, str, None], \n options: dict=None, \n **kwargs) -> None:\n #...\n self.outlets: List[Dataset] = kwargs.get('outlets', [])\n\ndef sl_import(self, task_id: str, domain: str, **kwargs) -> BaseOperator:\n #...\n dataset = Dataset(keep_ascii_only(domain).lower())\n self.outlets += kwargs.get('outlets', []) + [dataset]\n #...\n\ndef sl_load(\n self, \n task_id: str, \n domain: str, \n table: str, \n spark_config: StarlakeSparkConfig=None,\n **kwargs) -> BaseOperator:\n #...\n dataset = Dataset(keep_ascii_only(f'{domain}.{table}').lower())\n self.outlets += kwargs.get('outlets', []) + [dataset]\n #...\n\ndef sl_transform(\n self, \n task_id: str, \n transform_name: str, \n transform_options: str=None, \n spark_config: StarlakeSparkConfig=None, \n **kwargs) -> BaseOperator:\n #...\n dataset = Dataset(keep_ascii_only(transform_name).lower())\n self.outlets += kwargs.get('outlets', []) + [dataset]\n #...\n```\n\nIn conjonction with the starlake dag generation, the `outlets` property can be used to schedule **effortless** DAGs that will run the **transform** commands.\n\n## On premise\n\n### StarlakeAirflowBashJob\n\nThis class is a concrete implementation of `StarlakeAirflowJob` that generates tasks using `airflow.operators.bash.BashOperator`. Usefull for **on premise** execution.\n\nAn additional `SL_STARLAKE_PATH` option is required to specify the **path** to the `starlake` **executable**.\n\n#### StarlakeAirflowBashJob load Example\n\nThe following example shows how to use `StarlakeAirflowBashJob` to generate dynamically DAGs that **load** domains using `starlake` and record corresponding `outlets`.\n\n```python\ndescription=\"\"\"example to load domain(s) using airflow starlake bash job\"\"\"\n\noptions = {\n # General options\n 'sl_env_var':'{\"SL_ROOT\": \"/starlake/samples/starbake\"}', \n 'pre_load_strategy':'imported', \n # Bash options\n 'SL_STARLAKE_PATH':'/starlake/starlake.sh', \n}\n\nfrom ai.starlake.airflow.bash import StarlakeAirflowBashJob\n\nsl_job = StarlakeAirflowBashJob(options=options)\n\nschedules= [{\n 'schedule': 'None',\n 'cron': None,\n 'domains': [{\n 'name':'starbake',\n 'final_name':'starbake',\n 'tables': [\n {\n 'name': 'Customers',\n 'final_name': 'Customers'\n },\n {\n 'name': 'Ingredients',\n 'final_name': 'Ingredients'\n },\n {\n 'name': 'Orders',\n 'final_name': 'Orders'\n },\n {\n 'name': 'Products',\n 'final_name': 'Products'\n }\n ]\n }]\n}]\n\ndef generate_dag_name(schedule):\n dag_name = os.path.basename(__file__).replace(\".py\", \"\").replace(\".pyc\", \"\").lower()\n return (f\"{dag_name}-{schedule['schedule']}\" if len(schedules) > 1 else dag_name)\n\nfrom ai.starlake.common import keep_ascii_only, sanitize_id\nfrom ai.starlake.airflow import DEFAULT_DAG_ARGS\n\nimport os\n\nfrom airflow import DAG\n\nfrom airflow.datasets import Dataset\n\nfrom airflow.utils.task_group import TaskGroup\n\n# [START instantiate_dag]\nfor schedule in schedules:\n for domain in schedule[\"domains\"]:\n tags.append(domain[\"name\"])\n with DAG(dag_id=generate_dag_name(schedule),\n schedule_interval=schedule['cron'],\n default_args=DEFAULT_DAG_ARGS,\n catchup=False,\n tags=set([tag.upper() for tag in tags]),\n description=description) as dag:\n start = sl_job.dummy_op(task_id=\"start\")\n\n post_tasks = sl_job.post_tasks()\n\n pre_load_tasks = sl_job.sl_pre_load(domain=domain[\"name\"])\n\n def generate_task_group_for_domain(domain):\n with TaskGroup(group_id=sanitize_id(f'{domain[\"name\"]}_load_tasks')) as domain_load_tasks:\n for table in domain[\"tables\"]:\n load_task_id = sanitize_id(f'{domain[\"name\"]}_{table[\"name\"]}_load')\n sl_job.sl_load(\n task_id=load_task_id, \n domain=domain[\"name\"], \n table=table[\"name\"]\n )\n return domain_load_tasks\n\n all_load_tasks = [generate_task_group_for_domain(domain) for domain in schedule[\"domains\"]]\n\n if pre_load_tasks:\n start >> pre_load_tasks >> all_load_tasks\n else:\n start >> all_load_tasks\n\n all_done = sl_job.dummy_op(task_id=\"all_done\", outlets=[Dataset(keep_ascii_only(dag.dag_id))]+sl_job.outlets)\n\n if post_tasks:\n all_load_tasks >> all_done >> post_tasks\n else:\n all_load_tasks >> all_done\n```\n\n\n\n#### StarlakeAirflowBashJob Transform Examples\n\nThe following example shows how to use `StarlakeAirflowBashJob` to generate dynamically **transform** Jobs using `starlake` and record corresponding `outlets`.\n\n```python\noptions = {\n # General options\n 'sl_env_var':'{\"SL_ROOT\": \"/starlake/samples/starbake\"}', \n 'pre_load_strategy':'imported', \n # Bash options\n 'SL_STARLAKE_PATH':'/starlake/starlake.sh', \n}\n\nfrom ai.starlake.airflow.bash import StarlakeAirflowBashJob\n\nsl_job = StarlakeAirflowBashJob(options=options)\n\nfrom ai.starlake.common import keep_ascii_only, sanitize_id\nfrom ai.starlake.job import StarlakeSparkConfig\nfrom ai.starlake.airflow import StarlakeAirflowJob, DEFAULT_DAG_ARGS\n\nimport json\nimport os\nimport sys\nfrom typing import Set, Union\n\nfrom airflow import DAG\n\nfrom airflow.datasets import Dataset\n\nfrom airflow.utils.task_group import TaskGroup\n\ncron = \"None\"\n\ntask_deps=json.loads(\"\"\"[ {\n \"data\" : {\n \"name\" : \"Customers.HighValueCustomers\",\n \"typ\" : \"task\",\n \"parent\" : \"Customers.CustomerLifeTimeValue\",\n \"parentTyp\" : \"task\",\n \"parentRef\" : \"CustomerLifetimeValue\",\n \"sink\" : \"Customers.HighValueCustomers\"\n },\n \"children\" : [ {\n \"data\" : {\n \"name\" : \"Customers.CustomerLifeTimeValue\",\n \"typ\" : \"task\",\n \"parent\" : \"starbake.Customers\",\n \"parentTyp\" : \"table\",\n \"parentRef\" : \"starbake.Customers\",\n \"sink\" : \"Customers.CustomerLifeTimeValue\"\n },\n \"children\" : [ {\n \"data\" : {\n \"name\" : \"starbake.Customers\",\n \"typ\" : \"table\",\n \"parentTyp\" : \"unknown\"\n },\n \"task\" : false\n }, {\n \"data\" : {\n \"name\" : \"starbake.Orders\",\n \"typ\" : \"table\",\n \"parentTyp\" : \"unknown\"\n },\n \"task\" : false\n } ],\n \"task\" : true\n } ],\n \"task\" : true\n} ]\"\"\")\n\nload_dependencies = StarlakeAirflowJob.get_context_var(var_name='load_dependencies', default_value='False', options=options)\n\nschedule = None\n\ndatasets: Set[str] = []\n\n_extra_dataset: Union[dict, None] = sys.modules[__name__].__dict__.get('extra_dataset', None)\n\n_extra_dataset_parameters = '?' + '&'.join(list(f'{k}={v}' for (k,v) in _extra_dataset.items())) if _extra_dataset else ''\n\n# if you choose to not load the dependencies, a schedule will be created to check if the dependencies are met\ndef _load_datasets(task: dict):\n if 'children' in task:\n for child in task['children']:\n datasets.append(keep_ascii_only(child['data']['name']).lower())\n _load_datasets(child)\n\nif load_dependencies.lower() != 'true':\n for task in task_deps:\n _load_datasets(task)\n schedule = list(map(lambda dataset: Dataset(dataset + _extra_dataset_parameters), datasets))\n\ntags = StarlakeAirflowJob.get_context_var(var_name='tags', default_value=\"\", options=options).split()\n\n# [START instantiate_dag]\nwith DAG(dag_id=os.path.basename(__file__).replace(\".py\", \"\").replace(\".pyc\", \"\").lower(),\n schedule_interval=None if cron == \"None\" else cron,\n schedule=schedule,\n default_args=sys.modules[__name__].__dict__.get('default_dag_args', DEFAULT_DAG_ARGS),\n catchup=False,\n user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None),\n user_defined_filters=sys.modules[__name__].__dict__.get('user_defined_filters', None),\n tags=set([tag.upper() for tag in tags]),\n description=description) as dag:\n\n start = sl_job.dummy_op(task_id=\"start\")\n\n pre_tasks = sl_job.pre_tasks(dag=dag)\n\n post_tasks = sl_job.post_tasks(dag=dag)\n\n def create_task(airflow_task_id: str, task_name: str, task_type: str):\n spark_config_name=StarlakeAirflowOptions.get_context_var('spark_config_name', task_name.lower(), options)\n if (task_type == 'task'):\n return sl_job.sl_transform(\n task_id=airflow_task_id, \n transform_name=task_name,\n spark_config=spark_config(spark_config_name, **sys.modules[__name__].__dict__.get('spark_properties', {}))\n )\n else:\n load_domain_and_table = task_name.split(\".\",1)\n domain = load_domain_and_table[0]\n table = load_domain_and_table[1]\n return sl_job.sl_load(\n task_id=airflow_task_id, \n domain=domain, \n table=table,\n spark_config=spark_config(spark_config_name, **sys.modules[__name__].__dict__.get('spark_properties', {}))\n )\n\n # build takgroups recursively\n def generate_task_group_for_task(task):\n task_name = task['data']['name']\n airflow_task_group_id = sanitize_id(task_name)\n airflow_task_id = airflow_task_group_id\n task_type = task['data']['typ']\n if (task_type == 'task'):\n airflow_task_id = airflow_task_group_id + \"_task\"\n else:\n airflow_task_id = airflow_task_group_id + \"_table\"\n\n if (load_dependencies.lower() == 'true' and 'children' in task):\n with TaskGroup(group_id=airflow_task_group_id) as airflow_task_group:\n for transform_sub_task in task['children']:\n generate_task_group_for_task(transform_sub_task)\n upstream_tasks = list(airflow_task_group.children.values())\n airflow_task = create_task(airflow_task_id, task_name, task_type)\n airflow_task.set_upstream(upstream_tasks)\n return airflow_task_group\n else:\n airflow_task = create_task(airflow_task_id=airflow_task_id, task_name=task_name, task_type=task_type)\n return airflow_task\n\n all_transform_tasks = [generate_task_group_for_task(task) for task in task_deps]\n\n if pre_tasks:\n start >> pre_tasks >> all_transform_tasks\n else:\n start >> all_transform_tasks\n\n end = sl_job.dummy_op(task_id=\"end\", outlets=[Dataset(keep_ascii_only(dag.dag_id))]+list(map(lambda x: Dataset(x.uri + _extra_dataset_parameters), sl_job.outlets)))\n\n all_transform_tasks >> end\n\n if post_tasks:\n all_done = sl_job.dummy_op(task_id=\"all_done\")\n all_transform_tasks >> all_done >> post_tasks >> end\n\n```\n\n\n\nIf you want to load the dependencies, you just need to set the `load_dependencies` option to `True`:\n\n\n\n## Google Cloud Platform\n\n### StarlakeAirflowDataprocJob\n\nThis class is a concrete implementation of `StarlakeAirflowJob` that overrides the `sl_job` method that will run the starlake command by submitting **Dataproc job** to the configured **Dataproc cluster**.\n\nIt delegates to an instance of the `ai.starlake.airflow.gcp.StarlakeAirflowDataprocCluster` class the responsibility to :\n\n* **create** the **Dataproc cluster** by instantiating `airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`\n* **submit Dataproc job** to the latter by instantiating `airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`\n* **delete** the **Dataproc cluster** by instantiating `airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator`\n\nThis instance is available in the `cluster` property of the `StarlakeAirflowDataprocJob` class and can be configured using the `ai.starlake.airflow.gcp.StarlakeAirflowDataprocClusterConfig` class.\n\nThe creation of the **Dataproc cluster** can be performed by calling the `create_cluster` method of the `cluster` property or by calling the `pre_tasks` method of the StarlakeAirflowDataprocJob (the call to the `pre_load` method will, behind the scene, call the `pre_tasks` method and add the optional resulting task to the group of Airflow tasks).\n\nThe deletion of the **Dataproc cluster** can be performed by calling the `delete_cluster` method of the `cluster` property or by calling the `post_tasks` method of the StarlakeAirflowDataprocJob.\n\n#### Dataproc cluster configuration\n\nAdditional options may be specified to configure the **Dataproc cluster**.\n\n| name | type | description |\n| -------------------------------- | ---- | -------------------------------------------------------------------- |\n| **cluster_id** | str | the optional unique id of the cluster that will participate in the definition of the Dataproc cluster name (if not specified) |\n| **dataproc_name** | str | the optional dataproc name of the cluster that will participate in the definition of the Dataproc cluster name (if not specified) |\n| **dataproc_project_id** | str | the optional dataproc project id (the project id on which the composer has been instantiated by default) |\n| **dataproc_region** | str | the optional region (`europe-west1` by default) |\n| **dataproc_subnet** | str | the optional subnet (the `default` subnet if not specified) |\n| **dataproc_service_account** | str | the optional service account (`service-{self.project_id}@dataproc-accounts.iam.gserviceaccount.com` by default) |\n| **dataproc_image_version** | str | the image version of the dataproc cluster (`2.2-debian1` by default) |\n| **dataproc_master_machine_type** | str | the optional master machine type (`n1-standard-4` by default) |\n| **dataproc_master_disk_type** | str | the optional master disk type (`pd-standard` by default) |\n| **dataproc_master_disk_size** | int | the optional master disk size (`1024` by default) |\n| **dataproc_worker_machine_type** | str | the optional worker machine type (`n1-standard-4` by default) |\n| **dataproc_worker_disk_type** | str | the optional worker disk size (`pd-standard` by default) |\n| **dataproc_worker_disk_size** | int | the optional worker disk size (`1024` by default) |\n| **dataproc_num_workers** | int | the optional number of workers (`4` by default) |\n\nAll of these options will be used by default if no **StarlakeAirflowDataprocClusterConfig** was defined when instantiating **StarlakeAirflowDataprocCluster** or if the latter was not defined when instantiating **StarlakeAirflowDataprocJob**.\n\n#### Dataproc Job configuration\n\nAdditional options may be specified to configure the **Dataproc job**.\n\n| name | type | description |\n| ---------------------------- | ---- | ---------------------------------------------------------------------------- |\n| **spark_jar_list** | str | the required list of spark jars to be used (using `,` as separator) |\n| **spark_bucket** | str | the required bucket to use for spark and biqquery temporary storage |\n| **spark_job_main_class** | str | the optional main class of the spark job (`ai.starlake.job.Main` by default) |\n| **spark_executor_memory** | str | the optional amount of memory to use per executor process (`11g` by default) |\n| **spark_executor_cores** | int | the optional number of cores to use on each executor (`4` by default) |\n| **spark_executor_instances** | int | the optional number of executor instances (`1` by default) |\n\n`spark_executor_memory`, `spark_executor_cores` and `spark_executor_instances` options will be used by default if no **StarlakeSparkConfig** was passed to the `sl_load` and `sl_transform` methods.\n\n#### StarlakeAirflowDataprocJob load Example\n\nThe following example shows how to use `StarlakeAirflowDataprocJob` to generate dynamically DAGs that **load** domains using `starlake` and record corresponding `outlets`.\n\n```python\ndescription=\"\"\"example to load domain(s) using airflow starlake dataproc job\"\"\"\n\noptions = {\n # General options\n 'sl_env_var':'{\"SL_ROOT\": \"gcs://starlake/samples/starbake\"}', \n 'pre_load_strategy':'pending', \n # Dataproc cluster configuration\n 'dataproc_project_id':'starbake',\n # Dataproc job configuration \n 'spark_bucket':'my-bucket', \n 'spark_jar_list':'gcs://artifacts/starlake.jar', \n}\n\nfrom ai.starlake.airflow.gcp import StarlakeAirflowDataprocJob\n\nsl_job = StarlakeAirflowDataprocJob(options=options)\n\n# all the code following the instantiation of the starlake job is exactly the same as that defined for StarlakeAirflowBashJob\n#...\n```\n\n\n\n### StarlakeAirflowCloudRunJob\n\nThis class is a concrete implementation of `StarlakeAirflowJob` that overrides the `sl_job` method that will run the starlake command by executing **Cloud Run job**.\n\n#### Cloud Run job configuration\n\nAdditional options may be specified to configure the **Cloud Run job**.\n\n| name | type | description |\n| ---------------------------- | ---- | ---------------------------------------------------------------------------- |\n| **cloud_run_project_id** | str | the optional cloud run project id (the project id on which the composer has been instantiated by default) |\n| **cloud_run_job_name** | str | the required name of the cloud run job |\n| **cloud_run_region** | str | the optional region (`europe-west1` by default) |\n| **cloud_run_service_account** | str | the optional cloud run service account |\n| **cloud_run_async** | bool | the optional flag to run the cloud run job asynchronously (`True` by default)|\n| **retry_on_failure** | bool | the optional flag to retry the cloud run job on failure (`False` by default) |\n| **retry_delay_in_seconds** | int | the optional delay in seconds to wait before retrying the cloud run job (`10` by default) |\n\nIf the execution has been parameterized to be **asynchronous**, an `airflow.sensors.bash.BashSensor` will be instantiated to wait for the completion of the **Cloud Run job** execution.\n\n#### StarlakeAirflowCloudRunJob load Examples\n\nThe following examples shows how to use `StarlakeAirflowCloudRunJob` to generate dynamically DAGs that **load** domains using `starlake` and record corresponding `outlets`.\n\n##### Synchronous execution\n\n```python\ndescription=\"\"\"example to load domain(s) using airflow starlake cloud run job synchronously\"\"\"\n\noptions = {\n # General options\n 'sl_env_var':'{\"SL_ROOT\": \"gs://my-bucket/starbake\"}', \n 'pre_load_strategy':'ack', \n 'global_ack_file_path':'gs://my-bucket/starbake/pending/HighValueCustomers/2024-22-01.ack', \n # Cloud run options\n 'cloud_run_job_name':'starlake', \n 'cloud_run_project_id':'starbake',\n 'cloud_run_async':'False'\n}\n\nfrom ai.starlake.airflow.gcp import StarlakeAirflowCloudRunJob\n\nsl_job = StarlakeAirflowCloudRunJob(options=options)\n# all the code following the instantiation of the starlake job is exactly the same as that defined for StarlakeAirflowBashJob\n#...\n```\n\n\n\n##### Asynchronous execution\n\n```python\n\ndescription=\"\"\"example to load domain(s) using airflow starlake cloud run job asynchronously\"\"\"\n\noptions = {\n # General options\n 'sl_env_var':'{\"SL_ROOT\": \"gs://my-bucket/starbake\"}', \n 'pre_load_strategy':'pending', \n # Cloud run options\n 'cloud_run_job_name':'starlake', \n 'cloud_run_project_id':'starbake',\n# 'cloud_run_async':'True'\n 'retry_on_failure':'True', \n}\n\n# all the code following the options is exactly the same as that defined above\n#...\n```\n\n\n\n## Amazon Web Services\n\n## Azure\n",
"bugtrack_url": null,
"license": "Apache 2.0",
"summary": "Starlake Python Distribution For Airflow",
"version": "0.1.3.1",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "014decf8d47cc449c8813d52bdb369f466715dc5ad50c32029d93d14f5a346b7",
"md5": "c79432e5796fcd8d11918bcea8813363",
"sha256": "5c8f8bff9c1bb8c03bee5dbecf330f5926b087b59c67782d7e08a615023a0997"
},
"downloads": -1,
"filename": "starlake_airflow-0.1.3.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c79432e5796fcd8d11918bcea8813363",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 22148,
"upload_time": "2024-10-17T13:43:45",
"upload_time_iso_8601": "2024-10-17T13:43:45.725877Z",
"url": "https://files.pythonhosted.org/packages/01/4d/ecf8d47cc449c8813d52bdb369f466715dc5ad50c32029d93d14f5a346b7/starlake_airflow-0.1.3.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "e67176f7932d1545d164cc1c432776c937a55b1dd4a1d94cad9e7af37a071914",
"md5": "36aed17ac154a123e7c238f538b21c6c",
"sha256": "4459b1616be96d9c02914a1f664144196e012163357217111a70e0125fe5319e"
},
"downloads": -1,
"filename": "starlake_airflow-0.1.3.1.tar.gz",
"has_sig": false,
"md5_digest": "36aed17ac154a123e7c238f538b21c6c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 25316,
"upload_time": "2024-10-17T13:43:47",
"upload_time_iso_8601": "2024-10-17T13:43:47.582391Z",
"url": "https://files.pythonhosted.org/packages/e6/71/76f7932d1545d164cc1c432776c937a55b1dd4a1d94cad9e7af37a071914/starlake_airflow-0.1.3.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-10-17 13:43:47",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "starlake-airflow"
}