[![Github (6)](https://github.com/memphisdev/memphis/assets/107035359/bc2feafc-946c-4569-ab8d-836bc0181890)](https://www.functions.memphis.dev/)
<p align="center">
<a href="https://memphis.dev/discord"><img src="https://img.shields.io/discord/963333392844328961?color=6557ff&label=discord" alt="Discord"></a>
<a href="https://github.com/memphisdev/memphis/issues?q=is%3Aissue+is%3Aclosed"><img src="https://img.shields.io/github/issues-closed/memphisdev/memphis?color=6557ff"></a>
<img src="https://img.shields.io/npm/dw/memphis-dev?color=ffc633&label=installations">
<a href="https://github.com/memphisdev/memphis/blob/master/CODE_OF_CONDUCT.md"><img src="https://img.shields.io/badge/Code%20of%20Conduct-v1.0-ff69b4.svg?color=ffc633" alt="Code Of Conduct"></a>
<img alt="GitHub release (latest by date)" src="https://img.shields.io/github/v/release/memphisdev/memphis?color=61dfc6">
<img src="https://img.shields.io/github/last-commit/memphisdev/memphis?color=61dfc6&label=last%20commit">
</p>
<div align="center">
<img width="177" alt="cloud_native 2 (5)" src="https://github.com/memphisdev/memphis/assets/107035359/a20ea11c-d509-42bb-a46c-e388c8424101">
</div>
<b><p align="center">
<a href="https://memphis.dev/pricing/">Cloud</a> - <a href="https://memphis.dev/docs/">Docs</a> - <a href="https://twitter.com/Memphis_Dev">X</a> - <a href="https://www.youtube.com/channel/UCVdMDLCSxXOqtgrBaRUHKKg">YouTube</a>
</p></b>
<div align="center">
<h4>
**[Memphis.dev](https://memphis.dev)** is more than a broker. It's a new streaming stack.<br>
Memphis.dev is a highly scalable event streaming and processing engine.<br>
</h4>
</div>
## ![20](https://user-images.githubusercontent.com/70286779/220196529-abb958d2-5c58-4c33-b5e0-40f5446515ad.png) About
Before Memphis came along, handling ingestion and processing of events on a large scale took months to adopt and was a capability reserved for the top 20% of mega-companies. Now, Memphis opens the door for the other 80% to unleash their event and data streaming superpowers quickly, easily, and with great cost-effectiveness.
**This repository is responsible for the Memphis Functions Python SDK**
## Installation
```sh
$ pip3 install memphis-functions
```
## Importing
```python
from memphis import create_function
```
### Creating a Memphis function
Memphis provides a create_function utility for more easily creatin Memphis Functions.
The user created `event_handler` will be called for every message in the given batch of events. The user's `event_handler` will take in a `msg_payload` as bytes, `msg_headers` as a dict and `inputs` as a dict, and should return a modified version of the payload and headers in the same data types.
The user function should raise an exception if the message processing has failed. If any exception is raised (deliberately or by a failed operation) the message will be sent to the dead letter station.
If the returned modified version of the `msg_payload` or `msg_headers` are returned as `None`, then the message will be skipped and will not be sent to the station or dead letter station.
> Make sure to encode the modified `msg_payload` bytes object with utf-8 encoding!
This example function takes the bytes object `msg_payload` and encodes it into a string so that it may be parsed as JSON.
```python
import json
import base64
from memphis import create_function
def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>
return create_function(event, event_handler = event_handler)
def event_handler(msg_payload, msg_headers, inputs):
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
as_json['modified'] = True
return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
```
Instead of taking `msg_payload` as a bytes object, the as_dict flag can be used to have the JSON parsed to a dictionary.
```python
import json
import base64
from memphis import create_function
def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>
return create_function(event, event_handler = event_handler, as_dict=True)
def event_handler(msg_payload, msg_headers, inputs):
msg_payload['modified'] = True
return msg_payload, msg_headers
```
Memphis Functions support using Async functions through asyncio. When functions are async, set the use_async parameter to true.
```python
import json
import base64
import asyncio
from memphis import create_function
def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>
return create_function(event, event_handler = event_handler, use_async = True)
async def event_handler(msg_payload, msg_headers, inputs):
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
as_json['modified'] = True
asyncio.sleep(1)
return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
```
If the user would want to have a message that they would want to validate and send to the dead letter station if the validation fails then the user can raise an exception. In the following example, the field `check` is simply a boolean. The following function will send any messages which fail the `check` to the dead letter station.
```python
import json
import base64
from memphis import create_function
def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>
return create_function(event, event_handler = event_handler)
def event_handler(msg_payload, msg_headers, inputs):
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
if as_json['check'] == False:
raise Exception("Validation Failed!")
return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
```
If a user would rather just skip the message and not have it be sent to the station or dead letter station, the cuser could instead return `None`, `None`:
```python
import json
import base64
from memphis import create_function
def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>
return create_function(event, event_handler = event_handler)
def event_handler(msg_payload, msg_headers, inputs):
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
if as_json['check'] == False:
return None, None
return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
```
Lastly, if the user is using another data format like Protocol Buffers, the user may simply decode the `msg_payload` into that format instead of JSON. Assuming we have a .proto definition like this:
```proto
syntax = "proto3";
package protobuf_example;
message Message{
string data_field = 1;
}
```
We can decode this and get the data_field out like this:
```python
import json
import base64
from memphis import create_function
import message_pb2
def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>
return create_function(event, event_handler = event_handler)
def event_handler(msg_payload, msg_headers, inputs):
message = message_pb2.Message()
message.ParseFromString(base64.b64decode(encoded_str))
# Arbitrarily changing the data_field
message.data_field = "my new data"
# SerializeToString returns bytes, which is the type we want
return message.SerializeToString(), msg_headers
```
Raw data
{
"_id": null,
"home_page": "https://github.com/memphisdev/memphis-functions.py",
"name": "memphis-functions",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "message broker,devtool,streaming,data",
"author": "Memphis.dev",
"author_email": "team@memphis.dev",
"download_url": "https://files.pythonhosted.org/packages/ec/47/1df8345968110b5711242744e34385630e7ffd9ee145b9e433a25418ce9f/memphis-functions-1.0.2.tar.gz",
"platform": null,
"description": "[![Github (6)](https://github.com/memphisdev/memphis/assets/107035359/bc2feafc-946c-4569-ab8d-836bc0181890)](https://www.functions.memphis.dev/)\n<p align=\"center\">\n<a href=\"https://memphis.dev/discord\"><img src=\"https://img.shields.io/discord/963333392844328961?color=6557ff&label=discord\" alt=\"Discord\"></a>\n<a href=\"https://github.com/memphisdev/memphis/issues?q=is%3Aissue+is%3Aclosed\"><img src=\"https://img.shields.io/github/issues-closed/memphisdev/memphis?color=6557ff\"></a> \n <img src=\"https://img.shields.io/npm/dw/memphis-dev?color=ffc633&label=installations\">\n<a href=\"https://github.com/memphisdev/memphis/blob/master/CODE_OF_CONDUCT.md\"><img src=\"https://img.shields.io/badge/Code%20of%20Conduct-v1.0-ff69b4.svg?color=ffc633\" alt=\"Code Of Conduct\"></a> \n<img alt=\"GitHub release (latest by date)\" src=\"https://img.shields.io/github/v/release/memphisdev/memphis?color=61dfc6\">\n<img src=\"https://img.shields.io/github/last-commit/memphisdev/memphis?color=61dfc6&label=last%20commit\">\n</p>\n\n<div align=\"center\">\n \n<img width=\"177\" alt=\"cloud_native 2 (5)\" src=\"https://github.com/memphisdev/memphis/assets/107035359/a20ea11c-d509-42bb-a46c-e388c8424101\">\n \n</div>\n \n <b><p align=\"center\">\n <a href=\"https://memphis.dev/pricing/\">Cloud</a> - <a href=\"https://memphis.dev/docs/\">Docs</a> - <a href=\"https://twitter.com/Memphis_Dev\">X</a> - <a href=\"https://www.youtube.com/channel/UCVdMDLCSxXOqtgrBaRUHKKg\">YouTube</a>\n</p></b>\n\n<div align=\"center\">\n\n <h4>\n\n**[Memphis.dev](https://memphis.dev)** is more than a broker. It's a new streaming stack.<br>\nMemphis.dev is a highly scalable event streaming and processing engine.<br>\n\n </h4>\n \n</div>\n\n## ![20](https://user-images.githubusercontent.com/70286779/220196529-abb958d2-5c58-4c33-b5e0-40f5446515ad.png) About\n\nBefore Memphis came along, handling ingestion and processing of events on a large scale took months to adopt and was a capability reserved for the top 20% of mega-companies. Now, Memphis opens the door for the other 80% to unleash their event and data streaming superpowers quickly, easily, and with great cost-effectiveness.\n\n**This repository is responsible for the Memphis Functions Python SDK**\n\n## Installation\n\n```sh\n$ pip3 install memphis-functions\n```\n\n## Importing\n\n```python\nfrom memphis import create_function\n```\n\n### Creating a Memphis function\nMemphis provides a create_function utility for more easily creatin Memphis Functions.\n\nThe user created `event_handler` will be called for every message in the given batch of events. The user's `event_handler` will take in a `msg_payload` as bytes, `msg_headers` as a dict and `inputs` as a dict, and should return a modified version of the payload and headers in the same data types.\n\nThe user function should raise an exception if the message processing has failed. If any exception is raised (deliberately or by a failed operation) the message will be sent to the dead letter station.\n\nIf the returned modified version of the `msg_payload` or `msg_headers` are returned as `None`, then the message will be skipped and will not be sent to the station or dead letter station.\n\n> Make sure to encode the modified `msg_payload` bytes object with utf-8 encoding!\n\nThis example function takes the bytes object `msg_payload` and encodes it into a string so that it may be parsed as JSON. \n\n```python\nimport json\nimport base64\nfrom memphis import create_function\n\ndef handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>\n return create_function(event, event_handler = event_handler)\n\ndef event_handler(msg_payload, msg_headers, inputs):\n payload = str(msg_payload, 'utf-8')\n as_json = json.loads(payload)\n as_json['modified'] = True\n\n return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers\n```\n\nInstead of taking `msg_payload` as a bytes object, the as_dict flag can be used to have the JSON parsed to a dictionary.\n\n```python\nimport json\nimport base64\nfrom memphis import create_function\n\ndef handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>\n return create_function(event, event_handler = event_handler, as_dict=True)\n\ndef event_handler(msg_payload, msg_headers, inputs):\n msg_payload['modified'] = True\n\n return msg_payload, msg_headers\n```\n\nMemphis Functions support using Async functions through asyncio. When functions are async, set the use_async parameter to true.\n```python\nimport json\nimport base64\nimport asyncio\nfrom memphis import create_function\n\ndef handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>\n return create_function(event, event_handler = event_handler, use_async = True)\n\nasync def event_handler(msg_payload, msg_headers, inputs):\n payload = str(msg_payload, 'utf-8')\n as_json = json.loads(payload)\n as_json['modified'] = True\n asyncio.sleep(1)\n\n return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers\n```\n\nIf the user would want to have a message that they would want to validate and send to the dead letter station if the validation fails then the user can raise an exception. In the following example, the field `check` is simply a boolean. The following function will send any messages which fail the `check` to the dead letter station.\n\n```python\nimport json\nimport base64\nfrom memphis import create_function\n\ndef handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>\n return create_function(event, event_handler = event_handler)\n\ndef event_handler(msg_payload, msg_headers, inputs):\n payload = str(msg_payload, 'utf-8')\n as_json = json.loads(payload)\n if as_json['check'] == False:\n raise Exception(\"Validation Failed!\")\n\n return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers\n```\n\nIf a user would rather just skip the message and not have it be sent to the station or dead letter station, the cuser could instead return `None`, `None`:\n\n```python\nimport json\nimport base64\nfrom memphis import create_function\n\ndef handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>\n return create_function(event, event_handler = event_handler)\n\ndef event_handler(msg_payload, msg_headers, inputs):\n payload = str(msg_payload, 'utf-8')\n as_json = json.loads(payload)\n if as_json['check'] == False:\n return None, None\n\n return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers\n```\n\nLastly, if the user is using another data format like Protocol Buffers, the user may simply decode the `msg_payload` into that format instead of JSON. Assuming we have a .proto definition like this: \n```proto\nsyntax = \"proto3\";\npackage protobuf_example;\n\nmessage Message{\n string data_field = 1;\n}\n```\n\nWe can decode this and get the data_field out like this:\n\n```python\nimport json\nimport base64\nfrom memphis import create_function\nimport message_pb2\n\ndef handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>\n return create_function(event, event_handler = event_handler)\n\ndef event_handler(msg_payload, msg_headers, inputs):\n message = message_pb2.Message()\n message.ParseFromString(base64.b64decode(encoded_str))\n\n # Arbitrarily changing the data_field\n message.data_field = \"my new data\"\n\n # SerializeToString returns bytes, which is the type we want\n return message.SerializeToString(), msg_headers\n```",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "A powerful messaging platform for modern developers",
"version": "1.0.2",
"project_urls": {
"Download": "https://github.com/memphisdev/memphis-functions.py/archive/refs/tags/1.0.2.tar.gz",
"Homepage": "https://github.com/memphisdev/memphis-functions.py"
},
"split_keywords": [
"message broker",
"devtool",
"streaming",
"data"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "ec471df8345968110b5711242744e34385630e7ffd9ee145b9e433a25418ce9f",
"md5": "6eb1b4a6bbb2ea7f9f739a47bbe64c35",
"sha256": "9a92b18c005c2c2ea9d166887037919d601bff57b806c9a4bd29149082e667c5"
},
"downloads": -1,
"filename": "memphis-functions-1.0.2.tar.gz",
"has_sig": false,
"md5_digest": "6eb1b4a6bbb2ea7f9f739a47bbe64c35",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 10761,
"upload_time": "2024-01-21T18:39:01",
"upload_time_iso_8601": "2024-01-21T18:39:01.797825Z",
"url": "https://files.pythonhosted.org/packages/ec/47/1df8345968110b5711242744e34385630e7ffd9ee145b9e433a25418ce9f/memphis-functions-1.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-01-21 18:39:01",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "memphisdev",
"github_project": "memphis-functions.py",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "memphis-functions"
}