step-in-line


Namestep-in-line JSON
Version 0.6.2 PyPI version JSON
download
home_pageNone
SummaryNone
upload_time2024-04-25 15:20:07
maintainerNone
docs_urlNone
authorDaniel Stahl
requires_python<4.0,>=3.9
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            ## Step-in-line

[AWS Step Functions](https://aws.amazon.com/step-functions/) is awesome.  It is a fully managed serverless AWS offering, so there is no upkeep or maintenance required.  Unfortunately, programatically created workflows of Lambda functions using Terraform requires creating complex JSON definitions.  `step-in-line` generates these JSON definitions automatically from Python decorators.  In addition, it generates the Terraform files needed for deploying Step Functions, Lambdas, and EventBridge events.    

The API is intentionally similar to the Sagemaker Pipeline API.  

## Usage

### Install

Full install (recommended):

`pip install 'step-in-line[terraform]'`

Don't include Terraform related dependencies:

`pip install step-in-line`.

### Example Pipeline
```python

from step_in_line.step import step
from step_in_line.pipeline import Pipeline
from step_in_line.tf import StepInLine, rename_tf_output
from pathlib import Path

lambda_iam_policy = "" # put an IAM policy here, formatted as JSON

@step(
    # function names must be unique, or you can pass a name to the step to 
    # ensure uniqueness
    name = "preprocess_unique",
    python_runtime = "python3.9", # defaults to 3.10
    memory_size = 128, # defaults to 512
    layers = ["arn:aws:lambda:us-east-2:123456789012:layer:example-layer"],
    # be default, Lambda will only get bare permissions; to 
    # interact with additional AWS Services, need to provide 
    # IAM policies here.  They will automatically get attached 
    # to the Lambda Role.  
    policies = [lambda_iam_policy] 
)
def preprocess(arg1: str) -> str:
    # do stuff here, eg run some sql code against snowflake.  
    # Make sure to "import snowflake" within this function.  
    # Will need a "layer" passed which contains the snowflake
    # dependencies.  Must run in <15 minutes.
    return "hello"

@step
def preprocess_2(arg1: str) -> str:
    # do stuff here, eg run some sql code against snowflake.  
    # Make sure to "import snowflake" within this function.  
    # Will need a "layer" passed which contains the snowflake
    # dependencies.  Must run in <15 minutes.
    return "hello"

@step
def preprocess_3(arg1: str) -> str:
    # do stuff here, eg run some sql code against snowflake.  
    # Make sure to "import snowflake" within this function.  
    # Will need a "layer" passed which contains the snowflake
    # dependencies. Must run in <15 minutes.
    return "hello"

@step
def train(arg1: str, arg2: str, arg3: str) -> str:
    # do stuff here, eg run some sql code against snowflake.  
    # Make sure to "import snowflake" within this function.  
    # Will need a "layer" passed which contains the snowflake
    # dependencies.  Must run in <15 minutes.
    return "goodbye"

step_process_result = preprocess("hi")
# typically, will pass small bits of metadata between jobs.
# the lambdas will also pass data to each other via json inputs.
step_process_result_2 = preprocess_2(step_process_result)
step_process_result_3 = preprocess_3(step_process_result)
step_train_result = train(
    step_process_result, step_process_result_2, step_process_result_3
)
# this creates a pipeline including all the dependent steps
# "schedule" is optional, and can be cron or rate based
pipe = Pipeline("mytest", steps=[step_train_result], schedule="rate(2 minutes)")

# to run locally
print(pipe.local_run()) # will print output of each step

# to extract the step function definition
print(pipe.generate_step_functions())

# to extract the step function definition as a string
import json
print(json.dumps(pipe.generate_step_functions()))

# generate terraform json including step function code and lambdas
# Optionally installed with `pip install step-in-line[terraform]`
from cdktf import App
app = App(hcl_output=True)
instance_name = "aws_instance"
stack = StepInLine(app, instance_name, pipe, "us-east-1")
# write the terraform json for use by `terraform apply`
tf_path = Path(app.outdir, "stacks", instance_name)
app.synth()
# Terraform Python SDK does not add ".json" extension; this function
# renames the generated Terraform file and copies it to the project root.
rename_tf_output(tf_path)

```

```bash
export AWS_ACCESS_KEY_ID=your_aws_access_key
export AWS_SECRET_ACCESS_KEY=your_aws_secret_access_key
terraform init
terraform apply
```

### Custom Lambda template

The default Lambda template is the [following](./step_in_line/template_lambda.py):

```python
import pickle

"{{PUT_FUNCTION_HERE}}"


def combine_payload(event):
    """Takes payload from event.  If previous "step" was a Parallel state,
            this will be an array of payloads from however many steps
            were in the Parallel state.  In this case, it combines these
            outputs into one large payload.
    Args:
        event (dict): object passed to the Lambda
    """
    payload = {}
    if isinstance(event, dict):
        if "Payload" in event:
            payload = event["Payload"]
    else:  # then event is a list, and contains "multiple" payloads
        for ev in event:
            if "Payload" in ev:
                payload = {**payload, **ev["Payload"]}
    return payload


# Retrieve transform job name from event and return transform job status.
def lambda_handler(event, context):
    
    with open("args.pickle", "rb") as f:
        args = pickle.load(f)
    
    with open("name.pickle", "rb") as f:
        name = pickle.load(f)

    arg_values = []
    payload = combine_payload(event)
    for arg in args:
        if arg in payload:
            # extract the output from a previous Lambda
            arg_values.append(payload[arg])
        else:
            # just use the hardcoded argument
            arg_values.append(arg)

    result = "{{PUT_FUNCTION_NAME_HERE}}"(*arg_values)
    ## all outputs from all lambdas are stored in the payload and
    ## passed on to the next lambda(s) in the step.  This mirrors
    ## the local_run from the `Pipeline` class.  On each subsequent
    ## "step" this payload will grow larger.  At the final step, this
    ## will include the output of all intermediary steps.
    return {name: result, **payload}

```

You can supply a custom template like so:

```python
stack = StepInLine(app, instance_name, pipe, "us-east-1", template_file="/path/to/your/custom/template.py")
```

The `"{{PUT_FUNCTION_HERE}}"` and `"{{PUT_FUNCTION_NAME_HERE}}"` will automatically be replaced by the code of your function defined inside the `@step` decorator and the name of the function, respectively.  

### Limitations

Only Lambda steps are supported.  For other types of steps, including Sagemaker jobs, [Sagemaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-step-decorator-create-pipeline.html) are likely a better option.

All limitations of Lambdas apply.  Each step can only run for 15 minutes before timing out.



## API Docs

https://danielhstahl.github.io/step-in-line/index.html
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "step-in-line",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": null,
    "author": "Daniel Stahl",
    "author_email": "danstahl1138@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/e5/a5/d1cf4ba7720096416ed561bf73cefa48cb9f3a959791a9c06b9e2254ee1c/step_in_line-0.6.2.tar.gz",
    "platform": null,
    "description": "## Step-in-line\n\n[AWS Step Functions](https://aws.amazon.com/step-functions/) is awesome.  It is a fully managed serverless AWS offering, so there is no upkeep or maintenance required.  Unfortunately, programatically created workflows of Lambda functions using Terraform requires creating complex JSON definitions.  `step-in-line` generates these JSON definitions automatically from Python decorators.  In addition, it generates the Terraform files needed for deploying Step Functions, Lambdas, and EventBridge events.    \n\nThe API is intentionally similar to the Sagemaker Pipeline API.  \n\n## Usage\n\n### Install\n\nFull install (recommended):\n\n`pip install 'step-in-line[terraform]'`\n\nDon't include Terraform related dependencies:\n\n`pip install step-in-line`.\n\n### Example Pipeline\n```python\n\nfrom step_in_line.step import step\nfrom step_in_line.pipeline import Pipeline\nfrom step_in_line.tf import StepInLine, rename_tf_output\nfrom pathlib import Path\n\nlambda_iam_policy = \"\" # put an IAM policy here, formatted as JSON\n\n@step(\n    # function names must be unique, or you can pass a name to the step to \n    # ensure uniqueness\n    name = \"preprocess_unique\",\n    python_runtime = \"python3.9\", # defaults to 3.10\n    memory_size = 128, # defaults to 512\n    layers = [\"arn:aws:lambda:us-east-2:123456789012:layer:example-layer\"],\n    # be default, Lambda will only get bare permissions; to \n    # interact with additional AWS Services, need to provide \n    # IAM policies here.  They will automatically get attached \n    # to the Lambda Role.  \n    policies = [lambda_iam_policy] \n)\ndef preprocess(arg1: str) -> str:\n    # do stuff here, eg run some sql code against snowflake.  \n    # Make sure to \"import snowflake\" within this function.  \n    # Will need a \"layer\" passed which contains the snowflake\n    # dependencies.  Must run in <15 minutes.\n    return \"hello\"\n\n@step\ndef preprocess_2(arg1: str) -> str:\n    # do stuff here, eg run some sql code against snowflake.  \n    # Make sure to \"import snowflake\" within this function.  \n    # Will need a \"layer\" passed which contains the snowflake\n    # dependencies.  Must run in <15 minutes.\n    return \"hello\"\n\n@step\ndef preprocess_3(arg1: str) -> str:\n    # do stuff here, eg run some sql code against snowflake.  \n    # Make sure to \"import snowflake\" within this function.  \n    # Will need a \"layer\" passed which contains the snowflake\n    # dependencies. Must run in <15 minutes.\n    return \"hello\"\n\n@step\ndef train(arg1: str, arg2: str, arg3: str) -> str:\n    # do stuff here, eg run some sql code against snowflake.  \n    # Make sure to \"import snowflake\" within this function.  \n    # Will need a \"layer\" passed which contains the snowflake\n    # dependencies.  Must run in <15 minutes.\n    return \"goodbye\"\n\nstep_process_result = preprocess(\"hi\")\n# typically, will pass small bits of metadata between jobs.\n# the lambdas will also pass data to each other via json inputs.\nstep_process_result_2 = preprocess_2(step_process_result)\nstep_process_result_3 = preprocess_3(step_process_result)\nstep_train_result = train(\n    step_process_result, step_process_result_2, step_process_result_3\n)\n# this creates a pipeline including all the dependent steps\n# \"schedule\" is optional, and can be cron or rate based\npipe = Pipeline(\"mytest\", steps=[step_train_result], schedule=\"rate(2 minutes)\")\n\n# to run locally\nprint(pipe.local_run()) # will print output of each step\n\n# to extract the step function definition\nprint(pipe.generate_step_functions())\n\n# to extract the step function definition as a string\nimport json\nprint(json.dumps(pipe.generate_step_functions()))\n\n# generate terraform json including step function code and lambdas\n# Optionally installed with `pip install step-in-line[terraform]`\nfrom cdktf import App\napp = App(hcl_output=True)\ninstance_name = \"aws_instance\"\nstack = StepInLine(app, instance_name, pipe, \"us-east-1\")\n# write the terraform json for use by `terraform apply`\ntf_path = Path(app.outdir, \"stacks\", instance_name)\napp.synth()\n# Terraform Python SDK does not add \".json\" extension; this function\n# renames the generated Terraform file and copies it to the project root.\nrename_tf_output(tf_path)\n\n```\n\n```bash\nexport AWS_ACCESS_KEY_ID=your_aws_access_key\nexport AWS_SECRET_ACCESS_KEY=your_aws_secret_access_key\nterraform init\nterraform apply\n```\n\n### Custom Lambda template\n\nThe default Lambda template is the [following](./step_in_line/template_lambda.py):\n\n```python\nimport pickle\n\n\"{{PUT_FUNCTION_HERE}}\"\n\n\ndef combine_payload(event):\n    \"\"\"Takes payload from event.  If previous \"step\" was a Parallel state,\n            this will be an array of payloads from however many steps\n            were in the Parallel state.  In this case, it combines these\n            outputs into one large payload.\n    Args:\n        event (dict): object passed to the Lambda\n    \"\"\"\n    payload = {}\n    if isinstance(event, dict):\n        if \"Payload\" in event:\n            payload = event[\"Payload\"]\n    else:  # then event is a list, and contains \"multiple\" payloads\n        for ev in event:\n            if \"Payload\" in ev:\n                payload = {**payload, **ev[\"Payload\"]}\n    return payload\n\n\n# Retrieve transform job name from event and return transform job status.\ndef lambda_handler(event, context):\n    \n    with open(\"args.pickle\", \"rb\") as f:\n        args = pickle.load(f)\n    \n    with open(\"name.pickle\", \"rb\") as f:\n        name = pickle.load(f)\n\n    arg_values = []\n    payload = combine_payload(event)\n    for arg in args:\n        if arg in payload:\n            # extract the output from a previous Lambda\n            arg_values.append(payload[arg])\n        else:\n            # just use the hardcoded argument\n            arg_values.append(arg)\n\n    result = \"{{PUT_FUNCTION_NAME_HERE}}\"(*arg_values)\n    ## all outputs from all lambdas are stored in the payload and\n    ## passed on to the next lambda(s) in the step.  This mirrors\n    ## the local_run from the `Pipeline` class.  On each subsequent\n    ## \"step\" this payload will grow larger.  At the final step, this\n    ## will include the output of all intermediary steps.\n    return {name: result, **payload}\n\n```\n\nYou can supply a custom template like so:\n\n```python\nstack = StepInLine(app, instance_name, pipe, \"us-east-1\", template_file=\"/path/to/your/custom/template.py\")\n```\n\nThe `\"{{PUT_FUNCTION_HERE}}\"` and `\"{{PUT_FUNCTION_NAME_HERE}}\"` will automatically be replaced by the code of your function defined inside the `@step` decorator and the name of the function, respectively.  \n\n### Limitations\n\nOnly Lambda steps are supported.  For other types of steps, including Sagemaker jobs, [Sagemaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-step-decorator-create-pipeline.html) are likely a better option.\n\nAll limitations of Lambdas apply.  Each step can only run for 15 minutes before timing out.\n\n\n\n## API Docs\n\nhttps://danielhstahl.github.io/step-in-line/index.html",
    "bugtrack_url": null,
    "license": null,
    "summary": null,
    "version": "0.6.2",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "138abe052bc43be305b347ed3f49273c7ac81e919cc871dba42a1f314404f7b9",
                "md5": "65460d4ba7947a1db8a22c2f705a06be",
                "sha256": "0ebbf5851a036d12defb5f0e9a1cf1f6d1ac19f6d996869eb5fc9a092ea7bc12"
            },
            "downloads": -1,
            "filename": "step_in_line-0.6.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "65460d4ba7947a1db8a22c2f705a06be",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 23404,
            "upload_time": "2024-04-25T15:20:06",
            "upload_time_iso_8601": "2024-04-25T15:20:06.328721Z",
            "url": "https://files.pythonhosted.org/packages/13/8a/be052bc43be305b347ed3f49273c7ac81e919cc871dba42a1f314404f7b9/step_in_line-0.6.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e5a5d1cf4ba7720096416ed561bf73cefa48cb9f3a959791a9c06b9e2254ee1c",
                "md5": "0a7772bff6612e14d57e03dfaa747e7c",
                "sha256": "c4728359f695aa685df1bd0bec95313cefc4414b5ba983a724d3964e7c522ebc"
            },
            "downloads": -1,
            "filename": "step_in_line-0.6.2.tar.gz",
            "has_sig": false,
            "md5_digest": "0a7772bff6612e14d57e03dfaa747e7c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 21096,
            "upload_time": "2024-04-25T15:20:07",
            "upload_time_iso_8601": "2024-04-25T15:20:07.655969Z",
            "url": "https://files.pythonhosted.org/packages/e5/a5/d1cf4ba7720096416ed561bf73cefa48cb9f3a959791a9c06b9e2254ee1c/step_in_line-0.6.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-25 15:20:07",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "step-in-line"
}
        
Elapsed time: 0.34316s