# Tasks for AWS Step Functions
<!--BEGIN STABILITY BANNER-->---
![End-of-Support](https://img.shields.io/badge/End--of--Support-critical.svg?style=for-the-badge)
> AWS CDK v1 has reached End-of-Support on 2023-06-01.
> This package is no longer being updated, and users should migrate to AWS CDK v2.
>
> For more information on how to migrate, see the [*Migrating to AWS CDK v2* guide](https://docs.aws.amazon.com/cdk/v2/guide/migrating-v2.html).
---
<!--END STABILITY BANNER-->
[AWS Step Functions](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html) is a web service that enables you to coordinate the
components of distributed applications and microservices using visual workflows.
You build applications from individual components that each perform a discrete
function, or task, allowing you to scale and change applications quickly.
A [Task](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-task-state.html) state represents a single unit of work performed by a state machine.
All work in your state machine is performed by tasks.
This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aws-cdk) project.
## Table Of Contents
* [Tasks for AWS Step Functions](#tasks-for-aws-step-functions)
* [Table Of Contents](#table-of-contents)
* [Task](#task)
* [Paths](#paths)
* [InputPath](#inputpath)
* [OutputPath](#outputpath)
* [ResultPath](#resultpath)
* [Task parameters from the state JSON](#task-parameters-from-the-state-json)
* [Evaluate Expression](#evaluate-expression)
* [API Gateway](#api-gateway)
* [Call REST API Endpoint](#call-rest-api-endpoint)
* [Call HTTP API Endpoint](#call-http-api-endpoint)
* [AWS SDK](#aws-sdk)
* [Athena](#athena)
* [StartQueryExecution](#startqueryexecution)
* [GetQueryExecution](#getqueryexecution)
* [GetQueryResults](#getqueryresults)
* [StopQueryExecution](#stopqueryexecution)
* [Batch](#batch)
* [SubmitJob](#submitjob)
* [CodeBuild](#codebuild)
* [StartBuild](#startbuild)
* [DynamoDB](#dynamodb)
* [GetItem](#getitem)
* [PutItem](#putitem)
* [DeleteItem](#deleteitem)
* [UpdateItem](#updateitem)
* [ECS](#ecs)
* [RunTask](#runtask)
* [EC2](#ec2)
* [Fargate](#fargate)
* [EMR](#emr)
* [Create Cluster](#create-cluster)
* [Termination Protection](#termination-protection)
* [Terminate Cluster](#terminate-cluster)
* [Add Step](#add-step)
* [Cancel Step](#cancel-step)
* [Modify Instance Fleet](#modify-instance-fleet)
* [Modify Instance Group](#modify-instance-group)
* [EMR on EKS](#emr-on-eks)
* [Create Virtual Cluster](#create-virtual-cluster)
* [Delete Virtual Cluster](#delete-virtual-cluster)
* [Start Job Run](#start-job-run)
* [EKS](#eks)
* [Call](#call)
* [EventBridge](#eventbridge)
* [Put Events](#put-events)
* [Glue](#glue)
* [Glue DataBrew](#glue-databrew)
* [Lambda](#lambda)
* [SageMaker](#sagemaker)
* [Create Training Job](#create-training-job)
* [Create Transform Job](#create-transform-job)
* [Create Endpoint](#create-endpoint)
* [Create Endpoint Config](#create-endpoint-config)
* [Create Model](#create-model)
* [Update Endpoint](#update-endpoint)
* [SNS](#sns)
* [Step Functions](#step-functions)
* [Start Execution](#start-execution)
* [Invoke Activity](#invoke-activity)
* [SQS](#sqs)
## Task
A Task state represents a single unit of work performed by a state machine. In the
CDK, the exact work to be done is determined by a class that implements `IStepFunctionsTask`.
AWS Step Functions [integrates](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-service-integrations.html) with some AWS services so that you can call API
actions, and coordinate executions directly from the Amazon States Language in
Step Functions. You can directly call and pass parameters to the APIs of those
services.
## Paths
In the Amazon States Language, a [path](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-paths.html) is a string beginning with `$` that you
can use to identify components within JSON text.
Learn more about input and output processing in Step Functions [here](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-input-output-filtering.html)
### InputPath
Both `InputPath` and `Parameters` fields provide a way to manipulate JSON as it
moves through your workflow. AWS Step Functions applies the `InputPath` field first,
and then the `Parameters` field. You can first filter your raw input to a selection
you want using InputPath, and then apply Parameters to manipulate that input
further, or add new values. If you don't specify an `InputPath`, a default value
of `$` will be used.
The following example provides the field named `input` as the input to the `Task`
state that runs a Lambda function.
```python
# fn: lambda.Function
submit_job = tasks.LambdaInvoke(self, "Invoke Handler",
lambda_function=fn,
input_path="$.input"
)
```
### OutputPath
Tasks also allow you to select a portion of the state output to pass to the next
state. This enables you to filter out unwanted information, and pass only the
portion of the JSON that you care about. If you don't specify an `OutputPath`,
a default value of `$` will be used. This passes the entire JSON node to the next
state.
The [response](https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html#API_Invoke_ResponseSyntax) from a Lambda function includes the response from the function
as well as other metadata.
The following example assigns the output from the Task to a field named `result`
```python
# fn: lambda.Function
submit_job = tasks.LambdaInvoke(self, "Invoke Handler",
lambda_function=fn,
output_path="$.Payload.result"
)
```
### ResultSelector
You can use [`ResultSelector`](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector)
to manipulate the raw result of a Task, Map or Parallel state before it is
passed to [`ResultPath`](###ResultPath). For service integrations, the raw
result contains metadata in addition to the response payload. You can use
ResultSelector to construct a JSON payload that becomes the effective result
using static values or references to the raw result or context object.
The following example extracts the output payload of a Lambda function Task and combines
it with some static values and the state name from the context object.
```python
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke Handler",
lambda_function=fn,
result_selector={
"lambda_output": sfn.JsonPath.string_at("$.Payload"),
"invoke_request_id": sfn.JsonPath.string_at("$.SdkResponseMetadata.RequestId"),
"static_value": {
"foo": "bar"
},
"state_name": sfn.JsonPath.string_at("$.State.Name")
}
)
```
### ResultPath
The output of a state can be a copy of its input, the result it produces (for
example, output from a Task state’s Lambda function), or a combination of its
input and result. Use [`ResultPath`](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-resultpath.html) to control which combination of these is
passed to the state output. If you don't specify an `ResultPath`, a default
value of `$` will be used.
The following example adds the item from calling DynamoDB's `getItem` API to the state
input and passes it to the next state.
```python
# my_table: dynamodb.Table
tasks.DynamoPutItem(self, "PutItem",
item={
"MessageId": tasks.DynamoAttributeValue.from_string("message-id")
},
table=my_table,
result_path="$.Item"
)
```
⚠️ The `OutputPath` is computed after applying `ResultPath`. All service integrations
return metadata as part of their response. When using `ResultPath`, it's not possible to
merge a subset of the task output to the input.
## Task parameters from the state JSON
Most tasks take parameters. Parameter values can either be static, supplied directly
in the workflow definition (by specifying their values), or a value available at runtime
in the state machine's execution (either as its input or an output of a prior state).
Parameter values available at runtime can be specified via the `JsonPath` class,
using methods such as `JsonPath.stringAt()`.
The following example provides the field named `input` as the input to the Lambda function
and invokes it asynchronously.
```python
# fn: lambda.Function
submit_job = tasks.LambdaInvoke(self, "Invoke Handler",
lambda_function=fn,
payload=sfn.TaskInput.from_json_path_at("$.input"),
invocation_type=tasks.LambdaInvocationType.EVENT
)
```
You can also use [intrinsic functions](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-intrinsic-functions.html) available on `JsonPath`, for example `JsonPath.format()`.
Here is an example of starting an Athena query that is dynamically created using the task input:
```python
start_query_execution_job = tasks.AthenaStartQueryExecution(self, "Athena Start Query",
query_string=sfn.JsonPath.format("select contacts where year={};", sfn.JsonPath.string_at("$.year")),
query_execution_context=tasks.QueryExecutionContext(
database_name="interactions"
),
result_configuration=tasks.ResultConfiguration(
encryption_configuration=tasks.EncryptionConfiguration(
encryption_option=tasks.EncryptionOption.S3_MANAGED
),
output_location=s3.Location(
bucket_name="mybucket",
object_key="myprefix"
)
),
integration_pattern=sfn.IntegrationPattern.RUN_JOB
)
```
Each service integration has its own set of parameters that can be supplied.
## Evaluate Expression
Use the `EvaluateExpression` to perform simple operations referencing state paths. The
`expression` referenced in the task will be evaluated in a Lambda function
(`eval()`). This allows you to not have to write Lambda code for simple operations.
Example: convert a wait time from milliseconds to seconds, concat this in a message and wait:
```python
convert_to_seconds = tasks.EvaluateExpression(self, "Convert to seconds",
expression="$.waitMilliseconds / 1000",
result_path="$.waitSeconds"
)
create_message = tasks.EvaluateExpression(self, "Create message",
# Note: this is a string inside a string.
expression="`Now waiting ${$.waitSeconds} seconds...`",
runtime=lambda_.Runtime.NODEJS_14_X,
result_path="$.message"
)
publish_message = tasks.SnsPublish(self, "Publish message",
topic=sns.Topic(self, "cool-topic"),
message=sfn.TaskInput.from_json_path_at("$.message"),
result_path="$.sns"
)
wait = sfn.Wait(self, "Wait",
time=sfn.WaitTime.seconds_path("$.waitSeconds")
)
sfn.StateMachine(self, "StateMachine",
definition=convert_to_seconds.next(create_message).next(publish_message).next(wait)
)
```
The `EvaluateExpression` supports a `runtime` prop to specify the Lambda
runtime to use to evaluate the expression. Currently, only runtimes
of the Node.js family are supported.
## API Gateway
Step Functions supports [API Gateway](https://docs.aws.amazon.com/step-functions/latest/dg/connect-api-gateway.html) through the service integration pattern.
HTTP APIs are designed for low-latency, cost-effective integrations with AWS services, including AWS Lambda, and HTTP endpoints.
HTTP APIs support OIDC and OAuth 2.0 authorization, and come with built-in support for CORS and automatic deployments.
Previous-generation REST APIs currently offer more features. More details can be found [here](https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-vs-rest.html).
### Call REST API Endpoint
The `CallApiGatewayRestApiEndpoint` calls the REST API endpoint.
```python
import aws_cdk.aws_apigateway as apigateway
rest_api = apigateway.RestApi(self, "MyRestApi")
invoke_task = tasks.CallApiGatewayRestApiEndpoint(self, "Call REST API",
api=rest_api,
stage_name="prod",
method=tasks.HttpMethod.GET
)
```
Be aware that the header values must be arrays. When passing the Task Token
in the headers field `WAIT_FOR_TASK_TOKEN` integration, use
`JsonPath.array()` to wrap the token in an array:
```python
import aws_cdk.aws_apigateway as apigateway
# api: apigateway.RestApi
tasks.CallApiGatewayRestApiEndpoint(self, "Endpoint",
api=api,
stage_name="Stage",
method=tasks.HttpMethod.PUT,
integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
headers=sfn.TaskInput.from_object({
"TaskToken": sfn.JsonPath.array(sfn.JsonPath.task_token)
})
)
```
### Call HTTP API Endpoint
The `CallApiGatewayHttpApiEndpoint` calls the HTTP API endpoint.
```python
import aws_cdk.aws_apigatewayv2 as apigatewayv2
http_api = apigatewayv2.HttpApi(self, "MyHttpApi")
invoke_task = tasks.CallApiGatewayHttpApiEndpoint(self, "Call HTTP API",
api_id=http_api.api_id,
api_stack=Stack.of(http_api),
method=tasks.HttpMethod.GET
)
```
### AWS SDK
Step Functions supports calling [AWS service's API actions](https://docs.aws.amazon.com/step-functions/latest/dg/supported-services-awssdk.html)
through the service integration pattern.
You can use Step Functions' AWS SDK integrations to call any of the over two hundred AWS services
directly from your state machine, giving you access to over nine thousand API actions.
```python
# my_bucket: s3.Bucket
get_object = tasks.CallAwsService(self, "GetObject",
service="s3",
action="getObject",
parameters={
"Bucket": my_bucket.bucket_name,
"Key": sfn.JsonPath.string_at("$.key")
},
iam_resources=[my_bucket.arn_for_objects("*")]
)
```
Use camelCase for actions and PascalCase for parameter names.
The task automatically adds an IAM statement to the state machine role's policy based on the
service and action called. The resources for this statement must be specified in `iamResources`.
Use the `iamAction` prop to manually specify the IAM action name in the case where the IAM
action name does not match with the API service/action name:
```python
list_buckets = tasks.CallAwsService(self, "ListBuckets",
service="s3",
action="listBuckets",
iam_resources=["*"],
iam_action="s3:ListAllMyBuckets"
)
```
## Athena
Step Functions supports [Athena](https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html) through the service integration pattern.
### StartQueryExecution
The [StartQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_StartQueryExecution.html) API runs the SQL query statement.
```python
start_query_execution_job = tasks.AthenaStartQueryExecution(self, "Start Athena Query",
query_string=sfn.JsonPath.string_at("$.queryString"),
query_execution_context=tasks.QueryExecutionContext(
database_name="mydatabase"
),
result_configuration=tasks.ResultConfiguration(
encryption_configuration=tasks.EncryptionConfiguration(
encryption_option=tasks.EncryptionOption.S3_MANAGED
),
output_location=s3.Location(
bucket_name="query-results-bucket",
object_key="folder"
)
)
)
```
### GetQueryExecution
The [GetQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryExecution.html) API gets information about a single execution of a query.
```python
get_query_execution_job = tasks.AthenaGetQueryExecution(self, "Get Query Execution",
query_execution_id=sfn.JsonPath.string_at("$.QueryExecutionId")
)
```
### GetQueryResults
The [GetQueryResults](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) API that streams the results of a single query execution specified by QueryExecutionId from S3.
```python
get_query_results_job = tasks.AthenaGetQueryResults(self, "Get Query Results",
query_execution_id=sfn.JsonPath.string_at("$.QueryExecutionId")
)
```
### StopQueryExecution
The [StopQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_StopQueryExecution.html) API that stops a query execution.
```python
stop_query_execution_job = tasks.AthenaStopQueryExecution(self, "Stop Query Execution",
query_execution_id=sfn.JsonPath.string_at("$.QueryExecutionId")
)
```
## Batch
Step Functions supports [Batch](https://docs.aws.amazon.com/step-functions/latest/dg/connect-batch.html) through the service integration pattern.
### SubmitJob
The [SubmitJob](https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html) API submits an AWS Batch job from a job definition.
```python
import aws_cdk.aws_batch as batch
# batch_job_definition: batch.JobDefinition
# batch_queue: batch.JobQueue
task = tasks.BatchSubmitJob(self, "Submit Job",
job_definition_arn=batch_job_definition.job_definition_arn,
job_name="MyJob",
job_queue_arn=batch_queue.job_queue_arn
)
```
## CodeBuild
Step Functions supports [CodeBuild](https://docs.aws.amazon.com/step-functions/latest/dg/connect-codebuild.html) through the service integration pattern.
### StartBuild
[StartBuild](https://docs.aws.amazon.com/codebuild/latest/APIReference/API_StartBuild.html) starts a CodeBuild Project by Project Name.
```python
import aws_cdk.aws_codebuild as codebuild
codebuild_project = codebuild.Project(self, "Project",
project_name="MyTestProject",
build_spec=codebuild.BuildSpec.from_object({
"version": "0.2",
"phases": {
"build": {
"commands": ["echo \"Hello, CodeBuild!\""
]
}
}
})
)
task = tasks.CodeBuildStartBuild(self, "Task",
project=codebuild_project,
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
environment_variables_override={
"ZONE": codebuild.BuildEnvironmentVariable(
type=codebuild.BuildEnvironmentVariableType.PLAINTEXT,
value=sfn.JsonPath.string_at("$.envVariables.zone")
)
}
)
```
## DynamoDB
You can call DynamoDB APIs from a `Task` state.
Read more about calling DynamoDB APIs [here](https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html)
### GetItem
The [GetItem](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html) operation returns a set of attributes for the item with the given primary key.
```python
# my_table: dynamodb.Table
tasks.DynamoGetItem(self, "Get Item",
key={"message_id": tasks.DynamoAttributeValue.from_string("message-007")},
table=my_table
)
```
### PutItem
The [PutItem](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html) operation creates a new item, or replaces an old item with a new item.
```python
# my_table: dynamodb.Table
tasks.DynamoPutItem(self, "PutItem",
item={
"MessageId": tasks.DynamoAttributeValue.from_string("message-007"),
"Text": tasks.DynamoAttributeValue.from_string(sfn.JsonPath.string_at("$.bar")),
"TotalCount": tasks.DynamoAttributeValue.from_number(10)
},
table=my_table
)
```
### DeleteItem
The [DeleteItem](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html) operation deletes a single item in a table by primary key.
```python
# my_table: dynamodb.Table
tasks.DynamoDeleteItem(self, "DeleteItem",
key={"MessageId": tasks.DynamoAttributeValue.from_string("message-007")},
table=my_table,
result_path=sfn.JsonPath.DISCARD
)
```
### UpdateItem
The [UpdateItem](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html) operation edits an existing item's attributes, or adds a new item
to the table if it does not already exist.
```python
# my_table: dynamodb.Table
tasks.DynamoUpdateItem(self, "UpdateItem",
key={
"MessageId": tasks.DynamoAttributeValue.from_string("message-007")
},
table=my_table,
expression_attribute_values={
":val": tasks.DynamoAttributeValue.number_from_string(sfn.JsonPath.string_at("$.Item.TotalCount.N")),
":rand": tasks.DynamoAttributeValue.from_number(20)
},
update_expression="SET TotalCount = :val + :rand"
)
```
## ECS
Step Functions supports [ECS/Fargate](https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html) through the service integration pattern.
### RunTask
[RunTask](https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html) starts a new task using the specified task definition.
#### EC2
The EC2 launch type allows you to run your containerized applications on a cluster
of Amazon EC2 instances that you manage.
When a task that uses the EC2 launch type is launched, Amazon ECS must determine where
to place the task based on the requirements specified in the task definition, such as
CPU and memory. Similarly, when you scale down the task count, Amazon ECS must determine
which tasks to terminate. You can apply task placement strategies and constraints to
customize how Amazon ECS places and terminates tasks. Learn more about [task placement](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-placement.html)
The latest ACTIVE revision of the passed task definition is used for running the task.
The following example runs a job from a task definition on EC2
```python
vpc = ec2.Vpc.from_lookup(self, "Vpc",
is_default=True
)
cluster = ecs.Cluster(self, "Ec2Cluster", vpc=vpc)
cluster.add_capacity("DefaultAutoScalingGroup",
instance_type=ec2.InstanceType("t2.micro"),
vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC)
)
task_definition = ecs.TaskDefinition(self, "TD",
compatibility=ecs.Compatibility.EC2
)
task_definition.add_container("TheContainer",
image=ecs.ContainerImage.from_registry("foo/bar"),
memory_limit_mi_b=256
)
run_task = tasks.EcsRunTask(self, "Run",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=task_definition,
launch_target=tasks.EcsEc2LaunchTarget(
placement_strategies=[
ecs.PlacementStrategy.spread_across_instances(),
ecs.PlacementStrategy.packed_by_cpu(),
ecs.PlacementStrategy.randomly()
],
placement_constraints=[
ecs.PlacementConstraint.member_of("blieptuut")
]
)
)
```
#### Fargate
AWS Fargate is a serverless compute engine for containers that works with Amazon
Elastic Container Service (ECS). Fargate makes it easy for you to focus on building
your applications. Fargate removes the need to provision and manage servers, lets you
specify and pay for resources per application, and improves security through application
isolation by design. Learn more about [Fargate](https://aws.amazon.com/fargate/)
The Fargate launch type allows you to run your containerized applications without the need
to provision and manage the backend infrastructure. Just register your task definition and
Fargate launches the container for you. The latest ACTIVE revision of the passed
task definition is used for running the task. Learn more about
[Fargate Versioning](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_DescribeTaskDefinition.html)
The following example runs a job from a task definition on Fargate
```python
vpc = ec2.Vpc.from_lookup(self, "Vpc",
is_default=True
)
cluster = ecs.Cluster(self, "FargateCluster", vpc=vpc)
task_definition = ecs.TaskDefinition(self, "TD",
memory_mi_b="512",
cpu="256",
compatibility=ecs.Compatibility.FARGATE
)
container_definition = task_definition.add_container("TheContainer",
image=ecs.ContainerImage.from_registry("foo/bar"),
memory_limit_mi_b=256
)
run_task = tasks.EcsRunTask(self, "RunFargate",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=task_definition,
assign_public_ip=True,
container_overrides=[tasks.ContainerOverride(
container_definition=container_definition,
environment=[tasks.TaskEnvironmentVariable(name="SOME_KEY", value=sfn.JsonPath.string_at("$.SomeKey"))]
)],
launch_target=tasks.EcsFargateLaunchTarget()
)
```
## EMR
Step Functions supports Amazon EMR through the service integration pattern.
The service integration APIs correspond to Amazon EMR APIs but differ in the
parameters that are used.
[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html) about the differences when using these service integrations.
### Create Cluster
Creates and starts running a cluster (job flow).
Corresponds to the [`runJobFlow`](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html) API in EMR.
```python
cluster_role = iam.Role(self, "ClusterRole",
assumed_by=iam.ServicePrincipal("ec2.amazonaws.com")
)
service_role = iam.Role(self, "ServiceRole",
assumed_by=iam.ServicePrincipal("elasticmapreduce.amazonaws.com")
)
auto_scaling_role = iam.Role(self, "AutoScalingRole",
assumed_by=iam.ServicePrincipal("elasticmapreduce.amazonaws.com")
)
auto_scaling_role.assume_role_policy.add_statements(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
principals=[
iam.ServicePrincipal("application-autoscaling.amazonaws.com")
],
actions=["sts:AssumeRole"
]
))
tasks.EmrCreateCluster(self, "Create Cluster",
instances=tasks.EmrCreateCluster.InstancesConfigProperty(),
cluster_role=cluster_role,
name=sfn.TaskInput.from_json_path_at("$.ClusterName").value,
service_role=service_role,
auto_scaling_role=auto_scaling_role
)
```
If you want to run multiple steps in [parallel](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-concurrent-steps.html),
you can specify the `stepConcurrencyLevel` property. The concurrency range is between 1
and 256 inclusive, where the default concurrency of 1 means no step concurrency is allowed.
`stepConcurrencyLevel` requires the EMR release label to be 5.28.0 or above.
```python
tasks.EmrCreateCluster(self, "Create Cluster",
instances=tasks.EmrCreateCluster.InstancesConfigProperty(),
name=sfn.TaskInput.from_json_path_at("$.ClusterName").value,
step_concurrency_level=10
)
```
### Termination Protection
Locks a cluster (job flow) so the EC2 instances in the cluster cannot be
terminated by user intervention, an API call, or a job-flow error.
Corresponds to the [`setTerminationProtection`](https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html) API in EMR.
```python
tasks.EmrSetClusterTerminationProtection(self, "Task",
cluster_id="ClusterId",
termination_protected=False
)
```
### Terminate Cluster
Shuts down a cluster (job flow).
Corresponds to the [`terminateJobFlows`](https://docs.aws.amazon.com/emr/latest/APIReference/API_TerminateJobFlows.html) API in EMR.
```python
tasks.EmrTerminateCluster(self, "Task",
cluster_id="ClusterId"
)
```
### Add Step
Adds a new step to a running cluster.
Corresponds to the [`addJobFlowSteps`](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html) API in EMR.
```python
tasks.EmrAddStep(self, "Task",
cluster_id="ClusterId",
name="StepName",
jar="Jar",
action_on_failure=tasks.ActionOnFailure.CONTINUE
)
```
### Cancel Step
Cancels a pending step in a running cluster.
Corresponds to the [`cancelSteps`](https://docs.aws.amazon.com/emr/latest/APIReference/API_CancelSteps.html) API in EMR.
```python
tasks.EmrCancelStep(self, "Task",
cluster_id="ClusterId",
step_id="StepId"
)
```
### Modify Instance Fleet
Modifies the target On-Demand and target Spot capacities for the instance
fleet with the specified InstanceFleetName.
Corresponds to the [`modifyInstanceFleet`](https://docs.aws.amazon.com/emr/latest/APIReference/API_ModifyInstanceFleet.html) API in EMR.
```python
tasks.EmrModifyInstanceFleetByName(self, "Task",
cluster_id="ClusterId",
instance_fleet_name="InstanceFleetName",
target_on_demand_capacity=2,
target_spot_capacity=0
)
```
### Modify Instance Group
Modifies the number of nodes and configuration settings of an instance group.
Corresponds to the [`modifyInstanceGroups`](https://docs.aws.amazon.com/emr/latest/APIReference/API_ModifyInstanceGroups.html) API in EMR.
```python
tasks.EmrModifyInstanceGroupByName(self, "Task",
cluster_id="ClusterId",
instance_group_name=sfn.JsonPath.string_at("$.InstanceGroupName"),
instance_group=tasks.EmrModifyInstanceGroupByName.InstanceGroupModifyConfigProperty(
instance_count=1
)
)
```
## EMR on EKS
Step Functions supports Amazon EMR on EKS through the service integration pattern.
The service integration APIs correspond to Amazon EMR on EKS APIs, but differ in the parameters that are used.
[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr-eks.html) about the differences when using these service integrations.
[Setting up](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up.html) the EKS cluster is required.
### Create Virtual Cluster
The [CreateVirtualCluster](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_CreateVirtualCluster.html) API creates a single virtual cluster that's mapped to a single Kubernetes namespace.
The EKS cluster containing the Kubernetes namespace where the virtual cluster will be mapped can be passed in from the task input.
```python
tasks.EmrContainersCreateVirtualCluster(self, "Create a Virtual Cluster",
eks_cluster=tasks.EksClusterInput.from_task_input(sfn.TaskInput.from_text("clusterId"))
)
```
The EKS cluster can also be passed in directly.
```python
import aws_cdk.aws_eks as eks
# eks_cluster: eks.Cluster
tasks.EmrContainersCreateVirtualCluster(self, "Create a Virtual Cluster",
eks_cluster=tasks.EksClusterInput.from_cluster(eks_cluster)
)
```
By default, the Kubernetes namespace that a virtual cluster maps to is "default", but a specific namespace within an EKS cluster can be selected.
```python
tasks.EmrContainersCreateVirtualCluster(self, "Create a Virtual Cluster",
eks_cluster=tasks.EksClusterInput.from_task_input(sfn.TaskInput.from_text("clusterId")),
eks_namespace="specified-namespace"
)
```
### Delete Virtual Cluster
The [DeleteVirtualCluster](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DeleteVirtualCluster.html) API deletes a virtual cluster.
```python
tasks.EmrContainersDeleteVirtualCluster(self, "Delete a Virtual Cluster",
virtual_cluster_id=sfn.TaskInput.from_json_path_at("$.virtualCluster")
)
```
### Start Job Run
The [StartJobRun](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_StartJobRun.html) API starts a job run. A job is a unit of work that you submit to Amazon EMR on EKS for execution. The work performed by the job can be defined by a Spark jar, PySpark script, or SparkSQL query. A job run is an execution of the job on the virtual cluster.
Required setup:
* If not done already, follow the [steps](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up.html) to setup EMR on EKS and [create an EKS Cluster](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-eks-readme.html#quick-start).
* Enable [Cluster access](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-cluster-access.html)
* Enable [IAM Role access](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-enable-IAM.html)
The following actions must be performed if the virtual cluster ID is supplied from the task input. Otherwise, if it is supplied statically in the state machine definition, these actions will be done automatically.
* Create an [IAM role](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-iam.Role.html)
* Update the [Role Trust Policy](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-trust-policy.html) of the Job Execution Role.
The job can be configured with spark submit parameters:
```python
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id("de92jdei2910fwedz"),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py"),
spark_submit_parameters="--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
)
)
)
```
Configuring the job can also be done via application configuration:
```python
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id("de92jdei2910fwedz"),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_name="EMR-Containers-Job",
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py")
)
),
application_config=[tasks.ApplicationConfiguration(
classification=tasks.Classification.SPARK_DEFAULTS,
properties={
"spark.executor.instances": "1",
"spark.executor.memory": "512M"
}
)]
)
```
Job monitoring can be enabled if `monitoring.logging` is set true. This automatically generates an S3 bucket and CloudWatch logs.
```python
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id("de92jdei2910fwedz"),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py"),
spark_submit_parameters="--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
)
),
monitoring=tasks.Monitoring(
logging=True
)
)
```
Otherwise, providing monitoring for jobs with existing log groups and log buckets is also available.
```python
import aws_cdk.aws_logs as logs
log_group = logs.LogGroup(self, "Log Group")
log_bucket = s3.Bucket(self, "S3 Bucket")
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id("de92jdei2910fwedz"),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py"),
spark_submit_parameters="--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
)
),
monitoring=tasks.Monitoring(
log_group=log_group,
log_bucket=log_bucket
)
)
```
Users can provide their own existing Job Execution Role.
```python
tasks.EmrContainersStartJobRun(self, "EMR Containers Start Job Run",
virtual_cluster=tasks.VirtualClusterInput.from_task_input(sfn.TaskInput.from_json_path_at("$.VirtualClusterId")),
release_label=tasks.ReleaseLabel.EMR_6_2_0,
job_name="EMR-Containers-Job",
execution_role=iam.Role.from_role_arn(self, "Job-Execution-Role", "arn:aws:iam::xxxxxxxxxxxx:role/JobExecutionRole"),
job_driver=tasks.JobDriver(
spark_submit_job_driver=tasks.SparkSubmitJobDriver(
entry_point=sfn.TaskInput.from_text("local:///usr/lib/spark/examples/src/main/python/pi.py"),
spark_submit_parameters="--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
)
)
)
```
## EKS
Step Functions supports Amazon EKS through the service integration pattern.
The service integration APIs correspond to Amazon EKS APIs.
[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eks.html) about the differences when using these service integrations.
### Call
Read and write Kubernetes resource objects via a Kubernetes API endpoint.
Corresponds to the [`call`](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eks.html) API in Step Functions Connector.
The following code snippet includes a Task state that uses eks:call to list the pods.
```python
import aws_cdk.aws_eks as eks
my_eks_cluster = eks.Cluster(self, "my sample cluster",
version=eks.KubernetesVersion.V1_18,
cluster_name="myEksCluster"
)
tasks.EksCall(self, "Call a EKS Endpoint",
cluster=my_eks_cluster,
http_method=tasks.HttpMethods.GET,
http_path="/api/v1/namespaces/default/pods"
)
```
## EventBridge
Step Functions supports Amazon EventBridge through the service integration pattern.
The service integration APIs correspond to Amazon EventBridge APIs.
[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eventbridge.html) about the differences when using these service integrations.
### Put Events
Send events to an EventBridge bus.
Corresponds to the [`put-events`](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eventbridge.html) API in Step Functions Connector.
The following code snippet includes a Task state that uses events:putevents to send an event to the default bus.
```python
import aws_cdk.aws_events as events
my_event_bus = events.EventBus(self, "EventBus",
event_bus_name="MyEventBus1"
)
tasks.EventBridgePutEvents(self, "Send an event to EventBridge",
entries=[tasks.EventBridgePutEventsEntry(
detail=sfn.TaskInput.from_object({
"Message": "Hello from Step Functions!"
}),
event_bus=my_event_bus,
detail_type="MessageFromStepFunctions",
source="step.functions"
)]
)
```
## Glue
Step Functions supports [AWS Glue](https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html) through the service integration pattern.
You can call the [`StartJobRun`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-runs.html#aws-glue-api-jobs-runs-StartJobRun) API from a `Task` state.
```python
tasks.GlueStartJobRun(self, "Task",
glue_job_name="my-glue-job",
arguments=sfn.TaskInput.from_object({
"key": "value"
}),
timeout=Duration.minutes(30),
notify_delay_after=Duration.minutes(5)
)
```
## Glue DataBrew
Step Functions supports [AWS Glue DataBrew](https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html) through the service integration pattern.
You can call the [`StartJobRun`](https://docs.aws.amazon.com/databrew/latest/dg/API_StartJobRun.html) API from a `Task` state.
```python
tasks.GlueDataBrewStartJobRun(self, "Task",
name="databrew-job"
)
```
## Lambda
[Invoke](https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html) a Lambda function.
You can specify the input to your Lambda function through the `payload` attribute.
By default, Step Functions invokes Lambda function with the state input (JSON path '$')
as the input.
The following snippet invokes a Lambda Function with the state input as the payload
by referencing the `$` path.
```python
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke with state input",
lambda_function=fn
)
```
When a function is invoked, the Lambda service sends [these response
elements](https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html#API_Invoke_ResponseElements)
back.
⚠️ The response from the Lambda function is in an attribute called `Payload`
The following snippet invokes a Lambda Function by referencing the `$.Payload` path
to reference the output of a Lambda executed before it.
```python
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke with empty object as payload",
lambda_function=fn,
payload=sfn.TaskInput.from_object({})
)
# use the output of fn as input
tasks.LambdaInvoke(self, "Invoke with payload field in the state input",
lambda_function=fn,
payload=sfn.TaskInput.from_json_path_at("$.Payload")
)
```
The following snippet invokes a Lambda and sets the task output to only include
the Lambda function response.
```python
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke and set function response as task output",
lambda_function=fn,
output_path="$.Payload"
)
```
If you want to combine the input and the Lambda function response you can use
the `payloadResponseOnly` property and specify the `resultPath`. This will put the
Lambda function ARN directly in the "Resource" string, but it conflicts with the
integrationPattern, invocationType, clientContext, and qualifier properties.
```python
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke and combine function response with task input",
lambda_function=fn,
payload_response_only=True,
result_path="$.fn"
)
```
You can have Step Functions pause a task, and wait for an external process to
return a task token. Read more about the [callback pattern](https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html#call-back-lambda-example)
To use the callback pattern, set the `token` property on the task. Call the Step
Functions `SendTaskSuccess` or `SendTaskFailure` APIs with the token to
indicate that the task has completed and the state machine should resume execution.
The following snippet invokes a Lambda with the task token as part of the input
to the Lambda.
```python
# fn: lambda.Function
tasks.LambdaInvoke(self, "Invoke with callback",
lambda_function=fn,
integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload=sfn.TaskInput.from_object({
"token": sfn.JsonPath.task_token,
"input": sfn.JsonPath.string_at("$.someField")
})
)
```
⚠️ The task will pause until it receives that task token back with a `SendTaskSuccess` or `SendTaskFailure`
call. Learn more about [Callback with the Task
Token](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token).
AWS Lambda can occasionally experience transient service errors. In this case, invoking Lambda
results in a 500 error, such as `ServiceException`, `AWSLambdaException`, or `SdkClientException`.
As a best practice, the `LambdaInvoke` task will retry on those errors with an interval of 2 seconds,
a back-off rate of 2 and 6 maximum attempts. Set the `retryOnServiceExceptions` prop to `false` to
disable this behavior.
## SageMaker
Step Functions supports [AWS SageMaker](https://docs.aws.amazon.com/step-functions/latest/dg/connect-sagemaker.html) through the service integration pattern.
If your training job or model uses resources from AWS Marketplace,
[network isolation is required](https://docs.aws.amazon.com/sagemaker/latest/dg/mkt-algo-model-internet-free.html).
To do so, set the `enableNetworkIsolation` property to `true` for `SageMakerCreateModel` or `SageMakerCreateTrainingJob`.
To set environment variables for the Docker container use the `environment` property.
### Create Training Job
You can call the [`CreateTrainingJob`](https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTrainingJob.html) API from a `Task` state.
```python
tasks.SageMakerCreateTrainingJob(self, "TrainSagemaker",
training_job_name=sfn.JsonPath.string_at("$.JobName"),
algorithm_specification=tasks.AlgorithmSpecification(
algorithm_name="BlazingText",
training_input_mode=tasks.InputMode.FILE
),
input_data_config=[tasks.Channel(
channel_name="train",
data_source=tasks.DataSource(
s3_data_source=tasks.S3DataSource(
s3_data_type=tasks.S3DataType.S3_PREFIX,
s3_location=tasks.S3Location.from_json_expression("$.S3Bucket")
)
)
)],
output_data_config=tasks.OutputDataConfig(
s3_output_location=tasks.S3Location.from_bucket(s3.Bucket.from_bucket_name(self, "Bucket", "mybucket"), "myoutputpath")
),
resource_config=tasks.ResourceConfig(
instance_count=1,
instance_type=ec2.InstanceType(sfn.JsonPath.string_at("$.InstanceType")),
volume_size=Size.gibibytes(50)
), # optional: default is 1 instance of EC2 `M4.XLarge` with `10GB` volume
stopping_condition=tasks.StoppingCondition(
max_runtime=Duration.hours(2)
)
)
```
### Create Transform Job
You can call the [`CreateTransformJob`](https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTransformJob.html) API from a `Task` state.
```python
tasks.SageMakerCreateTransformJob(self, "Batch Inference",
transform_job_name="MyTransformJob",
model_name="MyModelName",
model_client_options=tasks.ModelClientOptions(
invocations_max_retries=3, # default is 0
invocations_timeout=Duration.minutes(5)
),
transform_input=tasks.TransformInput(
transform_data_source=tasks.TransformDataSource(
s3_data_source=tasks.TransformS3DataSource(
s3_uri="s3://inputbucket/train",
s3_data_type=tasks.S3DataType.S3_PREFIX
)
)
),
transform_output=tasks.TransformOutput(
s3_output_path="s3://outputbucket/TransformJobOutputPath"
),
transform_resources=tasks.TransformResources(
instance_count=1,
instance_type=ec2.InstanceType.of(ec2.InstanceClass.M4, ec2.InstanceSize.XLARGE)
)
)
```
### Create Endpoint
You can call the [`CreateEndpoint`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateEndpoint.html) API from a `Task` state.
```python
tasks.SageMakerCreateEndpoint(self, "SagemakerEndpoint",
endpoint_name=sfn.JsonPath.string_at("$.EndpointName"),
endpoint_config_name=sfn.JsonPath.string_at("$.EndpointConfigName")
)
```
### Create Endpoint Config
You can call the [`CreateEndpointConfig`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateEndpointConfig.html) API from a `Task` state.
```python
tasks.SageMakerCreateEndpointConfig(self, "SagemakerEndpointConfig",
endpoint_config_name="MyEndpointConfig",
production_variants=[tasks.ProductionVariant(
initial_instance_count=2,
instance_type=ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.XLARGE),
model_name="MyModel",
variant_name="awesome-variant"
)]
)
```
### Create Model
You can call the [`CreateModel`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateModel.html) API from a `Task` state.
```python
tasks.SageMakerCreateModel(self, "Sagemaker",
model_name="MyModel",
primary_container=tasks.ContainerDefinition(
image=tasks.DockerImage.from_json_expression(sfn.JsonPath.string_at("$.Model.imageName")),
mode=tasks.Mode.SINGLE_MODEL,
model_s3_location=tasks.S3Location.from_json_expression("$.TrainingJob.ModelArtifacts.S3ModelArtifacts")
)
)
```
### Update Endpoint
You can call the [`UpdateEndpoint`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_UpdateEndpoint.html) API from a `Task` state.
```python
tasks.SageMakerUpdateEndpoint(self, "SagemakerEndpoint",
endpoint_name=sfn.JsonPath.string_at("$.Endpoint.Name"),
endpoint_config_name=sfn.JsonPath.string_at("$.Endpoint.EndpointConfig")
)
```
## SNS
Step Functions supports [Amazon SNS](https://docs.aws.amazon.com/step-functions/latest/dg/connect-sns.html) through the service integration pattern.
You can call the [`Publish`](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html) API from a `Task` state to publish to an SNS topic.
```python
topic = sns.Topic(self, "Topic")
# Use a field from the execution data as message.
task1 = tasks.SnsPublish(self, "Publish1",
topic=topic,
integration_pattern=sfn.IntegrationPattern.REQUEST_RESPONSE,
message=sfn.TaskInput.from_data_at("$.state.message"),
message_attributes={
"place": tasks.MessageAttribute(
value=sfn.JsonPath.string_at("$.place")
),
"pic": tasks.MessageAttribute(
# BINARY must be explicitly set
data_type=tasks.MessageAttributeDataType.BINARY,
value=sfn.JsonPath.string_at("$.pic")
),
"people": tasks.MessageAttribute(
value=4
),
"handles": tasks.MessageAttribute(
value=["@kslater", "@jjf", null, "@mfanning"]
)
}
)
# Combine a field from the execution data with
# a literal object.
task2 = tasks.SnsPublish(self, "Publish2",
topic=topic,
message=sfn.TaskInput.from_object({
"field1": "somedata",
"field2": sfn.JsonPath.string_at("$.field2")
})
)
```
## Step Functions
### Start Execution
You can manage [AWS Step Functions](https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html) executions.
AWS Step Functions supports it's own [`StartExecution`](https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html) API as a service integration.
```python
# Define a state machine with one Pass state
child = sfn.StateMachine(self, "ChildStateMachine",
definition=sfn.Chain.start(sfn.Pass(self, "PassState"))
)
# Include the state machine in a Task state with callback pattern
task = tasks.StepFunctionsStartExecution(self, "ChildTask",
state_machine=child,
integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
input=sfn.TaskInput.from_object({
"token": sfn.JsonPath.task_token,
"foo": "bar"
}),
name="MyExecutionName"
)
# Define a second state machine with the Task state above
sfn.StateMachine(self, "ParentStateMachine",
definition=task
)
```
You can utilize [Associate Workflow Executions](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-nested-workflows.html#nested-execution-startid)
via the `associateWithParent` property. This allows the Step Functions UI to link child
executions from parent executions, making it easier to trace execution flow across state machines.
```python
# child: sfn.StateMachine
task = tasks.StepFunctionsStartExecution(self, "ChildTask",
state_machine=child,
associate_with_parent=True
)
```
This will add the payload `AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$: $$.Execution.Id` to the
`input`property for you, which will pass the execution ID from the context object to the
execution input. It requires `input` to be an object or not be set at all.
### Invoke Activity
You can invoke a [Step Functions Activity](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html) which enables you to have
a task in your state machine where the work is performed by a *worker* that can
be hosted on Amazon EC2, Amazon ECS, AWS Lambda, basically anywhere. Activities
are a way to associate code running somewhere (known as an activity worker) with
a specific task in a state machine.
When Step Functions reaches an activity task state, the workflow waits for an
activity worker to poll for a task. An activity worker polls Step Functions by
using GetActivityTask, and sending the ARN for the related activity.
After the activity worker completes its work, it can provide a report of its
success or failure by using `SendTaskSuccess` or `SendTaskFailure`. These two
calls use the taskToken provided by GetActivityTask to associate the result
with that task.
The following example creates an activity and creates a task that invokes the activity.
```python
submit_job_activity = sfn.Activity(self, "SubmitJob")
tasks.StepFunctionsInvokeActivity(self, "Submit Job",
activity=submit_job_activity
)
```
## SQS
Step Functions supports [Amazon SQS](https://docs.aws.amazon.com/step-functions/latest/dg/connect-sqs.html)
You can call the [`SendMessage`](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html) API from a `Task` state
to send a message to an SQS queue.
```python
queue = sqs.Queue(self, "Queue")
# Use a field from the execution data as message.
task1 = tasks.SqsSendMessage(self, "Send1",
queue=queue,
message_body=sfn.TaskInput.from_json_path_at("$.message")
)
# Combine a field from the execution data with
# a literal object.
task2 = tasks.SqsSendMessage(self, "Send2",
queue=queue,
message_body=sfn.TaskInput.from_object({
"field1": "somedata",
"field2": sfn.JsonPath.string_at("$.field2")
})
)
```
Raw data
{
"_id": null,
"home_page": "https://github.com/aws/aws-cdk",
"name": "aws-cdk.aws-stepfunctions-tasks",
"maintainer": "",
"docs_url": null,
"requires_python": "~=3.7",
"maintainer_email": "",
"keywords": "",
"author": "Amazon Web Services",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/7b/e9/86e60931abcb9f2b30248117bcddb6277ed2beffa7377d6e27507066cc0c/aws-cdk.aws-stepfunctions-tasks-1.204.0.tar.gz",
"platform": null,
"description": "# Tasks for AWS Step Functions\n\n<!--BEGIN STABILITY BANNER-->---\n\n\n![End-of-Support](https://img.shields.io/badge/End--of--Support-critical.svg?style=for-the-badge)\n\n> AWS CDK v1 has reached End-of-Support on 2023-06-01.\n> This package is no longer being updated, and users should migrate to AWS CDK v2.\n>\n> For more information on how to migrate, see the [*Migrating to AWS CDK v2* guide](https://docs.aws.amazon.com/cdk/v2/guide/migrating-v2.html).\n\n---\n<!--END STABILITY BANNER-->\n\n[AWS Step Functions](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html) is a web service that enables you to coordinate the\ncomponents of distributed applications and microservices using visual workflows.\nYou build applications from individual components that each perform a discrete\nfunction, or task, allowing you to scale and change applications quickly.\n\nA [Task](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-task-state.html) state represents a single unit of work performed by a state machine.\nAll work in your state machine is performed by tasks.\n\nThis module is part of the [AWS Cloud Development Kit](https://github.com/aws/aws-cdk) project.\n\n## Table Of Contents\n\n* [Tasks for AWS Step Functions](#tasks-for-aws-step-functions)\n\n * [Table Of Contents](#table-of-contents)\n * [Task](#task)\n * [Paths](#paths)\n\n * [InputPath](#inputpath)\n * [OutputPath](#outputpath)\n * [ResultPath](#resultpath)\n * [Task parameters from the state JSON](#task-parameters-from-the-state-json)\n * [Evaluate Expression](#evaluate-expression)\n * [API Gateway](#api-gateway)\n\n * [Call REST API Endpoint](#call-rest-api-endpoint)\n * [Call HTTP API Endpoint](#call-http-api-endpoint)\n * [AWS SDK](#aws-sdk)\n * [Athena](#athena)\n\n * [StartQueryExecution](#startqueryexecution)\n * [GetQueryExecution](#getqueryexecution)\n * [GetQueryResults](#getqueryresults)\n * [StopQueryExecution](#stopqueryexecution)\n * [Batch](#batch)\n\n * [SubmitJob](#submitjob)\n * [CodeBuild](#codebuild)\n\n * [StartBuild](#startbuild)\n * [DynamoDB](#dynamodb)\n\n * [GetItem](#getitem)\n * [PutItem](#putitem)\n * [DeleteItem](#deleteitem)\n * [UpdateItem](#updateitem)\n * [ECS](#ecs)\n\n * [RunTask](#runtask)\n\n * [EC2](#ec2)\n * [Fargate](#fargate)\n * [EMR](#emr)\n\n * [Create Cluster](#create-cluster)\n * [Termination Protection](#termination-protection)\n * [Terminate Cluster](#terminate-cluster)\n * [Add Step](#add-step)\n * [Cancel Step](#cancel-step)\n * [Modify Instance Fleet](#modify-instance-fleet)\n * [Modify Instance Group](#modify-instance-group)\n * [EMR on EKS](#emr-on-eks)\n\n * [Create Virtual Cluster](#create-virtual-cluster)\n * [Delete Virtual Cluster](#delete-virtual-cluster)\n * [Start Job Run](#start-job-run)\n * [EKS](#eks)\n\n * [Call](#call)\n * [EventBridge](#eventbridge)\n\n * [Put Events](#put-events)\n * [Glue](#glue)\n * [Glue DataBrew](#glue-databrew)\n * [Lambda](#lambda)\n * [SageMaker](#sagemaker)\n\n * [Create Training Job](#create-training-job)\n * [Create Transform Job](#create-transform-job)\n * [Create Endpoint](#create-endpoint)\n * [Create Endpoint Config](#create-endpoint-config)\n * [Create Model](#create-model)\n * [Update Endpoint](#update-endpoint)\n * [SNS](#sns)\n * [Step Functions](#step-functions)\n\n * [Start Execution](#start-execution)\n * [Invoke Activity](#invoke-activity)\n * [SQS](#sqs)\n\n## Task\n\nA Task state represents a single unit of work performed by a state machine. In the\nCDK, the exact work to be done is determined by a class that implements `IStepFunctionsTask`.\n\nAWS Step Functions [integrates](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-service-integrations.html) with some AWS services so that you can call API\nactions, and coordinate executions directly from the Amazon States Language in\nStep Functions. You can directly call and pass parameters to the APIs of those\nservices.\n\n## Paths\n\nIn the Amazon States Language, a [path](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-paths.html) is a string beginning with `$` that you\ncan use to identify components within JSON text.\n\nLearn more about input and output processing in Step Functions [here](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-input-output-filtering.html)\n\n### InputPath\n\nBoth `InputPath` and `Parameters` fields provide a way to manipulate JSON as it\nmoves through your workflow. AWS Step Functions applies the `InputPath` field first,\nand then the `Parameters` field. You can first filter your raw input to a selection\nyou want using InputPath, and then apply Parameters to manipulate that input\nfurther, or add new values. If you don't specify an `InputPath`, a default value\nof `$` will be used.\n\nThe following example provides the field named `input` as the input to the `Task`\nstate that runs a Lambda function.\n\n```python\n# fn: lambda.Function\n\nsubmit_job = tasks.LambdaInvoke(self, \"Invoke Handler\",\n lambda_function=fn,\n input_path=\"$.input\"\n)\n```\n\n### OutputPath\n\nTasks also allow you to select a portion of the state output to pass to the next\nstate. This enables you to filter out unwanted information, and pass only the\nportion of the JSON that you care about. If you don't specify an `OutputPath`,\na default value of `$` will be used. This passes the entire JSON node to the next\nstate.\n\nThe [response](https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html#API_Invoke_ResponseSyntax) from a Lambda function includes the response from the function\nas well as other metadata.\n\nThe following example assigns the output from the Task to a field named `result`\n\n```python\n# fn: lambda.Function\n\nsubmit_job = tasks.LambdaInvoke(self, \"Invoke Handler\",\n lambda_function=fn,\n output_path=\"$.Payload.result\"\n)\n```\n\n### ResultSelector\n\nYou can use [`ResultSelector`](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector)\nto manipulate the raw result of a Task, Map or Parallel state before it is\npassed to [`ResultPath`](###ResultPath). For service integrations, the raw\nresult contains metadata in addition to the response payload. You can use\nResultSelector to construct a JSON payload that becomes the effective result\nusing static values or references to the raw result or context object.\n\nThe following example extracts the output payload of a Lambda function Task and combines\nit with some static values and the state name from the context object.\n\n```python\n# fn: lambda.Function\n\ntasks.LambdaInvoke(self, \"Invoke Handler\",\n lambda_function=fn,\n result_selector={\n \"lambda_output\": sfn.JsonPath.string_at(\"$.Payload\"),\n \"invoke_request_id\": sfn.JsonPath.string_at(\"$.SdkResponseMetadata.RequestId\"),\n \"static_value\": {\n \"foo\": \"bar\"\n },\n \"state_name\": sfn.JsonPath.string_at(\"$.State.Name\")\n }\n)\n```\n\n### ResultPath\n\nThe output of a state can be a copy of its input, the result it produces (for\nexample, output from a Task state\u2019s Lambda function), or a combination of its\ninput and result. Use [`ResultPath`](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-resultpath.html) to control which combination of these is\npassed to the state output. If you don't specify an `ResultPath`, a default\nvalue of `$` will be used.\n\nThe following example adds the item from calling DynamoDB's `getItem` API to the state\ninput and passes it to the next state.\n\n```python\n# my_table: dynamodb.Table\n\ntasks.DynamoPutItem(self, \"PutItem\",\n item={\n \"MessageId\": tasks.DynamoAttributeValue.from_string(\"message-id\")\n },\n table=my_table,\n result_path=\"$.Item\"\n)\n```\n\n\u26a0\ufe0f The `OutputPath` is computed after applying `ResultPath`. All service integrations\nreturn metadata as part of their response. When using `ResultPath`, it's not possible to\nmerge a subset of the task output to the input.\n\n## Task parameters from the state JSON\n\nMost tasks take parameters. Parameter values can either be static, supplied directly\nin the workflow definition (by specifying their values), or a value available at runtime\nin the state machine's execution (either as its input or an output of a prior state).\nParameter values available at runtime can be specified via the `JsonPath` class,\nusing methods such as `JsonPath.stringAt()`.\n\nThe following example provides the field named `input` as the input to the Lambda function\nand invokes it asynchronously.\n\n```python\n# fn: lambda.Function\n\n\nsubmit_job = tasks.LambdaInvoke(self, \"Invoke Handler\",\n lambda_function=fn,\n payload=sfn.TaskInput.from_json_path_at(\"$.input\"),\n invocation_type=tasks.LambdaInvocationType.EVENT\n)\n```\n\nYou can also use [intrinsic functions](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-intrinsic-functions.html) available on `JsonPath`, for example `JsonPath.format()`.\nHere is an example of starting an Athena query that is dynamically created using the task input:\n\n```python\nstart_query_execution_job = tasks.AthenaStartQueryExecution(self, \"Athena Start Query\",\n query_string=sfn.JsonPath.format(\"select contacts where year={};\", sfn.JsonPath.string_at(\"$.year\")),\n query_execution_context=tasks.QueryExecutionContext(\n database_name=\"interactions\"\n ),\n result_configuration=tasks.ResultConfiguration(\n encryption_configuration=tasks.EncryptionConfiguration(\n encryption_option=tasks.EncryptionOption.S3_MANAGED\n ),\n output_location=s3.Location(\n bucket_name=\"mybucket\",\n object_key=\"myprefix\"\n )\n ),\n integration_pattern=sfn.IntegrationPattern.RUN_JOB\n)\n```\n\nEach service integration has its own set of parameters that can be supplied.\n\n## Evaluate Expression\n\nUse the `EvaluateExpression` to perform simple operations referencing state paths. The\n`expression` referenced in the task will be evaluated in a Lambda function\n(`eval()`). This allows you to not have to write Lambda code for simple operations.\n\nExample: convert a wait time from milliseconds to seconds, concat this in a message and wait:\n\n```python\nconvert_to_seconds = tasks.EvaluateExpression(self, \"Convert to seconds\",\n expression=\"$.waitMilliseconds / 1000\",\n result_path=\"$.waitSeconds\"\n)\n\ncreate_message = tasks.EvaluateExpression(self, \"Create message\",\n # Note: this is a string inside a string.\n expression=\"`Now waiting ${$.waitSeconds} seconds...`\",\n runtime=lambda_.Runtime.NODEJS_14_X,\n result_path=\"$.message\"\n)\n\npublish_message = tasks.SnsPublish(self, \"Publish message\",\n topic=sns.Topic(self, \"cool-topic\"),\n message=sfn.TaskInput.from_json_path_at(\"$.message\"),\n result_path=\"$.sns\"\n)\n\nwait = sfn.Wait(self, \"Wait\",\n time=sfn.WaitTime.seconds_path(\"$.waitSeconds\")\n)\n\nsfn.StateMachine(self, \"StateMachine\",\n definition=convert_to_seconds.next(create_message).next(publish_message).next(wait)\n)\n```\n\nThe `EvaluateExpression` supports a `runtime` prop to specify the Lambda\nruntime to use to evaluate the expression. Currently, only runtimes\nof the Node.js family are supported.\n\n## API Gateway\n\nStep Functions supports [API Gateway](https://docs.aws.amazon.com/step-functions/latest/dg/connect-api-gateway.html) through the service integration pattern.\n\nHTTP APIs are designed for low-latency, cost-effective integrations with AWS services, including AWS Lambda, and HTTP endpoints.\nHTTP APIs support OIDC and OAuth 2.0 authorization, and come with built-in support for CORS and automatic deployments.\nPrevious-generation REST APIs currently offer more features. More details can be found [here](https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-vs-rest.html).\n\n### Call REST API Endpoint\n\nThe `CallApiGatewayRestApiEndpoint` calls the REST API endpoint.\n\n```python\nimport aws_cdk.aws_apigateway as apigateway\n\nrest_api = apigateway.RestApi(self, \"MyRestApi\")\n\ninvoke_task = tasks.CallApiGatewayRestApiEndpoint(self, \"Call REST API\",\n api=rest_api,\n stage_name=\"prod\",\n method=tasks.HttpMethod.GET\n)\n```\n\nBe aware that the header values must be arrays. When passing the Task Token\nin the headers field `WAIT_FOR_TASK_TOKEN` integration, use\n`JsonPath.array()` to wrap the token in an array:\n\n```python\nimport aws_cdk.aws_apigateway as apigateway\n# api: apigateway.RestApi\n\n\ntasks.CallApiGatewayRestApiEndpoint(self, \"Endpoint\",\n api=api,\n stage_name=\"Stage\",\n method=tasks.HttpMethod.PUT,\n integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,\n headers=sfn.TaskInput.from_object({\n \"TaskToken\": sfn.JsonPath.array(sfn.JsonPath.task_token)\n })\n)\n```\n\n### Call HTTP API Endpoint\n\nThe `CallApiGatewayHttpApiEndpoint` calls the HTTP API endpoint.\n\n```python\nimport aws_cdk.aws_apigatewayv2 as apigatewayv2\n\nhttp_api = apigatewayv2.HttpApi(self, \"MyHttpApi\")\n\ninvoke_task = tasks.CallApiGatewayHttpApiEndpoint(self, \"Call HTTP API\",\n api_id=http_api.api_id,\n api_stack=Stack.of(http_api),\n method=tasks.HttpMethod.GET\n)\n```\n\n### AWS SDK\n\nStep Functions supports calling [AWS service's API actions](https://docs.aws.amazon.com/step-functions/latest/dg/supported-services-awssdk.html)\nthrough the service integration pattern.\n\nYou can use Step Functions' AWS SDK integrations to call any of the over two hundred AWS services\ndirectly from your state machine, giving you access to over nine thousand API actions.\n\n```python\n# my_bucket: s3.Bucket\n\nget_object = tasks.CallAwsService(self, \"GetObject\",\n service=\"s3\",\n action=\"getObject\",\n parameters={\n \"Bucket\": my_bucket.bucket_name,\n \"Key\": sfn.JsonPath.string_at(\"$.key\")\n },\n iam_resources=[my_bucket.arn_for_objects(\"*\")]\n)\n```\n\nUse camelCase for actions and PascalCase for parameter names.\n\nThe task automatically adds an IAM statement to the state machine role's policy based on the\nservice and action called. The resources for this statement must be specified in `iamResources`.\n\nUse the `iamAction` prop to manually specify the IAM action name in the case where the IAM\naction name does not match with the API service/action name:\n\n```python\nlist_buckets = tasks.CallAwsService(self, \"ListBuckets\",\n service=\"s3\",\n action=\"listBuckets\",\n iam_resources=[\"*\"],\n iam_action=\"s3:ListAllMyBuckets\"\n)\n```\n\n## Athena\n\nStep Functions supports [Athena](https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html) through the service integration pattern.\n\n### StartQueryExecution\n\nThe [StartQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_StartQueryExecution.html) API runs the SQL query statement.\n\n```python\nstart_query_execution_job = tasks.AthenaStartQueryExecution(self, \"Start Athena Query\",\n query_string=sfn.JsonPath.string_at(\"$.queryString\"),\n query_execution_context=tasks.QueryExecutionContext(\n database_name=\"mydatabase\"\n ),\n result_configuration=tasks.ResultConfiguration(\n encryption_configuration=tasks.EncryptionConfiguration(\n encryption_option=tasks.EncryptionOption.S3_MANAGED\n ),\n output_location=s3.Location(\n bucket_name=\"query-results-bucket\",\n object_key=\"folder\"\n )\n )\n)\n```\n\n### GetQueryExecution\n\nThe [GetQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryExecution.html) API gets information about a single execution of a query.\n\n```python\nget_query_execution_job = tasks.AthenaGetQueryExecution(self, \"Get Query Execution\",\n query_execution_id=sfn.JsonPath.string_at(\"$.QueryExecutionId\")\n)\n```\n\n### GetQueryResults\n\nThe [GetQueryResults](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) API that streams the results of a single query execution specified by QueryExecutionId from S3.\n\n```python\nget_query_results_job = tasks.AthenaGetQueryResults(self, \"Get Query Results\",\n query_execution_id=sfn.JsonPath.string_at(\"$.QueryExecutionId\")\n)\n```\n\n### StopQueryExecution\n\nThe [StopQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_StopQueryExecution.html) API that stops a query execution.\n\n```python\nstop_query_execution_job = tasks.AthenaStopQueryExecution(self, \"Stop Query Execution\",\n query_execution_id=sfn.JsonPath.string_at(\"$.QueryExecutionId\")\n)\n```\n\n## Batch\n\nStep Functions supports [Batch](https://docs.aws.amazon.com/step-functions/latest/dg/connect-batch.html) through the service integration pattern.\n\n### SubmitJob\n\nThe [SubmitJob](https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html) API submits an AWS Batch job from a job definition.\n\n```python\nimport aws_cdk.aws_batch as batch\n# batch_job_definition: batch.JobDefinition\n# batch_queue: batch.JobQueue\n\n\ntask = tasks.BatchSubmitJob(self, \"Submit Job\",\n job_definition_arn=batch_job_definition.job_definition_arn,\n job_name=\"MyJob\",\n job_queue_arn=batch_queue.job_queue_arn\n)\n```\n\n## CodeBuild\n\nStep Functions supports [CodeBuild](https://docs.aws.amazon.com/step-functions/latest/dg/connect-codebuild.html) through the service integration pattern.\n\n### StartBuild\n\n[StartBuild](https://docs.aws.amazon.com/codebuild/latest/APIReference/API_StartBuild.html) starts a CodeBuild Project by Project Name.\n\n```python\nimport aws_cdk.aws_codebuild as codebuild\n\n\ncodebuild_project = codebuild.Project(self, \"Project\",\n project_name=\"MyTestProject\",\n build_spec=codebuild.BuildSpec.from_object({\n \"version\": \"0.2\",\n \"phases\": {\n \"build\": {\n \"commands\": [\"echo \\\"Hello, CodeBuild!\\\"\"\n ]\n }\n }\n })\n)\n\ntask = tasks.CodeBuildStartBuild(self, \"Task\",\n project=codebuild_project,\n integration_pattern=sfn.IntegrationPattern.RUN_JOB,\n environment_variables_override={\n \"ZONE\": codebuild.BuildEnvironmentVariable(\n type=codebuild.BuildEnvironmentVariableType.PLAINTEXT,\n value=sfn.JsonPath.string_at(\"$.envVariables.zone\")\n )\n }\n)\n```\n\n## DynamoDB\n\nYou can call DynamoDB APIs from a `Task` state.\nRead more about calling DynamoDB APIs [here](https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html)\n\n### GetItem\n\nThe [GetItem](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html) operation returns a set of attributes for the item with the given primary key.\n\n```python\n# my_table: dynamodb.Table\n\ntasks.DynamoGetItem(self, \"Get Item\",\n key={\"message_id\": tasks.DynamoAttributeValue.from_string(\"message-007\")},\n table=my_table\n)\n```\n\n### PutItem\n\nThe [PutItem](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html) operation creates a new item, or replaces an old item with a new item.\n\n```python\n# my_table: dynamodb.Table\n\ntasks.DynamoPutItem(self, \"PutItem\",\n item={\n \"MessageId\": tasks.DynamoAttributeValue.from_string(\"message-007\"),\n \"Text\": tasks.DynamoAttributeValue.from_string(sfn.JsonPath.string_at(\"$.bar\")),\n \"TotalCount\": tasks.DynamoAttributeValue.from_number(10)\n },\n table=my_table\n)\n```\n\n### DeleteItem\n\nThe [DeleteItem](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html) operation deletes a single item in a table by primary key.\n\n```python\n# my_table: dynamodb.Table\n\ntasks.DynamoDeleteItem(self, \"DeleteItem\",\n key={\"MessageId\": tasks.DynamoAttributeValue.from_string(\"message-007\")},\n table=my_table,\n result_path=sfn.JsonPath.DISCARD\n)\n```\n\n### UpdateItem\n\nThe [UpdateItem](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html) operation edits an existing item's attributes, or adds a new item\nto the table if it does not already exist.\n\n```python\n# my_table: dynamodb.Table\n\ntasks.DynamoUpdateItem(self, \"UpdateItem\",\n key={\n \"MessageId\": tasks.DynamoAttributeValue.from_string(\"message-007\")\n },\n table=my_table,\n expression_attribute_values={\n \":val\": tasks.DynamoAttributeValue.number_from_string(sfn.JsonPath.string_at(\"$.Item.TotalCount.N\")),\n \":rand\": tasks.DynamoAttributeValue.from_number(20)\n },\n update_expression=\"SET TotalCount = :val + :rand\"\n)\n```\n\n## ECS\n\nStep Functions supports [ECS/Fargate](https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html) through the service integration pattern.\n\n### RunTask\n\n[RunTask](https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html) starts a new task using the specified task definition.\n\n#### EC2\n\nThe EC2 launch type allows you to run your containerized applications on a cluster\nof Amazon EC2 instances that you manage.\n\nWhen a task that uses the EC2 launch type is launched, Amazon ECS must determine where\nto place the task based on the requirements specified in the task definition, such as\nCPU and memory. Similarly, when you scale down the task count, Amazon ECS must determine\nwhich tasks to terminate. You can apply task placement strategies and constraints to\ncustomize how Amazon ECS places and terminates tasks. Learn more about [task placement](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-placement.html)\n\nThe latest ACTIVE revision of the passed task definition is used for running the task.\n\nThe following example runs a job from a task definition on EC2\n\n```python\nvpc = ec2.Vpc.from_lookup(self, \"Vpc\",\n is_default=True\n)\n\ncluster = ecs.Cluster(self, \"Ec2Cluster\", vpc=vpc)\ncluster.add_capacity(\"DefaultAutoScalingGroup\",\n instance_type=ec2.InstanceType(\"t2.micro\"),\n vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC)\n)\n\ntask_definition = ecs.TaskDefinition(self, \"TD\",\n compatibility=ecs.Compatibility.EC2\n)\n\ntask_definition.add_container(\"TheContainer\",\n image=ecs.ContainerImage.from_registry(\"foo/bar\"),\n memory_limit_mi_b=256\n)\n\nrun_task = tasks.EcsRunTask(self, \"Run\",\n integration_pattern=sfn.IntegrationPattern.RUN_JOB,\n cluster=cluster,\n task_definition=task_definition,\n launch_target=tasks.EcsEc2LaunchTarget(\n placement_strategies=[\n ecs.PlacementStrategy.spread_across_instances(),\n ecs.PlacementStrategy.packed_by_cpu(),\n ecs.PlacementStrategy.randomly()\n ],\n placement_constraints=[\n ecs.PlacementConstraint.member_of(\"blieptuut\")\n ]\n )\n)\n```\n\n#### Fargate\n\nAWS Fargate is a serverless compute engine for containers that works with Amazon\nElastic Container Service (ECS). Fargate makes it easy for you to focus on building\nyour applications. Fargate removes the need to provision and manage servers, lets you\nspecify and pay for resources per application, and improves security through application\nisolation by design. Learn more about [Fargate](https://aws.amazon.com/fargate/)\n\nThe Fargate launch type allows you to run your containerized applications without the need\nto provision and manage the backend infrastructure. Just register your task definition and\nFargate launches the container for you. The latest ACTIVE revision of the passed\ntask definition is used for running the task. Learn more about\n[Fargate Versioning](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_DescribeTaskDefinition.html)\n\nThe following example runs a job from a task definition on Fargate\n\n```python\nvpc = ec2.Vpc.from_lookup(self, \"Vpc\",\n is_default=True\n)\n\ncluster = ecs.Cluster(self, \"FargateCluster\", vpc=vpc)\n\ntask_definition = ecs.TaskDefinition(self, \"TD\",\n memory_mi_b=\"512\",\n cpu=\"256\",\n compatibility=ecs.Compatibility.FARGATE\n)\n\ncontainer_definition = task_definition.add_container(\"TheContainer\",\n image=ecs.ContainerImage.from_registry(\"foo/bar\"),\n memory_limit_mi_b=256\n)\n\nrun_task = tasks.EcsRunTask(self, \"RunFargate\",\n integration_pattern=sfn.IntegrationPattern.RUN_JOB,\n cluster=cluster,\n task_definition=task_definition,\n assign_public_ip=True,\n container_overrides=[tasks.ContainerOverride(\n container_definition=container_definition,\n environment=[tasks.TaskEnvironmentVariable(name=\"SOME_KEY\", value=sfn.JsonPath.string_at(\"$.SomeKey\"))]\n )],\n launch_target=tasks.EcsFargateLaunchTarget()\n)\n```\n\n## EMR\n\nStep Functions supports Amazon EMR through the service integration pattern.\nThe service integration APIs correspond to Amazon EMR APIs but differ in the\nparameters that are used.\n\n[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html) about the differences when using these service integrations.\n\n### Create Cluster\n\nCreates and starts running a cluster (job flow).\nCorresponds to the [`runJobFlow`](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html) API in EMR.\n\n```python\ncluster_role = iam.Role(self, \"ClusterRole\",\n assumed_by=iam.ServicePrincipal(\"ec2.amazonaws.com\")\n)\n\nservice_role = iam.Role(self, \"ServiceRole\",\n assumed_by=iam.ServicePrincipal(\"elasticmapreduce.amazonaws.com\")\n)\n\nauto_scaling_role = iam.Role(self, \"AutoScalingRole\",\n assumed_by=iam.ServicePrincipal(\"elasticmapreduce.amazonaws.com\")\n)\n\nauto_scaling_role.assume_role_policy.add_statements(\n iam.PolicyStatement(\n effect=iam.Effect.ALLOW,\n principals=[\n iam.ServicePrincipal(\"application-autoscaling.amazonaws.com\")\n ],\n actions=[\"sts:AssumeRole\"\n ]\n ))\n\ntasks.EmrCreateCluster(self, \"Create Cluster\",\n instances=tasks.EmrCreateCluster.InstancesConfigProperty(),\n cluster_role=cluster_role,\n name=sfn.TaskInput.from_json_path_at(\"$.ClusterName\").value,\n service_role=service_role,\n auto_scaling_role=auto_scaling_role\n)\n```\n\nIf you want to run multiple steps in [parallel](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-concurrent-steps.html),\nyou can specify the `stepConcurrencyLevel` property. The concurrency range is between 1\nand 256 inclusive, where the default concurrency of 1 means no step concurrency is allowed.\n`stepConcurrencyLevel` requires the EMR release label to be 5.28.0 or above.\n\n```python\ntasks.EmrCreateCluster(self, \"Create Cluster\",\n instances=tasks.EmrCreateCluster.InstancesConfigProperty(),\n name=sfn.TaskInput.from_json_path_at(\"$.ClusterName\").value,\n step_concurrency_level=10\n)\n```\n\n### Termination Protection\n\nLocks a cluster (job flow) so the EC2 instances in the cluster cannot be\nterminated by user intervention, an API call, or a job-flow error.\n\nCorresponds to the [`setTerminationProtection`](https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html) API in EMR.\n\n```python\ntasks.EmrSetClusterTerminationProtection(self, \"Task\",\n cluster_id=\"ClusterId\",\n termination_protected=False\n)\n```\n\n### Terminate Cluster\n\nShuts down a cluster (job flow).\nCorresponds to the [`terminateJobFlows`](https://docs.aws.amazon.com/emr/latest/APIReference/API_TerminateJobFlows.html) API in EMR.\n\n```python\ntasks.EmrTerminateCluster(self, \"Task\",\n cluster_id=\"ClusterId\"\n)\n```\n\n### Add Step\n\nAdds a new step to a running cluster.\nCorresponds to the [`addJobFlowSteps`](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html) API in EMR.\n\n```python\ntasks.EmrAddStep(self, \"Task\",\n cluster_id=\"ClusterId\",\n name=\"StepName\",\n jar=\"Jar\",\n action_on_failure=tasks.ActionOnFailure.CONTINUE\n)\n```\n\n### Cancel Step\n\nCancels a pending step in a running cluster.\nCorresponds to the [`cancelSteps`](https://docs.aws.amazon.com/emr/latest/APIReference/API_CancelSteps.html) API in EMR.\n\n```python\ntasks.EmrCancelStep(self, \"Task\",\n cluster_id=\"ClusterId\",\n step_id=\"StepId\"\n)\n```\n\n### Modify Instance Fleet\n\nModifies the target On-Demand and target Spot capacities for the instance\nfleet with the specified InstanceFleetName.\n\nCorresponds to the [`modifyInstanceFleet`](https://docs.aws.amazon.com/emr/latest/APIReference/API_ModifyInstanceFleet.html) API in EMR.\n\n```python\ntasks.EmrModifyInstanceFleetByName(self, \"Task\",\n cluster_id=\"ClusterId\",\n instance_fleet_name=\"InstanceFleetName\",\n target_on_demand_capacity=2,\n target_spot_capacity=0\n)\n```\n\n### Modify Instance Group\n\nModifies the number of nodes and configuration settings of an instance group.\n\nCorresponds to the [`modifyInstanceGroups`](https://docs.aws.amazon.com/emr/latest/APIReference/API_ModifyInstanceGroups.html) API in EMR.\n\n```python\ntasks.EmrModifyInstanceGroupByName(self, \"Task\",\n cluster_id=\"ClusterId\",\n instance_group_name=sfn.JsonPath.string_at(\"$.InstanceGroupName\"),\n instance_group=tasks.EmrModifyInstanceGroupByName.InstanceGroupModifyConfigProperty(\n instance_count=1\n )\n)\n```\n\n## EMR on EKS\n\nStep Functions supports Amazon EMR on EKS through the service integration pattern.\nThe service integration APIs correspond to Amazon EMR on EKS APIs, but differ in the parameters that are used.\n\n[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr-eks.html) about the differences when using these service integrations.\n\n[Setting up](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up.html) the EKS cluster is required.\n\n### Create Virtual Cluster\n\nThe [CreateVirtualCluster](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_CreateVirtualCluster.html) API creates a single virtual cluster that's mapped to a single Kubernetes namespace.\n\nThe EKS cluster containing the Kubernetes namespace where the virtual cluster will be mapped can be passed in from the task input.\n\n```python\ntasks.EmrContainersCreateVirtualCluster(self, \"Create a Virtual Cluster\",\n eks_cluster=tasks.EksClusterInput.from_task_input(sfn.TaskInput.from_text(\"clusterId\"))\n)\n```\n\nThe EKS cluster can also be passed in directly.\n\n```python\nimport aws_cdk.aws_eks as eks\n\n# eks_cluster: eks.Cluster\n\n\ntasks.EmrContainersCreateVirtualCluster(self, \"Create a Virtual Cluster\",\n eks_cluster=tasks.EksClusterInput.from_cluster(eks_cluster)\n)\n```\n\nBy default, the Kubernetes namespace that a virtual cluster maps to is \"default\", but a specific namespace within an EKS cluster can be selected.\n\n```python\ntasks.EmrContainersCreateVirtualCluster(self, \"Create a Virtual Cluster\",\n eks_cluster=tasks.EksClusterInput.from_task_input(sfn.TaskInput.from_text(\"clusterId\")),\n eks_namespace=\"specified-namespace\"\n)\n```\n\n### Delete Virtual Cluster\n\nThe [DeleteVirtualCluster](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_DeleteVirtualCluster.html) API deletes a virtual cluster.\n\n```python\ntasks.EmrContainersDeleteVirtualCluster(self, \"Delete a Virtual Cluster\",\n virtual_cluster_id=sfn.TaskInput.from_json_path_at(\"$.virtualCluster\")\n)\n```\n\n### Start Job Run\n\nThe [StartJobRun](https://docs.aws.amazon.com/emr-on-eks/latest/APIReference/API_StartJobRun.html) API starts a job run. A job is a unit of work that you submit to Amazon EMR on EKS for execution. The work performed by the job can be defined by a Spark jar, PySpark script, or SparkSQL query. A job run is an execution of the job on the virtual cluster.\n\nRequired setup:\n\n* If not done already, follow the [steps](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up.html) to setup EMR on EKS and [create an EKS Cluster](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-eks-readme.html#quick-start).\n* Enable [Cluster access](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-cluster-access.html)\n* Enable [IAM Role access](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-enable-IAM.html)\n\nThe following actions must be performed if the virtual cluster ID is supplied from the task input. Otherwise, if it is supplied statically in the state machine definition, these actions will be done automatically.\n\n* Create an [IAM role](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-iam.Role.html)\n* Update the [Role Trust Policy](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-trust-policy.html) of the Job Execution Role.\n\nThe job can be configured with spark submit parameters:\n\n```python\ntasks.EmrContainersStartJobRun(self, \"EMR Containers Start Job Run\",\n virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id(\"de92jdei2910fwedz\"),\n release_label=tasks.ReleaseLabel.EMR_6_2_0,\n job_driver=tasks.JobDriver(\n spark_submit_job_driver=tasks.SparkSubmitJobDriver(\n entry_point=sfn.TaskInput.from_text(\"local:///usr/lib/spark/examples/src/main/python/pi.py\"),\n spark_submit_parameters=\"--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1\"\n )\n )\n)\n```\n\nConfiguring the job can also be done via application configuration:\n\n```python\ntasks.EmrContainersStartJobRun(self, \"EMR Containers Start Job Run\",\n virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id(\"de92jdei2910fwedz\"),\n release_label=tasks.ReleaseLabel.EMR_6_2_0,\n job_name=\"EMR-Containers-Job\",\n job_driver=tasks.JobDriver(\n spark_submit_job_driver=tasks.SparkSubmitJobDriver(\n entry_point=sfn.TaskInput.from_text(\"local:///usr/lib/spark/examples/src/main/python/pi.py\")\n )\n ),\n application_config=[tasks.ApplicationConfiguration(\n classification=tasks.Classification.SPARK_DEFAULTS,\n properties={\n \"spark.executor.instances\": \"1\",\n \"spark.executor.memory\": \"512M\"\n }\n )]\n)\n```\n\nJob monitoring can be enabled if `monitoring.logging` is set true. This automatically generates an S3 bucket and CloudWatch logs.\n\n```python\ntasks.EmrContainersStartJobRun(self, \"EMR Containers Start Job Run\",\n virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id(\"de92jdei2910fwedz\"),\n release_label=tasks.ReleaseLabel.EMR_6_2_0,\n job_driver=tasks.JobDriver(\n spark_submit_job_driver=tasks.SparkSubmitJobDriver(\n entry_point=sfn.TaskInput.from_text(\"local:///usr/lib/spark/examples/src/main/python/pi.py\"),\n spark_submit_parameters=\"--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1\"\n )\n ),\n monitoring=tasks.Monitoring(\n logging=True\n )\n)\n```\n\nOtherwise, providing monitoring for jobs with existing log groups and log buckets is also available.\n\n```python\nimport aws_cdk.aws_logs as logs\n\n\nlog_group = logs.LogGroup(self, \"Log Group\")\nlog_bucket = s3.Bucket(self, \"S3 Bucket\")\n\ntasks.EmrContainersStartJobRun(self, \"EMR Containers Start Job Run\",\n virtual_cluster=tasks.VirtualClusterInput.from_virtual_cluster_id(\"de92jdei2910fwedz\"),\n release_label=tasks.ReleaseLabel.EMR_6_2_0,\n job_driver=tasks.JobDriver(\n spark_submit_job_driver=tasks.SparkSubmitJobDriver(\n entry_point=sfn.TaskInput.from_text(\"local:///usr/lib/spark/examples/src/main/python/pi.py\"),\n spark_submit_parameters=\"--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1\"\n )\n ),\n monitoring=tasks.Monitoring(\n log_group=log_group,\n log_bucket=log_bucket\n )\n)\n```\n\nUsers can provide their own existing Job Execution Role.\n\n```python\ntasks.EmrContainersStartJobRun(self, \"EMR Containers Start Job Run\",\n virtual_cluster=tasks.VirtualClusterInput.from_task_input(sfn.TaskInput.from_json_path_at(\"$.VirtualClusterId\")),\n release_label=tasks.ReleaseLabel.EMR_6_2_0,\n job_name=\"EMR-Containers-Job\",\n execution_role=iam.Role.from_role_arn(self, \"Job-Execution-Role\", \"arn:aws:iam::xxxxxxxxxxxx:role/JobExecutionRole\"),\n job_driver=tasks.JobDriver(\n spark_submit_job_driver=tasks.SparkSubmitJobDriver(\n entry_point=sfn.TaskInput.from_text(\"local:///usr/lib/spark/examples/src/main/python/pi.py\"),\n spark_submit_parameters=\"--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1\"\n )\n )\n)\n```\n\n## EKS\n\nStep Functions supports Amazon EKS through the service integration pattern.\nThe service integration APIs correspond to Amazon EKS APIs.\n\n[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eks.html) about the differences when using these service integrations.\n\n### Call\n\nRead and write Kubernetes resource objects via a Kubernetes API endpoint.\nCorresponds to the [`call`](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eks.html) API in Step Functions Connector.\n\nThe following code snippet includes a Task state that uses eks:call to list the pods.\n\n```python\nimport aws_cdk.aws_eks as eks\n\n\nmy_eks_cluster = eks.Cluster(self, \"my sample cluster\",\n version=eks.KubernetesVersion.V1_18,\n cluster_name=\"myEksCluster\"\n)\n\ntasks.EksCall(self, \"Call a EKS Endpoint\",\n cluster=my_eks_cluster,\n http_method=tasks.HttpMethods.GET,\n http_path=\"/api/v1/namespaces/default/pods\"\n)\n```\n\n## EventBridge\n\nStep Functions supports Amazon EventBridge through the service integration pattern.\nThe service integration APIs correspond to Amazon EventBridge APIs.\n\n[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eventbridge.html) about the differences when using these service integrations.\n\n### Put Events\n\nSend events to an EventBridge bus.\nCorresponds to the [`put-events`](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eventbridge.html) API in Step Functions Connector.\n\nThe following code snippet includes a Task state that uses events:putevents to send an event to the default bus.\n\n```python\nimport aws_cdk.aws_events as events\n\n\nmy_event_bus = events.EventBus(self, \"EventBus\",\n event_bus_name=\"MyEventBus1\"\n)\n\ntasks.EventBridgePutEvents(self, \"Send an event to EventBridge\",\n entries=[tasks.EventBridgePutEventsEntry(\n detail=sfn.TaskInput.from_object({\n \"Message\": \"Hello from Step Functions!\"\n }),\n event_bus=my_event_bus,\n detail_type=\"MessageFromStepFunctions\",\n source=\"step.functions\"\n )]\n)\n```\n\n## Glue\n\nStep Functions supports [AWS Glue](https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html) through the service integration pattern.\n\nYou can call the [`StartJobRun`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-runs.html#aws-glue-api-jobs-runs-StartJobRun) API from a `Task` state.\n\n```python\ntasks.GlueStartJobRun(self, \"Task\",\n glue_job_name=\"my-glue-job\",\n arguments=sfn.TaskInput.from_object({\n \"key\": \"value\"\n }),\n timeout=Duration.minutes(30),\n notify_delay_after=Duration.minutes(5)\n)\n```\n\n## Glue DataBrew\n\nStep Functions supports [AWS Glue DataBrew](https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html) through the service integration pattern.\n\nYou can call the [`StartJobRun`](https://docs.aws.amazon.com/databrew/latest/dg/API_StartJobRun.html) API from a `Task` state.\n\n```python\ntasks.GlueDataBrewStartJobRun(self, \"Task\",\n name=\"databrew-job\"\n)\n```\n\n## Lambda\n\n[Invoke](https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html) a Lambda function.\n\nYou can specify the input to your Lambda function through the `payload` attribute.\nBy default, Step Functions invokes Lambda function with the state input (JSON path '$')\nas the input.\n\nThe following snippet invokes a Lambda Function with the state input as the payload\nby referencing the `$` path.\n\n```python\n# fn: lambda.Function\n\ntasks.LambdaInvoke(self, \"Invoke with state input\",\n lambda_function=fn\n)\n```\n\nWhen a function is invoked, the Lambda service sends [these response\nelements](https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html#API_Invoke_ResponseElements)\nback.\n\n\u26a0\ufe0f The response from the Lambda function is in an attribute called `Payload`\n\nThe following snippet invokes a Lambda Function by referencing the `$.Payload` path\nto reference the output of a Lambda executed before it.\n\n```python\n# fn: lambda.Function\n\ntasks.LambdaInvoke(self, \"Invoke with empty object as payload\",\n lambda_function=fn,\n payload=sfn.TaskInput.from_object({})\n)\n\n# use the output of fn as input\ntasks.LambdaInvoke(self, \"Invoke with payload field in the state input\",\n lambda_function=fn,\n payload=sfn.TaskInput.from_json_path_at(\"$.Payload\")\n)\n```\n\nThe following snippet invokes a Lambda and sets the task output to only include\nthe Lambda function response.\n\n```python\n# fn: lambda.Function\n\ntasks.LambdaInvoke(self, \"Invoke and set function response as task output\",\n lambda_function=fn,\n output_path=\"$.Payload\"\n)\n```\n\nIf you want to combine the input and the Lambda function response you can use\nthe `payloadResponseOnly` property and specify the `resultPath`. This will put the\nLambda function ARN directly in the \"Resource\" string, but it conflicts with the\nintegrationPattern, invocationType, clientContext, and qualifier properties.\n\n```python\n# fn: lambda.Function\n\ntasks.LambdaInvoke(self, \"Invoke and combine function response with task input\",\n lambda_function=fn,\n payload_response_only=True,\n result_path=\"$.fn\"\n)\n```\n\nYou can have Step Functions pause a task, and wait for an external process to\nreturn a task token. Read more about the [callback pattern](https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html#call-back-lambda-example)\n\nTo use the callback pattern, set the `token` property on the task. Call the Step\nFunctions `SendTaskSuccess` or `SendTaskFailure` APIs with the token to\nindicate that the task has completed and the state machine should resume execution.\n\nThe following snippet invokes a Lambda with the task token as part of the input\nto the Lambda.\n\n```python\n# fn: lambda.Function\n\ntasks.LambdaInvoke(self, \"Invoke with callback\",\n lambda_function=fn,\n integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,\n payload=sfn.TaskInput.from_object({\n \"token\": sfn.JsonPath.task_token,\n \"input\": sfn.JsonPath.string_at(\"$.someField\")\n })\n)\n```\n\n\u26a0\ufe0f The task will pause until it receives that task token back with a `SendTaskSuccess` or `SendTaskFailure`\ncall. Learn more about [Callback with the Task\nToken](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token).\n\nAWS Lambda can occasionally experience transient service errors. In this case, invoking Lambda\nresults in a 500 error, such as `ServiceException`, `AWSLambdaException`, or `SdkClientException`.\nAs a best practice, the `LambdaInvoke` task will retry on those errors with an interval of 2 seconds,\na back-off rate of 2 and 6 maximum attempts. Set the `retryOnServiceExceptions` prop to `false` to\ndisable this behavior.\n\n## SageMaker\n\nStep Functions supports [AWS SageMaker](https://docs.aws.amazon.com/step-functions/latest/dg/connect-sagemaker.html) through the service integration pattern.\n\nIf your training job or model uses resources from AWS Marketplace,\n[network isolation is required](https://docs.aws.amazon.com/sagemaker/latest/dg/mkt-algo-model-internet-free.html).\nTo do so, set the `enableNetworkIsolation` property to `true` for `SageMakerCreateModel` or `SageMakerCreateTrainingJob`.\n\nTo set environment variables for the Docker container use the `environment` property.\n\n### Create Training Job\n\nYou can call the [`CreateTrainingJob`](https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTrainingJob.html) API from a `Task` state.\n\n```python\ntasks.SageMakerCreateTrainingJob(self, \"TrainSagemaker\",\n training_job_name=sfn.JsonPath.string_at(\"$.JobName\"),\n algorithm_specification=tasks.AlgorithmSpecification(\n algorithm_name=\"BlazingText\",\n training_input_mode=tasks.InputMode.FILE\n ),\n input_data_config=[tasks.Channel(\n channel_name=\"train\",\n data_source=tasks.DataSource(\n s3_data_source=tasks.S3DataSource(\n s3_data_type=tasks.S3DataType.S3_PREFIX,\n s3_location=tasks.S3Location.from_json_expression(\"$.S3Bucket\")\n )\n )\n )],\n output_data_config=tasks.OutputDataConfig(\n s3_output_location=tasks.S3Location.from_bucket(s3.Bucket.from_bucket_name(self, \"Bucket\", \"mybucket\"), \"myoutputpath\")\n ),\n resource_config=tasks.ResourceConfig(\n instance_count=1,\n instance_type=ec2.InstanceType(sfn.JsonPath.string_at(\"$.InstanceType\")),\n volume_size=Size.gibibytes(50)\n ), # optional: default is 1 instance of EC2 `M4.XLarge` with `10GB` volume\n stopping_condition=tasks.StoppingCondition(\n max_runtime=Duration.hours(2)\n )\n)\n```\n\n### Create Transform Job\n\nYou can call the [`CreateTransformJob`](https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTransformJob.html) API from a `Task` state.\n\n```python\ntasks.SageMakerCreateTransformJob(self, \"Batch Inference\",\n transform_job_name=\"MyTransformJob\",\n model_name=\"MyModelName\",\n model_client_options=tasks.ModelClientOptions(\n invocations_max_retries=3, # default is 0\n invocations_timeout=Duration.minutes(5)\n ),\n transform_input=tasks.TransformInput(\n transform_data_source=tasks.TransformDataSource(\n s3_data_source=tasks.TransformS3DataSource(\n s3_uri=\"s3://inputbucket/train\",\n s3_data_type=tasks.S3DataType.S3_PREFIX\n )\n )\n ),\n transform_output=tasks.TransformOutput(\n s3_output_path=\"s3://outputbucket/TransformJobOutputPath\"\n ),\n transform_resources=tasks.TransformResources(\n instance_count=1,\n instance_type=ec2.InstanceType.of(ec2.InstanceClass.M4, ec2.InstanceSize.XLARGE)\n )\n)\n```\n\n### Create Endpoint\n\nYou can call the [`CreateEndpoint`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateEndpoint.html) API from a `Task` state.\n\n```python\ntasks.SageMakerCreateEndpoint(self, \"SagemakerEndpoint\",\n endpoint_name=sfn.JsonPath.string_at(\"$.EndpointName\"),\n endpoint_config_name=sfn.JsonPath.string_at(\"$.EndpointConfigName\")\n)\n```\n\n### Create Endpoint Config\n\nYou can call the [`CreateEndpointConfig`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateEndpointConfig.html) API from a `Task` state.\n\n```python\ntasks.SageMakerCreateEndpointConfig(self, \"SagemakerEndpointConfig\",\n endpoint_config_name=\"MyEndpointConfig\",\n production_variants=[tasks.ProductionVariant(\n initial_instance_count=2,\n instance_type=ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.XLARGE),\n model_name=\"MyModel\",\n variant_name=\"awesome-variant\"\n )]\n)\n```\n\n### Create Model\n\nYou can call the [`CreateModel`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateModel.html) API from a `Task` state.\n\n```python\ntasks.SageMakerCreateModel(self, \"Sagemaker\",\n model_name=\"MyModel\",\n primary_container=tasks.ContainerDefinition(\n image=tasks.DockerImage.from_json_expression(sfn.JsonPath.string_at(\"$.Model.imageName\")),\n mode=tasks.Mode.SINGLE_MODEL,\n model_s3_location=tasks.S3Location.from_json_expression(\"$.TrainingJob.ModelArtifacts.S3ModelArtifacts\")\n )\n)\n```\n\n### Update Endpoint\n\nYou can call the [`UpdateEndpoint`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_UpdateEndpoint.html) API from a `Task` state.\n\n```python\ntasks.SageMakerUpdateEndpoint(self, \"SagemakerEndpoint\",\n endpoint_name=sfn.JsonPath.string_at(\"$.Endpoint.Name\"),\n endpoint_config_name=sfn.JsonPath.string_at(\"$.Endpoint.EndpointConfig\")\n)\n```\n\n## SNS\n\nStep Functions supports [Amazon SNS](https://docs.aws.amazon.com/step-functions/latest/dg/connect-sns.html) through the service integration pattern.\n\nYou can call the [`Publish`](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html) API from a `Task` state to publish to an SNS topic.\n\n```python\ntopic = sns.Topic(self, \"Topic\")\n\n# Use a field from the execution data as message.\ntask1 = tasks.SnsPublish(self, \"Publish1\",\n topic=topic,\n integration_pattern=sfn.IntegrationPattern.REQUEST_RESPONSE,\n message=sfn.TaskInput.from_data_at(\"$.state.message\"),\n message_attributes={\n \"place\": tasks.MessageAttribute(\n value=sfn.JsonPath.string_at(\"$.place\")\n ),\n \"pic\": tasks.MessageAttribute(\n # BINARY must be explicitly set\n data_type=tasks.MessageAttributeDataType.BINARY,\n value=sfn.JsonPath.string_at(\"$.pic\")\n ),\n \"people\": tasks.MessageAttribute(\n value=4\n ),\n \"handles\": tasks.MessageAttribute(\n value=[\"@kslater\", \"@jjf\", null, \"@mfanning\"]\n )\n }\n)\n\n# Combine a field from the execution data with\n# a literal object.\ntask2 = tasks.SnsPublish(self, \"Publish2\",\n topic=topic,\n message=sfn.TaskInput.from_object({\n \"field1\": \"somedata\",\n \"field2\": sfn.JsonPath.string_at(\"$.field2\")\n })\n)\n```\n\n## Step Functions\n\n### Start Execution\n\nYou can manage [AWS Step Functions](https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html) executions.\n\nAWS Step Functions supports it's own [`StartExecution`](https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html) API as a service integration.\n\n```python\n# Define a state machine with one Pass state\nchild = sfn.StateMachine(self, \"ChildStateMachine\",\n definition=sfn.Chain.start(sfn.Pass(self, \"PassState\"))\n)\n\n# Include the state machine in a Task state with callback pattern\ntask = tasks.StepFunctionsStartExecution(self, \"ChildTask\",\n state_machine=child,\n integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,\n input=sfn.TaskInput.from_object({\n \"token\": sfn.JsonPath.task_token,\n \"foo\": \"bar\"\n }),\n name=\"MyExecutionName\"\n)\n\n# Define a second state machine with the Task state above\nsfn.StateMachine(self, \"ParentStateMachine\",\n definition=task\n)\n```\n\nYou can utilize [Associate Workflow Executions](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-nested-workflows.html#nested-execution-startid)\nvia the `associateWithParent` property. This allows the Step Functions UI to link child\nexecutions from parent executions, making it easier to trace execution flow across state machines.\n\n```python\n# child: sfn.StateMachine\n\ntask = tasks.StepFunctionsStartExecution(self, \"ChildTask\",\n state_machine=child,\n associate_with_parent=True\n)\n```\n\nThis will add the payload `AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$: $$.Execution.Id` to the\n`input`property for you, which will pass the execution ID from the context object to the\nexecution input. It requires `input` to be an object or not be set at all.\n\n### Invoke Activity\n\nYou can invoke a [Step Functions Activity](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html) which enables you to have\na task in your state machine where the work is performed by a *worker* that can\nbe hosted on Amazon EC2, Amazon ECS, AWS Lambda, basically anywhere. Activities\nare a way to associate code running somewhere (known as an activity worker) with\na specific task in a state machine.\n\nWhen Step Functions reaches an activity task state, the workflow waits for an\nactivity worker to poll for a task. An activity worker polls Step Functions by\nusing GetActivityTask, and sending the ARN for the related activity.\n\nAfter the activity worker completes its work, it can provide a report of its\nsuccess or failure by using `SendTaskSuccess` or `SendTaskFailure`. These two\ncalls use the taskToken provided by GetActivityTask to associate the result\nwith that task.\n\nThe following example creates an activity and creates a task that invokes the activity.\n\n```python\nsubmit_job_activity = sfn.Activity(self, \"SubmitJob\")\n\ntasks.StepFunctionsInvokeActivity(self, \"Submit Job\",\n activity=submit_job_activity\n)\n```\n\n## SQS\n\nStep Functions supports [Amazon SQS](https://docs.aws.amazon.com/step-functions/latest/dg/connect-sqs.html)\n\nYou can call the [`SendMessage`](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html) API from a `Task` state\nto send a message to an SQS queue.\n\n```python\nqueue = sqs.Queue(self, \"Queue\")\n\n# Use a field from the execution data as message.\ntask1 = tasks.SqsSendMessage(self, \"Send1\",\n queue=queue,\n message_body=sfn.TaskInput.from_json_path_at(\"$.message\")\n)\n\n# Combine a field from the execution data with\n# a literal object.\ntask2 = tasks.SqsSendMessage(self, \"Send2\",\n queue=queue,\n message_body=sfn.TaskInput.from_object({\n \"field1\": \"somedata\",\n \"field2\": sfn.JsonPath.string_at(\"$.field2\")\n })\n)\n```\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Task integrations for AWS StepFunctions",
"version": "1.204.0",
"project_urls": {
"Homepage": "https://github.com/aws/aws-cdk",
"Source": "https://github.com/aws/aws-cdk.git"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "2580259df8f7c61d7af8a8d50b00ad1b463c06b7d4e42298688181de506cb4f7",
"md5": "0321c97abf493dc8a182f74c4f24fe13",
"sha256": "cadd01ab7b419a08f9b81ab90c32b8848f31deb26e411998ae93334bc46bff78"
},
"downloads": -1,
"filename": "aws_cdk.aws_stepfunctions_tasks-1.204.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "0321c97abf493dc8a182f74c4f24fe13",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "~=3.7",
"size": 722142,
"upload_time": "2023-06-19T21:01:43",
"upload_time_iso_8601": "2023-06-19T21:01:43.158383Z",
"url": "https://files.pythonhosted.org/packages/25/80/259df8f7c61d7af8a8d50b00ad1b463c06b7d4e42298688181de506cb4f7/aws_cdk.aws_stepfunctions_tasks-1.204.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "7be986e60931abcb9f2b30248117bcddb6277ed2beffa7377d6e27507066cc0c",
"md5": "efefb3f7493bb5f7b992e3b6a92141c3",
"sha256": "1afd611c00b8bec2c5d3a047fcadf085664c404dc323a2b94b2c2e22dc7dd34b"
},
"downloads": -1,
"filename": "aws-cdk.aws-stepfunctions-tasks-1.204.0.tar.gz",
"has_sig": false,
"md5_digest": "efefb3f7493bb5f7b992e3b6a92141c3",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "~=3.7",
"size": 746067,
"upload_time": "2023-06-19T21:07:49",
"upload_time_iso_8601": "2023-06-19T21:07:49.991423Z",
"url": "https://files.pythonhosted.org/packages/7b/e9/86e60931abcb9f2b30248117bcddb6277ed2beffa7377d6e27507066cc0c/aws-cdk.aws-stepfunctions-tasks-1.204.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-06-19 21:07:49",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "aws",
"github_project": "aws-cdk",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "aws-cdk.aws-stepfunctions-tasks"
}