<a href="![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)">[![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)](https://memphis.dev)</a>
<p align="center">
<a href="https://memphis.dev/discord"><img src="https://img.shields.io/discord/963333392844328961?color=6557ff&label=discord" alt="Discord"></a>
<a href="https://github.com/memphisdev/memphis/issues?q=is%3Aissue+is%3Aclosed"><img src="https://img.shields.io/github/issues-closed/memphisdev/memphis?color=6557ff"></a>
<img src="https://img.shields.io/npm/dw/memphis-dev?color=ffc633&label=installations">
<a href="https://github.com/memphisdev/memphis/blob/master/CODE_OF_CONDUCT.md"><img src="https://img.shields.io/badge/Code%20of%20Conduct-v1.0-ff69b4.svg?color=ffc633" alt="Code Of Conduct"></a>
<img alt="GitHub release (latest by date)" src="https://img.shields.io/github/v/release/memphisdev/memphis?color=61dfc6">
<img src="https://img.shields.io/github/last-commit/memphisdev/memphis?color=61dfc6&label=last%20commit">
</p>
<div align="center">
<img width="177" alt="cloud_native 2 (5)" src="https://github.com/memphisdev/memphis/assets/107035359/a20ea11c-d509-42bb-a46c-e388c8424101">
<h4>
**[Memphis.dev](https://memphis.dev)** is a highly scalable, painless, and effortless data streaming platform.<br>
Made to enable developers and data teams to collaborate and build<br>
real-time and streaming apps fast.
</h4>
</div>
## Installation
```sh
$ pip3 install memphis-py
```
Notice: you may receive an error about the "mmh3" package, to solve it please install python3-devel
```sh
$ sudo yum install python3-devel
```
## Importing
```python
from memphis import Memphis, Headers
from memphis.types import Retention, Storage
import asyncio
```
### Connecting to Memphis
First, we need to create Memphis `object` and then connect with Memphis by using `memphis.connect`.
```python
async def main():
try:
memphis = Memphis()
await memphis.connect(
host="<memphis-host>",
username="<application-type username>",
account_id=<account_id>, # You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored
connection_token="<broker-token>", # you will get it on application type user creation
password="<string>", # depends on how Memphis deployed - default is connection token-based authentication
port=<port>, # defaults to 6666
reconnect=True, # defaults to True
max_reconnect=10, # defaults to -1 which means reconnect indefinitely
reconnect_interval_ms=1500, # defaults to 1500
timeout_ms=1500, # defaults to 1500
# for TLS connection:
key_file='<key-client.pem>',
cert_file='<cert-client.pem>',
ca_file='<rootCA.pem>'
)
...
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
```
```python
async def connect(
self,
host: str,
username: str,
account_id: int = 1, # Cloud use only, ignored otherwise
connection_token: str = "", # JWT token given when creating client accounts
password: str = "", # For password-based connections
port: int = 6666,
reconnect: bool = True,
max_reconnect: int = 10,
reconnect_interval_ms: int = 1500,
timeout_ms: int = 2000,
# For TLS connections:
cert_file: str = "",
key_file: str = "",
ca_file: str = "",
)
```
The connect function in the Memphis class allows for the connection to Memphis. Connecting to Memphis (cloud or open-source) will be needed in order to use any of the other functionality of the Memphis class. Upon connection, all of Memphis' features are available.
What arguments are used with the Memphis.connect function change depending on the type of connection being made.
For details on deploying memphis open-source with different types of connections see the [docs](https://docs.memphis.dev/memphis/memphis-broker/concepts/security).
A password-based connection would look like this (using the defualt root memphis login with Memphis open-source):
```python
# Imports hidden. See other examples
async def main():
try:
memphis = Memphis()
await memphis.connect(
host = "localhost",
username = "root",
password = "memphis",
# port = 6666, default port
# reconnect = True, default reconnect setting
# max_reconnect = 10, default number of reconnect attempts
# reconnect_interval_ms = 1500, default reconnect interval
# timeout_ms = 2000, default duration of time for the connection to timeout
)
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
```
If you wanted to connect to Memphis cloud instead, simply add your account ID and change the host. The host and account_id can be found on the Overview page in the Memphis cloud UI under your name at the top. Here is an example to connecting to a cloud broker that is located in US East:
```python
# Imports hidden. See other examples
async def main():
try:
memphis = Memphis()
await memphis.connect(
host = "aws-us-east-1.cloud.memphis.dev",
username = "my_client_username",
password = "my_client_password",
account_id = "123456789"
# port = 6666, default port
# reconnect = True, default reconnect setting
# max_reconnect = 10, default number of reconnect attempts
# reconnect_interval_ms = 1500, default reconnect interval
# timeout_ms = 2000, default duration of time for the connection to timeout
)
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
```
It is possible to use a token-based connection to memphis as well, where multiple users can share the same token to connect to memphis. Here is an example of using memphis.connect with a token:
```python
# Imports hidden. See other examples
async def main():
try:
memphis = Memphis()
await memphis.connect(
host = "localhost",
username = "user",
connection_token = "token",
# port = 6666, default port
# reconnect = True, default reconnect setting
# max_reconnect = 10, default number of reconnect attempts
# reconnect_interval_ms = 1500, default reconnect interval
# timeout_ms = 2000, default duration of time for the connection to timeout
)
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
```
The token will be presented when creating new users.
Memphis needs to be configured to use a token based connection. See the [docs](https://docs.memphis.dev/memphis/memphis-broker/concepts/security) for help doing this.
> For the rest of the examples, the try-except statement and the asyncio runtime call will be withheld to assist with the succinctness of the examples.
A TLS based connection would look like this:
```python
# Imports hidden. See other examples
try:
memphis = Memphis()
await memphis.connect(
host = "localhost",
username = "user",
key_file = "~/tls_file_path.key",
cert_file = "~/tls_cert_file_path.crt",
ca_file = "~/tls_ca_file_path.crt",
# port = 6666, default port
# reconnect = True, default reconnect setting
# max_reconnect = 10, default number of reconnect attempts
# reconnect_interval_ms = 1500, default reconnect interval
# timeout_ms = 2000, default duration of time for the connection to timeout
)
except Exception as e:
print(e)
finally:
```
Memphis needs to configured for these use cases. To configure memphis to use TLS see the [docs](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/production-best-practices#memphis-metadata-tls-connection-configuration).
### Disconnecting from Memphis
To disconnect from Memphis, call `close()` on the memphis object.
```python
await memphis.close()
```
### Creating a Station
Stations are distributed units that store messages. Producers add messages to stations and Consumers take messages from them. Each station stores messages until their retention policy causes them to either delete the messages or move them to [remote storage](https://docs.memphis.dev/memphis/integrations-center/storage/s3-compatible).
**A station will be automatically created for the user when a consumer or producer is used if no stations with the given station name exist.**<br><br>
_If the station trying to be created exists when this function is called, nothing will change with the exisitng station_
```python
async def station(
self,
name: str,
retention_type: Retention = Retention.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES/ACK_BASED(cloud only). Defaults to MAX_MESSAGE_AGE_SECONDS
retention_value: int = 3600, # defaults to 3600
storage_type: Storage = Storage.DISK, # Storage.DISK/Storage.MEMORY. Defaults to DISK
replicas: int = 1,
idempotency_window_ms: int = 120000, # defaults to 2 minutes
schema_name: str = "", # defaults to "" (no schema)
send_poison_msg_to_dls: bool = True, # defaults to true
send_schema_failed_msg_to_dls: bool = True, # defaults to true
tiered_storage_enabled: bool = False, # defaults to false
partitions_number: int = 1, # defaults to 1
dls_station: str = "", # defaults to "" (no DLS station). If given, both poison and schema failed events will be sent to the DLS
)
```
The station function is used to create a station. Using the different arguemnts, one can programically create many different types of stations. The Memphis UI can also be used to create stations to the same effect.
Creating a station with just a name name would create a station with that named and containing the default options provided above:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station"
)
```
### Stations with Retention
To change what criteria the station uses to decide if a message should be retained in the station, change the retention type. The different types of retention are documented [here](https://github.com/memphisdev/memphis.py#retention-types) in the python README.
The unit of the rentention value will vary depending on the retention_type. The [previous link](https://github.com/memphisdev/memphis.py#retention-types) also describes what units will be used.
Here is an example of a station which will only hold up to 10 messages:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
retention_type = Retention.MESSAGES,
retention_value = 10
)
```
### Station storage types
Memphis stations can either store Messages on disk or in memory. A comparison of those types of storage can be found [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#tier-1-local-storage).
Here is an example of how to create a station that uses Memory as its storage type:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
storage_type = Storage.MEMORY
)
```
### Station Replicas
In order to make a station more redundant, replicas can be used. Read more about replicas [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#replicas-mirroring). Note that replicas are only available in cluster mode. Cluster mode can be enabled in the [Helm settings](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/1-installation#appendix-b-helm-deployment-options) when deploying Memphis with Kubernetes.
Here is an example of creating a station with 3 replicas:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
replicas = 3
)
```
### Station idempotency
Idempotency defines how Memphis will prevent duplicate messages from being stored or consumed. The duration of time the message ID's will be stored in the station can be set with idempotency_window_ms. If the environment Memphis is deployed in has unreliably connection and/or a lot of latency, increasing this value might be desiriable. The default duration of time is set to two minutes. Read more about idempotency [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/idempotency).
Here is an example of changing the idempotency window to 3 seconds:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
idempotency_window_ms = 180000
)
```
### Enforcing a schema
The schema name is used to set a schema to be enforced by the station. The default value of "" ensures that no schema is enforced. Here is an example of changing the schema to a defined schema in schemaverse called "sensor_logs":
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
schema = "sensor_logs"
)
```
### Dead Letter Stations
There are two parameters for sending messages to the [dead-letter station(DLS)](https://docs.memphis.dev/memphis/memphis-broker/concepts/dead-letter#terminology). These are send_poison_msg_to_dls and send_schema_failed_msg_to_dls.
Here is an example of sending poison messages to the DLS but not messages which fail to conform to the given schema.
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
schema = "sensor_logs",
send_poison_msg_to_dls = True,
send_schema_failed_msg_to_dls = False
)
```
When either of the DLS flags are set to True, a station can also be set to handle these events. To set a station as the station to where schema failed or poison messages will be set to, use the dls_station parameter:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
schema = "sensor_logs",
send_poison_msg_to_dls = True,
send_schema_failed_msg_to_dls = False,
dls_station = "bad_sensor_messages_station"
)
```
### Station Tiered Storage
When the retention value is met, Mempihs by default will delete old messages. If tiered storage is setup, Memphis can instead move messages to tier 2 storage. Read more about tiered storage [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#storage-tiering). Enable this setting with the respective flag:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
tiered_storage_enabled = True
)
```
### Station Partitions
[Partitioning](https://docs.memphis.dev/memphis/memphis-broker/concepts/station#partitions) might be useful for a station. To have a station partitioned, simply change the partitions number:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.station(
name = "my_station",
partitions_number = 3
)
```
### Retention types
Retention types define the methodology behind how a station behaves with its messages. Memphis currently supports the following retention types:
```python
memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS
```
When the retention type is set to MAX_MESSAGE_AGE_SECONDS, messages will persist in the station for the number of seconds specified in the retention_value.
```python
memphis.types.Retention.MESSAGES
```
When the retention type is set to MESSAGES, the station will only hold up to retention_value messages. The station will delete the oldest messsages to maintain a retention_value number of messages.
```python
memphis.types.Retention.BYTES
```
When the retention type is set to BYTES, the station will only hold up to retention_value BYTES. The oldest messages will be deleted in order to maintain at maximum retention_vlaue BYTES in the station.
```python
memphis.types.Retention.ACK_BASED # for cloud users only
```
When the retention type is set to ACK_BASED, messages in the station will be deleted after they are acked by all subscribed consumer groups.
### Retention Values
The unit of the `retention_value` changes depending on the `retention_type` specified.
All retention values are of type `int`. The following units are used based on the respective retention type:
`memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS` is **in seconds**,<br>
`memphis.types.Retention.MESSAGES` is a **number of messages**,<br>
`memphis.types.Retention.BYTES` is a **number of bytes**, <br>
With `memphis.ACK_BASED`, the `retention_type` is ignored
### Storage types
Memphis currently supports the following types of messages storage:
```python
memphis.types.Storage.DISK
```
When storage is set to DISK, messages are stored on disk.
```python
memphis.types.Storage.MEMORY
```
When storage is set to MEMORY, messages are stored in the system memory.
### Destroying a Station
Destroying a station will remove all its resources (including producers/consumers)
```python
station.destroy()
```
### Creating a New Schema
In case schema is already exist a new version will be created
```python
await memphis.create_schema("<schema-name>", "<schema-type>", "<schema-file-path>")
```
Current available schema types - Protobuf / JSON schema / GraphQL schema / Avro
### Enforcing a Schema on an Existing Station
```python
async def enforce_schema(self, name, station_name)
```
To add a schema to an already created station, enforce_schema can be used. Here is an example using enforce_schmea to add a schema to a station:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.enforce_schmea(
name = "my_schmea",
station_name = "my_station"
)
```
### Deprecated - Attaching a Schema, use enforce_schema instead
```python
await memphis.attach_schema("<schema-name>", "<station-name>")
```
### Detaching a Schema from Station
```python
async def detach_schema(self, station_name)
```
To remove a schema from an already created station, detach_schema can be used. Here is an example of removing a schmea from a station:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.detach_schmea(
station_name = "my_station"
)
```
### Produce and Consume messages
The most common client operations are using `produce` to send messages and `consume` to
receive messages.
Messages are published to a station with a Producer and consumed from it by a Consumer.
Consumers are poll based and consume all the messages in a station. Consumers can also be grouped into consumer groups. When consuming with a consumer group, all consumers in the group will receive each message.
Memphis messages are payload agnostic. Payloads are always `bytearray`s.
In order to stop getting messages, you have to call `consumer.destroy()`. Destroy will terminate the consumer even if messages are currently being sent to the consumer.
If a station is created with more than one partition, producing to and consuming from the station will happen in a round robin fashion.
### Creating a Producer
```python
async def producer(
self,
station_name: str,
producer_name: str,
generate_random_suffix: bool = False, #Depreicated
)
```
Use the Memphis producer function to create a producer. Here is an example of creating a producer for a given station:
```python
memphis = Memphis()
await memphis.connect(...)
producer = await memphis.producer(
station_name = "my_station",
producer_name = "new_producer"
)
```
### Producing a message
```python
async def produce(
self,
message,
ack_wait_sec: int = 15,
headers: Union[Headers, None] = None,
async_produce: Union[bool, None] = None,
nonblocking: bool = False,
msg_id: Union[str, None] = None,
concurrent_task_limit: Union[int, None] = None,
producer_partition_key: Union[str, None] = None,
producer_partition_number: Union[int, -1] = -1
):
```
Both producers and connections can use the produce function. To produce a message from a connection, simply call `memphis.produce`. This function will create a producer if none with the given name exists, otherwise it will pull the producer from a cache and use it to produce the message.
```python
await memphis.produce(station_name='test_station_py', producer_name='prod_py',
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or bytearray/dict (schema validated station - avro schema)
ack_wait_sec=15, # defaults to 15
headers=headers, # default to {}
nonblocking=False, #defaults to false
msg_id="123",
producer_partition_key="key" #default to None
)
```
Creating a producer and calling produce on it will increase the performance of producing messages as it removes the overhead of pulling created producers from the cache.
```python
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema)
ack_wait_sec=15) # defaults to 15
```
Here is an example of a produce function call that waits up to 30 seconds for an acknowledgement from memphis and does so in an nonblocking manner:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
ack_wait_sec = 30,
nonblocking = True
)
```
### Producing with idempotency
As discussed before in the station section, idempotency is an important feature of memphis. To achieve idempotency, an id must be assigned to messages that are being produced. Use the msg_id parameter for this purpose.
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
msg_id = '42'
)
```
### Producing with headers
To add message headers to the message, use the headers parameter. Headers can help with observability when using certain 3rd party to help monitor the behavior of memphis. See [here](https://docs.memphis.dev/memphis/memphis-broker/comparisons/aws-sqs-vs-memphis#observability) for more details.
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
headers = {
'trace_header': 'track_me_123'
}
)
```
### Producing to a partition
Lastly, memphis can produce to a specific partition in a station. To do so, use the producer_partition_key parameter:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
producer_partition_key = "2nd_partition"
)
```
Or, alternatively, use the producer_partition_number parameter:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
producer_partition_number = 2
)
```
### Non-blocking Produce with Task Limits
For better performance, the client won't block requests while waiting for an acknowledgment.
If you are producing a large number of messages very quickly, there maybe some timeout errors, then you may need to limit the number of concurrent tasks to get around this:
```python
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
headers={}, nonblocking=True, limit_concurrent_tasks=500)
```
You may read more about this [here](https://memphis.dev/blog/producing-messages-at-warp-speed-best-practices-for-optimizing-your-producers/) on the memphis.dev blog.
### Produce to multiple stations
Producing to multiple stations can be done by creating a producer with multiple stations and then calling produce on that producer.
```python
memphis = Memphis()
await memphis.connect(...)
producer = await memphis.producer(
station_name = ["station_1", "station_2"],
producer_name = "new_producer"
)
await producer.produce(
message = "some message"
)
```
Alternatively, it also possible to produce to multiple stations using the connection:
```python
memphis = Memphis()
await memphis.connect(...)
await memphis.produce(
station_name = ["station_1", "station_2"],
producer_name = "new_producer",
message = "some message"
)
```
### Destroying a Producer
```python
producer.destroy()
```
### Creating a Consumer
```python
consumer = await memphis.consumer(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
pull_interval_ms=1000, # defaults to 1000
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=100, # defaults to 100
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=2, # defaults to 2
start_consume_from_sequence=1, # start consuming from a specific sequence. defaults to 1
last_messages=-1 # consume the last N messages, defaults to -1 (all messages in the station)
)
```
Consumers are used to pull messages from a station. Here is how to create a consumer with all of the default parameters:
```python
memphis = Memphis()
await memphis.connect(...)
consumer = await Memphis.consumer(
station_name = "my_station",
consumer_name: "new_consumer",
)
```
To create a consumer in a consumer group, add the consumer_group parameter:
```python
memphis = Memphis()
await memphis.connect(...)
consumer = await Memphis.consumer(
station_name = "my_station",
consumer_name: "new_consumer",
consumer_group: "consumer_group_1"
)
```
When using Consumer.consume, the consumer will continue to consume in an infinite loop. To change the rate at which the consumer polls the station for new messages, change the pull_interval_ms parameter:
```python
memphis = Memphis()
await memphis.connect(...)
consumer = await Memphis.consumer(
station_name = "my_station",
consumer_name = "new_consumer",
pull_interval_ms = 2000
)
```
Every time the consumer pulls from the station, the consumer will try to take batch_size number of elements from the station. However, sometimes there are not enough messages in the station for the consumer to consume a full batch. In this case, the consumer will continue to wait until either batch_size messages are gathered or the time in milliseconds specified by batch_max_time_to_wait_ms is reached.
Here is an example of a consumer that will try to poll 100 messages every 10 seconds while waiting up to 15 seconds for all messages to reach the consumer.
```python
memphis = Memphis()
await memphis.connect(...)
consumer = await Memphis.consumer(
station_name = "my_station",
consumer_name = "new_consumer",
pull_interval_ms = 10000,
batch_size = 100,
batch_max_time_to_wait_ms = 100
)
```
The max_msg_deliveries parameter allows the user how many messages the consumer is able to consume before consuming more. The max_ack_time_ms Here is an example where the consumer will only hold up to one batch of messages at a time:
```python
memphis = Memphis()
await memphis.connect(...)
consumer = await Memphis.consumer(
station_name = "my_station",
consumer_name = "new_consumer",
pull_interval_ms = 10000,
batch_size = 100,
batch_max_time_to_wait_ms = 100,
max_msg_deliveries = 2
)
```
### Consume using a partition key
The key will be used to consume from a specific partition
```python
consumer.consume(msg_handler,
consumer_partition_key = "key" #consume from a specific partition
)
```
### Consume using a partition number
The number will be used to consume from a specific partition
```python
consumer.consume(msg_handler,
consumer_partition_number = -1 #consume from a specific partition
)
```
### Setting a context for message handler function
```python
context = {"key": "value"}
consumer.set_context(context)
```
### Processing messages
To use a consumer to process messages, use the consume function. The consume function will have a consumer poll a station for new messages as discussed in previous sections. The consumer will stop polling the statoin once all the messages in the station were consumed, and the msg_handler will receive a `Memphis: TimeoutError`.
```python
async def msg_handler(msgs, error, context):
for msg in msgs:
print("message: ", msg.get_data())
await msg.ack()
if error:
print(error)
consumer.consume(msg_handler)
```
#### Processing schema deserialized messages
To get messages deserialized, use `msg.get_data_deserialized()`.
```python
async def msg_handler(msgs, error, context):
for msg in msgs:
print("message: ", await msg.get_data_deserialized())
await msg.ack()
if error:
print(error)
consumer.consume(msg_handler)
```
There may be some instances where you apply a schema *after* a station has received some messages. In order to consume those messages get_data_deserialized may be used to consume the messages without trying to apply the schema to them. As an example, if you produced a string to a station and then attached a protobuf schema, using get_data_deserialized will not try to deserialize the string as a protobuf-formatted message.
### Fetch a single batch of messages
Using fetch_messages or fetch will allow the user to remove a specific number of messages from a given station. This behavior could be beneficial if the user does not want to have a consumer actively poll from a station indefinetly.
```python
msgs = await memphis.fetch_messages(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=100, # defaults to 100
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=2, # defaults to 2
start_consume_from_sequence=1, # start consuming from a specific sequence. defaults to 1
last_messages=-1, # consume the last N messages, defaults to -1 (all messages in the station))
consumer_partition_key="key", # used to consume from a specific partition, default to None
consumer_partition_number=-1 # used to consume from a specific partition, default to -1
)
```
### Fetch a single batch of messages after creating a consumer
```python
msgs = await consumer.fetch(batch_size=10) # defaults to 10
```
`prefetch = true` will prefetch next batch of messages and save it in memory for future fetch() request<br>
```python
msgs = await consumer.fetch(batch_size=10, prefetch=True) # defaults to False
```
### Acknowledge a message
Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group
```python
await message.ack()
```
### Nacking a Message
Mark the message as not acknowledged - the broker will resend the message immediately to the same consumers group, instead of waiting to the max ack time configured.
```python
await message.nack();
```
### Sending a message to the dead-letter
Sending the message to the dead-letter station (DLS) - the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
The message will still be available to other consumer groups
```python
await message.dead_letter("reason");
```
### Delay the message after a given duration
Delay the message and tell Memphis server to re-send the same message again to the same consumer group. The message will be redelivered only in case `consumer.max_msg_deliveries` is not reached yet.
```python
await message.delay(delay_in_seconds)
```
### Get headers
Get headers per message
```python
headers = message.get_headers()
```
### Get message sequence number
Get message sequence number
```python
sequence_number = msg.get_sequence_number()
```
### Get message time sent
Get message time sent
```python
time_sent = msg.get_timesent()
```
### Destroying a Consumer
```python
consumer.destroy()
```
### Check connection status
```python
memphis.is_connected()
```
Raw data
{
"_id": null,
"home_page": "https://github.com/memphisdev/memphis.py",
"name": "memphis-py-beta",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "message broker,devtool,streaming,data",
"author": "Memphis.dev",
"author_email": "team@memphis.dev",
"download_url": "https://files.pythonhosted.org/packages/a0/88/bda8c3ca7adf25fa5284004c442d22ec3bf0a7405fb7292ade81482b7f5c/memphis-py-beta-1.1.21.tar.gz",
"platform": null,
"description": "<a href=\"![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)\">[![Github (4)](https://github.com/memphisdev/memphis-terraform/assets/107035359/a5fe5d0f-22e1-4445-957d-5ce4464e61b1)](https://memphis.dev)</a>\n<p align=\"center\">\n<a href=\"https://memphis.dev/discord\"><img src=\"https://img.shields.io/discord/963333392844328961?color=6557ff&label=discord\" alt=\"Discord\"></a>\n<a href=\"https://github.com/memphisdev/memphis/issues?q=is%3Aissue+is%3Aclosed\"><img src=\"https://img.shields.io/github/issues-closed/memphisdev/memphis?color=6557ff\"></a> \n <img src=\"https://img.shields.io/npm/dw/memphis-dev?color=ffc633&label=installations\">\n<a href=\"https://github.com/memphisdev/memphis/blob/master/CODE_OF_CONDUCT.md\"><img src=\"https://img.shields.io/badge/Code%20of%20Conduct-v1.0-ff69b4.svg?color=ffc633\" alt=\"Code Of Conduct\"></a> \n<img alt=\"GitHub release (latest by date)\" src=\"https://img.shields.io/github/v/release/memphisdev/memphis?color=61dfc6\">\n<img src=\"https://img.shields.io/github/last-commit/memphisdev/memphis?color=61dfc6&label=last%20commit\">\n</p>\n\n<div align=\"center\">\n \n<img width=\"177\" alt=\"cloud_native 2 (5)\" src=\"https://github.com/memphisdev/memphis/assets/107035359/a20ea11c-d509-42bb-a46c-e388c8424101\"> \n\n <h4>\n\n**[Memphis.dev](https://memphis.dev)** is a highly scalable, painless, and effortless data streaming platform.<br>\nMade to enable developers and data teams to collaborate and build<br>\nreal-time and streaming apps fast.\n\n </h4>\n \n</div>\n\n## Installation\n\n```sh\n$ pip3 install memphis-py\n```\n\nNotice: you may receive an error about the \"mmh3\" package, to solve it please install python3-devel\n```sh\n$ sudo yum install python3-devel\n```\n\n## Importing\n\n```python\nfrom memphis import Memphis, Headers\nfrom memphis.types import Retention, Storage\nimport asyncio\n```\n\n### Connecting to Memphis\n\nFirst, we need to create Memphis `object` and then connect with Memphis by using `memphis.connect`.\n\n```python\nasync def main():\n try:\n memphis = Memphis()\n await memphis.connect(\n host=\"<memphis-host>\",\n username=\"<application-type username>\",\n account_id=<account_id>, # You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored\n connection_token=\"<broker-token>\", # you will get it on application type user creation\n password=\"<string>\", # depends on how Memphis deployed - default is connection token-based authentication\n port=<port>, # defaults to 6666\n reconnect=True, # defaults to True\n max_reconnect=10, # defaults to -1 which means reconnect indefinitely\n reconnect_interval_ms=1500, # defaults to 1500\n timeout_ms=1500, # defaults to 1500\n # for TLS connection:\n key_file='<key-client.pem>', \n cert_file='<cert-client.pem>', \n ca_file='<rootCA.pem>'\n )\n ...\n except Exception as e:\n print(e)\n finally:\n await memphis.close()\n\nif __name__ == '__main__':\n asyncio.run(main())\n```\n\n```python\n async def connect(\n self,\n host: str,\n username: str,\n account_id: int = 1, # Cloud use only, ignored otherwise\n connection_token: str = \"\", # JWT token given when creating client accounts\n password: str = \"\", # For password-based connections\n port: int = 6666,\n reconnect: bool = True,\n max_reconnect: int = 10,\n reconnect_interval_ms: int = 1500,\n timeout_ms: int = 2000,\n # For TLS connections: \n cert_file: str = \"\", \n key_file: str = \"\",\n ca_file: str = \"\",\n )\n```\n\nThe connect function in the Memphis class allows for the connection to Memphis. Connecting to Memphis (cloud or open-source) will be needed in order to use any of the other functionality of the Memphis class. Upon connection, all of Memphis' features are available.\n\nWhat arguments are used with the Memphis.connect function change depending on the type of connection being made.\n\nFor details on deploying memphis open-source with different types of connections see the [docs](https://docs.memphis.dev/memphis/memphis-broker/concepts/security).\n \nA password-based connection would look like this (using the defualt root memphis login with Memphis open-source):\n\n```python\n # Imports hidden. See other examples\nasync def main():\n try:\n memphis = Memphis()\n await memphis.connect(\n host = \"localhost\",\n username = \"root\",\n password = \"memphis\",\n # port = 6666, default port\n # reconnect = True, default reconnect setting\n # max_reconnect = 10, default number of reconnect attempts\n # reconnect_interval_ms = 1500, default reconnect interval\n # timeout_ms = 2000, default duration of time for the connection to timeout\n )\n except Exception as e:\n print(e)\n finally:\n await memphis.close()\n\nif __name__ == '__main__':\n asyncio.run(main()) \n```\n\nIf you wanted to connect to Memphis cloud instead, simply add your account ID and change the host. The host and account_id can be found on the Overview page in the Memphis cloud UI under your name at the top. Here is an example to connecting to a cloud broker that is located in US East: \n\n```python\n # Imports hidden. See other examples\nasync def main():\n try:\n memphis = Memphis()\n await memphis.connect(\n host = \"aws-us-east-1.cloud.memphis.dev\",\n username = \"my_client_username\",\n password = \"my_client_password\",\n account_id = \"123456789\"\n # port = 6666, default port\n # reconnect = True, default reconnect setting\n # max_reconnect = 10, default number of reconnect attempts\n # reconnect_interval_ms = 1500, default reconnect interval\n # timeout_ms = 2000, default duration of time for the connection to timeout\n )\n except Exception as e:\n print(e)\n finally:\n await memphis.close()\n\nif __name__ == '__main__':\n asyncio.run(main()) \n```\n\nIt is possible to use a token-based connection to memphis as well, where multiple users can share the same token to connect to memphis. Here is an example of using memphis.connect with a token:\n\n```python\n # Imports hidden. See other examples\nasync def main():\n try:\n memphis = Memphis()\n await memphis.connect(\n host = \"localhost\",\n username = \"user\",\n connection_token = \"token\",\n # port = 6666, default port\n # reconnect = True, default reconnect setting\n # max_reconnect = 10, default number of reconnect attempts\n # reconnect_interval_ms = 1500, default reconnect interval\n # timeout_ms = 2000, default duration of time for the connection to timeout\n )\n except Exception as e:\n print(e)\n finally:\n await memphis.close()\n\nif __name__ == '__main__':\n asyncio.run(main()) \n```\n\nThe token will be presented when creating new users. \n\nMemphis needs to be configured to use a token based connection. See the [docs](https://docs.memphis.dev/memphis/memphis-broker/concepts/security) for help doing this.\n\n> For the rest of the examples, the try-except statement and the asyncio runtime call will be withheld to assist with the succinctness of the examples. \n\nA TLS based connection would look like this:\n\n```python\n # Imports hidden. See other examples\n\n try:\n memphis = Memphis()\n await memphis.connect(\n host = \"localhost\",\n username = \"user\",\n key_file = \"~/tls_file_path.key\",\n cert_file = \"~/tls_cert_file_path.crt\",\n ca_file = \"~/tls_ca_file_path.crt\",\n # port = 6666, default port\n # reconnect = True, default reconnect setting\n # max_reconnect = 10, default number of reconnect attempts\n # reconnect_interval_ms = 1500, default reconnect interval\n # timeout_ms = 2000, default duration of time for the connection to timeout\n )\n except Exception as e:\n print(e)\n finally:\n```\nMemphis needs to configured for these use cases. To configure memphis to use TLS see the [docs](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/production-best-practices#memphis-metadata-tls-connection-configuration).\n\n### Disconnecting from Memphis\n\nTo disconnect from Memphis, call `close()` on the memphis object.\n\n```python\nawait memphis.close()\n```\n\n### Creating a Station\n\nStations are distributed units that store messages. Producers add messages to stations and Consumers take messages from them. Each station stores messages until their retention policy causes them to either delete the messages or move them to [remote storage](https://docs.memphis.dev/memphis/integrations-center/storage/s3-compatible). \n\n**A station will be automatically created for the user when a consumer or producer is used if no stations with the given station name exist.**<br><br>\n_If the station trying to be created exists when this function is called, nothing will change with the exisitng station_\n\n```python\n async def station(\n self,\n name: str,\n retention_type: Retention = Retention.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES/ACK_BASED(cloud only). Defaults to MAX_MESSAGE_AGE_SECONDS\n retention_value: int = 3600, # defaults to 3600\n storage_type: Storage = Storage.DISK, # Storage.DISK/Storage.MEMORY. Defaults to DISK\n replicas: int = 1,\n idempotency_window_ms: int = 120000, # defaults to 2 minutes\n schema_name: str = \"\", # defaults to \"\" (no schema)\n send_poison_msg_to_dls: bool = True, # defaults to true\n send_schema_failed_msg_to_dls: bool = True, # defaults to true\n tiered_storage_enabled: bool = False, # defaults to false\n partitions_number: int = 1, # defaults to 1\n dls_station: str = \"\", # defaults to \"\" (no DLS station). If given, both poison and schema failed events will be sent to the DLS\n )\n```\n\nThe station function is used to create a station. Using the different arguemnts, one can programically create many different types of stations. The Memphis UI can also be used to create stations to the same effect. \n\nCreating a station with just a name name would create a station with that named and containing the default options provided above:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\"\n )\n```\n\n### Stations with Retention\n\nTo change what criteria the station uses to decide if a message should be retained in the station, change the retention type. The different types of retention are documented [here](https://github.com/memphisdev/memphis.py#retention-types) in the python README. \n\nThe unit of the rentention value will vary depending on the retention_type. The [previous link](https://github.com/memphisdev/memphis.py#retention-types) also describes what units will be used. \n\nHere is an example of a station which will only hold up to 10 messages:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n \n await memphis.station(\n name = \"my_station\",\n retention_type = Retention.MESSAGES,\n retention_value = 10\n )\n```\n\n### Station storage types\n\nMemphis stations can either store Messages on disk or in memory. A comparison of those types of storage can be found [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#tier-1-local-storage).\n\nHere is an example of how to create a station that uses Memory as its storage type:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\",\n storage_type = Storage.MEMORY\n )\n```\n\n### Station Replicas\n\nIn order to make a station more redundant, replicas can be used. Read more about replicas [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#replicas-mirroring). Note that replicas are only available in cluster mode. Cluster mode can be enabled in the [Helm settings](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/1-installation#appendix-b-helm-deployment-options) when deploying Memphis with Kubernetes.\n\nHere is an example of creating a station with 3 replicas:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\",\n replicas = 3\n )\n```\n\n### Station idempotency\n\nIdempotency defines how Memphis will prevent duplicate messages from being stored or consumed. The duration of time the message ID's will be stored in the station can be set with idempotency_window_ms. If the environment Memphis is deployed in has unreliably connection and/or a lot of latency, increasing this value might be desiriable. The default duration of time is set to two minutes. Read more about idempotency [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/idempotency).\n\nHere is an example of changing the idempotency window to 3 seconds:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\",\n idempotency_window_ms = 180000\n )\n```\n\n### Enforcing a schema\n\nThe schema name is used to set a schema to be enforced by the station. The default value of \"\" ensures that no schema is enforced. Here is an example of changing the schema to a defined schema in schemaverse called \"sensor_logs\":\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\",\n schema = \"sensor_logs\"\n )\n```\n\n### Dead Letter Stations\n\nThere are two parameters for sending messages to the [dead-letter station(DLS)](https://docs.memphis.dev/memphis/memphis-broker/concepts/dead-letter#terminology). These are send_poison_msg_to_dls and send_schema_failed_msg_to_dls. \n\nHere is an example of sending poison messages to the DLS but not messages which fail to conform to the given schema.\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\",\n schema = \"sensor_logs\",\n send_poison_msg_to_dls = True,\n send_schema_failed_msg_to_dls = False\n )\n```\n\nWhen either of the DLS flags are set to True, a station can also be set to handle these events. To set a station as the station to where schema failed or poison messages will be set to, use the dls_station parameter:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\",\n schema = \"sensor_logs\",\n send_poison_msg_to_dls = True,\n send_schema_failed_msg_to_dls = False,\n dls_station = \"bad_sensor_messages_station\"\n )\n```\n\n### Station Tiered Storage\n\nWhen the retention value is met, Mempihs by default will delete old messages. If tiered storage is setup, Memphis can instead move messages to tier 2 storage. Read more about tiered storage [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#storage-tiering). Enable this setting with the respective flag:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\",\n tiered_storage_enabled = True\n )\n```\n\n### Station Partitions\n\n[Partitioning](https://docs.memphis.dev/memphis/memphis-broker/concepts/station#partitions) might be useful for a station. To have a station partitioned, simply change the partitions number:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.station(\n name = \"my_station\",\n partitions_number = 3\n )\n```\n\n\n### Retention types\n\nRetention types define the methodology behind how a station behaves with its messages. Memphis currently supports the following retention types:\n\n```python\nmemphis.types.Retention.MAX_MESSAGE_AGE_SECONDS\n```\n\nWhen the retention type is set to MAX_MESSAGE_AGE_SECONDS, messages will persist in the station for the number of seconds specified in the retention_value. \n\n\n```python\nmemphis.types.Retention.MESSAGES\n```\n\nWhen the retention type is set to MESSAGES, the station will only hold up to retention_value messages. The station will delete the oldest messsages to maintain a retention_value number of messages.\n\n```python\nmemphis.types.Retention.BYTES\n```\n\nWhen the retention type is set to BYTES, the station will only hold up to retention_value BYTES. The oldest messages will be deleted in order to maintain at maximum retention_vlaue BYTES in the station.\n\n```python\nmemphis.types.Retention.ACK_BASED # for cloud users only\n```\n\nWhen the retention type is set to ACK_BASED, messages in the station will be deleted after they are acked by all subscribed consumer groups.\n\n### Retention Values\n\nThe unit of the `retention_value` changes depending on the `retention_type` specified. \n\nAll retention values are of type `int`. The following units are used based on the respective retention type:\n\n`memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS` is **in seconds**,<br>\n`memphis.types.Retention.MESSAGES` is a **number of messages**,<br>\n`memphis.types.Retention.BYTES` is a **number of bytes**, <br>\nWith `memphis.ACK_BASED`, the `retention_type` is ignored \n\n### Storage types\n\nMemphis currently supports the following types of messages storage:\n\n```python\nmemphis.types.Storage.DISK\n```\nWhen storage is set to DISK, messages are stored on disk.\n\n```python\nmemphis.types.Storage.MEMORY\n```\nWhen storage is set to MEMORY, messages are stored in the system memory.\n\n### Destroying a Station\n\nDestroying a station will remove all its resources (including producers/consumers)\n\n```python\nstation.destroy()\n```\n\n### Creating a New Schema \nIn case schema is already exist a new version will be created\n```python\nawait memphis.create_schema(\"<schema-name>\", \"<schema-type>\", \"<schema-file-path>\")\n```\nCurrent available schema types - Protobuf / JSON schema / GraphQL schema / Avro\n\n### Enforcing a Schema on an Existing Station\n\n```python\nasync def enforce_schema(self, name, station_name)\n```\n\nTo add a schema to an already created station, enforce_schema can be used. Here is an example using enforce_schmea to add a schema to a station:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n await memphis.enforce_schmea(\n name = \"my_schmea\",\n station_name = \"my_station\"\n )\n```\n\n### Deprecated - Attaching a Schema, use enforce_schema instead\n\n```python\nawait memphis.attach_schema(\"<schema-name>\", \"<station-name>\")\n```\n\n### Detaching a Schema from Station\n\n```python\n async def detach_schema(self, station_name)\n```\n\nTo remove a schema from an already created station, detach_schema can be used. Here is an example of removing a schmea from a station:\n\n```python\n memphis = Memphis()\n \n await memphis.connect(...)\n\n await memphis.detach_schmea(\n station_name = \"my_station\"\n )\n```\n\n### Produce and Consume messages\n\nThe most common client operations are using `produce` to send messages and `consume` to\nreceive messages.\n\nMessages are published to a station with a Producer and consumed from it by a Consumer. \n\nConsumers are poll based and consume all the messages in a station. Consumers can also be grouped into consumer groups. When consuming with a consumer group, all consumers in the group will receive each message.\n\nMemphis messages are payload agnostic. Payloads are always `bytearray`s.\n\nIn order to stop getting messages, you have to call `consumer.destroy()`. Destroy will terminate the consumer even if messages are currently being sent to the consumer.\n\nIf a station is created with more than one partition, producing to and consuming from the station will happen in a round robin fashion. \n\n### Creating a Producer\n\n```python\n async def producer(\n self,\n station_name: str,\n producer_name: str,\n generate_random_suffix: bool = False, #Depreicated\n )\n```\n\nUse the Memphis producer function to create a producer. Here is an example of creating a producer for a given station:\n\n```python\n memphis = Memphis()\n \n await memphis.connect(...)\n\n producer = await memphis.producer(\n station_name = \"my_station\",\n producer_name = \"new_producer\"\n )\n```\n\n### Producing a message\n```python\nasync def produce(\n self,\n message,\n ack_wait_sec: int = 15,\n headers: Union[Headers, None] = None,\n async_produce: Union[bool, None] = None,\n nonblocking: bool = False,\n msg_id: Union[str, None] = None,\n concurrent_task_limit: Union[int, None] = None,\n producer_partition_key: Union[str, None] = None,\n producer_partition_number: Union[int, -1] = -1\n ):\n```\nBoth producers and connections can use the produce function. To produce a message from a connection, simply call `memphis.produce`. This function will create a producer if none with the given name exists, otherwise it will pull the producer from a cache and use it to produce the message.\n \n```python\nawait memphis.produce(station_name='test_station_py', producer_name='prod_py',\n message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or bytearray/dict (schema validated station - avro schema)\n ack_wait_sec=15, # defaults to 15\n headers=headers, # default to {}\n nonblocking=False, #defaults to false\n msg_id=\"123\",\n producer_partition_key=\"key\" #default to None\n)\n```\n\nCreating a producer and calling produce on it will increase the performance of producing messages as it removes the overhead of pulling created producers from the cache.\n\n```python\nawait producer.produce(\n message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema)\n ack_wait_sec=15) # defaults to 15\n```\n\nHere is an example of a produce function call that waits up to 30 seconds for an acknowledgement from memphis and does so in an nonblocking manner:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n \n await memphis.produce(\n station_name = \"some_station\",\n producer_name = \"temp_producer\",\n message = {'some':'message'},\n ack_wait_sec = 30,\n nonblocking = True\n )\n```\n\n### Producing with idempotency\n\nAs discussed before in the station section, idempotency is an important feature of memphis. To achieve idempotency, an id must be assigned to messages that are being produced. Use the msg_id parameter for this purpose.\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n \n await memphis.produce(\n station_name = \"some_station\",\n producer_name = \"temp_producer\",\n message = {'some':'message'},\n msg_id = '42'\n )\n```\n\n### Producing with headers\n\nTo add message headers to the message, use the headers parameter. Headers can help with observability when using certain 3rd party to help monitor the behavior of memphis. See [here](https://docs.memphis.dev/memphis/memphis-broker/comparisons/aws-sqs-vs-memphis#observability) for more details.\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n \n await memphis.produce(\n station_name = \"some_station\",\n producer_name = \"temp_producer\",\n message = {'some':'message'},\n headers = {\n 'trace_header': 'track_me_123'\n }\n )\n```\n\n### Producing to a partition\n\nLastly, memphis can produce to a specific partition in a station. To do so, use the producer_partition_key parameter:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n \n await memphis.produce(\n station_name = \"some_station\",\n producer_name = \"temp_producer\",\n message = {'some':'message'},\n producer_partition_key = \"2nd_partition\"\n )\n```\n\nOr, alternatively, use the producer_partition_number parameter:\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n \n await memphis.produce(\n station_name = \"some_station\",\n producer_name = \"temp_producer\",\n message = {'some':'message'},\n producer_partition_number = 2\n )\n```\n\n### Non-blocking Produce with Task Limits\n\nFor better performance, the client won't block requests while waiting for an acknowledgment.\nIf you are producing a large number of messages very quickly, there maybe some timeout errors, then you may need to limit the number of concurrent tasks to get around this:\n\n```python\nawait producer.produce(\n message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)\n headers={}, nonblocking=True, limit_concurrent_tasks=500)\n```\n\nYou may read more about this [here](https://memphis.dev/blog/producing-messages-at-warp-speed-best-practices-for-optimizing-your-producers/) on the memphis.dev blog.\n\n### Produce to multiple stations\n\nProducing to multiple stations can be done by creating a producer with multiple stations and then calling produce on that producer.\n\n```python\nmemphis = Memphis()\n\nawait memphis.connect(...)\n\nproducer = await memphis.producer(\n station_name = [\"station_1\", \"station_2\"],\n producer_name = \"new_producer\"\n)\n\nawait producer.produce(\n message = \"some message\"\n)\n```\n\nAlternatively, it also possible to produce to multiple stations using the connection:\n\n```python\nmemphis = Memphis()\n\nawait memphis.connect(...)\n\nawait memphis.produce(\n station_name = [\"station_1\", \"station_2\"],\n producer_name = \"new_producer\",\n message = \"some message\"\n)\n```\n\n### Destroying a Producer\n\n```python\nproducer.destroy()\n```\n\n### Creating a Consumer\n\n```python\nconsumer = await memphis.consumer(\n station_name=\"<station-name>\",\n consumer_name=\"<consumer-name>\",\n consumer_group=\"<group-name>\", # defaults to the consumer name\n pull_interval_ms=1000, # defaults to 1000\n batch_size=10, # defaults to 10\n batch_max_time_to_wait_ms=100, # defaults to 100\n max_ack_time_ms=30000, # defaults to 30000\n max_msg_deliveries=2, # defaults to 2\n start_consume_from_sequence=1, # start consuming from a specific sequence. defaults to 1\n last_messages=-1 # consume the last N messages, defaults to -1 (all messages in the station)\n)\n```\n\nConsumers are used to pull messages from a station. Here is how to create a consumer with all of the default parameters:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n consumer = await Memphis.consumer(\n station_name = \"my_station\",\n consumer_name: \"new_consumer\",\n )\n```\n\nTo create a consumer in a consumer group, add the consumer_group parameter:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n consumer = await Memphis.consumer(\n station_name = \"my_station\",\n consumer_name: \"new_consumer\",\n consumer_group: \"consumer_group_1\"\n )\n```\n\nWhen using Consumer.consume, the consumer will continue to consume in an infinite loop. To change the rate at which the consumer polls the station for new messages, change the pull_interval_ms parameter:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n consumer = await Memphis.consumer(\n station_name = \"my_station\",\n consumer_name = \"new_consumer\",\n pull_interval_ms = 2000\n )\n```\n\nEvery time the consumer pulls from the station, the consumer will try to take batch_size number of elements from the station. However, sometimes there are not enough messages in the station for the consumer to consume a full batch. In this case, the consumer will continue to wait until either batch_size messages are gathered or the time in milliseconds specified by batch_max_time_to_wait_ms is reached. \n\nHere is an example of a consumer that will try to poll 100 messages every 10 seconds while waiting up to 15 seconds for all messages to reach the consumer.\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n\n consumer = await Memphis.consumer(\n station_name = \"my_station\",\n consumer_name = \"new_consumer\",\n pull_interval_ms = 10000,\n batch_size = 100,\n batch_max_time_to_wait_ms = 100\n )\n```\n\nThe max_msg_deliveries parameter allows the user how many messages the consumer is able to consume before consuming more. The max_ack_time_ms Here is an example where the consumer will only hold up to one batch of messages at a time:\n\n```python\n memphis = Memphis()\n\n await memphis.connect(...)\n \n consumer = await Memphis.consumer(\n station_name = \"my_station\",\n consumer_name = \"new_consumer\",\n pull_interval_ms = 10000,\n batch_size = 100,\n batch_max_time_to_wait_ms = 100,\n max_msg_deliveries = 2\n )\n```\n### Consume using a partition key\nThe key will be used to consume from a specific partition\n\n```python\nconsumer.consume(msg_handler,\n consumer_partition_key = \"key\" #consume from a specific partition\n )\n```\n\n### Consume using a partition number\nThe number will be used to consume from a specific partition\n\n```python\nconsumer.consume(msg_handler,\n consumer_partition_number = -1 #consume from a specific partition\n )\n```\n\n### Setting a context for message handler function\n\n```python\ncontext = {\"key\": \"value\"}\nconsumer.set_context(context)\n```\n\n### Processing messages\n\nTo use a consumer to process messages, use the consume function. The consume function will have a consumer poll a station for new messages as discussed in previous sections. The consumer will stop polling the statoin once all the messages in the station were consumed, and the msg_handler will receive a `Memphis: TimeoutError`.\n\n```python\nasync def msg_handler(msgs, error, context):\n for msg in msgs:\n print(\"message: \", msg.get_data())\n await msg.ack()\n if error:\n print(error)\nconsumer.consume(msg_handler)\n```\n\n#### Processing schema deserialized messages\nTo get messages deserialized, use `msg.get_data_deserialized()`. \n\n```python\nasync def msg_handler(msgs, error, context):\n for msg in msgs:\n print(\"message: \", await msg.get_data_deserialized())\n await msg.ack()\n if error:\n print(error)\nconsumer.consume(msg_handler)\n```\n\nThere may be some instances where you apply a schema *after* a station has received some messages. In order to consume those messages get_data_deserialized may be used to consume the messages without trying to apply the schema to them. As an example, if you produced a string to a station and then attached a protobuf schema, using get_data_deserialized will not try to deserialize the string as a protobuf-formatted message.\n\n### Fetch a single batch of messages\n\nUsing fetch_messages or fetch will allow the user to remove a specific number of messages from a given station. This behavior could be beneficial if the user does not want to have a consumer actively poll from a station indefinetly.\n\n```python\nmsgs = await memphis.fetch_messages(\n station_name=\"<station-name>\",\n consumer_name=\"<consumer-name>\",\n consumer_group=\"<group-name>\", # defaults to the consumer name\n batch_size=10, # defaults to 10\n batch_max_time_to_wait_ms=100, # defaults to 100\n max_ack_time_ms=30000, # defaults to 30000\n max_msg_deliveries=2, # defaults to 2\n start_consume_from_sequence=1, # start consuming from a specific sequence. defaults to 1\n last_messages=-1, # consume the last N messages, defaults to -1 (all messages in the station))\n consumer_partition_key=\"key\", # used to consume from a specific partition, default to None \n consumer_partition_number=-1 # used to consume from a specific partition, default to -1 \n)\n```\n\n### Fetch a single batch of messages after creating a consumer\n\n```python\nmsgs = await consumer.fetch(batch_size=10) # defaults to 10\n```\n\n`prefetch = true` will prefetch next batch of messages and save it in memory for future fetch() request<br>\n\n```python\nmsgs = await consumer.fetch(batch_size=10, prefetch=True) # defaults to False\n```\n\n### Acknowledge a message\n\nAcknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group\n\n```python\nawait message.ack()\n```\n\n### Nacking a Message\n\nMark the message as not acknowledged - the broker will resend the message immediately to the same consumers group, instead of waiting to the max ack time configured.\n\n```python\nawait message.nack();\n```\n\n### Sending a message to the dead-letter\n\nSending the message to the dead-letter station (DLS) - the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.\nThe message will still be available to other consumer groups\n\n```python\nawait message.dead_letter(\"reason\");\n```\n\n### Delay the message after a given duration\n\nDelay the message and tell Memphis server to re-send the same message again to the same consumer group. The message will be redelivered only in case `consumer.max_msg_deliveries` is not reached yet.\n\n```python\nawait message.delay(delay_in_seconds)\n```\n\n### Get headers\n\nGet headers per message\n\n```python\nheaders = message.get_headers()\n```\n\n### Get message sequence number\n\nGet message sequence number\n\n```python\nsequence_number = msg.get_sequence_number()\n```\n\n### Get message time sent\n\nGet message time sent\n\n```python\ntime_sent = msg.get_timesent()\n```\n\n### Destroying a Consumer\n\n```python\nconsumer.destroy()\n```\n\n### Check connection status\n\n```python\nmemphis.is_connected()\n```",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "A powerful messaging platform for modern developers",
"version": "1.1.21",
"project_urls": {
"Download": "https://github.com/memphisdev/memphis.py/archive/refs/tags/1.1.21.tar.gz",
"Homepage": "https://github.com/memphisdev/memphis.py"
},
"split_keywords": [
"message broker",
"devtool",
"streaming",
"data"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "a088bda8c3ca7adf25fa5284004c442d22ec3bf0a7405fb7292ade81482b7f5c",
"md5": "a9c28a9ec3baa7b91c3f603920b036ab",
"sha256": "320b744048c6e934a214cc9207abbaefc8f8e50441a42c0b66685e1344c691a3"
},
"downloads": -1,
"filename": "memphis-py-beta-1.1.21.tar.gz",
"has_sig": false,
"md5_digest": "a9c28a9ec3baa7b91c3f603920b036ab",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 49973,
"upload_time": "2024-01-21T18:19:32",
"upload_time_iso_8601": "2024-01-21T18:19:32.276244Z",
"url": "https://files.pythonhosted.org/packages/a0/88/bda8c3ca7adf25fa5284004c442d22ec3bf0a7405fb7292ade81482b7f5c/memphis-py-beta-1.1.21.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-01-21 18:19:32",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "memphisdev",
"github_project": "memphis.py",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "memphis-py-beta"
}