acceldata-airflow-sdk


Nameacceldata-airflow-sdk JSON
Version 3.2.0 PyPI version JSON
download
home_pageNone
SummaryAcceldata Airflow SDK.
upload_time2024-03-21 11:24:41
maintainerNone
docs_urlNone
authoracceldata
requires_pythonNone
licenseMIT License
keywords acceldata-airflow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # ACCELDATA-AIRFLOW-SDK

Acceldata airflow sdk provides support for observability of airflow dags in torch catalog. With the use of acceldata airflow SDK, user can e2e observability on airflow dag run in torch UI. Every dag is associated with pipeline in torch.
<br />
Make sure while configuring airflow, 4 environment variable are set up in airflow environment docker container.


* TORCH_CATALOG_URL - URL of the torch catalog
* TORCH_ACCESS_KEY - API access key generated from torch UI
* TORCH_SECRET_KEY - API secret key generated from torch UI
* ENABLE_VERSION_CHECK - This is used to enable or disable version compatibility check between Torch and SDK. Default Value is 'True'. To disable version check please set it to 'False'.


### Creating Airflow connection
If you want to avoid using environment variables then you can create a connection in Airflow UI as described below and provide the connection id of that connection in TorchInitializer.
Set the following in connection:
* Conn id: Create a unique ID for the connection
* Conn Type: HTTP
* Host - URL of the torch catalog
* Login - API access key generated from torch UI
* Password - API secret key generated from torch UI
* Extra - {"ENABLE_VERSION_CHECK": "{{value}}"}. This value will be used to enable or disable version compatibility check between Torch and SDK. Default Value is 'True'. To disable version check please set it to 'False'.

First of all, install below mentioned 2 pypi package to expose ETL in torch.
```bash
pip install acceldata-sdk
```

Read more about acceldata-sdk from [here](https://pypi.org/project/acceldata-sdk/)

```bash
pip install acceldata-airflow-sdk
```


Read more about acceldata-airflow-sdk from [here](https://pypi.org/project/acceldata-airflow-sdk/)

## Create TorchClient

While creating a TorchClient connection to torch by default version compatibility checks between torch and sdk is enabled. If we want we can disable that check by passing `do_version_check` as `False


```python
from acceldata-sdk.torch_client import TorchClient
torchClient = TorchClient(
    url="https://torch.acceldata.local:5443",
    access_key="OY2VVIN2N6LJ",
    secret_key="da6bDBimQfXSMsyyhlPVJJfk7Zc2gs",
    do_version_check=False
)
```


## Create DAG
In airflow DAG code, import torch dag instead of airflow dag. All the parameters will be the same as standard apache airflow dag. But there will be 2 additional parameters `override_success_callback`, `override_failure_callback`. 

Params:

`override_success_callback`: A Boolean parameter to allow the user to override the success callback provided by the SDK. The success callback end the pipeline run when DAG ends successfully. Default value is False. It should be set to True if we do not want the pipeline run to be ended at the end of the successful run of the DAG.

`override_failure_callback`: A Boolean parameter to allow the user to override the failure callback provided by the SDK. The failure callback ends the pipeline run with error when DAG ends in failure. Default value is False. It should be set to True if we do not want the pipeline run to be ended at the end of the unsuccessful run of the DAG.

These can be useful if few steps of the pipeline are being executed outside of Airflow DAG.
```python
from acceldata_airflow_sdk.dag import DAG
dag = DAG(
   dag_id='pipeline_demo_final',
   schedule_interval='@daily',
   default_args=default_args,
   start_date=datetime(2020, 2, 2),
   catchup=False,
   on_failure_callback= failure_callback,
   on_success_callback= success_callback,
   override_success_callback=False,
   override_failure_callback=False,
)
```




## Create Job and Span using decorator
This was added in version 0.0.36 <br />
To create a job and span in the pipeline, the user needs to decorate the python function with a job decorator as shown in the below example.


Object of `Node` should have either asset_uid (
{data source}.{asset path from its root}) or job_uid (Uid of next Job) as parameters.

Params:

`span_uid`: A String parameter to specify the UID of span to be created. Default value is None. If `span_uid` is not provided a span corresponding to the job will be created with value of job_uid.

`job_uid`: A String parameter to specify the job UID of the pipeline. Default value is None. If `job_uid` is not provided, uid is constructed using the task id and function name.

`inputs`: An Array parameter of Node type objects being used by job as input. Default value is empty array.

`outputs`: An Array parameter of Node type objects being returned by job as output. Default value is empty array.

`metadata`: Parameter of type JobMetadata specifying the metadata of the job. Default value is None.

`xcom_to_event_mapper_ids`: A list Parameter having list of xcom keys used to send xcom variables in span event. Default value is empty list.

`bounded_by_span`: A boolean parameter deciding whether to create a span along with the Job. Default value is True. If its is set to True to create span make sure, it has `**context` parameter inside the function argument. That gives access to the context of the task. Using the context, various span events can be sent inside the function. Use span_context = context['span_context_parent'] to get you the span context.

| NOTE: Passing context is mandatory to the function being decorated as our decorators use context to share information through xcom. |
|-------------------------------------------------------------------------------------------------------------------------------------|

| NOTE: The job_id for a task should be unique in a pipeline. |
|-------------------------------------------------------------|

If there are multiple tasks being created from a job decorated function then do not pass job_uid as that will end up using same job_uid for multiple tasks. In this scenario if we do not pass job_uid the autogenerated job_uid will be unique.

```python
from acceldata_airflow_sdk.decorators.job import job
from acceldata_sdk.models.job import JobMetadata, Node
@job(job_uid='monthly.order.aggregate.job',
   inputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customer_orders')],
   outputs=[Node(job_uid='Job2_uid')],
   metadata=JobMetadata(name = 'Vaishvik_brahmbhatt', team = 'backend', code_location ='https://github.com/acme/reporting/report.scala'),
   span_uid='customer.orders.datagen.span',
   xcom_to_event_mapper_ids = ['run_id', 'event_id'],
   bounded_by_span=True
   )
def monthly_order_aggregate(**context):
    pass
```



## Create Span Using Decorator
To create a span for a python function, the user can decorate a python function with a span decorator that contains span uid as parameters. To decorate function with span make sure, it has `**context` parameter inside the function argument. That gives access to the context of the task. Using the context, various span events can be sent inside the function.  To get the parent span context, use the key name `span_context_parent` in xcom pull of the task instance. It’s value will be span context instance which can  be used to create child spans and send custom events (As shown in below example.)

Params:

`span_uid`: A String parameter to specify the UID of span to be created. Default value is None. If `span_uid` is not provided, uid is constructed using the task id and function name.

`xcom_to_event_mapper_ids`: A Parameter having list of xcom keys used to send xcom variables in span event. Default value is empty list.

| NOTE: Passing context is mandatory to the function being decorated as our decorators use context to share information through xcom. |
|-------------------------------------------------------------------------------------------------------------------------------------|
```python
from acceldata_airflow_sdk.decorators.span import span
from acceldata_sdk.events.generic_event import GenericEvent
@span(span_uid='customer.orders.datagen.span',
      associated_job_uids = ['monthly.order.aggregate.transfer'],  xcom_to_event_mapper_ids = ['run_id', 'event_id'] )
def data_gen(**context):
   datagen_span_context = context['span_context_parent']
   # Send event for current span
   datagen_span_context.send_event(
      GenericEvent(
         context_data={
            'client_time': str(datetime.now()), 
            'detail': 'Generating data'
         }, 
         event_uid="order.customer.join.result"
      )
   )
   customer_datagen_span = datagen_span_context.create_child_span(
       uid="customer.data.gen", 
      context_data= {'client_time': str(datetime.now()) }
   )
   # Send event for child span
   customer_datagen_span.send_event(
      GenericEvent(
         context_data={
            'client_time': str(datetime.now()), 
            'row_count': len(rows)
         }, 
         event_uid="order.customer.join.result"
      )
   )
   customer_datagen_span.end(
       context_data={'client_time': str(datetime.now()), 'customers_count': len(customer_ids) }
   )

```


## Custom Operators
Acceldata airflow sdk contains 4 custom operators.
##### TorchInitializer Operator:
The user needs to add a task with a given operator at the root of your dag. This operator will create a new pipeline. Additionally, this will create new pipeline run and root span for that dag run of the airflow dag.

Params:

`create_pipeline`: A Boolean parameter deciding whether to create a pipeline(if not exists) and pipeline run. Default Value is True. This can be useful if pipeline/pipeline run has been created outside of Airflow DAG.

`span_name`: A string parameter specifying name of the Root Span. Default value is None. If not provided we will use the pipeline_uid.span as span name.

`meta`: A parameter specifying the metadata for pipeline (PipelineMetadata). Default value is None. If not provided PipelineMetadata(owner='sdk/pipeline-user', team='TORCH', codeLocation='...') is set as meta. 

`pipeline_uid`: A string parameter specifying the UID of the pipeline. It is a mandatory parameter.

`pipeline_name`: A string parameter specifying the Name of the pipeline. Default value is None. If not provided pipeline_uid will be used as name.

`continuation_id`: A string parameter that uniquely identifies a pipeline run. This parameter can accept jinja templates as well. Default value is None. This parameter is useful when we want to have a pipeline run span over multiple DAG's. To use it we need to provide a continuation id while creating pipeline in first DAG with create_pipeline=True and then provide the same continuation id in the second DAG where we want to continue the same pipeline run with create_pipeline=False.

`connection_id`: A string parameter that uniquely identifies a connection storing Torch credentials. Default value is None. This parameter is useful when we want to use Torch credentials from Airflow connection instead of environment variables. To get details about creating a connection refer 'Creating Airflow connection' section above. 


```python
from acceldata_airflow_sdk.operators.torch_initialiser_operator import TorchInitializer
from acceldata_sdk.models.pipeline import PipelineMetadata

# example of jinja templates being used in continuation_id
# jinja template to pull value from config json
# continuation_id=f"{{{{ dag_run.conf['continuation_id']  }}}}"
# jinja template to pull value from xcom
# continuation_id=f"{{{{ task_instance.xcom_pull(key='continuation_id') }}}}"

torch_initializer_task = TorchInitializer(
   task_id='torch_pipeline_initializer',
   pipeline_uid='customer.orders.monthly.agg.demo',
   pipeline_name='CUSTOMERS ORDERS MONTHLY AGG',
   continuation_id='heterogeneous_test',
   create_pipeline=True,
   span_name='customer.orders.monthly.agg.demo.span',
   meta=PipelineMetadata(owner='test', team='testing', codeLocation='...'),
   dag=dag
)

```

##### SpanOperator Operator :
SpanOperator Operator will execute any std operator being passed as `operator` parameter and send span start and end event it. Just wrap the std operator with a span operator.
Make sure that the wrapped operator is not added in the DAG. If the operator is wrapped with a span operator, the span operator will take care of that operator task inside its execution. 

Params:

`span_uid`: A string parameter specifying the UID of span to be created. If `job_uid` is not provided, uid is constructed using the task id and operator name.

`xcom_to_event_mapper_ids`: A list parameter having list of xcom keys used to send xcom variables in span event. Default value is empty list.

`operator` : A parameter specifying the Standard airflow operator. It is a mandatory parameter.

Other parameters will be the same as the airflow standard base operator.

| WARNING: Do not specify the `dag` parameter in std airflow operator being passed as an argument to SpanOperator as the execution of operator task is taken care of by SpanOperator.   |
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 
```python
from acceldata_airflow_sdk.operators.span_operator import SpanOperator

get_order_agg_for_q4 = PostgresOperator(
   task_id="get_monthly_order_aggregate_last_quarter",
   postgres_conn_id='example_db',
   sql="select * from information_schema.attributess",
)

get_order_agg_for_q4 = SpanOperator(
   task_id="get_monthly_order_aggregate_last_quarter",
   span_uid='monthly.order.agg.q4.span',
   operator=get_order_agg_for_q4,
   associated_job_uids = ['monthly.order.aggregate.transfer'],  
   xcom_to_event_mapper_ids = ['run_id', 'event_id'] ,
   dag=dag
)
```

##### JobOperator Operator :
JobOperator Operator will execute any std operator being passed as `operator` parameter and create a job and send span start and end event. Just wrap the std operator with a Job operator.
Make sure that the wrapped operator is not added in the DAG. If the operator is wrapped with a Job operator, the Job operator will take care of that operator task inside its execution. 

Object of `Node` should have either asset_uid (
{data source}.{asset path from its root}) or job_uid (Uid of next Job) as parameters.

Params:

`span_uid`: A string parameter to specify the UID of span to be created. Default value is None. If `span_uid` is not provided a span corresponding to the job will be created with value of job_uid.

`job_uid`: A string parameter to specify the job UID of the pipeline. Default value is None. If `job_uid` is not provided, uid is constructed using the task id and operator name.

`inputs`: An array parameter of Node type objects being used by job as input. Default value is empty array.

`outputs`: An array parameter of Node type objects being returned by job as output. Default value is empty array.

`metadata`: A parameter of type JobMetadata specifying the metadata of the job. Default value is None.

`xcom_to_event_mapper_ids`: A list parameter having list of xcom keys used to send xcom variables in span event. Default value is empty list.

`bounded_by_span`: A boolean parameter deciding whether to create a span along with the Job. Default value is True. If its is set to True to create span make sure, it has `**context` parameter inside the function argument. That gives access to the context of the task. Using the context, various span events can be sent inside the function. Use span_context = context['span_context_parent'] to get you the span context.

`operator` : A Parameter specifying the Standard airflow operator. It is a mandatory parameter.

Other parameters will be the same as the airflow standard base operator.
Make sure, inside a Node the type of the object which will have asset_uid (
{data source}.{asset path from its root}) or job_uid (Uid of next Job) as parameters.

| WARNING: Do not specify the `dag` parameter in std airflow operator being passed as an argument to JobOperator as the execution of operator task is taken care of by JobOperator.  |
|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 
```python
from acceldata_airflow_sdk.operators.job_operator import JobOperator
from acceldata_sdk.models.job import Node, JobMetadata
get_order_agg_for_q4 = PostgresOperator(
   task_id="get_monthly_order_aggregate_last_quarter",
   postgres_conn_id='example_db',
   sql="select * from information_schema.attributess",
)

get_order_agg_for_q4 = JobOperator(
   task_id="get_monthly_order_aggregate_last_quarter",
   job_uid='customer.order.join.job',
   inputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.orders'), Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customers')] ,
   outputs=[Node(job_uid='next_job_uid')],
   metadata=JobMetadata('name', 'team', 'code_location'),
   span_uid='monthly.order.agg.q4.span',
   operator=get_order_agg_for_q4,
   xcom_to_event_mapper_ids = ['run_id', 'event_id'] ,
   bounded_by_span = True,
   dag=dag
)
```


##### ExecutePolicyOperator Operator : 
`ExecutePolicyOperator` is used to execute a policy by passing `policytype` and `policy_id`.

Params:

`sync`: A boolean parameter used to decide if the policy should be executed synchronously or asynchronously. It is a mandatory parameter. If it is set to  `True` it will return only after the execution ends. If it is set to `False` it will return immediately after starting the execution.

`policy_type`: A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is a enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.

`policy_id`: A string parameter used to specify the policy id to be executed. It is a mandatory parameter. 

`incremental`: A boolean parameter used to specify if the policy execution should be incremental or full. Default value is False.

`failure_strategy`: An enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.

* `failure_strategy` takes enum of type `FailureStrategy` which can have 3 values DoNotFail, FailOnError and FailOnWarning.

* DoNotFail will never throw. In case of failure it will log the error.
* FailOnError will Throw exception only if it's an error. In case of warning it return without any errors.
* FailOnWarning will Throw exception on warning as well as error.

```python
from acceldata_airflow_sdk.operators.execute_policy_operator import ExecutePolicyOperator
from acceldata_sdk.constants import FailureStrategy, PolicyType
operator_task = ExecutePolicyOperator(
    task_id='torch_pipeline_operator_test',
    policy_type=PolicyType.DATA_QUALITY,
    policy_id=46,
    sync=True,
    failure_strategy=FailureStrategy.DoNotFail,
    dag=dag
)
```

`ExecutePolicyOperator` stores the execution id of the policy executed in xcom using the key {`policy_type.name`}_{`policy_id`}_execution_id. Replace the policy_type and policy_id based on the policy.

Hence, to query the result in another task you need to pull the execution id from xcom using the same key {`policy_type`}_{`policy_id`}_execution_id 

`get_polcy_execution_result` can be used to query the result using the execution id pulled from xcom.
In this example the policy_type is const.PolicyType.DATA_QUALITY.name and the policy_id is 46.

Params:

`policy_type`:  A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is a enum which can take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.

`execution_id`: A string parameter specifying the execution id for which we want to query the results. It is a mandatory parameter. 

`failure_strategy`: An Enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.

* `failure_strategy` takes enum of type `FailureStrategy` which can have 3 values DoNotFail, FailOnError and FailOnWarning.
    
* DoNotFail will never throw. In case of failure it will log the error.
* FailOnError will Throw exception only if it's an error. In case of warning it return without any errors.
* FailOnWarning will Throw exception on warning as well as error.


```python
from acceldata_sdk.torch_client import TorchClient
from acceldata_airflow_sdk.initialiser import torch_credentials
from acceldata_sdk.constants import FailureStrategy, PolicyType, RuleExecutionStatus

def ruleoperator_result(**context):
    xcom_key = f'{PolicyType.DATA_QUALITY.name}_46_execution_id'
    task_instance = context['ti']
    # pull the execution id from xcom
    execution_id = task_instance.xcom_pull(key=xcom_key)
    if execution_id is not None:
        torch_client = TorchClient(**torch_credentials)
        result = torch_client.get_polcy_execution_result(policy_type=PolicyType.DATA_QUALITY, execution_id=execution_id,
                                                        failure_strategy=FailureStrategy.DoNotFail)
    if result.execution.resultStatus == RuleExecutionStatus.ERRORED:
        print(result.execution.executionError)
```


`get_policy_status` can be used to  query the current status of execution.

Params:

`policy_type`:  A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which can take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.

`execution_id`: A string parameter specifying the execution id which we want to query the status. It is a mandatory parameter.

You need to pull the execution id from xcom using the same key {`policy_type.name`}_{`policy_id`}_execution_id which was pushed by `ExecutePolicyOperator`. Replace the policy_type and policy_id based on the policy. In this example the policy_type is PolicyType.DATA_QUALITY.name and the policy_id is 46.




```python
from acceldata_sdk.torch_client import TorchClient
from acceldata_airflow_sdk.initialiser import torch_credentials
import acceldata_sdk.constants as const
def ruleoperator_status(**context):
    xcom_key = f'{const.PolicyType.DATA_QUALITY.name}_46_execution_id'
    task_instance = context['ti']
    # pull the execution id from xcom
    execution_id = task_instance.xcom_pull(key=xcom_key)
    if execution_id is not None:
        torch_client = TorchClient(**torch_credentials)
        result = torch_client.get_policy_status(policy_type=const.PolicyType.DATA_QUALITY, execution_id=execution_id)
        if result==const.RuleExecutionStatus.ERRORED:
            print("Policy execution encountered an error.")

```

Version Log
==========

3.2.0 (14/03/2024)
-------------------
- Acceldata airflow sdk - Wrapper on apache airflow
- Acceldata airflow sdk provides support for observability of airflow dags in torch catalog. With the use of acceldata airflow SDK, user can e2e observability on airflow dag run in torch UI.




            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "acceldata-airflow-sdk",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "acceldata-airflow",
    "author": "acceldata",
    "author_email": "apisupport@acceldata.io",
    "download_url": "https://files.pythonhosted.org/packages/d3/d5/86fc99cfad5a07aa7bb3b8e243f5699d7e5f5d6038737cf58cf757a33954/acceldata_airflow_sdk-3.2.0.tar.gz",
    "platform": null,
    "description": "# ACCELDATA-AIRFLOW-SDK\n\nAcceldata airflow sdk provides support for observability of airflow dags in torch catalog. With the use of acceldata airflow SDK, user can e2e observability on airflow dag run in torch UI. Every dag is associated with pipeline in torch.\n<br />\nMake sure while configuring airflow, 4 environment variable are set up in airflow environment docker container.\n\n\n* TORCH_CATALOG_URL - URL of the torch catalog\n* TORCH_ACCESS_KEY - API access key generated from torch UI\n* TORCH_SECRET_KEY - API secret key generated from torch UI\n* ENABLE_VERSION_CHECK - This is used to enable or disable version compatibility check between Torch and SDK. Default Value is 'True'. To disable version check please set it to 'False'.\n\n\n### Creating Airflow connection\nIf you want to avoid using environment variables then you can create a connection in Airflow UI as described below and provide the connection id of that connection in TorchInitializer.\nSet the following in connection:\n* Conn id: Create a unique ID for the connection\n* Conn Type: HTTP\n* Host - URL of the torch catalog\n* Login - API access key generated from torch UI\n* Password - API secret key generated from torch UI\n* Extra - {\"ENABLE_VERSION_CHECK\": \"{{value}}\"}. This value will be used to enable or disable version compatibility check between Torch and SDK. Default Value is 'True'. To disable version check please set it to 'False'.\n\nFirst of all, install below mentioned 2 pypi package to expose ETL in torch.\n```bash\npip install acceldata-sdk\n```\n\nRead more about acceldata-sdk from [here](https://pypi.org/project/acceldata-sdk/)\n\n```bash\npip install acceldata-airflow-sdk\n```\n\n\nRead more about acceldata-airflow-sdk from [here](https://pypi.org/project/acceldata-airflow-sdk/)\n\n## Create TorchClient\n\nWhile creating a TorchClient connection to torch by default version compatibility checks between torch and sdk is enabled. If we want we can disable that check by passing `do_version_check` as `False\n\n\n```python\nfrom acceldata-sdk.torch_client import TorchClient\ntorchClient = TorchClient(\n    url=\"https://torch.acceldata.local:5443\",\n    access_key=\"OY2VVIN2N6LJ\",\n    secret_key=\"da6bDBimQfXSMsyyhlPVJJfk7Zc2gs\",\n    do_version_check=False\n)\n```\n\n\n## Create DAG\nIn airflow DAG code, import torch dag instead of airflow dag. All the parameters will be the same as standard apache airflow dag. But there will be 2 additional parameters `override_success_callback`, `override_failure_callback`. \n\nParams:\n\n`override_success_callback`: A Boolean parameter to allow the user to override the success callback provided by the SDK. The success callback end the pipeline run when DAG ends successfully. Default value is False. It should be set to True if we do not want the pipeline run to be ended at the end of the successful run of the DAG.\n\n`override_failure_callback`: A Boolean parameter to allow the user to override the failure callback provided by the SDK. The failure callback ends the pipeline run with error when DAG ends in failure. Default value is False. It should be set to True if we do not want the pipeline run to be ended at the end of the unsuccessful run of the DAG.\n\nThese can be useful if few steps of the pipeline are being executed outside of Airflow DAG.\n```python\nfrom acceldata_airflow_sdk.dag import DAG\ndag = DAG(\n   dag_id='pipeline_demo_final',\n   schedule_interval='@daily',\n   default_args=default_args,\n   start_date=datetime(2020, 2, 2),\n   catchup=False,\n   on_failure_callback= failure_callback,\n   on_success_callback= success_callback,\n   override_success_callback=False,\n   override_failure_callback=False,\n)\n```\n\n\n\n\n## Create Job and Span using decorator\nThis was added in version 0.0.36 <br />\nTo create a job and span in the pipeline, the user needs to decorate the python function with a job decorator as shown in the below example.\n\n\nObject of `Node` should have either asset_uid (\n{data source}.{asset path from its root}) or job_uid (Uid of next Job) as parameters.\n\nParams:\n\n`span_uid`: A String parameter to specify the UID of span to be created. Default value is None. If `span_uid` is not provided a span corresponding to the job will be created with value of job_uid.\n\n`job_uid`: A String parameter to specify the job UID of the pipeline. Default value is None. If `job_uid` is not provided, uid is constructed using the task id and function name.\n\n`inputs`: An Array parameter of Node type objects being used by job as input. Default value is empty array.\n\n`outputs`: An Array parameter of Node type objects being returned by job as output. Default value is empty array.\n\n`metadata`: Parameter of type JobMetadata specifying the metadata of the job. Default value is None.\n\n`xcom_to_event_mapper_ids`: A list Parameter having list of xcom keys used to send xcom variables in span event. Default value is empty list.\n\n`bounded_by_span`: A boolean parameter deciding whether to create a span along with the Job. Default value is True. If its is set to True to create span make sure, it has `**context` parameter inside the function argument. That gives access to the context of the task. Using the context, various span events can be sent inside the function. Use span_context = context['span_context_parent'] to get you the span context.\n\n| NOTE: Passing context is mandatory to the function being decorated as our decorators use context to share information through xcom. |\n|-------------------------------------------------------------------------------------------------------------------------------------|\n\n| NOTE: The job_id for a task should be unique in a pipeline. |\n|-------------------------------------------------------------|\n\nIf there are multiple tasks being created from a job decorated function then do not pass job_uid as that will end up using same job_uid for multiple tasks. In this scenario if we do not pass job_uid the autogenerated job_uid will be unique.\n\n```python\nfrom acceldata_airflow_sdk.decorators.job import job\nfrom acceldata_sdk.models.job import JobMetadata, Node\n@job(job_uid='monthly.order.aggregate.job',\n   inputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customer_orders')],\n   outputs=[Node(job_uid='Job2_uid')],\n   metadata=JobMetadata(name = 'Vaishvik_brahmbhatt', team = 'backend', code_location ='https://github.com/acme/reporting/report.scala'),\n   span_uid='customer.orders.datagen.span',\n   xcom_to_event_mapper_ids = ['run_id', 'event_id'],\n   bounded_by_span=True\n   )\ndef monthly_order_aggregate(**context):\n    pass\n```\n\n\n\n## Create Span Using Decorator\nTo create a span for a python function, the user can decorate a python function with a span decorator that contains span uid as parameters. To decorate function with span make sure, it has `**context` parameter inside the function argument. That gives access to the context of the task. Using the context, various span events can be sent inside the function.  To get the parent span context, use the key name `span_context_parent` in xcom pull of the task instance. It\u2019s value will be span context instance which can  be used to create child spans and send custom events (As shown in below example.)\n\nParams:\n\n`span_uid`: A String parameter to specify the UID of span to be created. Default value is None. If `span_uid` is not provided, uid is constructed using the task id and function name.\n\n`xcom_to_event_mapper_ids`: A Parameter having list of xcom keys used to send xcom variables in span event. Default value is empty list.\n\n| NOTE: Passing context is mandatory to the function being decorated as our decorators use context to share information through xcom. |\n|-------------------------------------------------------------------------------------------------------------------------------------|\n```python\nfrom acceldata_airflow_sdk.decorators.span import span\nfrom acceldata_sdk.events.generic_event import GenericEvent\n@span(span_uid='customer.orders.datagen.span',\n      associated_job_uids = ['monthly.order.aggregate.transfer'],  xcom_to_event_mapper_ids = ['run_id', 'event_id'] )\ndef data_gen(**context):\n   datagen_span_context = context['span_context_parent']\n   # Send event for current span\n   datagen_span_context.send_event(\n      GenericEvent(\n         context_data={\n            'client_time': str(datetime.now()), \n            'detail': 'Generating data'\n         }, \n         event_uid=\"order.customer.join.result\"\n      )\n   )\n   customer_datagen_span = datagen_span_context.create_child_span(\n       uid=\"customer.data.gen\", \n      context_data= {'client_time': str(datetime.now()) }\n   )\n   # Send event for child span\n   customer_datagen_span.send_event(\n      GenericEvent(\n         context_data={\n            'client_time': str(datetime.now()), \n            'row_count': len(rows)\n         }, \n         event_uid=\"order.customer.join.result\"\n      )\n   )\n   customer_datagen_span.end(\n       context_data={'client_time': str(datetime.now()), 'customers_count': len(customer_ids) }\n   )\n\n```\n\n\n## Custom Operators\nAcceldata airflow sdk contains 4 custom operators.\n##### TorchInitializer Operator:\nThe user needs to add a task with a given operator at the root of your dag. This operator will create a new pipeline. Additionally, this will create new pipeline run and root span for that dag run of the airflow dag.\n\nParams:\n\n`create_pipeline`: A Boolean parameter deciding whether to create a pipeline(if not exists) and pipeline run. Default Value is True. This can be useful if pipeline/pipeline run has been created outside of Airflow DAG.\n\n`span_name`: A string parameter specifying name of the Root Span. Default value is None. If not provided we will use the pipeline_uid.span as span name.\n\n`meta`: A parameter specifying the metadata for pipeline (PipelineMetadata). Default value is None. If not provided PipelineMetadata(owner='sdk/pipeline-user', team='TORCH', codeLocation='...') is set as meta. \n\n`pipeline_uid`: A string parameter specifying the UID of the pipeline. It is a mandatory parameter.\n\n`pipeline_name`: A string parameter specifying the Name of the pipeline. Default value is None. If not provided pipeline_uid will be used as name.\n\n`continuation_id`: A string parameter that uniquely identifies a pipeline run. This parameter can accept jinja templates as well. Default value is None. This parameter is useful when we want to have a pipeline run span over multiple DAG's. To use it we need to provide a continuation id while creating pipeline in first DAG with create_pipeline=True and then provide the same continuation id in the second DAG where we want to continue the same pipeline run with create_pipeline=False.\n\n`connection_id`: A string parameter that uniquely identifies a connection storing Torch credentials. Default value is None. This parameter is useful when we want to use Torch credentials from Airflow connection instead of environment variables. To get details about creating a connection refer 'Creating Airflow connection' section above. \n\n\n```python\nfrom acceldata_airflow_sdk.operators.torch_initialiser_operator import TorchInitializer\nfrom acceldata_sdk.models.pipeline import PipelineMetadata\n\n# example of jinja templates being used in continuation_id\n# jinja template to pull value from config json\n# continuation_id=f\"{{{{ dag_run.conf['continuation_id']  }}}}\"\n# jinja template to pull value from xcom\n# continuation_id=f\"{{{{ task_instance.xcom_pull(key='continuation_id') }}}}\"\n\ntorch_initializer_task = TorchInitializer(\n   task_id='torch_pipeline_initializer',\n   pipeline_uid='customer.orders.monthly.agg.demo',\n   pipeline_name='CUSTOMERS ORDERS MONTHLY AGG',\n   continuation_id='heterogeneous_test',\n   create_pipeline=True,\n   span_name='customer.orders.monthly.agg.demo.span',\n   meta=PipelineMetadata(owner='test', team='testing', codeLocation='...'),\n   dag=dag\n)\n\n```\n\n##### SpanOperator Operator :\nSpanOperator Operator will execute any std operator being passed as `operator` parameter and send span start and end event it. Just wrap the std operator with a span operator.\nMake sure that the wrapped operator is not added in the DAG. If the operator is wrapped with a span operator, the span operator will take care of that operator task inside its execution. \n\nParams:\n\n`span_uid`: A string parameter specifying the UID of span to be created. If `job_uid` is not provided, uid is constructed using the task id and operator name.\n\n`xcom_to_event_mapper_ids`: A list parameter having list of xcom keys used to send xcom variables in span event. Default value is empty list.\n\n`operator` : A parameter specifying the Standard airflow operator. It is a mandatory parameter.\n\nOther parameters will be the same as the airflow standard base operator.\n\n| WARNING: Do not specify the `dag` parameter in std airflow operator being passed as an argument to SpanOperator as the execution of operator task is taken care of by SpanOperator.   |\n|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n \n```python\nfrom acceldata_airflow_sdk.operators.span_operator import SpanOperator\n\nget_order_agg_for_q4 = PostgresOperator(\n   task_id=\"get_monthly_order_aggregate_last_quarter\",\n   postgres_conn_id='example_db',\n   sql=\"select * from information_schema.attributess\",\n)\n\nget_order_agg_for_q4 = SpanOperator(\n   task_id=\"get_monthly_order_aggregate_last_quarter\",\n   span_uid='monthly.order.agg.q4.span',\n   operator=get_order_agg_for_q4,\n   associated_job_uids = ['monthly.order.aggregate.transfer'],  \n   xcom_to_event_mapper_ids = ['run_id', 'event_id'] ,\n   dag=dag\n)\n```\n\n##### JobOperator Operator :\nJobOperator Operator will execute any std operator being passed as `operator` parameter and create a job and send span start and end event. Just wrap the std operator with a Job operator.\nMake sure that the wrapped operator is not added in the DAG. If the operator is wrapped with a Job operator, the Job operator will take care of that operator task inside its execution. \n\nObject of `Node` should have either asset_uid (\n{data source}.{asset path from its root}) or job_uid (Uid of next Job) as parameters.\n\nParams:\n\n`span_uid`: A string parameter to specify the UID of span to be created. Default value is None. If `span_uid` is not provided a span corresponding to the job will be created with value of job_uid.\n\n`job_uid`: A string parameter to specify the job UID of the pipeline. Default value is None. If `job_uid` is not provided, uid is constructed using the task id and operator name.\n\n`inputs`: An array parameter of Node type objects being used by job as input. Default value is empty array.\n\n`outputs`: An array parameter of Node type objects being returned by job as output. Default value is empty array.\n\n`metadata`: A parameter of type JobMetadata specifying the metadata of the job. Default value is None.\n\n`xcom_to_event_mapper_ids`: A list parameter having list of xcom keys used to send xcom variables in span event. Default value is empty list.\n\n`bounded_by_span`: A boolean parameter deciding whether to create a span along with the Job. Default value is True. If its is set to True to create span make sure, it has `**context` parameter inside the function argument. That gives access to the context of the task. Using the context, various span events can be sent inside the function. Use span_context = context['span_context_parent'] to get you the span context.\n\n`operator` : A Parameter specifying the Standard airflow operator. It is a mandatory parameter.\n\nOther parameters will be the same as the airflow standard base operator.\nMake sure, inside a Node the type of the object which will have asset_uid (\n{data source}.{asset path from its root}) or job_uid (Uid of next Job) as parameters.\n\n| WARNING: Do not specify the `dag` parameter in std airflow operator being passed as an argument to JobOperator as the execution of operator task is taken care of by JobOperator.  |\n|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n \n```python\nfrom acceldata_airflow_sdk.operators.job_operator import JobOperator\nfrom acceldata_sdk.models.job import Node, JobMetadata\nget_order_agg_for_q4 = PostgresOperator(\n   task_id=\"get_monthly_order_aggregate_last_quarter\",\n   postgres_conn_id='example_db',\n   sql=\"select * from information_schema.attributess\",\n)\n\nget_order_agg_for_q4 = JobOperator(\n   task_id=\"get_monthly_order_aggregate_last_quarter\",\n   job_uid='customer.order.join.job',\n   inputs=[Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.orders'), Node(asset_uid='POSTGRES_LOCAL_DS.pipeline.pipeline.customers')] ,\n   outputs=[Node(job_uid='next_job_uid')],\n   metadata=JobMetadata('name', 'team', 'code_location'),\n   span_uid='monthly.order.agg.q4.span',\n   operator=get_order_agg_for_q4,\n   xcom_to_event_mapper_ids = ['run_id', 'event_id'] ,\n   bounded_by_span = True,\n   dag=dag\n)\n```\n\n\n##### ExecutePolicyOperator Operator : \n`ExecutePolicyOperator` is used to execute a policy by passing `policytype` and `policy_id`.\n\nParams:\n\n`sync`: A boolean parameter used to decide if the policy should be executed synchronously or asynchronously. It is a mandatory parameter. If it is set to  `True` it will return only after the execution ends. If it is set to `False` it will return immediately after starting the execution.\n\n`policy_type`: A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is a enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.\n\n`policy_id`: A string parameter used to specify the policy id to be executed. It is a mandatory parameter. \n\n`incremental`: A boolean parameter used to specify if the policy execution should be incremental or full. Default value is False.\n\n`failure_strategy`: An enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.\n\n* `failure_strategy` takes enum of type `FailureStrategy` which can have 3 values DoNotFail, FailOnError and FailOnWarning.\n\n* DoNotFail will never throw. In case of failure it will log the error.\n* FailOnError will Throw exception only if it's an error. In case of warning it return without any errors.\n* FailOnWarning will Throw exception on warning as well as error.\n\n```python\nfrom acceldata_airflow_sdk.operators.execute_policy_operator import ExecutePolicyOperator\nfrom acceldata_sdk.constants import FailureStrategy, PolicyType\noperator_task = ExecutePolicyOperator(\n    task_id='torch_pipeline_operator_test',\n    policy_type=PolicyType.DATA_QUALITY,\n    policy_id=46,\n    sync=True,\n    failure_strategy=FailureStrategy.DoNotFail,\n    dag=dag\n)\n```\n\n`ExecutePolicyOperator` stores the execution id of the policy executed in xcom using the key {`policy_type.name`}_{`policy_id`}_execution_id. Replace the policy_type and policy_id based on the policy.\n\nHence, to query the result in another task you need to pull the execution id from xcom using the same key {`policy_type`}_{`policy_id`}_execution_id \n\n`get_polcy_execution_result` can be used to query the result using the execution id pulled from xcom.\nIn this example the policy_type is const.PolicyType.DATA_QUALITY.name and the policy_id is 46.\n\nParams:\n\n`policy_type`:  A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is a enum which can take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.\n\n`execution_id`: A string parameter specifying the execution id for which we want to query the results. It is a mandatory parameter. \n\n`failure_strategy`: An Enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.\n\n* `failure_strategy` takes enum of type `FailureStrategy` which can have 3 values DoNotFail, FailOnError and FailOnWarning.\n    \n* DoNotFail will never throw. In case of failure it will log the error.\n* FailOnError will Throw exception only if it's an error. In case of warning it return without any errors.\n* FailOnWarning will Throw exception on warning as well as error.\n\n\n```python\nfrom acceldata_sdk.torch_client import TorchClient\nfrom acceldata_airflow_sdk.initialiser import torch_credentials\nfrom acceldata_sdk.constants import FailureStrategy, PolicyType, RuleExecutionStatus\n\ndef ruleoperator_result(**context):\n    xcom_key = f'{PolicyType.DATA_QUALITY.name}_46_execution_id'\n    task_instance = context['ti']\n    # pull the execution id from xcom\n    execution_id = task_instance.xcom_pull(key=xcom_key)\n    if execution_id is not None:\n        torch_client = TorchClient(**torch_credentials)\n        result = torch_client.get_polcy_execution_result(policy_type=PolicyType.DATA_QUALITY, execution_id=execution_id,\n                                                        failure_strategy=FailureStrategy.DoNotFail)\n    if result.execution.resultStatus == RuleExecutionStatus.ERRORED:\n        print(result.execution.executionError)\n```\n\n\n`get_policy_status` can be used to  query the current status of execution.\n\nParams:\n\n`policy_type`:  A PolicyType parameter used to specify the policy type. It is a mandatory parameter. It is an enum which can take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.\n\n`execution_id`: A string parameter specifying the execution id which we want to query the status. It is a mandatory parameter.\n\nYou need to pull the execution id from xcom using the same key {`policy_type.name`}_{`policy_id`}_execution_id which was pushed by `ExecutePolicyOperator`. Replace the policy_type and policy_id based on the policy. In this example the policy_type is PolicyType.DATA_QUALITY.name and the policy_id is 46.\n\n\n\n\n```python\nfrom acceldata_sdk.torch_client import TorchClient\nfrom acceldata_airflow_sdk.initialiser import torch_credentials\nimport acceldata_sdk.constants as const\ndef ruleoperator_status(**context):\n    xcom_key = f'{const.PolicyType.DATA_QUALITY.name}_46_execution_id'\n    task_instance = context['ti']\n    # pull the execution id from xcom\n    execution_id = task_instance.xcom_pull(key=xcom_key)\n    if execution_id is not None:\n        torch_client = TorchClient(**torch_credentials)\n        result = torch_client.get_policy_status(policy_type=const.PolicyType.DATA_QUALITY, execution_id=execution_id)\n        if result==const.RuleExecutionStatus.ERRORED:\n            print(\"Policy execution encountered an error.\")\n\n```\n\nVersion Log\n==========\n\n3.2.0 (14/03/2024)\n-------------------\n- Acceldata airflow sdk - Wrapper on apache airflow\n- Acceldata airflow sdk provides support for observability of airflow dags in torch catalog. With the use of acceldata airflow SDK, user can e2e observability on airflow dag run in torch UI.\n\n\n\n",
    "bugtrack_url": null,
    "license": "MIT License",
    "summary": "Acceldata Airflow SDK.",
    "version": "3.2.0",
    "project_urls": null,
    "split_keywords": [
        "acceldata-airflow"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d3d586fc99cfad5a07aa7bb3b8e243f5699d7e5f5d6038737cf58cf757a33954",
                "md5": "fb8a5edf44c98e58a4f49f25be32373c",
                "sha256": "404dca15649c22ee7e908406c29de59432aa88e6cb9655452a0e69890f7431b2"
            },
            "downloads": -1,
            "filename": "acceldata_airflow_sdk-3.2.0.tar.gz",
            "has_sig": false,
            "md5_digest": "fb8a5edf44c98e58a4f49f25be32373c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 36968,
            "upload_time": "2024-03-21T11:24:41",
            "upload_time_iso_8601": "2024-03-21T11:24:41.482358Z",
            "url": "https://files.pythonhosted.org/packages/d3/d5/86fc99cfad5a07aa7bb3b8e243f5699d7e5f5d6038737cf58cf757a33954/acceldata_airflow_sdk-3.2.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-21 11:24:41",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "acceldata-airflow-sdk"
}
        
Elapsed time: 0.18336s