cumulus-message-adapter-python


Namecumulus-message-adapter-python JSON
Version 2.3.0 PyPI version JSON
download
home_pagehttps://github.com/cumulus-nasa/cumulus-message-adapter-python
SummaryA handler library for cumulus tasks written in python
upload_time2024-09-18 19:22:48
maintainerNone
docs_urlNone
authorCumulus Authors
requires_pythonNone
licenseNone
keywords nasa cumulus
VCS
bugtrack_url
requirements cumulus-message-adapter
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # cumulus-message-adapter-python

[![CircleCI]](https://circleci.com/gh/nasa/cumulus-message-adapter-python)
[![PyPI version]](https://badge.fury.io/py/cumulus-message-adapter-python)

## What is Cumulus?

Cumulus is a cloud-based data ingest, archive, distribution and management
prototype for NASA's future Earth science data streams.

Read the [Cumulus Documentation]

## What is the Cumulus Message Adapter?

The Cumulus Message Adapter is a library that adapts incoming messages in the
Cumulus protocol to a format more easily consumable by Cumulus tasks, invokes
the tasks, and then adapts their response back to the Cumulus message protocol
to be sent to the next task.

## Installation

```plain
pip install cumulus-message-adapter-python
```

## Task definition

In order to use the Cumulus Message Adapter, you will need to create two
methods in your task module: a handler function and a business logic function.

The handler function is a standard Lambda handler function which takes two
parameters (as specified by AWS): `event` and `context`.

The business logic function is where the actual work of your task occurs. It
should take two parameters: `event` and `context`.

The `event` object contains two keys:

* `input` - the task's input, typically the `payload` of the message, produced
  at runtime
* `config` - the task's configuration, with any templated variables resolved

The `context` parameter is the standard Lambda context as passed by AWS.

The return value of the business logic function will be placed in the
`payload` of the resulting Cumulus message.

Expectations for input, config, and return values are all defined by the task,
and should be well documented. Tasks should thoughtfully consider their inputs
and return values, as breaking changes may have cascading effects on tasks
throughout a workflow. Configuration changes are slightly less impactful, but
must be communicated to those using the task.

## Cumulus Message Adapter interface

The Cumulus Message adapter for python provides one method:
`run_cumulus_task`. It takes four parameters:

* `task_function` - the function containing your business logic (as described
  above)
* `cumulus_message` - the event passed by Lambda, and should be a Cumulus
  Message, *or* a CMA parameter encapsulated message (see [Cumulus Workflow
  Documentation](https://nasa.github.io/cumulus/docs/workflows/input_output)):

  ```json
  {
     "cma": {
        "event": "<cumulus message object>",
        "SomeCMAConfigKey": "CMA configuration object>"
     }
  }
  ```

* `context` - the Lambda context
* `schemas` - optional: a dict with `input`, `config`, and `output` properties.
  Each should be a string set to the filepath of the corresponding JSON schema
  file. All three properties of this dict are optional. If ommitted, the message
  adapter will look in `/<task_root>/schemas/<schema_type>.json`, and if not
  found there, will be ignored.
* `taskargs` - Optional. Additional keyword arguments for the `task_function`

## Example

Simple example of using this package's `run_cumulus_task` function as a wrapper
around another function:

```python
>>> from run_cumulus_task import run_cumulus_task

# simple task that returns the event
>>> def task(event, context):
...     return event

# handler that is provided to aws lambda
>>> def handler(event, context):
...     return run_cumulus_task(task, event, context)

```

For a full example see the [example folder](./example).

## Creating a deployment package

Tasks that use this library are just standard AWS Lambda tasks. See
[creating release packages].

## Usage in a Cumulus Deployment

For documenation on how to utilize this package in a Cumulus Deployment, view
the [Cumulus Workflow Documenation].

## Development

### Dependency Installation

```plain
$ pip install -r requirements-dev.txt
$ pip install -r requirements.txt
```

### Logging with `CumulusLogger`

Included in this package is the `cumulus_logger` which contains a logging class
`CumulusLogger` that standardizes the log format for Cumulus. Methods are
provided to log error, fatal, warning, debug, info, and trace.

**Import the `CumulusLogger` class:**

```python
>>> from cumulus_logger import CumulusLogger

```

**Instantiate the logger inside the task definition (name and level are
optional):**

```python
>>> import logging
>>> logger = CumulusLogger("event_name", logging.ERROR)

```

**Use the logging methods for different levels:**

```python
>>> logger.trace('<your message>')

>>> logger.debug('<your message>')

>>> logger.info('<your message>')

>>> logger.warn('<your message>')

>>> logger.error('<your message>')

>>> logger.fatal('<your message>')

```

**It can also take additional non-keyword and keyword arguments as in Python
Logger.**

The `msg` is the message format string, the `args` and `kwargs` are the
arguments for string formatting.

If `exc_info` in `kwargs` is not `False`, the exception information in the
`exc_info` or `sys.exc_info()` is added to the message.

```python
>>> logger.debug(msg, *args, **kwargs)

```

**Example usage:**

```python
>>> import os
>>> import sys

>>> from run_cumulus_task import run_cumulus_task
>>> from cumulus_logger import CumulusLogger

# instantiate CumulusLogger
>>> logger = CumulusLogger()

>>> def task(event, context):
...     logger.info('task executed')
... 
...     # log error when an exception is caught
...     logger.error("task formatted message {} exc_info ", "bar", exc_info=True)
... 
...     # return the output of the task
...     return { "example": "output" }

>>> def handler(event, context):
...     # make sure event & context metadata is set in the logger
...     logger.setMetadata(event, context)
...     return run_cumulus_task(task, event, context)

```

### Running Tests

Running tests requires [localstack](https://github.com/localstack/localstack).

Tests only require localstack running S3, which can be initiated with the
following command:

```plain
$ SERVICES=s3 localstack start
```

And then you can check tests pass with the following nosetests command:

```plain
$ CUMULUS_ENV=testing nose2
```

### Linting

```plain
$ pylint run_cumulus_task.py
```

## Why?

This approach has a few major advantages:

1. It explicitly prevents tasks from making assumptions about data structures
   like `meta` and `cumulus_meta` that are owned internally and may therefore
   be broken in future updates. To gain access to fields in these structures,
   tasks must be passed the data explicitly in the workflow configuration.
1. It provides clearer ownership of the various data structures. Operators own
   `meta`. Cumulus owns `cumulus_meta`. Tasks define their own `config`,
   `input`, and `output` formats.
1. The Cumulus Message Adapter greatly simplifies running Lambda functions not
   explicitly created for Cumulus.
1. The approach greatly simplifies testing for tasks, as tasks don't need to
   set up cumbersome structures to emulate the message protocol and can just
   test their business function.

## License

[Apache 2.0](LICENSE)

[circleci]:
  https://circleci.com/gh/nasa/cumulus-message-adapter-python.svg?style=svg
[pypi version]:
  https://badge.fury.io/py/cumulus-message-adapter-python.svg
[Cumulus Documentation]:
  https://nasa.github.io/cumulus/
[creating release packages]:
  https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-package.html
[cumulus workflow documenation]:
  https://nasa.github.io/cumulus/docs/workflows/input_output

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/cumulus-nasa/cumulus-message-adapter-python",
    "name": "cumulus-message-adapter-python",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "nasa cumulus",
    "author": "Cumulus Authors",
    "author_email": "info@developmentseed.org",
    "download_url": "https://files.pythonhosted.org/packages/08/56/0f6be3b511b630e25cbd2214f15dab17e46439ee52118a4577a9fbce4b5a/cumulus_message_adapter_python-2.3.0.tar.gz",
    "platform": null,
    "description": "# cumulus-message-adapter-python\n\n[![CircleCI]](https://circleci.com/gh/nasa/cumulus-message-adapter-python)\n[![PyPI version]](https://badge.fury.io/py/cumulus-message-adapter-python)\n\n## What is Cumulus?\n\nCumulus is a cloud-based data ingest, archive, distribution and management\nprototype for NASA's future Earth science data streams.\n\nRead the [Cumulus Documentation]\n\n## What is the Cumulus Message Adapter?\n\nThe Cumulus Message Adapter is a library that adapts incoming messages in the\nCumulus protocol to a format more easily consumable by Cumulus tasks, invokes\nthe tasks, and then adapts their response back to the Cumulus message protocol\nto be sent to the next task.\n\n## Installation\n\n```plain\npip install cumulus-message-adapter-python\n```\n\n## Task definition\n\nIn order to use the Cumulus Message Adapter, you will need to create two\nmethods in your task module: a handler function and a business logic function.\n\nThe handler function is a standard Lambda handler function which takes two\nparameters (as specified by AWS): `event` and `context`.\n\nThe business logic function is where the actual work of your task occurs. It\nshould take two parameters: `event` and `context`.\n\nThe `event` object contains two keys:\n\n* `input` - the task's input, typically the `payload` of the message, produced\n  at runtime\n* `config` - the task's configuration, with any templated variables resolved\n\nThe `context` parameter is the standard Lambda context as passed by AWS.\n\nThe return value of the business logic function will be placed in the\n`payload` of the resulting Cumulus message.\n\nExpectations for input, config, and return values are all defined by the task,\nand should be well documented. Tasks should thoughtfully consider their inputs\nand return values, as breaking changes may have cascading effects on tasks\nthroughout a workflow. Configuration changes are slightly less impactful, but\nmust be communicated to those using the task.\n\n## Cumulus Message Adapter interface\n\nThe Cumulus Message adapter for python provides one method:\n`run_cumulus_task`. It takes four parameters:\n\n* `task_function` - the function containing your business logic (as described\n  above)\n* `cumulus_message` - the event passed by Lambda, and should be a Cumulus\n  Message, *or* a CMA parameter encapsulated message (see [Cumulus Workflow\n  Documentation](https://nasa.github.io/cumulus/docs/workflows/input_output)):\n\n  ```json\n  {\n     \"cma\": {\n        \"event\": \"<cumulus message object>\",\n        \"SomeCMAConfigKey\": \"CMA configuration object>\"\n     }\n  }\n  ```\n\n* `context` - the Lambda context\n* `schemas` - optional: a dict with `input`, `config`, and `output` properties.\n  Each should be a string set to the filepath of the corresponding JSON schema\n  file. All three properties of this dict are optional. If ommitted, the message\n  adapter will look in `/<task_root>/schemas/<schema_type>.json`, and if not\n  found there, will be ignored.\n* `taskargs` - Optional. Additional keyword arguments for the `task_function`\n\n## Example\n\nSimple example of using this package's `run_cumulus_task` function as a wrapper\naround another function:\n\n```python\n>>> from run_cumulus_task import run_cumulus_task\n\n# simple task that returns the event\n>>> def task(event, context):\n...     return event\n\n# handler that is provided to aws lambda\n>>> def handler(event, context):\n...     return run_cumulus_task(task, event, context)\n\n```\n\nFor a full example see the [example folder](./example).\n\n## Creating a deployment package\n\nTasks that use this library are just standard AWS Lambda tasks. See\n[creating release packages].\n\n## Usage in a Cumulus Deployment\n\nFor documenation on how to utilize this package in a Cumulus Deployment, view\nthe [Cumulus Workflow Documenation].\n\n## Development\n\n### Dependency Installation\n\n```plain\n$ pip install -r requirements-dev.txt\n$ pip install -r requirements.txt\n```\n\n### Logging with `CumulusLogger`\n\nIncluded in this package is the `cumulus_logger` which contains a logging class\n`CumulusLogger` that standardizes the log format for Cumulus. Methods are\nprovided to log error, fatal, warning, debug, info, and trace.\n\n**Import the `CumulusLogger` class:**\n\n```python\n>>> from cumulus_logger import CumulusLogger\n\n```\n\n**Instantiate the logger inside the task definition (name and level are\noptional):**\n\n```python\n>>> import logging\n>>> logger = CumulusLogger(\"event_name\", logging.ERROR)\n\n```\n\n**Use the logging methods for different levels:**\n\n```python\n>>> logger.trace('<your message>')\n\n>>> logger.debug('<your message>')\n\n>>> logger.info('<your message>')\n\n>>> logger.warn('<your message>')\n\n>>> logger.error('<your message>')\n\n>>> logger.fatal('<your message>')\n\n```\n\n**It can also take additional non-keyword and keyword arguments as in Python\nLogger.**\n\nThe `msg` is the message format string, the `args` and `kwargs` are the\narguments for string formatting.\n\nIf `exc_info` in `kwargs` is not `False`, the exception information in the\n`exc_info` or `sys.exc_info()` is added to the message.\n\n```python\n>>> logger.debug(msg, *args, **kwargs)\n\n```\n\n**Example usage:**\n\n```python\n>>> import os\n>>> import sys\n\n>>> from run_cumulus_task import run_cumulus_task\n>>> from cumulus_logger import CumulusLogger\n\n# instantiate CumulusLogger\n>>> logger = CumulusLogger()\n\n>>> def task(event, context):\n...     logger.info('task executed')\n... \n...     # log error when an exception is caught\n...     logger.error(\"task formatted message {} exc_info \", \"bar\", exc_info=True)\n... \n...     # return the output of the task\n...     return { \"example\": \"output\" }\n\n>>> def handler(event, context):\n...     # make sure event & context metadata is set in the logger\n...     logger.setMetadata(event, context)\n...     return run_cumulus_task(task, event, context)\n\n```\n\n### Running Tests\n\nRunning tests requires [localstack](https://github.com/localstack/localstack).\n\nTests only require localstack running S3, which can be initiated with the\nfollowing command:\n\n```plain\n$ SERVICES=s3 localstack start\n```\n\nAnd then you can check tests pass with the following nosetests command:\n\n```plain\n$ CUMULUS_ENV=testing nose2\n```\n\n### Linting\n\n```plain\n$ pylint run_cumulus_task.py\n```\n\n## Why?\n\nThis approach has a few major advantages:\n\n1. It explicitly prevents tasks from making assumptions about data structures\n   like `meta` and `cumulus_meta` that are owned internally and may therefore\n   be broken in future updates. To gain access to fields in these structures,\n   tasks must be passed the data explicitly in the workflow configuration.\n1. It provides clearer ownership of the various data structures. Operators own\n   `meta`. Cumulus owns `cumulus_meta`. Tasks define their own `config`,\n   `input`, and `output` formats.\n1. The Cumulus Message Adapter greatly simplifies running Lambda functions not\n   explicitly created for Cumulus.\n1. The approach greatly simplifies testing for tasks, as tasks don't need to\n   set up cumbersome structures to emulate the message protocol and can just\n   test their business function.\n\n## License\n\n[Apache 2.0](LICENSE)\n\n[circleci]:\n  https://circleci.com/gh/nasa/cumulus-message-adapter-python.svg?style=svg\n[pypi version]:\n  https://badge.fury.io/py/cumulus-message-adapter-python.svg\n[Cumulus Documentation]:\n  https://nasa.github.io/cumulus/\n[creating release packages]:\n  https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-package.html\n[cumulus workflow documenation]:\n  https://nasa.github.io/cumulus/docs/workflows/input_output\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A handler library for cumulus tasks written in python",
    "version": "2.3.0",
    "project_urls": {
        "Homepage": "https://github.com/cumulus-nasa/cumulus-message-adapter-python"
    },
    "split_keywords": [
        "nasa",
        "cumulus"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "08560f6be3b511b630e25cbd2214f15dab17e46439ee52118a4577a9fbce4b5a",
                "md5": "c0db7428eaff56f1b5630c0df373c1c0",
                "sha256": "ff55768fae327db5aad1239b2186685f4085a2d6a6adb5ed29cda32f79b43abc"
            },
            "downloads": -1,
            "filename": "cumulus_message_adapter_python-2.3.0.tar.gz",
            "has_sig": false,
            "md5_digest": "c0db7428eaff56f1b5630c0df373c1c0",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 14953,
            "upload_time": "2024-09-18T19:22:48",
            "upload_time_iso_8601": "2024-09-18T19:22:48.683186Z",
            "url": "https://files.pythonhosted.org/packages/08/56/0f6be3b511b630e25cbd2214f15dab17e46439ee52118a4577a9fbce4b5a/cumulus_message_adapter_python-2.3.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-18 19:22:48",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "cumulus-nasa",
    "github_project": "cumulus-message-adapter-python",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "circle": true,
    "requirements": [
        {
            "name": "cumulus-message-adapter",
            "specs": [
                [
                    "~=",
                    "2.0.4"
                ]
            ]
        }
    ],
    "lcname": "cumulus-message-adapter-python"
}
        
Elapsed time: 3.65500s