# esdb-py
[![PyPI version](https://badge.fury.io/py/esdb.svg)](https://pypi.org/project/esdb/)
[![codecov](https://codecov.io/gh/andriykohut/esdb-py/branch/main/graph/badge.svg?token=YVDPTDBPFB)](https://codecov.io/gh/andriykohut/esdb-py)
## EventStoreDB Python gRPC client
> NOTE: This project is still work in progress
<!-- TOC -->
* [Completed features](#completed-features)
* [Installation](#installation)
* [Development](#development)
* [Usage](#usage)
* [Connection string](#connection-string)
* [Discovery and node preferences](#discovery-and-node-preferences)
* [Connection configuration](#connection-configuration)
* [Append, Read, Catch-up subscriptions](#append-read-catch-up-subscriptions)
* [Batch append](#batch-append)
* [Catch-up subscription to all events with filtering](#catch-up-subscription-to-all-events-with-filtering)
* [Persistent subscriptions](#persistent-subscriptions)
<!-- TOC -->
## Completed features
* [x] secure connection
* [x] basic auth
* [x] connection string parsing
* [x] streams
* [x] append
* [x] batch append (v21.10+)
* [x] delete
* [x] read stream
* [x] read all with stream/event type filters (v21.10+)
* [x] catch-up subscriptions
* [x] tombstone
* [x] filtering
* [x] persistent subscriptions
* [x] create
* [x] read stream
* [x] read all with filter (v21.10+)
* [x] update
* [x] delete
* [x] list
* [x] info
* [ ] reply parked events
* [ ] CRUD for projections
* [ ] users
## Installation
Using pip:
```sh
pip install esdb
```
Using poetry:
```sh
poetry add esdb
```
## Development
1. Install [poetry](https://python-poetry.org/docs/#installation)
2. Create virtualenv (i.e. using pyenv):
```sh
pyenv install 3.12.0
pyenv virtualenv 3.12.0 esdb-py
pyenv local esdb-py
```
3. Install deps with `poetry install`
4. Start eventstore in docker: `make run-esdb`
5. Run the tests: `pytest tests`
## Usage
Have a look at [tests](https://github.com/andriykohut/esdb-py/tree/main/tests) for more examples.
### Connection string examples
DNS discovery with credentials, discovery configuration, node preference and ca file path
```
esdb+discover://admin:changeit@localhost:2111?discoveryInterval=0&maxDiscoverAttempts=3&tlsCafile=certs/ca/ca.crt&nodePreference=follower
```
Single-node insecure connection
```
esdb://localhost:2111?tls=false
```
Supported parameters:
- `discoveryInterval`
- `gossipTimeout`
- `maxDiscoverAttempts`
- `nodePreference`
- `keepAliveInterval`
- `keepAliveTimeout`
- `tls`
- `tlsCafile`
- `tlsVerifyCert`
- `defaultDeadline`
Connection string can be generated [here](https://developers.eventstore.com/clients/grpc/#connection-details).
### Discovery and node preferences
```py
from esdb import ESClient
client = ESClient("esdb+discover://admin:changeit@localhost:2111?nodePreference=follower")
```
### Connection configuration
```py
from esdb import ESClient
# Connect without TLS
client = ESClient("esdb://localhost:2111?tls=false")
# Secure connection with basic auth and keepalive
client = ESClient("esdb://admin:changeit@localhost:2111?tlsCafile=certs/ca/ca.crt&keepAliveInterval=5&keepAliveTimeout=5")
```
### Append, Read, Catch-up subscriptions
```py
import asyncio
import datetime
import uuid
from esdb import ESClient
client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = f"test-{str(uuid.uuid4())}"
async def streams():
async with client.connect() as conn:
# Appending to stream
for i in range(10):
append_result = await conn.streams.append(
stream=stream,
event_type="test_event",
data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
)
# Read up to 10 events
async for result in conn.streams.read(stream=stream, count=10):
print(result.data)
# Read up to 10 events, backwards
async for result in conn.streams.read(stream=stream, count=10, backwards=True):
print(result.data)
# Read up to 10 events, starting from 5th event
async for result in conn.streams.read(stream=stream, count=10, revision=5):
print(result.data)
# Read up to 10 events backwards, starting from 5th event
async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
print(result.data)
# Create a catch-up subscription to a stream
async for result in conn.streams.read(stream=stream, subscribe=True):
print(result.data)
asyncio.run(streams())
```
### Batch append
```py
import asyncio
import uuid
from esdb import ESClient
from esdb.streams import Message
async def batch_append():
# Append multiple events in as a single batch
# Batch append is not supported on EventStore < v21.10
stream = str(uuid.uuid4())
messages: list[Message] = [
Message(event_type="one", data={"item": 1}),
Message(event_type="one", data={"item": 2}),
Message(event_type="one", data={"item": 3}),
Message(event_type="two", data={"item": 1}),
Message(event_type="two", data={"item": 2}),
Message(event_type="two", data={"item": 3}),
]
async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
response = await conn.streams.batch_append(stream=stream, messages=messages)
assert response.current_revision == 5
events = [e async for e in conn.streams.read(stream=stream, count=50)]
assert len(events) == 6
asyncio.run(batch_append())
```
### Catch-up subscription to all events with filtering
```py
import uuid
import asyncio
from esdb import ESClient
from esdb.shared import Filter
async def filters():
async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
# Append 10 events with the same prefix to random streams
for i in range(10):
await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
# subscribe to events from all streams, filtering by event type
async for event in conn.streams.read_all(
subscribe=True, # subscribe will wait for events, use count=<n> to read <n> events and stop
filter_by=Filter(
kind=Filter.Kind.EVENT_TYPE,
regex="^prefix-",
# Checkpoint only required when subscribe=True, it's not needed when using count=<int>
checkpoint_interval_multiplier=1000,
),
):
print(event)
asyncio.run(filters())
```
### Persistent subscriptions
```python
import asyncio
from esdb import ESClient
from esdb.shared import Filter
from esdb.subscriptions import SubscriptionSettings, NackAction
client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = "stream-foo"
group = "group-bar"
async def persistent():
async with client.connect() as conn:
# emit some events to the same stream
for i in range(50):
await conn.streams.append(stream, "foobar", {"i": i})
# create a stream subscription
await conn.subscriptions.create_stream_subscription(
stream=stream,
group_name=group,
settings=SubscriptionSettings(
max_subscriber_count=50,
read_batch_size=5,
live_buffer_size=10,
history_buffer_size=10,
consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
checkpoint_ms=10000,
),
)
# create subscription to all events with filtering
# Only supported on EventStore v21.10+
await conn.subscriptions.create_all_subscription(
group_name="subscription_group",
filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex="^some_type$", checkpoint_interval_multiplier=200),
settings=SubscriptionSettings(
read_batch_size=50,
live_buffer_size=100,
history_buffer_size=100,
max_retry_count=2,
checkpoint_ms=20000,
),
)
# read from a subscription
async with client.connect() as conn:
sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)
async for event in sub:
try:
# do work with event
print(event)
await sub.ack([event])
except Exception as err:
await sub.nack([event], NackAction.RETRY, reason=str(err))
# get subscription info
info = await conn.subscriptions.get_info(group, stream)
assert info.group_name == group
# delete subscription
await conn.subscriptions.delete(group, stream)
# list subscriptions
subs = await conn.subscriptions.list()
for sub in subs:
print(sub.total_items)
asyncio.run(persistent())
```
Raw data
{
"_id": null,
"home_page": "https://github.com/andriykohut/esdb-py",
"name": "esdb",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.9,<4.0",
"maintainer_email": "",
"keywords": "eventstore,esdb,event sourcing,cqrs,event-sourcing,grpcio,grpc",
"author": "Andrii Kohut",
"author_email": "kogut.andriy@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/a8/36/28dc6153021725623c2232aca6785ed31c2bea8a7188ac6746009b1ccc63/esdb-0.3.5.tar.gz",
"platform": null,
"description": "# esdb-py\n\n[![PyPI version](https://badge.fury.io/py/esdb.svg)](https://pypi.org/project/esdb/)\n[![codecov](https://codecov.io/gh/andriykohut/esdb-py/branch/main/graph/badge.svg?token=YVDPTDBPFB)](https://codecov.io/gh/andriykohut/esdb-py)\n\n## EventStoreDB Python gRPC client\n\n> NOTE: This project is still work in progress\n\n<!-- TOC -->\n* [Completed features](#completed-features)\n* [Installation](#installation)\n* [Development](#development)\n* [Usage](#usage)\n * [Connection string](#connection-string)\n * [Discovery and node preferences](#discovery-and-node-preferences)\n * [Connection configuration](#connection-configuration)\n * [Append, Read, Catch-up subscriptions](#append-read-catch-up-subscriptions)\n * [Batch append](#batch-append)\n * [Catch-up subscription to all events with filtering](#catch-up-subscription-to-all-events-with-filtering)\n * [Persistent subscriptions](#persistent-subscriptions)\n<!-- TOC -->\n\n## Completed features\n\n* [x] secure connection\n* [x] basic auth\n* [x] connection string parsing\n* [x] streams\n * [x] append\n * [x] batch append (v21.10+)\n * [x] delete\n * [x] read stream\n * [x] read all with stream/event type filters (v21.10+)\n * [x] catch-up subscriptions\n * [x] tombstone\n * [x] filtering\n* [x] persistent subscriptions\n * [x] create\n * [x] read stream\n * [x] read all with filter (v21.10+)\n * [x] update\n * [x] delete\n * [x] list\n * [x] info\n * [ ] reply parked events\n* [ ] CRUD for projections\n* [ ] users\n\n## Installation\n\nUsing pip:\n\n```sh\npip install esdb\n```\n\nUsing poetry:\n\n```sh\npoetry add esdb\n```\n\n## Development\n\n1. Install [poetry](https://python-poetry.org/docs/#installation)\n2. Create virtualenv (i.e. using pyenv):\n\n ```sh\n pyenv install 3.12.0\n pyenv virtualenv 3.12.0 esdb-py\n pyenv local esdb-py\n ```\n\n3. Install deps with `poetry install`\n4. Start eventstore in docker: `make run-esdb`\n5. Run the tests: `pytest tests`\n\n## Usage\n\nHave a look at [tests](https://github.com/andriykohut/esdb-py/tree/main/tests) for more examples.\n\n### Connection string examples\n\nDNS discovery with credentials, discovery configuration, node preference and ca file path\n```\nesdb+discover://admin:changeit@localhost:2111?discoveryInterval=0&maxDiscoverAttempts=3&tlsCafile=certs/ca/ca.crt&nodePreference=follower\n```\n\nSingle-node insecure connection\n```\nesdb://localhost:2111?tls=false\n```\n\nSupported parameters:\n - `discoveryInterval`\n - `gossipTimeout`\n - `maxDiscoverAttempts`\n - `nodePreference`\n - `keepAliveInterval`\n - `keepAliveTimeout`\n - `tls`\n - `tlsCafile`\n - `tlsVerifyCert`\n - `defaultDeadline`\n\n\nConnection string can be generated [here](https://developers.eventstore.com/clients/grpc/#connection-details).\n\n### Discovery and node preferences\n\n```py\nfrom esdb import ESClient\n\nclient = ESClient(\"esdb+discover://admin:changeit@localhost:2111?nodePreference=follower\")\n\n```\n\n### Connection configuration\n\n```py\nfrom esdb import ESClient\n\n# Connect without TLS\nclient = ESClient(\"esdb://localhost:2111?tls=false\")\n\n# Secure connection with basic auth and keepalive\nclient = ESClient(\"esdb://admin:changeit@localhost:2111?tlsCafile=certs/ca/ca.crt&keepAliveInterval=5&keepAliveTimeout=5\")\n```\n\n### Append, Read, Catch-up subscriptions\n\n```py\nimport asyncio\nimport datetime\nimport uuid\n\nfrom esdb import ESClient\n\n\nclient = ESClient(\"esdb+discover://admin:changeit@localhost:2111\")\nstream = f\"test-{str(uuid.uuid4())}\"\n\n\nasync def streams():\n async with client.connect() as conn:\n # Appending to stream\n for i in range(10):\n append_result = await conn.streams.append(\n stream=stream,\n event_type=\"test_event\",\n data={\"i\": i, \"ts\": datetime.datetime.utcnow().isoformat()},\n )\n\n # Read up to 10 events\n async for result in conn.streams.read(stream=stream, count=10):\n print(result.data)\n\n # Read up to 10 events, backwards\n async for result in conn.streams.read(stream=stream, count=10, backwards=True):\n print(result.data)\n\n # Read up to 10 events, starting from 5th event\n async for result in conn.streams.read(stream=stream, count=10, revision=5):\n print(result.data)\n\n # Read up to 10 events backwards, starting from 5th event\n async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):\n print(result.data)\n\n # Create a catch-up subscription to a stream\n async for result in conn.streams.read(stream=stream, subscribe=True):\n print(result.data)\n\n\nasyncio.run(streams())\n```\n\n### Batch append\n\n```py\nimport asyncio\nimport uuid\n\nfrom esdb import ESClient\nfrom esdb.streams import Message\n\n\nasync def batch_append():\n # Append multiple events in as a single batch\n # Batch append is not supported on EventStore < v21.10\n stream = str(uuid.uuid4())\n messages: list[Message] = [\n Message(event_type=\"one\", data={\"item\": 1}),\n Message(event_type=\"one\", data={\"item\": 2}),\n Message(event_type=\"one\", data={\"item\": 3}),\n Message(event_type=\"two\", data={\"item\": 1}),\n Message(event_type=\"two\", data={\"item\": 2}),\n Message(event_type=\"two\", data={\"item\": 3}),\n ]\n async with ESClient(\"esdb+discover://admin:changeit@localhost:2111\").connect() as conn:\n response = await conn.streams.batch_append(stream=stream, messages=messages)\n assert response.current_revision == 5\n events = [e async for e in conn.streams.read(stream=stream, count=50)]\n assert len(events) == 6\n\n\nasyncio.run(batch_append())\n```\n\n### Catch-up subscription to all events with filtering\n\n```py\nimport uuid\nimport asyncio\n\nfrom esdb import ESClient\nfrom esdb.shared import Filter\n\n\nasync def filters():\n async with ESClient(\"esdb+discover://admin:changeit@localhost:2111\").connect() as conn:\n # Append 10 events with the same prefix to random streams\n for i in range(10):\n await conn.streams.append(stream=str(uuid.uuid4()), event_type=f\"prefix-{i}\", data=b\"\")\n # subscribe to events from all streams, filtering by event type\n async for event in conn.streams.read_all(\n subscribe=True, # subscribe will wait for events, use count=<n> to read <n> events and stop\n filter_by=Filter(\n kind=Filter.Kind.EVENT_TYPE,\n regex=\"^prefix-\",\n # Checkpoint only required when subscribe=True, it's not needed when using count=<int>\n checkpoint_interval_multiplier=1000,\n ),\n ):\n print(event)\n\n\nasyncio.run(filters())\n```\n\n### Persistent subscriptions\n\n```python\nimport asyncio\nfrom esdb import ESClient\nfrom esdb.shared import Filter\nfrom esdb.subscriptions import SubscriptionSettings, NackAction\n\nclient = ESClient(\"esdb+discover://admin:changeit@localhost:2111\")\n\nstream = \"stream-foo\"\ngroup = \"group-bar\"\n\n\nasync def persistent():\n async with client.connect() as conn:\n # emit some events to the same stream\n for i in range(50):\n await conn.streams.append(stream, \"foobar\", {\"i\": i})\n\n # create a stream subscription\n await conn.subscriptions.create_stream_subscription(\n stream=stream,\n group_name=group,\n settings=SubscriptionSettings(\n max_subscriber_count=50,\n read_batch_size=5,\n live_buffer_size=10,\n history_buffer_size=10,\n consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,\n checkpoint_ms=10000,\n ),\n )\n\n # create subscription to all events with filtering\n # Only supported on EventStore v21.10+\n await conn.subscriptions.create_all_subscription(\n group_name=\"subscription_group\",\n filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex=\"^some_type$\", checkpoint_interval_multiplier=200),\n settings=SubscriptionSettings(\n read_batch_size=50,\n live_buffer_size=100,\n history_buffer_size=100,\n max_retry_count=2,\n checkpoint_ms=20000,\n ),\n )\n\n # read from a subscription\n async with client.connect() as conn:\n sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)\n async for event in sub:\n try:\n # do work with event\n print(event)\n await sub.ack([event])\n except Exception as err:\n await sub.nack([event], NackAction.RETRY, reason=str(err))\n\n # get subscription info\n info = await conn.subscriptions.get_info(group, stream)\n assert info.group_name == group\n\n # delete subscription\n await conn.subscriptions.delete(group, stream)\n \n # list subscriptions\n subs = await conn.subscriptions.list()\n for sub in subs:\n print(sub.total_items)\n\n\nasyncio.run(persistent())\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "gRPC client for EventStore DB",
"version": "0.3.5",
"project_urls": {
"Homepage": "https://github.com/andriykohut/esdb-py",
"Repository": "https://github.com/andriykohut/esdb-py"
},
"split_keywords": [
"eventstore",
"esdb",
"event sourcing",
"cqrs",
"event-sourcing",
"grpcio",
"grpc"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "9c3ff72fae47146573ed2006410f9a06ac2da8c7d4ad6b7aa8376302bbc68369",
"md5": "18ea152edd5a0ee18801343108b4e3f2",
"sha256": "866fc108d933becc23ea7e20f74e0c1f01337c17fd4705c4381ab2e2d7af0e42"
},
"downloads": -1,
"filename": "esdb-0.3.5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "18ea152edd5a0ee18801343108b4e3f2",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9,<4.0",
"size": 80360,
"upload_time": "2023-10-05T20:31:55",
"upload_time_iso_8601": "2023-10-05T20:31:55.094310Z",
"url": "https://files.pythonhosted.org/packages/9c/3f/f72fae47146573ed2006410f9a06ac2da8c7d4ad6b7aa8376302bbc68369/esdb-0.3.5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "a83628dc6153021725623c2232aca6785ed31c2bea8a7188ac6746009b1ccc63",
"md5": "1f163120a7d42f6d719d20e04cf8dd95",
"sha256": "0311dabd56ce0b42b5c55e13b7085e74276270327e08a6164929cad739366ce6"
},
"downloads": -1,
"filename": "esdb-0.3.5.tar.gz",
"has_sig": false,
"md5_digest": "1f163120a7d42f6d719d20e04cf8dd95",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9,<4.0",
"size": 63856,
"upload_time": "2023-10-05T20:31:57",
"upload_time_iso_8601": "2023-10-05T20:31:57.269575Z",
"url": "https://files.pythonhosted.org/packages/a8/36/28dc6153021725623c2232aca6785ed31c2bea8a7188ac6746009b1ccc63/esdb-0.3.5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-10-05 20:31:57",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "andriykohut",
"github_project": "esdb-py",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "esdb"
}