message-db-py


Namemessage-db-py JSON
Version 0.2.0 PyPI version JSON
download
home_pageNone
SummaryThe Python interface to the MessageDB Event Store and Message Store
upload_time2024-04-24 03:56:31
maintainerNone
docs_urlNone
authorSubhash Bhushan
requires_python<4.0,>=3.9
licenseMIT
keywords message-db event-sourcing event-store messaging cqrs command-query-responsibility-segregation events streaming database postgresql python async microservices distributed-systems message-queue event-driven real-time
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # message-db-py

Message DB is a fully-featured event store and message store implemented in
PostgreSQL for Pub/Sub, Event Sourcing, Messaging, and Evented Microservices
applications.

`message-db-py` is a Python interface to the Message DB event store and message 
store, designed for easy integration into Python applications.

[![Build Status](https://github.com/subhashb/message-db-py/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/subhashb/message-db-py/actions)
[![Code Coverage](https://codecov.io/gh/subhashb/message-db-py/graph/badge.svg?token=QMNUSLN2OM)](https://codecov.io/gh/subhashb/message-db-py)
[![Python Version](https://img.shields.io/pypi/pyversions/message-db-py.svg)](https://pypi.org/project/message-db-py/)
[![PyPI version](https://badge.fury.io/py/message-db-py.svg)](https://pypi.org/project/message-db-py/)
[![License](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

## Installation

Use pip to install:

```shell
$ pip install message-db-py
```

## Setting up Message DB database

Clone the Message DB repository to set up the database:

```shell
$ git clone git@github.com:message-db/message-db.git
```

More detailed instructions are in the [Installation](https://github.com/message-db/message-db?tab=readme-ov-file#installation)
section of Message DB repo.

Running the database installation script creates the database, schema, table,
indexes, functions, views, types, a user role, and limit the user's privileges
to the message store's public interface.

The installation script is in the database directory of the cloned Message DB
repo. Change directory to the message-db directory where you cloned the repo,
and run the script:

```shell
$ database/install.sh
```

Make sure that your default Postgres user has administrative privileges.

### Database Name

By default, the database creation tool will create a database named
`message_store`.

If you prefer either a different database name, you can override the name
using the `DATABASE_NAME` environment variable.

```shell
$ DATABASE_NAME=some_other_database database/install.sh
```

### Uninstalling the Database

If you need to drop the database (for example, on a local dev machine):

``` bash
$ database/uninstall.sh
```

If you're upgrading a previous version of the database:

``` bash
$ database/update.sh
```

## Docker Image

You can optionally use a Docker image with Message DB pre-installed and ready
to go. This is especially helpful to run test cases locally.

The docker image is available in [Docker Hub](https://hub.docker.com/r/ethangarofolo/message-db).
The source is in [Gitlab](https://gitlab.com/such-software/message-db-docker)

## Usage

The complete user guide for Message DB is available at 
[http://docs.eventide-project.org/user-guide/message-db/]
(http://docs.eventide-project.org/user-guide/message-db/).

Below is documentation for methods exposed through the Python API.

### Quickstart

Here's a quick example of how to publish and read messages using Message-DB-py:

```python
from message_db import MessageDB

# Initialize the database connection
store = MessageDB(CONNECTION_URL)

# Write a message
store.write("user_stream", "register", {"name": "John Doe"})

# Read a message
message = store.read_last_message("user_stream")
print(message)
```

## Primary APIs

- [Write Messages](#write)
- [Read Messages](#read-messages-from-a-stream-or-category)
- [Read Last Message from stream](#read-last-message-from-stream)

### Write messages

The `write` method is used to append a new message to a specified stream within
the message database. This method ensures that the message is written with the
appropriate type, data, and metadata, and optionally, at a specific expected
version of the stream.

```python
def write(
    self,
    stream_name: str,
    message_type: str,
    data: Dict,
    metadata: Dict | None = None,
    expected_version: int | None = None,
) -> int:
    """Write a message to a stream."""
```

#### Parameters

- `stream_name` (`str`): The name of the stream to which the message will be
written. This identifies the logical series of messages.
- `message_type` (`str`): The type of message being written. Typically, this
reflects the nature of the event or data change the message represents.
- `data` (`Dict`): The data payload of the message. This should be a dictionary
containing the actual information the message carries.
- `metadata` (`Dict` | `None`): Optional. Metadata about the message, provided as a
dictionary. Metadata can include any additional information that is not part of
the - main data payload, such as sender information or timestamps.
Defaults to None.
- `expected_version` (`int` | `None`): Optional. The version of the stream where the
client expects to write the message. This is used for concurrency control and
ensuring the integrity of the stream's order. Defaults to `None`.

#### Returns

- `position` (`int`): The position (or version number) of the message in the
stream after it has been successfully written.

```python
message_db = MessageDB(connection_pool=my_pool)
stream_name = "user_updates"
message_type = "UserCreated"
data = {"user_id": 123, "username": "example"}
metadata = {"source": "web_app"}

position = message_db.write(stream_name, message_type, data, metadata)

print("Message written at position:", position)
```

---

### Read messages from a stream or category

The `read` method retrieves messages from a specified stream or category. This
method supports flexible query options through a direct SQL parameter or by
determining the SQL based on the stream name and its context
(stream vs. category vs. all messages).

```python
def read(
    self,
    stream_name: str,
    sql: str | None = None,
    position: int = 0,
    no_of_messages: int = 1000,
) -> List[Dict[str, Any]]:
    """Read messages from a stream or category.

    Returns a list of messages from the stream or category starting from the given position.
    """
```

#### Parameters

- `stream_name` (`str`): The identifier for the stream or category from which
messages are to be retrieved. Special names like "$all" can be used to fetch
messages across all streams.
- `sql` (`str` | `None`, optional): An optional SQL query string that if
provided, overrides the default SQL generation based on the stream_name.
If None, the SQL is automatically generated based on the stream_name value.
Defaults to None.
- `position` (`int`, optional): The starting position in the stream or category
from which to begin reading messages. Defaults to 0.
- `no_of_messages` (`int`, optional): The maximum number of messages to
retrieve. Defaults to 1000.

#### Returns

- List[Dict[str, Any]]: A list of messages, where each message is
represented as a dictionary containing details such as the message ID,
stream name, type, position, global position, data, metadata, and timestamp.

```python
message_db = MessageDB(connection_pool=my_pool)
stream_name = "user-updates"
position = 10
no_of_messages = 50

# Reading from a specific stream
messages = message_db.read(stream_name, position=position, no_of_messages=no_of_messages)

# Custom SQL query
custom_sql = "SELECT * FROM get_stream_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
messages = message_db.read(stream_name, sql=custom_sql, position=position, no_of_messages=no_of_messages)

for message in messages:
    print(message)
```

---

### Read Last Message from stream

The `read_last_message` method retrieves the most recent message from a
specified stream. This method is useful when you need the latest state or
event in a stream without querying the entire message history.

```python
def read_last_message(self, stream_name: str) -> Dict[str, Any] | None:
    """Read the last message from a stream."""
```

#### Parameters

- `stream_name` (`str`): The name of the stream from which the last message is to be
retrieved.

#### Returns

- `Dict`[`str`, `Any`] | `None`: A dictionary representing the last message 
in the specified stream. If the stream is empty or the message does not exist,
`None` is returned.

```python
message_db = MessageDB(connection_pool=my_pool)
stream_name = "user_updates"

# Reading the last message from a stream
last_message = message_db.read_last_message(stream_name)

if last_message:
    print("Last message data:", last_message)
else:
    print("No messages found in the stream.")
```

---

## Utility APIs

- [Read Stream](#read-stream)
- [Read Category](#read-category)
- [Write Batch](#write-batch)

### Read Stream

The `read_stream` method retrieves a sequence of messages from a specified stream
within the message database. This method is specifically designed to fetch
messages from a well-defined stream based on a starting position and a
specified number of messages.

```python
def read_stream(
    self, stream_name: str, position: int = 0, no_of_messages: int = 1000
) -> List[Dict[str, Any]]:
    """Read messages from a stream.

    Returns a list of messages from the stream starting from the given position.
    """
```

#### Parameters

- `stream_name` (`str`): The name of the stream from which messages are to be
retrieved. This name must include a hyphen (-) to be recognized as a valid
stream identifier.
- `position` (`int`, optional): The zero-based index position from which to start
reading messages. Defaults to 0, which starts reading from the beginning of
the stream.
- `no_of_messages` (`int`, optional): The maximum number of messages to retrieve
from the stream. Defaults to 1000.

#### Returns

- `List`[`Dict`[`str`, `Any`]]: A list of dictionaries, each representing a message
retrieved from the stream. Each dictionary contains the message details
structured in key-value pairs.

#### Exceptions

- `ValueError`: Raised if the provided stream_name does not contain a hyphen
(-), which is required to validate the name as a stream identifier.

```python
message_db = MessageDB(connection_pool=my_pool)
stream_name = "user-updates-2023"
position = 0
no_of_messages = 100

messages = message_db.read_stream(stream_name, position, no_of_messages)

for message in messages:
    print(message)
```

---

### Read Category

The `read_category` method retrieves a sequence of messages from a specified
category within the message database. It is designed to fetch messages based
on a category identifier, starting from a specific position, and up to a
defined limit of messages.

```python
def read_category(
    self, category_name: str, position: int = 0, no_of_messages: int = 1000
) -> List[Dict[str, Any]]:
    """Read messages from a category.

    Returns a list of messages from the category starting from the given position.
    """
```

#### Parameters

- `category_name` (`str`): The name of the category from which messages are to be
retrieved. This identifier should not include a hyphen (-) to validate it as
a category name.
- `position` (`int`, optional): The zero-based index position from which to start
reading messages within the category. Defaults to 0.
- `no_of_messages` (`int`, optional): The maximum number of messages to retrieve
from the category. Defaults to 1000.

#### Returns

- List[Dict[str, Any]]: A list of dictionaries, each representing a message.
Each dictionary includes details about the message such as the message ID,
stream name, type, position, global position, data, metadata, and time of
creation.

#### Exceptions

- `ValueError`: Raised if the provided category_name contains a hyphen (-),
which is not allowed for category identifiers and implies a misunderstanding
between streams and categories.

```python
message_db = MessageDB(connection_pool=my_pool)
category_name = "user_updates"
position = 0
no_of_messages = 100

# Reading messages from a category
messages = message_db.read_category(category_name, position, no_of_messages)

for message in messages:
    print(message)
```

---

### Write Batch

The `write_batch` method is designed to write a series of messages to a
specified stream in a batch operation. It ensures atomicity in writing
operations, where all messages are written in sequence, and each subsequent
message can optionally depend on the position of the last message written.
This method is useful when multiple messages need to be written as a part of a
single transactional context.

```python
def write_batch(
    self, stream_name, data, expected_version: int | None = None
) -> int:
    """Write a batch of messages to a stream."""
```

#### Parameters

- `stream_name` (`str`): The name of the stream to which the batch of messages
will be written.
- `data` (`List`[`Tuple`[`str`, `Dict`, `Dict` | `None`]]): A list of tuples,
where each tuple represents a message. The tuple format is (message_type, data,
metadata), with metadata being optional.
- `expected_version` (`int` | `None`, optional): The version of the stream
where the batch operation expects to start writing. This can be used for
concurrency control to ensure messages are written in the expected order.
Defaults to None.

#### Returns

- `position` (`int`): The position (or version number) of the last message
written in the stream as a result of the batch operation.

```python
message_db = MessageDB(connection_pool=my_pool)
stream_name = "order_events"
data = [
    ("OrderCreated", {"order_id": 123, "product_id": 456}, None),
    ("OrderShipped",
        {"order_id": 123, "shipment_id": 789},
        {"priority": "high"}
    ),
    ("OrderDelivered", {"order_id": 123, "delivery_date": "2024-04-23"}, None)
]

# Writing a batch of messages to a stream
last_position = message_db.write_batch(stream_name, data)

print(f"Last message written at position: {last_position}")
```

---

## License

[MIT](https://github.com/subhashb/message-db-py/blob/main/LICENSE)
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "message-db-py",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": "message-db, event-sourcing, event-store, messaging, cqrs, command-query-responsibility-segregation, events, streaming, database, postgresql, python, async, microservices, distributed-systems, message-queue, event-driven, real-time",
    "author": "Subhash Bhushan",
    "author_email": "subhash.bhushan@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/b3/9a/5e5afcdf4e0db611323b568ce13b3eb02925046cab041ccd056e5a413795/message_db_py-0.2.0.tar.gz",
    "platform": null,
    "description": "# message-db-py\n\nMessage DB is a fully-featured event store and message store implemented in\nPostgreSQL for Pub/Sub, Event Sourcing, Messaging, and Evented Microservices\napplications.\n\n`message-db-py` is a Python interface to the Message DB event store and message \nstore, designed for easy integration into Python applications.\n\n[![Build Status](https://github.com/subhashb/message-db-py/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/subhashb/message-db-py/actions)\n[![Code Coverage](https://codecov.io/gh/subhashb/message-db-py/graph/badge.svg?token=QMNUSLN2OM)](https://codecov.io/gh/subhashb/message-db-py)\n[![Python Version](https://img.shields.io/pypi/pyversions/message-db-py.svg)](https://pypi.org/project/message-db-py/)\n[![PyPI version](https://badge.fury.io/py/message-db-py.svg)](https://pypi.org/project/message-db-py/)\n[![License](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n\n## Installation\n\nUse pip to install:\n\n```shell\n$ pip install message-db-py\n```\n\n## Setting up Message DB database\n\nClone the Message DB repository to set up the database:\n\n```shell\n$ git clone git@github.com:message-db/message-db.git\n```\n\nMore detailed instructions are in the [Installation](https://github.com/message-db/message-db?tab=readme-ov-file#installation)\nsection of Message DB repo.\n\nRunning the database installation script creates the database, schema, table,\nindexes, functions, views, types, a user role, and limit the user's privileges\nto the message store's public interface.\n\nThe installation script is in the database directory of the cloned Message DB\nrepo. Change directory to the message-db directory where you cloned the repo,\nand run the script:\n\n```shell\n$ database/install.sh\n```\n\nMake sure that your default Postgres user has administrative privileges.\n\n### Database Name\n\nBy default, the database creation tool will create a database named\n`message_store`.\n\nIf you prefer either a different database name, you can override the name\nusing the `DATABASE_NAME` environment variable.\n\n```shell\n$ DATABASE_NAME=some_other_database database/install.sh\n```\n\n### Uninstalling the Database\n\nIf you need to drop the database (for example, on a local dev machine):\n\n``` bash\n$ database/uninstall.sh\n```\n\nIf you're upgrading a previous version of the database:\n\n``` bash\n$ database/update.sh\n```\n\n## Docker Image\n\nYou can optionally use a Docker image with Message DB pre-installed and ready\nto go. This is especially helpful to run test cases locally.\n\nThe docker image is available in [Docker Hub](https://hub.docker.com/r/ethangarofolo/message-db).\nThe source is in [Gitlab](https://gitlab.com/such-software/message-db-docker)\n\n## Usage\n\nThe complete user guide for Message DB is available at \n[http://docs.eventide-project.org/user-guide/message-db/]\n(http://docs.eventide-project.org/user-guide/message-db/).\n\nBelow is documentation for methods exposed through the Python API.\n\n### Quickstart\n\nHere's a quick example of how to publish and read messages using Message-DB-py:\n\n```python\nfrom message_db import MessageDB\n\n# Initialize the database connection\nstore = MessageDB(CONNECTION_URL)\n\n# Write a message\nstore.write(\"user_stream\", \"register\", {\"name\": \"John Doe\"})\n\n# Read a message\nmessage = store.read_last_message(\"user_stream\")\nprint(message)\n```\n\n## Primary APIs\n\n- [Write Messages](#write)\n- [Read Messages](#read-messages-from-a-stream-or-category)\n- [Read Last Message from stream](#read-last-message-from-stream)\n\n### Write messages\n\nThe `write` method is used to append a new message to a specified stream within\nthe message database. This method ensures that the message is written with the\nappropriate type, data, and metadata, and optionally, at a specific expected\nversion of the stream.\n\n```python\ndef write(\n    self,\n    stream_name: str,\n    message_type: str,\n    data: Dict,\n    metadata: Dict | None = None,\n    expected_version: int | None = None,\n) -> int:\n    \"\"\"Write a message to a stream.\"\"\"\n```\n\n#### Parameters\n\n- `stream_name` (`str`): The name of the stream to which the message will be\nwritten. This identifies the logical series of messages.\n- `message_type` (`str`): The type of message being written. Typically, this\nreflects the nature of the event or data change the message represents.\n- `data` (`Dict`): The data payload of the message. This should be a dictionary\ncontaining the actual information the message carries.\n- `metadata` (`Dict` | `None`): Optional. Metadata about the message, provided as a\ndictionary. Metadata can include any additional information that is not part of\nthe - main data payload, such as sender information or timestamps.\nDefaults to None.\n- `expected_version` (`int` | `None`): Optional. The version of the stream where the\nclient expects to write the message. This is used for concurrency control and\nensuring the integrity of the stream's order. Defaults to `None`.\n\n#### Returns\n\n- `position` (`int`): The position (or version number) of the message in the\nstream after it has been successfully written.\n\n```python\nmessage_db = MessageDB(connection_pool=my_pool)\nstream_name = \"user_updates\"\nmessage_type = \"UserCreated\"\ndata = {\"user_id\": 123, \"username\": \"example\"}\nmetadata = {\"source\": \"web_app\"}\n\nposition = message_db.write(stream_name, message_type, data, metadata)\n\nprint(\"Message written at position:\", position)\n```\n\n---\n\n### Read messages from a stream or category\n\nThe `read` method retrieves messages from a specified stream or category. This\nmethod supports flexible query options through a direct SQL parameter or by\ndetermining the SQL based on the stream name and its context\n(stream vs. category vs. all messages).\n\n```python\ndef read(\n    self,\n    stream_name: str,\n    sql: str | None = None,\n    position: int = 0,\n    no_of_messages: int = 1000,\n) -> List[Dict[str, Any]]:\n    \"\"\"Read messages from a stream or category.\n\n    Returns a list of messages from the stream or category starting from the given position.\n    \"\"\"\n```\n\n#### Parameters\n\n- `stream_name` (`str`): The identifier for the stream or category from which\nmessages are to be retrieved. Special names like \"$all\" can be used to fetch\nmessages across all streams.\n- `sql` (`str` | `None`, optional): An optional SQL query string that if\nprovided, overrides the default SQL generation based on the stream_name.\nIf None, the SQL is automatically generated based on the stream_name value.\nDefaults to None.\n- `position` (`int`, optional): The starting position in the stream or category\nfrom which to begin reading messages. Defaults to 0.\n- `no_of_messages` (`int`, optional): The maximum number of messages to\nretrieve. Defaults to 1000.\n\n#### Returns\n\n- List[Dict[str, Any]]: A list of messages, where each message is\nrepresented as a dictionary containing details such as the message ID,\nstream name, type, position, global position, data, metadata, and timestamp.\n\n```python\nmessage_db = MessageDB(connection_pool=my_pool)\nstream_name = \"user-updates\"\nposition = 10\nno_of_messages = 50\n\n# Reading from a specific stream\nmessages = message_db.read(stream_name, position=position, no_of_messages=no_of_messages)\n\n# Custom SQL query\ncustom_sql = \"SELECT * FROM get_stream_messages(%(stream_name)s, %(position)s, %(batch_size)s);\"\nmessages = message_db.read(stream_name, sql=custom_sql, position=position, no_of_messages=no_of_messages)\n\nfor message in messages:\n    print(message)\n```\n\n---\n\n### Read Last Message from stream\n\nThe `read_last_message` method retrieves the most recent message from a\nspecified stream. This method is useful when you need the latest state or\nevent in a stream without querying the entire message history.\n\n```python\ndef read_last_message(self, stream_name: str) -> Dict[str, Any] | None:\n    \"\"\"Read the last message from a stream.\"\"\"\n```\n\n#### Parameters\n\n- `stream_name` (`str`): The name of the stream from which the last message is to be\nretrieved.\n\n#### Returns\n\n- `Dict`[`str`, `Any`] | `None`: A dictionary representing the last message \nin the specified stream. If the stream is empty or the message does not exist,\n`None` is returned.\n\n```python\nmessage_db = MessageDB(connection_pool=my_pool)\nstream_name = \"user_updates\"\n\n# Reading the last message from a stream\nlast_message = message_db.read_last_message(stream_name)\n\nif last_message:\n    print(\"Last message data:\", last_message)\nelse:\n    print(\"No messages found in the stream.\")\n```\n\n---\n\n## Utility APIs\n\n- [Read Stream](#read-stream)\n- [Read Category](#read-category)\n- [Write Batch](#write-batch)\n\n### Read Stream\n\nThe `read_stream` method retrieves a sequence of messages from a specified stream\nwithin the message database. This method is specifically designed to fetch\nmessages from a well-defined stream based on a starting position and a\nspecified number of messages.\n\n```python\ndef read_stream(\n    self, stream_name: str, position: int = 0, no_of_messages: int = 1000\n) -> List[Dict[str, Any]]:\n    \"\"\"Read messages from a stream.\n\n    Returns a list of messages from the stream starting from the given position.\n    \"\"\"\n```\n\n#### Parameters\n\n- `stream_name` (`str`): The name of the stream from which messages are to be\nretrieved. This name must include a hyphen (-) to be recognized as a valid\nstream identifier.\n- `position` (`int`, optional): The zero-based index position from which to start\nreading messages. Defaults to 0, which starts reading from the beginning of\nthe stream.\n- `no_of_messages` (`int`, optional): The maximum number of messages to retrieve\nfrom the stream. Defaults to 1000.\n\n#### Returns\n\n- `List`[`Dict`[`str`, `Any`]]: A list of dictionaries, each representing a message\nretrieved from the stream. Each dictionary contains the message details\nstructured in key-value pairs.\n\n#### Exceptions\n\n- `ValueError`: Raised if the provided stream_name does not contain a hyphen\n(-), which is required to validate the name as a stream identifier.\n\n```python\nmessage_db = MessageDB(connection_pool=my_pool)\nstream_name = \"user-updates-2023\"\nposition = 0\nno_of_messages = 100\n\nmessages = message_db.read_stream(stream_name, position, no_of_messages)\n\nfor message in messages:\n    print(message)\n```\n\n---\n\n### Read Category\n\nThe `read_category` method retrieves a sequence of messages from a specified\ncategory within the message database. It is designed to fetch messages based\non a category identifier, starting from a specific position, and up to a\ndefined limit of messages.\n\n```python\ndef read_category(\n    self, category_name: str, position: int = 0, no_of_messages: int = 1000\n) -> List[Dict[str, Any]]:\n    \"\"\"Read messages from a category.\n\n    Returns a list of messages from the category starting from the given position.\n    \"\"\"\n```\n\n#### Parameters\n\n- `category_name` (`str`): The name of the category from which messages are to be\nretrieved. This identifier should not include a hyphen (-) to validate it as\na category name.\n- `position` (`int`, optional): The zero-based index position from which to start\nreading messages within the category. Defaults to 0.\n- `no_of_messages` (`int`, optional): The maximum number of messages to retrieve\nfrom the category. Defaults to 1000.\n\n#### Returns\n\n- List[Dict[str, Any]]: A list of dictionaries, each representing a message.\nEach dictionary includes details about the message such as the message ID,\nstream name, type, position, global position, data, metadata, and time of\ncreation.\n\n#### Exceptions\n\n- `ValueError`: Raised if the provided category_name contains a hyphen (-),\nwhich is not allowed for category identifiers and implies a misunderstanding\nbetween streams and categories.\n\n```python\nmessage_db = MessageDB(connection_pool=my_pool)\ncategory_name = \"user_updates\"\nposition = 0\nno_of_messages = 100\n\n# Reading messages from a category\nmessages = message_db.read_category(category_name, position, no_of_messages)\n\nfor message in messages:\n    print(message)\n```\n\n---\n\n### Write Batch\n\nThe `write_batch` method is designed to write a series of messages to a\nspecified stream in a batch operation. It ensures atomicity in writing\noperations, where all messages are written in sequence, and each subsequent\nmessage can optionally depend on the position of the last message written.\nThis method is useful when multiple messages need to be written as a part of a\nsingle transactional context.\n\n```python\ndef write_batch(\n    self, stream_name, data, expected_version: int | None = None\n) -> int:\n    \"\"\"Write a batch of messages to a stream.\"\"\"\n```\n\n#### Parameters\n\n- `stream_name` (`str`): The name of the stream to which the batch of messages\nwill be written.\n- `data` (`List`[`Tuple`[`str`, `Dict`, `Dict` | `None`]]): A list of tuples,\nwhere each tuple represents a message. The tuple format is (message_type, data,\nmetadata), with metadata being optional.\n- `expected_version` (`int` | `None`, optional): The version of the stream\nwhere the batch operation expects to start writing. This can be used for\nconcurrency control to ensure messages are written in the expected order.\nDefaults to None.\n\n#### Returns\n\n- `position` (`int`): The position (or version number) of the last message\nwritten in the stream as a result of the batch operation.\n\n```python\nmessage_db = MessageDB(connection_pool=my_pool)\nstream_name = \"order_events\"\ndata = [\n    (\"OrderCreated\", {\"order_id\": 123, \"product_id\": 456}, None),\n    (\"OrderShipped\",\n        {\"order_id\": 123, \"shipment_id\": 789},\n        {\"priority\": \"high\"}\n    ),\n    (\"OrderDelivered\", {\"order_id\": 123, \"delivery_date\": \"2024-04-23\"}, None)\n]\n\n# Writing a batch of messages to a stream\nlast_position = message_db.write_batch(stream_name, data)\n\nprint(f\"Last message written at position: {last_position}\")\n```\n\n---\n\n## License\n\n[MIT](https://github.com/subhashb/message-db-py/blob/main/LICENSE)",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "The Python interface to the MessageDB Event Store and Message Store",
    "version": "0.2.0",
    "project_urls": null,
    "split_keywords": [
        "message-db",
        " event-sourcing",
        " event-store",
        " messaging",
        " cqrs",
        " command-query-responsibility-segregation",
        " events",
        " streaming",
        " database",
        " postgresql",
        " python",
        " async",
        " microservices",
        " distributed-systems",
        " message-queue",
        " event-driven",
        " real-time"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4a28e06b6778b469778fafdf08537f44ec18afeb14255e6a56717ffae6459c35",
                "md5": "d8a1bb34dcee25d737cd458540dbadd7",
                "sha256": "8caa5e5c3a4083e7d2a5b9b0bd98074e1e86a5f82a49641376b06f18453e9912"
            },
            "downloads": -1,
            "filename": "message_db_py-0.2.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d8a1bb34dcee25d737cd458540dbadd7",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 9341,
            "upload_time": "2024-04-24T03:56:30",
            "upload_time_iso_8601": "2024-04-24T03:56:30.110825Z",
            "url": "https://files.pythonhosted.org/packages/4a/28/e06b6778b469778fafdf08537f44ec18afeb14255e6a56717ffae6459c35/message_db_py-0.2.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b39a5e5afcdf4e0db611323b568ce13b3eb02925046cab041ccd056e5a413795",
                "md5": "4e2c3d111fe38496d7183d582d19c5cc",
                "sha256": "fbeb24c8eb28c55cf080865e8e21478e1e64292aa87491ce4026fd2f71d7b1bc"
            },
            "downloads": -1,
            "filename": "message_db_py-0.2.0.tar.gz",
            "has_sig": false,
            "md5_digest": "4e2c3d111fe38496d7183d582d19c5cc",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 11817,
            "upload_time": "2024-04-24T03:56:31",
            "upload_time_iso_8601": "2024-04-24T03:56:31.967183Z",
            "url": "https://files.pythonhosted.org/packages/b3/9a/5e5afcdf4e0db611323b568ce13b3eb02925046cab041ccd056e5a413795/message_db_py-0.2.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-24 03:56:31",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "message-db-py"
}
        
Elapsed time: 0.20842s