acceldata-sdk


Nameacceldata-sdk JSON
Version 3.2.0 PyPI version JSON
download
home_pageNone
SummaryAcceldata SDK.
upload_time2024-03-21 11:23:12
maintainerNone
docs_urlNone
authoracceldata
requires_pythonNone
licenseMIT License
keywords acceldata-sdk
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Pipeline APIs

Acceldata Torch is a complete solution to observe the quality of the data present in your data lake and warehouse. Using Torch, you can ensure that high-quality data backs your business decisions. Torch provides you with tools to measure the quality of data in a data catalog and to never miss significant data sources. All users including analysts, data scientists, and developers, can rely on Torch to observe the data flowing in the warehouse or data lake and can rest assured that there is no loss of data. 
<br />
Acceldata SDK is used to trigger torch catalog and pipeline APIs. By creating a Torch client, all the torch apis can be accessed. 

Install `acceldata-sdk` pypi package in a python environment.
```bash
pip install acceldata-sdk
```

## Create Torch Client
Torch client is used to send data to the torch servers. It consists of various methods to communicate with the torch server. Torch client have access to catalog and pipeline APIs. To create a torch client, torch url and API keys are required. To create torch API keys, go to torch ui’s settings and generate keys for the client.

While creating a TorchClient connection to torch by default version compatibility checks between torch and sdk is enabled. If we want we can disable that check by passing `do_version_check` as `False.


```python
from acceldata_sdk.torch_client import TorchClient

torch_client = TorchClient(url='https://acceldata.host.dev:9999', access_key='******',
                         secret_key='*****************', do_version_check=True)

```

## Pipeline API 
There are various pipeline APIs are supported through Acceldata SDK. Pipeline APIs like create pipeline, add jobs and spans, initiate pipeline run et cetera. Acceldata sdk is able to send various event during span life cycle. Hence, Acceldata sdk has full control over the pipelines.

##### Create Pipeline And Job and span to bound the job
`Pipeline` represents the ETL pipeline in its entirety and will contain Asset nodes and Jobs associated. The complete pipeline definition forms the Lineage graph for all the data assets.
</br>
`Job Node` or `Process Node` represents an entity that does some job in the ETL workflow. From this representation, `Job’s input` is some assets or some other Jobs, and output is few other assets or few other Jobs.
Torch will use the set of Jobs definition in the workflow to create the Lineage, and also track version changes for the Pipeline.

Acceldata sdk provides `CreateJob` class which need to be passed to `create_job` function as a parameter to create a job.

Params for `CreateJob`:

`uid`: uid of the job. It should be unique for the job. It is a mandatory parameter.<br/>
`name`: name of the job. It is a mandatory parameter.

#### NOTE: This changed in 2.4.1 release

`pipeline_run_id`: id of the pipeline_run for which you want to add a job. It is a mandatory parameter if `job` is being created using `pipeline`. Its is not needed if job is being created using `pipeline_run`.<br/>
`description`: description of the job </br>
`inputs`: (list[Node]) input for the job. This can be uid of an asset specified using asset_uid parameter of Node object
        or it can be uid of another job specified using job_uid parameter of Node object.</br>
`outputs`: (list[Node]) output for the job.This can be uid of an asset specified using asset_uid parameter of Node object
        or it can be uid of another job specified using job_uid parameter of Node object.</br>
`meta`: Metadata of the Job</br>
`context`: context of the job</br>
`bounded_by_span`: (Boolean) This has to be set to True if the job has to be bounded with a span. Default value is false. It is an optional parameter.
`span_uid`: (String) This is uid of new span to be created. This is a mandatory parameter if bounded_by_span is set to True. <br/>
`with_explicit_time`: An optional boolean parameter used when a job is bounded by a span.
- If set to True, the child span will be started at the specified time provided in the subsequent events.
- If not set, the span will be automatically started with the current time at the moment of creation.

```python
from acceldata_sdk.torch_client import TorchClient
from acceldata_sdk.models.job import CreateJob, JobMetadata, Node
from acceldata_sdk.models.pipeline import CreatePipeline, PipelineMetadata, PipelineRunResult, PipelineRunStatus

# Create pipeline
pipeline = CreatePipeline(
    uid='monthly_reporting_pipeline',
    name='Monthly reporting Pipeline',
    description='Pipeline to create monthly reporting tables',
    meta=PipelineMetadata('Vaishvik', 'acceldata_sdk_code', '...'),
    context={'key1': 'value1'}
)
torch_client = TorchClient(url="https://torch.acceldata.local", access_key="*******",
                          secret_key="******************************",do_version_check=False)
pipeline_response = torch_client.create_pipeline(pipeline=pipeline)
pipeline_run = pipeline_response.create_pipeline_run()

# Create a job using pipeline object.
# Passing of pipeline_run_id is mandatory
job = CreateJob(
    uid='monthly_sales_aggregate',
    name='Monthly Sales Aggregate',
    description='Generates the monthly sales aggregate tables for the complete year',
    inputs=[Node(asset_uid='datasource-name.database.schema.table_1')],
    outputs=[Node(job_uid='job2_uid')],
    meta=JobMetadata('vaishvik', 'backend', 'https://github.com/'),
    context={'key21': 'value21'},
    bounded_by_span=True,
    pipeline_run_id=pipeline_run.id,
    span_uid="test_shubh"
)
job_response = pipeline_response.create_job(job)

# Create a job using pipeline_run object.
# Passing of pipeline_run_id is not needed
job = CreateJob(
        uid='monthly_sales_aggregate',
        name='Monthly Sales Aggregate',
        description='Generates the monthly sales aggregate tables for the complete year',
        inputs=[Node(asset_uid='datasource-name.database.schema.table_1')],
        outputs=[Node(job_uid='job2_uid')],
        meta=JobMetadata('vaishvik', 'backend', 'https://github.com/'),
        context={'key21': 'value21'}
)
job_response_using_run = pipeline_run.create_job(job)
```
##### Create Pipeline Run And Generate Spans And Send Span Events

Pipeline run indicates the execution of the pipeline. The same pipeline can be executed multiple times and each execution (run) has new snapshot version. Each pipeline run has hierarchical span's group. A `Span` is a way to group a bunch of metrics, and they are hierarchical. It can be as granular as possible. The APIs will support creating a span object from a pipeline object, and then hierarchical spans are started from parent spans. A Span typically encompasses a process or a task and can be granular. This hierarchical system is powerful enough to model extremely complex pipeline observability flows. Optionally, a span can also be associated with a Job. This way, we can track starting and completion of Job, including the failure tracking. Start and stop are implicitly tracked for a span.

Acceldata sdk also has support for create new pipeline run, add spans in it. During the span life cycle, sdk is able to send some customs and standard span events to collect pipeline run metrics for observability.

Params for `create_span` function which is available under a `pipeline_run`

`uid`: uid of the span being created. This should be unique. This is a mandatory parameter.<br/>
`associatedJobUids`: List of job uids with which the span needs to be associated with.<br/>
`context_data`: This is dict of key-value pair providing custom context information related to a span.<br/>

Params for `create_child_span` function which is available under `span_context`. This is used to create hierarchy of span by creating a span under another span

`uid`: uid of the span being created. This should be unique. This is a mandatory parameter.<br/>
`context_data`: This is dict of key-value pair providing custom context information related to a span.<br/>
`associatedJobUids`: List of job uids with which the span needs to be associated with.
```python

from acceldata_sdk.events.generic_event import GenericEvent
from datetime import datetime

# create a pipeline run of the pipeline
pipeline_run = pipeline_response.create_pipeline_run()

# get root span of a pipeline run
root_span = pipeline_run.get_root_span()

# create span in the pipeline run
span_context = pipeline_run.create_span(uid='monthly.generate.data.span')

# check current span is root or not
span_context.is_root()

# end the span 
span_context.end()

# check if the current span has children or not
span_context.has_children()

# create a child span
child_span_context = span_context.create_child_span('monthly.generate.customer.span')

# send custom event
child_span_context.send_event(
    GenericEvent(context_data={'client_time': str(datetime.now()), 'row_count': 100}, 
                 event_uid="order.customer.join.result")
)


# abort span
child_span_context.abort()

# failed span
child_span_context.failed()

# update a pipeline run of the pipeline
updatePipelineRunRes = pipeline_run.update_pipeline_run(context_data={'key1': 'value2', 'name': 'backend'},
                                                               result=PipelineRunResult.SUCCESS,
                                                               status=PipelineRunStatus.COMPLETED)

```

##### Get Latest Pipeline Run
Acceldata sdk can get the latest pipeline run of the pipeline. With use of the latest pipeline run instance, user can continue ETL pipeline and add spans, jobs, events too. Hence, Acceldata sdk has complete access on the torch pipeline service.
Params for `get_pipeline`:

`pipeline_identity`: String parameter used to filter pipeline. It can be either id or uid of the pipeline.

```python
pipeline = torch_client.get_pipeline('monthly.reporting.pipeline')
pipeline_run = pipeline.get_latest_pipeline_run()

```
##### Get Pipeline Run with a particular pipeline run id
Acceldata sdk can get a pipeline run of the pipeline with a particular pipeline run id. With use of the pipeline run 
instance, user can continue ETL pipeline and add spans, jobs, events too. Hence, Acceldata sdk has complete access on the torch pipeline service.

Params for `get_pipeline_run`:

`pipeline_run_id`: run id of the pipeline run<br/>
`continuation_id`: continuation id of the pipeline run<br/>
`pipeline_id`: id of the pipeline to which the run belongs to<br/>
```python
pipeline_run = torch_client.get_pipeline_run(pipeline_run_id=pipeline_run_id)
pipeline = torch_client.get_pipeline(pipeline_id=pipeline_id)
pipeline_run = torch_client.get_pipeline_run(continuation_id=continuation_id, pipeline_id=pipeline.id)
pipeline_run = pipeline.get_run(continuation_id=continuation_id)
```

##### Get Pipeline details for a particular pipeline run id
Acceldata sdk can get Pipeline details for a particular pipeline run.

```python
pipeline_details = pipeline_run.get_details()
```
##### Get all spans for a particular pipeline run id
Acceldata sdk can get all spans for a particular pipeline run id.

```python
pipeline_run_spans = pipeline_run.get_spans()
```
##### Get Pipeline Runs for a pipeline
Acceldata sdk can get all pipeline runs.
Params for `get_pipeline_runs`:

`pipeline_id`: id of the pipeline
```python
runs = torch_client.get_pipeline_runs(pipeline_id)
runs = pipeline.get_runs()
```

##### Get all Pipelines
Acceldata sdk can get all pipelines.

```python
pipelines = torch_client.get_pipelines()
```

##### Delete a Pipeline
Acceldata sdk can delete a pipeline.
```python
delete_response = pipeline.delete()
```

##### Execute policy synchronously and asynchronously
Acceldata sdk provides utility function `execute_policy` to execute policies synchronously and asynchronously. This will return an object on which `get_result` and `get_status` can be called to get result and status of the execution respectively.

Params for `execute_policy`:

`sync`: Boolean parameter used to decide if the policy should be executed synchronously or asynchronously. It is a mandatory parameter. If its is set to  `True` it will return only after the execution ends. If it is set to `False` it will return immediately after starting the execution.

`policy_type`: Enum parameter used to specify the policy type. It is a mandatory parameter. It is a enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.

`policy_id`: String parameter used to specify the policy id to be executed. It is a mandatory parameter. 

`incremental`: Boolean parameter used to specify if the policy execution should be incremental or full. Default value is False.

`pipeline_run_id`: Long parameter used to specify Run id of the pipeline run where the policy is being executed. This can
be used to link the policy execution with a particular pipeline run.

`failure_strategy`: Enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.

* `failure_strategy` takes enum of type `FailureStrategy` which can have 3 values DoNotFail, FailOnError and FailOnWarning.

* DoNotFail will never throw. In case of failure it will log the error.
* FailOnError will Throw exception only if it's an error. In case of warning it return without any errors.
* FailOnWarning will Throw exception on warning as well as error.

To get the execution result we can call `get_policy_execution_result` on torch_client or call `get_result` on execution object which will return a result object.

Params for `get_policy_execution_result`:

`policy_type`: Enum parameter used to specify the policy type. It is a mandatory parameter. It is a enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.

`execution_id`: String parameter used to specify the execution id to be queried for rsult. It is a mandatory parameter. 

`failure_strategy`: Enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.

Params for `get_result`:

`failure_strategy`: Enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.

To get the current status we can call `get_policy_status` on torch_client or call `get_status` on execution object which will get the current `resultStatus` of the execution. 

params for `get_policy_status` :
`policy_type`: Enum parameter used to specify the policy type. It is a mandatory parameter. It is a enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.

`execution_id`: String parameter used to specify the execution id to be queried for rsult. It is a mandatory parameter. 

`get_status` does not take any parameter.

Asynchronous execution example
```python
from acceldata_sdk.torch_client import TorchClient
import acceldata_sdk.constants as const
torch_credentials = {
    'url':  'https://torch.acceldata.local:5443/torch',
    'access_key':'PJSAJALFHSHU',
    'secret_key': 'E6LLJHKGSHJJTRHGK540E5',
    'do_version_check': 'True'
}
torch_client = TorchClient(**torch_credentials)
async_executor = torch_client.execute_policy(const.PolicyType.DATA_QUALITY, 46, sync=False, failure_strategy=const.FailureStrategy.DoNotFail, pipeline_run_id=None)
# Wait for execution to get final result
execution_result = async_executor.get_result(failure_strategy=const.FailureStrategy.DoNotFail)
# Get the current status
execution_status = async_executor.get_status()
```

Synchronous execution example.
```python
from acceldata_sdk.torch_client import TorchClient
import acceldata_sdk.constants as const
torch_credentials = {
    'url':  'https://torch.acceldata.local:5443/torch',
    'access_key':'PJSAJALFHSHU',
    'secret_key': 'E6LLJHKGSHJJTRHGK540E5',
    'do_version_check': 'True'
}
torch_client = TorchClient(**torch_credentials)
# This will wait for execution to get final result
sync_executor = torch_client.execute_policy(const.PolicyType.DATA_QUALITY, 46, sync=True, failure_strategy=const.FailureStrategy.DoNotFail, pipeline_run_id=None)
# Wait for execution to get final result
execution_result = sync_executor.get_result(failure_strategy=const.FailureStrategy.DoNotFail)
# Get the current status
execution_status = sync_executor.get_status()
```

Cancel execution example.
```python
execution_result = sync_executor.cancel()
```


Example of continuing the same pipeline run across multiple ETL scripts using continuation_id

ETL1 - Here a new pipeline_run is created using a continuation_id but pipeline_run is not closed
```python
from acceldata_sdk.torch_client import TorchClient
from acceldata_sdk.models.pipeline import CreatePipeline, PipelineMetadata, PipelineRunResult, PipelineRunStatus

# Create pipeline
pipeline_uid = 'monthly_reporting_pipeline'
pipeline = CreatePipeline(
    uid=pipeline_uid,
    name='Monthly reporting Pipeline',
    description='Pipeline to create monthly reporting tables',
    meta=PipelineMetadata('Vaishvik', 'acceldata_sdk_code', '...'),
    context={'key1': 'value1'}
)
torch_client = TorchClient(url="https://torch.acceldata.local", access_key="*******",
                          secret_key="******************************",do_version_check=False)
pipeline_response = torch_client.create_pipeline(pipeline=pipeline)

# A new continuation id should be generated on every run. Same continuation id cannot be reused.
cont_id = "continuationid_demo_1"
pipeline_run = pipeline_response.create_pipeline_run(continuation_id=cont_id)

# Make sure pipeline_run is not ended using the update_pipeline_run call so that same run can be used in next ETL script
```


ETL2 - This script will continue the same pipeline run from ETL1
```python
from acceldata_sdk.torch_client import TorchClient
from acceldata_sdk.models.pipeline import PipelineRunResult, PipelineRunStatus

torch_client = TorchClient(url="https://torch.acceldata.local", access_key="*******",
                          secret_key="******************************",do_version_check=False)

pipeline_uid = 'monthly_reporting_pipeline'
# First get the same pipeline using the previously used UID. Then we will get the previously started pipeline_run using the continuation_id
pipeline = torch_client.get_pipeline(pipeline_uid)

# continuation_id should be a same ID used in ETL1 script so that same pipeline_run is continued in the pipeline.
cont_id = "continuationid_demo_1"
pipeline_run = pipeline.get_run(continuation_id=cont_id)
# Use this pipeline run to create span and jobs
# At the end of this script close the pipeline run using update_pipeline_run if we do not want to continue the same pipeline_run further
updatePipelineRunRes = pipeline_run.update_pipeline_run(context_data={'key1': 'value2', 'name': 'backend'},
                                                               result=PipelineRunResult.SUCCESS,
                                                               status=PipelineRunStatus.COMPLETED)

```



# Datasource APIs

Acceldata SDK has full access on catalog APIs as well. 

##### Datasource API
Torch has support for more 15+ datasource crawling support. 

```python
# Get datasource
ds_res = torch_client.get_datasource('snowflake_ds_local')
ds_res = torch_client.get_datasource(5, properties=True)

# Get datasources based on type
datasources = torch_client.get_datasources(const.AssetSourceType.SNOWFLAKE)

```


##### Assets APIs
Acceldata sdk has methods to get assets in the given datasource.
```python
from acceldata_sdk.models.create_asset import AssetMetadata

# Get asset by id/uid
asset = torchclient.get_asset(1)
asset = torch_client.get_asset('Feature_bag_datasource.feature_1')
```
##### Asset's tags, labels, metadata and sample data
User can add tags, labels custom metadata and also get sample data of the asset using sdk.
Tags and labels can be used to filter out asset easily.

```python
# asset metadata
from acceldata_sdk.models.tags import AssetLabel, CustomAssetMetadata
asset = torch_client.get_asset(asset_id)

# Get metadata of an asset
asset.get_metadata()

# Get all tags
tags = asset.get_tags()

# Add tag asset
tag_add = asset.add_tag(tag='asset_tag')

# Add asset labels
labels = asset.add_labels(labels=[AssetLabel('test1', 'demo1'), AssetLabel('test2', 'demo2')])

# Get asset labels
labels = asset.get_labels()

# Add custom metadata
asset.add_custom_metadata(custom_metadata=[CustomAssetMetadata('testcm1', 'democm1'), CustomAssetMetadata('testcm2', 'democm2')])
```

##### Crawler Operations
User can start crawler as well as check for running crawler status.
```python
# Start a crawler
datasource.start_crawler()
torch_client.start_crawler('datasource_name')

# Get running crawler status
datasource.get_crawler_status()
torch_client.get_crawler_status('datasource_name')

```

##### Trigger policies, Profiling and sampling of an asset
Crawled assets can be profiled and sampled with use of spark jobs running on the livy. 
Furthermore, Created policies (Recon + DQ) can be triggered too.

```python
import acceldata_sdk.constants as const

# profile an asset, get profile req details, cancel profile
profile_res = asset.start_profile(profiling_type=ProfilingType.FULL)

profile_req_details = profile_res.get_status()

cancel_profile_res = profile_res.cancel()

profile_res = asset.get_latest_profile_status()

profile_req_details_by_req_id = torch_client.get_profile_status(asset_id=profile_req_details.assetId,
                                                                req_id=profile_req_details.id)

# sample data
sample_data = asset.sample_data()

# Rule execution and status
# Execute policy
execute_dq_rule = torch_client.execute_policy(const.PolicyType.DATA_QUALITY, 1114, incremental=False)
failure_strategy = const.FailureStrategy.DoNotFail
# Get policy execution result
result = torch_client.get_policy_execution_result(
    policy_type=const.PolicyType.DATA_QUALITY,
    execution_id=execute_dq_rule.id,
    failure_strategy=failure_strategy
)

# Get policy and execute
from acceldata_sdk.models.ruleExecutionResult import RuleType, PolicyFilter

rule = torch_client.get_policy(const.PolicyType.RECONCILIATION, "auth001_reconciliation")

# Execute policy
async_execution = rule.execute(sync=False)
# Get execution result
async_execution_result = async_execution.get_result()
# Get current execution status
async_execution_status = async_execution.get_status()
# Cancel policy execution job
cancel_rule = async_execution.cancel()

# List all executions
# List executions by id
dq_rule_executions = torch_client.policy_executions(1114, RuleType.DATA_QUALITY)
# List executions by name
dq_rule_executions = torch_client.policy_executions('dq-scala', RuleType.DATA_QUALITY)

# List executions by rule
recon_rule_executions = rule.get_executions()
filter = PolicyFilter(policyType=RuleType.RECONCILIATION, enable=True)
# List all rules
recon_rules = torch_client.list_all_policies(filter=filter)
```

Version Log
==========

3.2.0 (14/03/2024)
-------------------
- Implemented profile response handling fixes


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "acceldata-sdk",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "acceldata-sdk",
    "author": "acceldata",
    "author_email": "apisupport@acceldata.io",
    "download_url": "https://files.pythonhosted.org/packages/82/7b/f45b660fe7485777fb7a874886c6f5d13184b5fdca8eb50a797ca318915d/acceldata_sdk-3.2.0.tar.gz",
    "platform": null,
    "description": "# Pipeline APIs\n\nAcceldata Torch is a complete solution to observe the quality of the data present in your data lake and warehouse. Using Torch, you can ensure that high-quality data backs your business decisions. Torch provides you with tools to measure the quality of data in a data catalog and to never miss significant data sources. All users including analysts, data scientists, and developers, can rely on Torch to observe the data flowing in the warehouse or data lake and can rest assured that there is no loss of data. \n<br />\nAcceldata SDK is used to trigger torch catalog and pipeline APIs. By creating a Torch client, all the torch apis can be accessed. \n\nInstall `acceldata-sdk` pypi package in a python environment.\n```bash\npip install acceldata-sdk\n```\n\n## Create Torch Client\nTorch client is used to send data to the torch servers. It consists of various methods to communicate with the torch server. Torch client have access to catalog and pipeline APIs. To create a torch client, torch url and API keys are required. To create torch API keys, go to torch ui\u2019s settings and generate keys for the client.\n\nWhile creating a TorchClient connection to torch by default version compatibility checks between torch and sdk is enabled. If we want we can disable that check by passing `do_version_check` as `False.\n\n\n```python\nfrom acceldata_sdk.torch_client import TorchClient\n\ntorch_client = TorchClient(url='https://acceldata.host.dev:9999', access_key='******',\n                         secret_key='*****************', do_version_check=True)\n\n```\n\n## Pipeline API \nThere are various pipeline APIs are supported through Acceldata SDK. Pipeline APIs like create pipeline, add jobs and spans, initiate pipeline run et cetera. Acceldata sdk is able to send various event during span life cycle. Hence, Acceldata sdk has full control over the pipelines.\n\n##### Create Pipeline And Job and span to bound the job\n`Pipeline` represents the ETL pipeline in its entirety and will contain Asset nodes and Jobs associated. The complete pipeline definition forms the Lineage graph for all the data assets.\n</br>\n`Job Node` or `Process Node` represents an entity that does some job in the ETL workflow. From this representation, `Job\u2019s input` is some assets or some other Jobs, and output is few other assets or few other Jobs.\nTorch will use the set of Jobs definition in the workflow to create the Lineage, and also track version changes for the Pipeline.\n\nAcceldata sdk provides `CreateJob` class which need to be passed to `create_job` function as a parameter to create a job.\n\nParams for `CreateJob`:\n\n`uid`: uid of the job. It should be unique for the job. It is a mandatory parameter.<br/>\n`name`: name of the job. It is a mandatory parameter.\n\n#### NOTE: This changed in 2.4.1 release\n\n`pipeline_run_id`: id of the pipeline_run for which you want to add a job. It is a mandatory parameter if `job` is being created using `pipeline`. Its is not needed if job is being created using `pipeline_run`.<br/>\n`description`: description of the job </br>\n`inputs`: (list[Node]) input for the job. This can be uid of an asset specified using asset_uid parameter of Node object\n        or it can be uid of another job specified using job_uid parameter of Node object.</br>\n`outputs`: (list[Node]) output for the job.This can be uid of an asset specified using asset_uid parameter of Node object\n        or it can be uid of another job specified using job_uid parameter of Node object.</br>\n`meta`: Metadata of the Job</br>\n`context`: context of the job</br>\n`bounded_by_span`: (Boolean) This has to be set to True if the job has to be bounded with a span. Default value is false. It is an optional parameter.\n`span_uid`: (String) This is uid of new span to be created. This is a mandatory parameter if bounded_by_span is set to True. <br/>\n`with_explicit_time`: An optional boolean parameter used when a job is bounded by a span.\n- If set to True, the child span will be started at the specified time provided in the subsequent events.\n- If not set, the span will be automatically started with the current time at the moment of creation.\n\n```python\nfrom acceldata_sdk.torch_client import TorchClient\nfrom acceldata_sdk.models.job import CreateJob, JobMetadata, Node\nfrom acceldata_sdk.models.pipeline import CreatePipeline, PipelineMetadata, PipelineRunResult, PipelineRunStatus\n\n# Create pipeline\npipeline = CreatePipeline(\n    uid='monthly_reporting_pipeline',\n    name='Monthly reporting Pipeline',\n    description='Pipeline to create monthly reporting tables',\n    meta=PipelineMetadata('Vaishvik', 'acceldata_sdk_code', '...'),\n    context={'key1': 'value1'}\n)\ntorch_client = TorchClient(url=\"https://torch.acceldata.local\", access_key=\"*******\",\n                          secret_key=\"******************************\",do_version_check=False)\npipeline_response = torch_client.create_pipeline(pipeline=pipeline)\npipeline_run = pipeline_response.create_pipeline_run()\n\n# Create a job using pipeline object.\n# Passing of pipeline_run_id is mandatory\njob = CreateJob(\n    uid='monthly_sales_aggregate',\n    name='Monthly Sales Aggregate',\n    description='Generates the monthly sales aggregate tables for the complete year',\n    inputs=[Node(asset_uid='datasource-name.database.schema.table_1')],\n    outputs=[Node(job_uid='job2_uid')],\n    meta=JobMetadata('vaishvik', 'backend', 'https://github.com/'),\n    context={'key21': 'value21'},\n    bounded_by_span=True,\n    pipeline_run_id=pipeline_run.id,\n    span_uid=\"test_shubh\"\n)\njob_response = pipeline_response.create_job(job)\n\n# Create a job using pipeline_run object.\n# Passing of pipeline_run_id is not needed\njob = CreateJob(\n        uid='monthly_sales_aggregate',\n        name='Monthly Sales Aggregate',\n        description='Generates the monthly sales aggregate tables for the complete year',\n        inputs=[Node(asset_uid='datasource-name.database.schema.table_1')],\n        outputs=[Node(job_uid='job2_uid')],\n        meta=JobMetadata('vaishvik', 'backend', 'https://github.com/'),\n        context={'key21': 'value21'}\n)\njob_response_using_run = pipeline_run.create_job(job)\n```\n##### Create Pipeline Run And Generate Spans And Send Span Events\n\nPipeline run indicates the execution of the pipeline. The same pipeline can be executed multiple times and each execution (run) has new snapshot version. Each pipeline run has hierarchical span's group. A `Span` is a way to group a bunch of metrics, and they are hierarchical. It can be as granular as possible. The APIs will support creating a span object from a pipeline object, and then hierarchical spans are started from parent spans. A Span typically encompasses a process or a task and can be granular. This hierarchical system is powerful enough to model extremely complex pipeline observability flows. Optionally, a span can also be associated with a Job. This way, we can track starting and completion of Job, including the failure tracking. Start and stop are implicitly tracked for a span.\n\nAcceldata sdk also has support for create new pipeline run, add spans in it. During the span life cycle, sdk is able to send some customs and standard span events to collect pipeline run metrics for observability.\n\nParams for `create_span` function which is available under a `pipeline_run`\n\n`uid`: uid of the span being created. This should be unique. This is a mandatory parameter.<br/>\n`associatedJobUids`: List of job uids with which the span needs to be associated with.<br/>\n`context_data`: This is dict of key-value pair providing custom context information related to a span.<br/>\n\nParams for `create_child_span` function which is available under `span_context`. This is used to create hierarchy of span by creating a span under another span\n\n`uid`: uid of the span being created. This should be unique. This is a mandatory parameter.<br/>\n`context_data`: This is dict of key-value pair providing custom context information related to a span.<br/>\n`associatedJobUids`: List of job uids with which the span needs to be associated with.\n```python\n\nfrom acceldata_sdk.events.generic_event import GenericEvent\nfrom datetime import datetime\n\n# create a pipeline run of the pipeline\npipeline_run = pipeline_response.create_pipeline_run()\n\n# get root span of a pipeline run\nroot_span = pipeline_run.get_root_span()\n\n# create span in the pipeline run\nspan_context = pipeline_run.create_span(uid='monthly.generate.data.span')\n\n# check current span is root or not\nspan_context.is_root()\n\n# end the span \nspan_context.end()\n\n# check if the current span has children or not\nspan_context.has_children()\n\n# create a child span\nchild_span_context = span_context.create_child_span('monthly.generate.customer.span')\n\n# send custom event\nchild_span_context.send_event(\n    GenericEvent(context_data={'client_time': str(datetime.now()), 'row_count': 100}, \n                 event_uid=\"order.customer.join.result\")\n)\n\n\n# abort span\nchild_span_context.abort()\n\n# failed span\nchild_span_context.failed()\n\n# update a pipeline run of the pipeline\nupdatePipelineRunRes = pipeline_run.update_pipeline_run(context_data={'key1': 'value2', 'name': 'backend'},\n                                                               result=PipelineRunResult.SUCCESS,\n                                                               status=PipelineRunStatus.COMPLETED)\n\n```\n\n##### Get Latest Pipeline Run\nAcceldata sdk can get the latest pipeline run of the pipeline. With use of the latest pipeline run instance, user can continue ETL pipeline and add spans, jobs, events too. Hence, Acceldata sdk has complete access on the torch pipeline service.\nParams for `get_pipeline`:\n\n`pipeline_identity`: String parameter used to filter pipeline. It can be either id or uid of the pipeline.\n\n```python\npipeline = torch_client.get_pipeline('monthly.reporting.pipeline')\npipeline_run = pipeline.get_latest_pipeline_run()\n\n```\n##### Get Pipeline Run with a particular pipeline run id\nAcceldata sdk can get a pipeline run of the pipeline with a particular pipeline run id. With use of the pipeline run \ninstance, user can continue ETL pipeline and add spans, jobs, events too. Hence, Acceldata sdk has complete access on the torch pipeline service.\n\nParams for `get_pipeline_run`:\n\n`pipeline_run_id`: run id of the pipeline run<br/>\n`continuation_id`: continuation id of the pipeline run<br/>\n`pipeline_id`: id of the pipeline to which the run belongs to<br/>\n```python\npipeline_run = torch_client.get_pipeline_run(pipeline_run_id=pipeline_run_id)\npipeline = torch_client.get_pipeline(pipeline_id=pipeline_id)\npipeline_run = torch_client.get_pipeline_run(continuation_id=continuation_id, pipeline_id=pipeline.id)\npipeline_run = pipeline.get_run(continuation_id=continuation_id)\n```\n\n##### Get Pipeline details for a particular pipeline run id\nAcceldata sdk can get Pipeline details for a particular pipeline run.\n\n```python\npipeline_details = pipeline_run.get_details()\n```\n##### Get all spans for a particular pipeline run id\nAcceldata sdk can get all spans for a particular pipeline run id.\n\n```python\npipeline_run_spans = pipeline_run.get_spans()\n```\n##### Get Pipeline Runs for a pipeline\nAcceldata sdk can get all pipeline runs.\nParams for `get_pipeline_runs`:\n\n`pipeline_id`: id of the pipeline\n```python\nruns = torch_client.get_pipeline_runs(pipeline_id)\nruns = pipeline.get_runs()\n```\n\n##### Get all Pipelines\nAcceldata sdk can get all pipelines.\n\n```python\npipelines = torch_client.get_pipelines()\n```\n\n##### Delete a Pipeline\nAcceldata sdk can delete a pipeline.\n```python\ndelete_response = pipeline.delete()\n```\n\n##### Execute policy synchronously and asynchronously\nAcceldata sdk provides utility function `execute_policy` to execute policies synchronously and asynchronously. This will return an object on which `get_result` and `get_status` can be called to get result and status of the execution respectively.\n\nParams for `execute_policy`:\n\n`sync`: Boolean parameter used to decide if the policy should be executed synchronously or asynchronously. It is a mandatory parameter. If its is set to  `True` it will return only after the execution ends. If it is set to `False` it will return immediately after starting the execution.\n\n`policy_type`: Enum parameter used to specify the policy type. It is a mandatory parameter. It is a enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.\n\n`policy_id`: String parameter used to specify the policy id to be executed. It is a mandatory parameter. \n\n`incremental`: Boolean parameter used to specify if the policy execution should be incremental or full. Default value is False.\n\n`pipeline_run_id`: Long parameter used to specify Run id of the pipeline run where the policy is being executed. This can\nbe used to link the policy execution with a particular pipeline run.\n\n`failure_strategy`: Enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.\n\n* `failure_strategy` takes enum of type `FailureStrategy` which can have 3 values DoNotFail, FailOnError and FailOnWarning.\n\n* DoNotFail will never throw. In case of failure it will log the error.\n* FailOnError will Throw exception only if it's an error. In case of warning it return without any errors.\n* FailOnWarning will Throw exception on warning as well as error.\n\nTo get the execution result we can call `get_policy_execution_result` on torch_client or call `get_result` on execution object which will return a result object.\n\nParams for `get_policy_execution_result`:\n\n`policy_type`: Enum parameter used to specify the policy type. It is a mandatory parameter. It is a enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.\n\n`execution_id`: String parameter used to specify the execution id to be queried for rsult. It is a mandatory parameter. \n\n`failure_strategy`: Enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.\n\nParams for `get_result`:\n\n`failure_strategy`: Enum parameter used to decide the behaviour in case of failure. Default value is DoNotFail.\n\nTo get the current status we can call `get_policy_status` on torch_client or call `get_status` on execution object which will get the current `resultStatus` of the execution. \n\nparams for `get_policy_status` :\n`policy_type`: Enum parameter used to specify the policy type. It is a mandatory parameter. It is a enum which will take values from constants as PolicyType.DATA_QUALITY or PolicyType.RECONCILIATION.\n\n`execution_id`: String parameter used to specify the execution id to be queried for rsult. It is a mandatory parameter. \n\n`get_status` does not take any parameter.\n\nAsynchronous execution example\n```python\nfrom acceldata_sdk.torch_client import TorchClient\nimport acceldata_sdk.constants as const\ntorch_credentials = {\n    'url':  'https://torch.acceldata.local:5443/torch',\n    'access_key':'PJSAJALFHSHU',\n    'secret_key': 'E6LLJHKGSHJJTRHGK540E5',\n    'do_version_check': 'True'\n}\ntorch_client = TorchClient(**torch_credentials)\nasync_executor = torch_client.execute_policy(const.PolicyType.DATA_QUALITY, 46, sync=False, failure_strategy=const.FailureStrategy.DoNotFail, pipeline_run_id=None)\n# Wait for execution to get final result\nexecution_result = async_executor.get_result(failure_strategy=const.FailureStrategy.DoNotFail)\n# Get the current status\nexecution_status = async_executor.get_status()\n```\n\nSynchronous execution example.\n```python\nfrom acceldata_sdk.torch_client import TorchClient\nimport acceldata_sdk.constants as const\ntorch_credentials = {\n    'url':  'https://torch.acceldata.local:5443/torch',\n    'access_key':'PJSAJALFHSHU',\n    'secret_key': 'E6LLJHKGSHJJTRHGK540E5',\n    'do_version_check': 'True'\n}\ntorch_client = TorchClient(**torch_credentials)\n# This will wait for execution to get final result\nsync_executor = torch_client.execute_policy(const.PolicyType.DATA_QUALITY, 46, sync=True, failure_strategy=const.FailureStrategy.DoNotFail, pipeline_run_id=None)\n# Wait for execution to get final result\nexecution_result = sync_executor.get_result(failure_strategy=const.FailureStrategy.DoNotFail)\n# Get the current status\nexecution_status = sync_executor.get_status()\n```\n\nCancel execution example.\n```python\nexecution_result = sync_executor.cancel()\n```\n\n\nExample of continuing the same pipeline run across multiple ETL scripts using continuation_id\n\nETL1 - Here a new pipeline_run is created using a continuation_id but pipeline_run is not closed\n```python\nfrom acceldata_sdk.torch_client import TorchClient\nfrom acceldata_sdk.models.pipeline import CreatePipeline, PipelineMetadata, PipelineRunResult, PipelineRunStatus\n\n# Create pipeline\npipeline_uid = 'monthly_reporting_pipeline'\npipeline = CreatePipeline(\n    uid=pipeline_uid,\n    name='Monthly reporting Pipeline',\n    description='Pipeline to create monthly reporting tables',\n    meta=PipelineMetadata('Vaishvik', 'acceldata_sdk_code', '...'),\n    context={'key1': 'value1'}\n)\ntorch_client = TorchClient(url=\"https://torch.acceldata.local\", access_key=\"*******\",\n                          secret_key=\"******************************\",do_version_check=False)\npipeline_response = torch_client.create_pipeline(pipeline=pipeline)\n\n# A new continuation id should be generated on every run. Same continuation id cannot be reused.\ncont_id = \"continuationid_demo_1\"\npipeline_run = pipeline_response.create_pipeline_run(continuation_id=cont_id)\n\n# Make sure pipeline_run is not ended using the update_pipeline_run call so that same run can be used in next ETL script\n```\n\n\nETL2 - This script will continue the same pipeline run from ETL1\n```python\nfrom acceldata_sdk.torch_client import TorchClient\nfrom acceldata_sdk.models.pipeline import PipelineRunResult, PipelineRunStatus\n\ntorch_client = TorchClient(url=\"https://torch.acceldata.local\", access_key=\"*******\",\n                          secret_key=\"******************************\",do_version_check=False)\n\npipeline_uid = 'monthly_reporting_pipeline'\n# First get the same pipeline using the previously used UID. Then we will get the previously started pipeline_run using the continuation_id\npipeline = torch_client.get_pipeline(pipeline_uid)\n\n# continuation_id should be a same ID used in ETL1 script so that same pipeline_run is continued in the pipeline.\ncont_id = \"continuationid_demo_1\"\npipeline_run = pipeline.get_run(continuation_id=cont_id)\n# Use this pipeline run to create span and jobs\n# At the end of this script close the pipeline run using update_pipeline_run if we do not want to continue the same pipeline_run further\nupdatePipelineRunRes = pipeline_run.update_pipeline_run(context_data={'key1': 'value2', 'name': 'backend'},\n                                                               result=PipelineRunResult.SUCCESS,\n                                                               status=PipelineRunStatus.COMPLETED)\n\n```\n\n\n\n# Datasource APIs\n\nAcceldata SDK has full access on catalog APIs as well. \n\n##### Datasource API\nTorch has support for more 15+ datasource crawling support. \n\n```python\n# Get datasource\nds_res = torch_client.get_datasource('snowflake_ds_local')\nds_res = torch_client.get_datasource(5, properties=True)\n\n# Get datasources based on type\ndatasources = torch_client.get_datasources(const.AssetSourceType.SNOWFLAKE)\n\n```\n\n\n##### Assets APIs\nAcceldata sdk has methods to get assets in the given datasource.\n```python\nfrom acceldata_sdk.models.create_asset import AssetMetadata\n\n# Get asset by id/uid\nasset = torchclient.get_asset(1)\nasset = torch_client.get_asset('Feature_bag_datasource.feature_1')\n```\n##### Asset's tags, labels, metadata and sample data\nUser can add tags, labels custom metadata and also get sample data of the asset using sdk.\nTags and labels can be used to filter out asset easily.\n\n```python\n# asset metadata\nfrom acceldata_sdk.models.tags import AssetLabel, CustomAssetMetadata\nasset = torch_client.get_asset(asset_id)\n\n# Get metadata of an asset\nasset.get_metadata()\n\n# Get all tags\ntags = asset.get_tags()\n\n# Add tag asset\ntag_add = asset.add_tag(tag='asset_tag')\n\n# Add asset labels\nlabels = asset.add_labels(labels=[AssetLabel('test1', 'demo1'), AssetLabel('test2', 'demo2')])\n\n# Get asset labels\nlabels = asset.get_labels()\n\n# Add custom metadata\nasset.add_custom_metadata(custom_metadata=[CustomAssetMetadata('testcm1', 'democm1'), CustomAssetMetadata('testcm2', 'democm2')])\n```\n\n##### Crawler Operations\nUser can start crawler as well as check for running crawler status.\n```python\n# Start a crawler\ndatasource.start_crawler()\ntorch_client.start_crawler('datasource_name')\n\n# Get running crawler status\ndatasource.get_crawler_status()\ntorch_client.get_crawler_status('datasource_name')\n\n```\n\n##### Trigger policies, Profiling and sampling of an asset\nCrawled assets can be profiled and sampled with use of spark jobs running on the livy. \nFurthermore, Created policies (Recon + DQ) can be triggered too.\n\n```python\nimport acceldata_sdk.constants as const\n\n# profile an asset, get profile req details, cancel profile\nprofile_res = asset.start_profile(profiling_type=ProfilingType.FULL)\n\nprofile_req_details = profile_res.get_status()\n\ncancel_profile_res = profile_res.cancel()\n\nprofile_res = asset.get_latest_profile_status()\n\nprofile_req_details_by_req_id = torch_client.get_profile_status(asset_id=profile_req_details.assetId,\n                                                                req_id=profile_req_details.id)\n\n# sample data\nsample_data = asset.sample_data()\n\n# Rule execution and status\n# Execute policy\nexecute_dq_rule = torch_client.execute_policy(const.PolicyType.DATA_QUALITY, 1114, incremental=False)\nfailure_strategy = const.FailureStrategy.DoNotFail\n# Get policy execution result\nresult = torch_client.get_policy_execution_result(\n    policy_type=const.PolicyType.DATA_QUALITY,\n    execution_id=execute_dq_rule.id,\n    failure_strategy=failure_strategy\n)\n\n# Get policy and execute\nfrom acceldata_sdk.models.ruleExecutionResult import RuleType, PolicyFilter\n\nrule = torch_client.get_policy(const.PolicyType.RECONCILIATION, \"auth001_reconciliation\")\n\n# Execute policy\nasync_execution = rule.execute(sync=False)\n# Get execution result\nasync_execution_result = async_execution.get_result()\n# Get current execution status\nasync_execution_status = async_execution.get_status()\n# Cancel policy execution job\ncancel_rule = async_execution.cancel()\n\n# List all executions\n# List executions by id\ndq_rule_executions = torch_client.policy_executions(1114, RuleType.DATA_QUALITY)\n# List executions by name\ndq_rule_executions = torch_client.policy_executions('dq-scala', RuleType.DATA_QUALITY)\n\n# List executions by rule\nrecon_rule_executions = rule.get_executions()\nfilter = PolicyFilter(policyType=RuleType.RECONCILIATION, enable=True)\n# List all rules\nrecon_rules = torch_client.list_all_policies(filter=filter)\n```\n\nVersion Log\n==========\n\n3.2.0 (14/03/2024)\n-------------------\n- Implemented profile response handling fixes\n\n",
    "bugtrack_url": null,
    "license": "MIT License",
    "summary": "Acceldata SDK.",
    "version": "3.2.0",
    "project_urls": null,
    "split_keywords": [
        "acceldata-sdk"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "827bf45b660fe7485777fb7a874886c6f5d13184b5fdca8eb50a797ca318915d",
                "md5": "a89a0235480c4040a0d064c318f408a9",
                "sha256": "40ae7da1f888fa1127d83de6a192ffb65e6f960b8ae2f09d44acb07c60073672"
            },
            "downloads": -1,
            "filename": "acceldata_sdk-3.2.0.tar.gz",
            "has_sig": false,
            "md5_digest": "a89a0235480c4040a0d064c318f408a9",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 60220,
            "upload_time": "2024-03-21T11:23:12",
            "upload_time_iso_8601": "2024-03-21T11:23:12.125105Z",
            "url": "https://files.pythonhosted.org/packages/82/7b/f45b660fe7485777fb7a874886c6f5d13184b5fdca8eb50a797ca318915d/acceldata_sdk-3.2.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-21 11:23:12",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "acceldata-sdk"
}
        
Elapsed time: 0.22707s