echostream-node


Nameechostream-node JSON
Version 0.4.5 PyPI version JSON
download
home_pageNone
SummaryEchoStream library for implementing remote nodes
upload_time2024-10-29 00:09:01
maintainerNone
docs_urlNone
authorEchoStream
requires_python>=3.12
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # echostream-node

EchoStream library for implementing remote nodes that can be used in the echostream system.

This package supports creating External Nodes and Managed Node Types,
and supports the following EchoStream use cases:
- An External Node in an External App or Cross Account App that is a stand-alone application or part of another application, using either `threading` or `asyncio`.
- An External Node in a Cross Account App that is an AWS Lambda function. This use case only supports `threading`.
- A Managed Node Type, using either `threading` or `asyncio`

> NOTE: Version >=0.4.0 requires Python 3.12 support

## Installation

### Python

```bash
pip install echostream-node
```

### AWS Lambda

You may use the publiclally provided layer instead of directly installing `echostream-node` in your lambda package. This layer includes `echostream-node` and all of the Python dependencies *except* those built-in to the AWS Lambda environment for Python.

The Layer arn is:
```
arn:aws:lambda:{region}:226390263822:layer:echostream-node-{version}:1
```
where `{version}` is the version of `echostream-node` that you want, with `.` replaced with `_` and `{region}` is the AWS region that your Lambda will run in. Currently, `us-east-1`, `us-east-2`, `us-west-1` and `us-west-2` are supported.

For example, for `echostream-node==0.4.0` in the `us-east-1` region the layer arn would be:
```
arn:aws:lambda:us-east-1:226390263822:layer:echostream-node-0_4_0:1
```

## Usage

### Configuration
To instantiate a Node a number of variables are required. These can be provided either as environment variables or directly on Node creation:

| Parameter | Environment Variable | Description |
| --- | --- | --- |
| `appsync_endpoint` | `APPSYNC_ENDPOINT` | The URL to the EchoStream API endpoint. |
| `client_id` | `CLIENT_ID` | The Application Client ID for the App's Cognito Client Application. |
| `name` | `NODE` | The Node's name. |
| `password` | `PASSWORD` | The password for the App User for the Node's App. |
| `tenant` | `TENANT` | The name of the Tenant that the Node is a part of. |
| `username` | `USER_NAME` | The name of the App User for the Node's App. |
| `user_pool_id` | `USER_POOL_ID` | The User Pool Id for the App's Cognito User Pool. |

### Threading Application Node
```python
from signal import SIGHUP, SIGINT, SIGTERM, signal, strsignal

from echostream_node import Message
from echostream_node.threading import AppNode


class MyExternalNode(AppNode):

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
    def signal_handler(self, signum: int, _: object) -> None:
        print(f"{strsignal(signum)} received, shutting down")
        self.stop()

    def start(self) -> None:
        super().start()
        signal(SIGHUP, self.signal_handler)
        signal(SIGINT, self.signal_handler)
        signal(SIGTERM, self.signal_handler)

try:
    my_external_node = MyExternalNode()
    my_external_node.start()
    for i in range(100):
        message = my_external_node.create_message(str(i))
        my_external_node.send_message(message)
        my_external_node.audit_message(message)
    my_external_node.join()
except Exception:
    print("Error running node")
```

### Asyncio Application Node
```python
import asyncio

import aiorun
from echostream_node import Message
from echostream_node.asyncio import Node

class MyExternalNode(Node):

    async def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)


async def main(node: Node) -> None:
    try:
        await node.start()
        for i in range(100):
            message = my_external_node.create_message(str(i))
            my_external_node.send_message(message)
            my_external_node.audit_message(message)
        await node.join()
    except asyncio.CancelledError:
        pass
    except Exception:
        print("Error running node")


if __name__ == "__main__":
    aiorun.run(main(MyExternalNode()), stop_on_unhandled_errors=True, use_uvloop=True)
```

### Cross Account Lambda Node
```python
from echostream_node import Message
from echostream_node.threading import LambdaNode

class MyExternalNode(LambdaNode):
    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
MY_EXTERNAL_NODE = MyExternalNode()

def lambda_handler(event, context):
    MY_EXTERNAL_NODE.handle_event(event)
```

## Concurrent vs Sequential Message Processing
By default, all Nodes created using the package will process messages sequentially.
This is normally the behavior that you want, as many messaging protocols require
guaranteed ordering and therefore sequential processing within your Nodes. If this is
the behavior that you require, nothign special is needed to gain it from `echostream-node`.

However, there are use cases where message ordering is not important but processing speed is.
In these cases, you may configure your Node upon creation to concurrently process the messages
that it receives.

### Making a Threading Application Node Concurrent
If your Node inherits from the `echostream_node.threading.AppNode` class you can achieve concurrency
using threading.

This will create an AppNode that uses the provided `ThreadPoolExecutor` to concurrently
process received `Message`s. Note that while you can set the maximum number of workers to
less than 10, there is no gain to setting it to more than 10 since Nodes will only process
up to 10 messages at a time.

```python
from concurrent.futures import ThreadPoolExecutor

from echostream_node import Message
from echostream_node.threading import AppNode

class MyExternalNode(AppNode):

    def __init__(self) -> None:
        super().__init__(executor=ThreadPoolExecutor(max_workers=10))

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
```

### Making a Asyncio Application Node Concurrent
If your Node inherits from the `echostream_node.asyncio.Node` you can set the Node to
process incoming `Message`s concurrently. There is no setting for the maximum number of tasks;
a task is created per received `Message`.

```python
import asyncio

from echostream_node import Message
from echostream_node.asyncio import Node

class MyExternalNode(Node):

    def __init__(self) -> None:
        super().__init__(concurrent_processing=True)

    async def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
```

### Making a Lambda Node Concurrent
The AWS Lambda platform does not support shared memory, and therefore will only support concurrency
via threading. This will create a LambdaNode that uses an optimized (to your Lambda function's resources)
`ThreadPoolExecutor` to concurrently process received `Message`s.

```python
from echostream_node import Message
from echostream_node.threading import LambdaNode

class MyExternalNode(LambdaNode):

    def __init__(self) -> None:
        super().__init__(concurrent_processing=True)

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
```

## Lambda Nodes and Partial Success Reporting
When you connect an Edge's SQS Queue to the AWS Lambda function implementing your
Lambda Node, you can choose to Report Batch Item Failures. This allows your Lambda Node
to report partial success back to the SQS Queue, but it does require that your Lambda Node
operate differently.

If you wish to take advantage of this, set `report_batch_item_failures` when you create your
Lambda Node. This can be set even if your Node is *not* concurrent processing.

```python
from echostream_node import Message
from echostream_node.threading import LambdaNode

class MyExternalNode(LambdaNode):

    def __init__(self) -> None:
        super().__init__(report_batch_item_failures=True)

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
```

Full documentation may be found at https://docs.echostream-node.echo.stream.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "echostream-node",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.12",
    "maintainer_email": null,
    "keywords": null,
    "author": "EchoStream",
    "author_email": "pypi@echo.stream",
    "download_url": "https://files.pythonhosted.org/packages/79/a7/586f7e1ecc590e1ef1a50cdf8bf054f4b395fe465af133503b02f856836c/echostream_node-0.4.5.tar.gz",
    "platform": null,
    "description": "# echostream-node\n\nEchoStream library for implementing remote nodes that can be used in the echostream system.\n\nThis package supports creating External Nodes and Managed Node Types,\nand supports the following EchoStream use cases:\n- An External Node in an External App or Cross Account App that is a stand-alone application or part of another application, using either `threading` or `asyncio`.\n- An External Node in a Cross Account App that is an AWS Lambda function. This use case only supports `threading`.\n- A Managed Node Type, using either `threading` or `asyncio`\n\n> NOTE: Version >=0.4.0 requires Python 3.12 support\n\n## Installation\n\n### Python\n\n```bash\npip install echostream-node\n```\n\n### AWS Lambda\n\nYou may use the publiclally provided layer instead of directly installing `echostream-node` in your lambda package. This layer includes `echostream-node` and all of the Python dependencies *except* those built-in to the AWS Lambda environment for Python.\n\nThe Layer arn is:\n```\narn:aws:lambda:{region}:226390263822:layer:echostream-node-{version}:1\n```\nwhere `{version}` is the version of `echostream-node` that you want, with `.` replaced with `_` and `{region}` is the AWS region that your Lambda will run in. Currently, `us-east-1`, `us-east-2`, `us-west-1` and `us-west-2` are supported.\n\nFor example, for `echostream-node==0.4.0` in the `us-east-1` region the layer arn would be:\n```\narn:aws:lambda:us-east-1:226390263822:layer:echostream-node-0_4_0:1\n```\n\n## Usage\n\n### Configuration\nTo instantiate a Node a number of variables are required. These can be provided either as environment variables or directly on Node creation:\n\n| Parameter | Environment Variable | Description |\n| --- | --- | --- |\n| `appsync_endpoint` | `APPSYNC_ENDPOINT` | The URL to the EchoStream API endpoint. |\n| `client_id` | `CLIENT_ID` | The Application Client ID for the App's Cognito Client Application. |\n| `name` | `NODE` | The Node's name. |\n| `password` | `PASSWORD` | The password for the App User for the Node's App. |\n| `tenant` | `TENANT` | The name of the Tenant that the Node is a part of. |\n| `username` | `USER_NAME` | The name of the App User for the Node's App. |\n| `user_pool_id` | `USER_POOL_ID` | The User Pool Id for the App's Cognito User Pool. |\n\n### Threading Application Node\n```python\nfrom signal import SIGHUP, SIGINT, SIGTERM, signal, strsignal\n\nfrom echostream_node import Message\nfrom echostream_node.threading import AppNode\n\n\nclass MyExternalNode(AppNode):\n\n    def handle_received_message(self, *, message: Message, source: str) -> None:\n        print(f\"Got a message:\\n{message.body}\")\n        self.audit_message(message, source=source)\n        \n    def signal_handler(self, signum: int, _: object) -> None:\n        print(f\"{strsignal(signum)} received, shutting down\")\n        self.stop()\n\n    def start(self) -> None:\n        super().start()\n        signal(SIGHUP, self.signal_handler)\n        signal(SIGINT, self.signal_handler)\n        signal(SIGTERM, self.signal_handler)\n\ntry:\n    my_external_node = MyExternalNode()\n    my_external_node.start()\n    for i in range(100):\n        message = my_external_node.create_message(str(i))\n        my_external_node.send_message(message)\n        my_external_node.audit_message(message)\n    my_external_node.join()\nexcept Exception:\n    print(\"Error running node\")\n```\n\n### Asyncio Application Node\n```python\nimport asyncio\n\nimport aiorun\nfrom echostream_node import Message\nfrom echostream_node.asyncio import Node\n\nclass MyExternalNode(Node):\n\n    async def handle_received_message(self, *, message: Message, source: str) -> None:\n        print(f\"Got a message:\\n{message.body}\")\n        self.audit_message(message, source=source)\n\n\nasync def main(node: Node) -> None:\n    try:\n        await node.start()\n        for i in range(100):\n            message = my_external_node.create_message(str(i))\n            my_external_node.send_message(message)\n            my_external_node.audit_message(message)\n        await node.join()\n    except asyncio.CancelledError:\n        pass\n    except Exception:\n        print(\"Error running node\")\n\n\nif __name__ == \"__main__\":\n    aiorun.run(main(MyExternalNode()), stop_on_unhandled_errors=True, use_uvloop=True)\n```\n\n### Cross Account Lambda Node\n```python\nfrom echostream_node import Message\nfrom echostream_node.threading import LambdaNode\n\nclass MyExternalNode(LambdaNode):\n    def handle_received_message(self, *, message: Message, source: str) -> None:\n        print(f\"Got a message:\\n{message.body}\")\n        self.audit_message(message, source=source)\n        \nMY_EXTERNAL_NODE = MyExternalNode()\n\ndef lambda_handler(event, context):\n    MY_EXTERNAL_NODE.handle_event(event)\n```\n\n## Concurrent vs Sequential Message Processing\nBy default, all Nodes created using the package will process messages sequentially.\nThis is normally the behavior that you want, as many messaging protocols require\nguaranteed ordering and therefore sequential processing within your Nodes. If this is\nthe behavior that you require, nothign special is needed to gain it from `echostream-node`.\n\nHowever, there are use cases where message ordering is not important but processing speed is.\nIn these cases, you may configure your Node upon creation to concurrently process the messages\nthat it receives.\n\n### Making a Threading Application Node Concurrent\nIf your Node inherits from the `echostream_node.threading.AppNode` class you can achieve concurrency\nusing threading.\n\nThis will create an AppNode that uses the provided `ThreadPoolExecutor` to concurrently\nprocess received `Message`s. Note that while you can set the maximum number of workers to\nless than 10, there is no gain to setting it to more than 10 since Nodes will only process\nup to 10 messages at a time.\n\n```python\nfrom concurrent.futures import ThreadPoolExecutor\n\nfrom echostream_node import Message\nfrom echostream_node.threading import AppNode\n\nclass MyExternalNode(AppNode):\n\n    def __init__(self) -> None:\n        super().__init__(executor=ThreadPoolExecutor(max_workers=10))\n\n    def handle_received_message(self, *, message: Message, source: str) -> None:\n        print(f\"Got a message:\\n{message.body}\")\n        self.audit_message(message, source=source)\n```\n\n### Making a Asyncio Application Node Concurrent\nIf your Node inherits from the `echostream_node.asyncio.Node` you can set the Node to\nprocess incoming `Message`s concurrently. There is no setting for the maximum number of tasks;\na task is created per received `Message`.\n\n```python\nimport asyncio\n\nfrom echostream_node import Message\nfrom echostream_node.asyncio import Node\n\nclass MyExternalNode(Node):\n\n    def __init__(self) -> None:\n        super().__init__(concurrent_processing=True)\n\n    async def handle_received_message(self, *, message: Message, source: str) -> None:\n        print(f\"Got a message:\\n{message.body}\")\n        self.audit_message(message, source=source)\n```\n\n### Making a Lambda Node Concurrent\nThe AWS Lambda platform does not support shared memory, and therefore will only support concurrency\nvia threading. This will create a LambdaNode that uses an optimized (to your Lambda function's resources)\n`ThreadPoolExecutor` to concurrently process received `Message`s.\n\n```python\nfrom echostream_node import Message\nfrom echostream_node.threading import LambdaNode\n\nclass MyExternalNode(LambdaNode):\n\n    def __init__(self) -> None:\n        super().__init__(concurrent_processing=True)\n\n    def handle_received_message(self, *, message: Message, source: str) -> None:\n        print(f\"Got a message:\\n{message.body}\")\n        self.audit_message(message, source=source)\n```\n\n## Lambda Nodes and Partial Success Reporting\nWhen you connect an Edge's SQS Queue to the AWS Lambda function implementing your\nLambda Node, you can choose to Report Batch Item Failures. This allows your Lambda Node\nto report partial success back to the SQS Queue, but it does require that your Lambda Node\noperate differently.\n\nIf you wish to take advantage of this, set `report_batch_item_failures` when you create your\nLambda Node. This can be set even if your Node is *not* concurrent processing.\n\n```python\nfrom echostream_node import Message\nfrom echostream_node.threading import LambdaNode\n\nclass MyExternalNode(LambdaNode):\n\n    def __init__(self) -> None:\n        super().__init__(report_batch_item_failures=True)\n\n    def handle_received_message(self, *, message: Message, source: str) -> None:\n        print(f\"Got a message:\\n{message.body}\")\n        self.audit_message(message, source=source)\n```\n\nFull documentation may be found at https://docs.echostream-node.echo.stream.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "EchoStream library for implementing remote nodes",
    "version": "0.4.5",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6f0b13227f9e91720acb549309427603c2ab2e53e8cda679eeeb1ae8c84b776c",
                "md5": "5a23c48690af176991d39e086dcfff8b",
                "sha256": "fcca435878b0de5854c408bda51edfdfbbde117626b6c5e5274fb2f585c5b3ea"
            },
            "downloads": -1,
            "filename": "echostream_node-0.4.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "5a23c48690af176991d39e086dcfff8b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.12",
            "size": 25357,
            "upload_time": "2024-10-29T00:09:00",
            "upload_time_iso_8601": "2024-10-29T00:09:00.399200Z",
            "url": "https://files.pythonhosted.org/packages/6f/0b/13227f9e91720acb549309427603c2ab2e53e8cda679eeeb1ae8c84b776c/echostream_node-0.4.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "79a7586f7e1ecc590e1ef1a50cdf8bf054f4b395fe465af133503b02f856836c",
                "md5": "9c55f1746802ec4836e53f4454826b88",
                "sha256": "9d9ccc785af98414a632209db04a3f5534bf5400b711fecc853731dd22511489"
            },
            "downloads": -1,
            "filename": "echostream_node-0.4.5.tar.gz",
            "has_sig": false,
            "md5_digest": "9c55f1746802ec4836e53f4454826b88",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.12",
            "size": 25360,
            "upload_time": "2024-10-29T00:09:01",
            "upload_time_iso_8601": "2024-10-29T00:09:01.919717Z",
            "url": "https://files.pythonhosted.org/packages/79/a7/586f7e1ecc590e1ef1a50cdf8bf054f4b395fe465af133503b02f856836c/echostream_node-0.4.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-10-29 00:09:01",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "echostream-node"
}
        
Elapsed time: 0.42756s