esdb


Nameesdb JSON
Version 0.3.5 PyPI version JSON
download
home_pagehttps://github.com/andriykohut/esdb-py
SummarygRPC client for EventStore DB
upload_time2023-10-05 20:31:57
maintainer
docs_urlNone
authorAndrii Kohut
requires_python>=3.9,<4.0
licenseMIT
keywords eventstore esdb event sourcing cqrs event-sourcing grpcio grpc
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # esdb-py

[![PyPI version](https://badge.fury.io/py/esdb.svg)](https://pypi.org/project/esdb/)
[![codecov](https://codecov.io/gh/andriykohut/esdb-py/branch/main/graph/badge.svg?token=YVDPTDBPFB)](https://codecov.io/gh/andriykohut/esdb-py)

## EventStoreDB Python gRPC client

> NOTE: This project is still work in progress

<!-- TOC -->
* [Completed features](#completed-features)
* [Installation](#installation)
* [Development](#development)
* [Usage](#usage)
  * [Connection string](#connection-string)
  * [Discovery and node preferences](#discovery-and-node-preferences)
  * [Connection configuration](#connection-configuration)
  * [Append, Read, Catch-up subscriptions](#append-read-catch-up-subscriptions)
  * [Batch append](#batch-append)
  * [Catch-up subscription to all events with filtering](#catch-up-subscription-to-all-events-with-filtering)
  * [Persistent subscriptions](#persistent-subscriptions)
<!-- TOC -->

## Completed features

* [x] secure connection
* [x] basic auth
* [x] connection string parsing
* [x] streams
  * [x] append
  * [x] batch append (v21.10+)
  * [x] delete
  * [x] read stream
  * [x] read all with stream/event type filters (v21.10+)
  * [x] catch-up subscriptions
  * [x] tombstone
  * [x] filtering
* [x] persistent subscriptions
  * [x] create
  * [x] read stream
  * [x] read all with filter (v21.10+)
  * [x] update
  * [x] delete
  * [x] list
  * [x] info
  * [ ] reply parked events
* [ ] CRUD for projections
* [ ] users

## Installation

Using pip:

```sh
pip install esdb
```

Using poetry:

```sh
poetry add esdb
```

## Development

1. Install [poetry](https://python-poetry.org/docs/#installation)
2. Create virtualenv (i.e. using pyenv):

    ```sh
    pyenv install 3.12.0
    pyenv virtualenv 3.12.0 esdb-py
    pyenv local esdb-py
    ```

3. Install deps with `poetry install`
4. Start eventstore in docker: `make run-esdb`
5. Run the tests: `pytest tests`

## Usage

Have a look at [tests](https://github.com/andriykohut/esdb-py/tree/main/tests) for more examples.

### Connection string examples

DNS discovery with credentials, discovery configuration, node preference and ca file path
```
esdb+discover://admin:changeit@localhost:2111?discoveryInterval=0&maxDiscoverAttempts=3&tlsCafile=certs/ca/ca.crt&nodePreference=follower
```

Single-node insecure connection
```
esdb://localhost:2111?tls=false
```

Supported parameters:
 - `discoveryInterval`
 - `gossipTimeout`
 - `maxDiscoverAttempts`
 - `nodePreference`
 - `keepAliveInterval`
 - `keepAliveTimeout`
 - `tls`
 - `tlsCafile`
 - `tlsVerifyCert`
 - `defaultDeadline`


Connection string can be generated [here](https://developers.eventstore.com/clients/grpc/#connection-details).

### Discovery and node preferences

```py
from esdb import ESClient

client = ESClient("esdb+discover://admin:changeit@localhost:2111?nodePreference=follower")

```

### Connection configuration

```py
from esdb import ESClient

# Connect without TLS
client = ESClient("esdb://localhost:2111?tls=false")

# Secure connection with basic auth and keepalive
client = ESClient("esdb://admin:changeit@localhost:2111?tlsCafile=certs/ca/ca.crt&keepAliveInterval=5&keepAliveTimeout=5")
```

### Append, Read, Catch-up subscriptions

```py
import asyncio
import datetime
import uuid

from esdb import ESClient


client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = f"test-{str(uuid.uuid4())}"


async def streams():
    async with client.connect() as conn:
        # Appending to stream
        for i in range(10):
            append_result = await conn.streams.append(
                stream=stream,
                event_type="test_event",
                data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
            )

        # Read up to 10 events
        async for result in conn.streams.read(stream=stream, count=10):
            print(result.data)

        # Read up to 10 events, backwards
        async for result in conn.streams.read(stream=stream, count=10, backwards=True):
            print(result.data)

        # Read up to 10 events, starting from 5th event
        async for result in conn.streams.read(stream=stream, count=10, revision=5):
            print(result.data)

        # Read up to 10 events backwards, starting from 5th event
        async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
            print(result.data)

        # Create a catch-up subscription to a stream
        async for result in conn.streams.read(stream=stream, subscribe=True):
            print(result.data)


asyncio.run(streams())
```

### Batch append

```py
import asyncio
import uuid

from esdb import ESClient
from esdb.streams import Message


async def batch_append():
    # Append multiple events in as a single batch
    # Batch append is not supported on EventStore < v21.10
    stream = str(uuid.uuid4())
    messages: list[Message] = [
        Message(event_type="one", data={"item": 1}),
        Message(event_type="one", data={"item": 2}),
        Message(event_type="one", data={"item": 3}),
        Message(event_type="two", data={"item": 1}),
        Message(event_type="two", data={"item": 2}),
        Message(event_type="two", data={"item": 3}),
    ]
    async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
        response = await conn.streams.batch_append(stream=stream, messages=messages)
        assert response.current_revision == 5
        events = [e async for e in conn.streams.read(stream=stream, count=50)]
        assert len(events) == 6


asyncio.run(batch_append())
```

### Catch-up subscription to all events with filtering

```py
import uuid
import asyncio

from esdb import ESClient
from esdb.shared import Filter


async def filters():
    async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
        # Append 10 events with the same prefix to random streams
        for i in range(10):
            await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
        # subscribe to events from all streams, filtering by event type
        async for event in conn.streams.read_all(
                subscribe=True,  # subscribe will wait for events, use count=<n> to read <n> events and stop
                filter_by=Filter(
                    kind=Filter.Kind.EVENT_TYPE,
                    regex="^prefix-",
                    # Checkpoint only required when subscribe=True, it's not needed when using count=<int>
                    checkpoint_interval_multiplier=1000,
                ),
        ):
            print(event)


asyncio.run(filters())
```

### Persistent subscriptions

```python
import asyncio
from esdb import ESClient
from esdb.shared import Filter
from esdb.subscriptions import SubscriptionSettings, NackAction

client = ESClient("esdb+discover://admin:changeit@localhost:2111")

stream = "stream-foo"
group = "group-bar"


async def persistent():
    async with client.connect() as conn:
        # emit some events to the same stream
        for i in range(50):
            await conn.streams.append(stream, "foobar", {"i": i})

        # create a stream subscription
        await conn.subscriptions.create_stream_subscription(
            stream=stream,
            group_name=group,
            settings=SubscriptionSettings(
                max_subscriber_count=50,
                read_batch_size=5,
                live_buffer_size=10,
                history_buffer_size=10,
                consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
                checkpoint_ms=10000,
            ),
        )

        # create subscription to all events with filtering
        # Only supported on EventStore v21.10+
        await conn.subscriptions.create_all_subscription(
            group_name="subscription_group",
            filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex="^some_type$", checkpoint_interval_multiplier=200),
            settings=SubscriptionSettings(
                read_batch_size=50,
                live_buffer_size=100,
                history_buffer_size=100,
                max_retry_count=2,
                checkpoint_ms=20000,
            ),
        )

    # read from a subscription
    async with client.connect() as conn:
        sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)
        async for event in sub:
            try:
                # do work with event
                print(event)
                await sub.ack([event])
            except Exception as err:
                await sub.nack([event], NackAction.RETRY, reason=str(err))

        # get subscription info
        info = await conn.subscriptions.get_info(group, stream)
        assert info.group_name == group

        # delete subscription
        await conn.subscriptions.delete(group, stream)
        
        # list subscriptions
        subs = await conn.subscriptions.list()
        for sub in subs:
            print(sub.total_items)


asyncio.run(persistent())
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/andriykohut/esdb-py",
    "name": "esdb",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.9,<4.0",
    "maintainer_email": "",
    "keywords": "eventstore,esdb,event sourcing,cqrs,event-sourcing,grpcio,grpc",
    "author": "Andrii Kohut",
    "author_email": "kogut.andriy@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/a8/36/28dc6153021725623c2232aca6785ed31c2bea8a7188ac6746009b1ccc63/esdb-0.3.5.tar.gz",
    "platform": null,
    "description": "# esdb-py\n\n[![PyPI version](https://badge.fury.io/py/esdb.svg)](https://pypi.org/project/esdb/)\n[![codecov](https://codecov.io/gh/andriykohut/esdb-py/branch/main/graph/badge.svg?token=YVDPTDBPFB)](https://codecov.io/gh/andriykohut/esdb-py)\n\n## EventStoreDB Python gRPC client\n\n> NOTE: This project is still work in progress\n\n<!-- TOC -->\n* [Completed features](#completed-features)\n* [Installation](#installation)\n* [Development](#development)\n* [Usage](#usage)\n  * [Connection string](#connection-string)\n  * [Discovery and node preferences](#discovery-and-node-preferences)\n  * [Connection configuration](#connection-configuration)\n  * [Append, Read, Catch-up subscriptions](#append-read-catch-up-subscriptions)\n  * [Batch append](#batch-append)\n  * [Catch-up subscription to all events with filtering](#catch-up-subscription-to-all-events-with-filtering)\n  * [Persistent subscriptions](#persistent-subscriptions)\n<!-- TOC -->\n\n## Completed features\n\n* [x] secure connection\n* [x] basic auth\n* [x] connection string parsing\n* [x] streams\n  * [x] append\n  * [x] batch append (v21.10+)\n  * [x] delete\n  * [x] read stream\n  * [x] read all with stream/event type filters (v21.10+)\n  * [x] catch-up subscriptions\n  * [x] tombstone\n  * [x] filtering\n* [x] persistent subscriptions\n  * [x] create\n  * [x] read stream\n  * [x] read all with filter (v21.10+)\n  * [x] update\n  * [x] delete\n  * [x] list\n  * [x] info\n  * [ ] reply parked events\n* [ ] CRUD for projections\n* [ ] users\n\n## Installation\n\nUsing pip:\n\n```sh\npip install esdb\n```\n\nUsing poetry:\n\n```sh\npoetry add esdb\n```\n\n## Development\n\n1. Install [poetry](https://python-poetry.org/docs/#installation)\n2. Create virtualenv (i.e. using pyenv):\n\n    ```sh\n    pyenv install 3.12.0\n    pyenv virtualenv 3.12.0 esdb-py\n    pyenv local esdb-py\n    ```\n\n3. Install deps with `poetry install`\n4. Start eventstore in docker: `make run-esdb`\n5. Run the tests: `pytest tests`\n\n## Usage\n\nHave a look at [tests](https://github.com/andriykohut/esdb-py/tree/main/tests) for more examples.\n\n### Connection string examples\n\nDNS discovery with credentials, discovery configuration, node preference and ca file path\n```\nesdb+discover://admin:changeit@localhost:2111?discoveryInterval=0&maxDiscoverAttempts=3&tlsCafile=certs/ca/ca.crt&nodePreference=follower\n```\n\nSingle-node insecure connection\n```\nesdb://localhost:2111?tls=false\n```\n\nSupported parameters:\n - `discoveryInterval`\n - `gossipTimeout`\n - `maxDiscoverAttempts`\n - `nodePreference`\n - `keepAliveInterval`\n - `keepAliveTimeout`\n - `tls`\n - `tlsCafile`\n - `tlsVerifyCert`\n - `defaultDeadline`\n\n\nConnection string can be generated [here](https://developers.eventstore.com/clients/grpc/#connection-details).\n\n### Discovery and node preferences\n\n```py\nfrom esdb import ESClient\n\nclient = ESClient(\"esdb+discover://admin:changeit@localhost:2111?nodePreference=follower\")\n\n```\n\n### Connection configuration\n\n```py\nfrom esdb import ESClient\n\n# Connect without TLS\nclient = ESClient(\"esdb://localhost:2111?tls=false\")\n\n# Secure connection with basic auth and keepalive\nclient = ESClient(\"esdb://admin:changeit@localhost:2111?tlsCafile=certs/ca/ca.crt&keepAliveInterval=5&keepAliveTimeout=5\")\n```\n\n### Append, Read, Catch-up subscriptions\n\n```py\nimport asyncio\nimport datetime\nimport uuid\n\nfrom esdb import ESClient\n\n\nclient = ESClient(\"esdb+discover://admin:changeit@localhost:2111\")\nstream = f\"test-{str(uuid.uuid4())}\"\n\n\nasync def streams():\n    async with client.connect() as conn:\n        # Appending to stream\n        for i in range(10):\n            append_result = await conn.streams.append(\n                stream=stream,\n                event_type=\"test_event\",\n                data={\"i\": i, \"ts\": datetime.datetime.utcnow().isoformat()},\n            )\n\n        # Read up to 10 events\n        async for result in conn.streams.read(stream=stream, count=10):\n            print(result.data)\n\n        # Read up to 10 events, backwards\n        async for result in conn.streams.read(stream=stream, count=10, backwards=True):\n            print(result.data)\n\n        # Read up to 10 events, starting from 5th event\n        async for result in conn.streams.read(stream=stream, count=10, revision=5):\n            print(result.data)\n\n        # Read up to 10 events backwards, starting from 5th event\n        async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):\n            print(result.data)\n\n        # Create a catch-up subscription to a stream\n        async for result in conn.streams.read(stream=stream, subscribe=True):\n            print(result.data)\n\n\nasyncio.run(streams())\n```\n\n### Batch append\n\n```py\nimport asyncio\nimport uuid\n\nfrom esdb import ESClient\nfrom esdb.streams import Message\n\n\nasync def batch_append():\n    # Append multiple events in as a single batch\n    # Batch append is not supported on EventStore < v21.10\n    stream = str(uuid.uuid4())\n    messages: list[Message] = [\n        Message(event_type=\"one\", data={\"item\": 1}),\n        Message(event_type=\"one\", data={\"item\": 2}),\n        Message(event_type=\"one\", data={\"item\": 3}),\n        Message(event_type=\"two\", data={\"item\": 1}),\n        Message(event_type=\"two\", data={\"item\": 2}),\n        Message(event_type=\"two\", data={\"item\": 3}),\n    ]\n    async with ESClient(\"esdb+discover://admin:changeit@localhost:2111\").connect() as conn:\n        response = await conn.streams.batch_append(stream=stream, messages=messages)\n        assert response.current_revision == 5\n        events = [e async for e in conn.streams.read(stream=stream, count=50)]\n        assert len(events) == 6\n\n\nasyncio.run(batch_append())\n```\n\n### Catch-up subscription to all events with filtering\n\n```py\nimport uuid\nimport asyncio\n\nfrom esdb import ESClient\nfrom esdb.shared import Filter\n\n\nasync def filters():\n    async with ESClient(\"esdb+discover://admin:changeit@localhost:2111\").connect() as conn:\n        # Append 10 events with the same prefix to random streams\n        for i in range(10):\n            await conn.streams.append(stream=str(uuid.uuid4()), event_type=f\"prefix-{i}\", data=b\"\")\n        # subscribe to events from all streams, filtering by event type\n        async for event in conn.streams.read_all(\n                subscribe=True,  # subscribe will wait for events, use count=<n> to read <n> events and stop\n                filter_by=Filter(\n                    kind=Filter.Kind.EVENT_TYPE,\n                    regex=\"^prefix-\",\n                    # Checkpoint only required when subscribe=True, it's not needed when using count=<int>\n                    checkpoint_interval_multiplier=1000,\n                ),\n        ):\n            print(event)\n\n\nasyncio.run(filters())\n```\n\n### Persistent subscriptions\n\n```python\nimport asyncio\nfrom esdb import ESClient\nfrom esdb.shared import Filter\nfrom esdb.subscriptions import SubscriptionSettings, NackAction\n\nclient = ESClient(\"esdb+discover://admin:changeit@localhost:2111\")\n\nstream = \"stream-foo\"\ngroup = \"group-bar\"\n\n\nasync def persistent():\n    async with client.connect() as conn:\n        # emit some events to the same stream\n        for i in range(50):\n            await conn.streams.append(stream, \"foobar\", {\"i\": i})\n\n        # create a stream subscription\n        await conn.subscriptions.create_stream_subscription(\n            stream=stream,\n            group_name=group,\n            settings=SubscriptionSettings(\n                max_subscriber_count=50,\n                read_batch_size=5,\n                live_buffer_size=10,\n                history_buffer_size=10,\n                consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,\n                checkpoint_ms=10000,\n            ),\n        )\n\n        # create subscription to all events with filtering\n        # Only supported on EventStore v21.10+\n        await conn.subscriptions.create_all_subscription(\n            group_name=\"subscription_group\",\n            filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex=\"^some_type$\", checkpoint_interval_multiplier=200),\n            settings=SubscriptionSettings(\n                read_batch_size=50,\n                live_buffer_size=100,\n                history_buffer_size=100,\n                max_retry_count=2,\n                checkpoint_ms=20000,\n            ),\n        )\n\n    # read from a subscription\n    async with client.connect() as conn:\n        sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)\n        async for event in sub:\n            try:\n                # do work with event\n                print(event)\n                await sub.ack([event])\n            except Exception as err:\n                await sub.nack([event], NackAction.RETRY, reason=str(err))\n\n        # get subscription info\n        info = await conn.subscriptions.get_info(group, stream)\n        assert info.group_name == group\n\n        # delete subscription\n        await conn.subscriptions.delete(group, stream)\n        \n        # list subscriptions\n        subs = await conn.subscriptions.list()\n        for sub in subs:\n            print(sub.total_items)\n\n\nasyncio.run(persistent())\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "gRPC client for EventStore DB",
    "version": "0.3.5",
    "project_urls": {
        "Homepage": "https://github.com/andriykohut/esdb-py",
        "Repository": "https://github.com/andriykohut/esdb-py"
    },
    "split_keywords": [
        "eventstore",
        "esdb",
        "event sourcing",
        "cqrs",
        "event-sourcing",
        "grpcio",
        "grpc"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "9c3ff72fae47146573ed2006410f9a06ac2da8c7d4ad6b7aa8376302bbc68369",
                "md5": "18ea152edd5a0ee18801343108b4e3f2",
                "sha256": "866fc108d933becc23ea7e20f74e0c1f01337c17fd4705c4381ab2e2d7af0e42"
            },
            "downloads": -1,
            "filename": "esdb-0.3.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "18ea152edd5a0ee18801343108b4e3f2",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9,<4.0",
            "size": 80360,
            "upload_time": "2023-10-05T20:31:55",
            "upload_time_iso_8601": "2023-10-05T20:31:55.094310Z",
            "url": "https://files.pythonhosted.org/packages/9c/3f/f72fae47146573ed2006410f9a06ac2da8c7d4ad6b7aa8376302bbc68369/esdb-0.3.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a83628dc6153021725623c2232aca6785ed31c2bea8a7188ac6746009b1ccc63",
                "md5": "1f163120a7d42f6d719d20e04cf8dd95",
                "sha256": "0311dabd56ce0b42b5c55e13b7085e74276270327e08a6164929cad739366ce6"
            },
            "downloads": -1,
            "filename": "esdb-0.3.5.tar.gz",
            "has_sig": false,
            "md5_digest": "1f163120a7d42f6d719d20e04cf8dd95",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9,<4.0",
            "size": 63856,
            "upload_time": "2023-10-05T20:31:57",
            "upload_time_iso_8601": "2023-10-05T20:31:57.269575Z",
            "url": "https://files.pythonhosted.org/packages/a8/36/28dc6153021725623c2232aca6785ed31c2bea8a7188ac6746009b1ccc63/esdb-0.3.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-10-05 20:31:57",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "andriykohut",
    "github_project": "esdb-py",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "esdb"
}
        
Elapsed time: 0.11759s