esdbclient


Nameesdbclient JSON
Version 1.1.3 PyPI version JSON
download
home_pagehttps://github.com/pyeventsourcing/esdbclient
SummaryPython gRPC Client for EventStoreDB
upload_time2024-11-08 02:41:54
maintainerNone
docs_urlNone
authorJohn Bywater
requires_python<4.0,>=3.8
licenseBSD 3-Clause
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            # Python gRPC Client for EventStoreDB

This [Python package](https://pypi.org/project/esdbclient/) provides multithreaded and asyncio Python
clients for the [EventStoreDB](https://www.eventstore.com/) database.

The multithreaded `EventStoreDBClient` is described in detail below. Please scroll
down for <a href="#asyncio-client">information</a> about `AsyncEventStoreDBClient`.

These clients have been developed and are being maintained in a collaboration
with the EventStoreDB team, and are officially support by Event Store Ltd.
Although not all aspects of the EventStoreDB gRPC API are implemented, many
of the most useful features are presented in an easy-to-use interface.

These clients have been tested to work with EventStoreDB LTS versions 22.10 and 23.10,
and release candidates 24.2 and 24.6, without and without SSL/TLS, with both single-server
and cluster modes, and with Python versions 3.8, 3.9, 3.10, 3.11 and 3.12.

The test suite has 100% line and branch coverage. The code has typing annotations
checked strictly with mypy. The code is formatted with black and isort, and checked
with flake8. Poetry is used for package management during development, and for
building and publishing distributions to [PyPI](https://pypi.org/project/esdbclient/).

For an example of usage, see the [eventsourcing-eventstoredb](
https://github.com/pyeventsourcing/eventsourcing-eventstoredb) package.


<!-- TOC -->
* [Synopsis](#synopsis)
* [Install package](#install-package)
  * [From PyPI](#from-pypi)
  * [With Poetry](#with-poetry)
* [EventStoreDB server](#eventstoredb-server)
  * [Run container](#run-container)
  * [Stop container](#stop-container)
* [EventStoreDB client](#eventstoredb-client)
  * [Import class](#import-class)
  * [Construct client](#construct-client)
* [Connection strings](#connection-strings)
  * [Two schemes](#two-schemes)
  * [User info string](#user-info-string)
  * [Query string](#query-string)
  * [Examples](#examples)
* [Event objects](#event-objects)
  * [New events](#new-events)
  * [Recorded events](#recorded-events)
* [Streams](#streams)
  * [Append events](#append-events)
  * [Idempotent append operations](#idempotent-append-operations)
  * [Read stream events](#read-stream-events)
  * [Get current version](#get-current-version)
  * [How to implement snapshotting with EventStoreDB](#how-to-implement-snapshotting-with-eventstoredb)
  * [Read all events](#read-all-events)
  * [Get commit position](#get-commit-position)
  * [Get stream metadata](#get-stream-metadata)
  * [Set stream metadata](#set-stream-metadata)
  * [Delete stream](#delete-stream)
  * [Tombstone stream](#tombstone-stream)
* [Catch-up subscriptions](#catch-up-subscriptions)
  * [Subscribe to all events](#subscribe-to-all-events)
  * [Subscribe to stream events](#subscribe-to-stream-events)
  * [How to implement exactly-once event processing](#how-to-implement-exactly-once-event-processing)
* [Persistent subscriptions](#persistent-subscriptions)
  * [Create subscription to all](#create-subscription-to-all)
  * [Read subscription to all](#read-subscription-to-all)
  * [How to write a persistent subscription consumer](#how-to-write-a-persistent-subscription-consumer)
  * [Update subscription to all](#update-subscription-to-all)
  * [Create subscription to stream](#create-subscription-to-stream)
  * [Read subscription to stream](#read-subscription-to-stream)
  * [Update subscription to stream](#update-subscription-to-stream)
  * [Replay parked events](#replay-parked-events)
  * [Get subscription info](#get-subscription-info)
  * [List subscriptions](#list-subscriptions)
  * [List subscriptions to stream](#list-subscriptions-to-stream)
  * [Delete subscription](#delete-subscription)
* [Projections](#projections)
  * [Create projection](#create-projection)
  * [Get projection state](#get-projection-state)
  * [Get projection statistics](#get-projection-statistics)
  * [Update projection](#update-projection)
  * [Enable projection](#enable-projection)
  * [Disable projection](#disable-projection)
  * [Reset projection](#reset-projection)
  * [Delete projection](#delete-projection)
  * [Restart projections subsystem](#restart-projections-subsystem)
* [Call credentials](#call-credentials)
  * [Construct call credentials](#construct-call-credentials)
* [Connection](#connection)
  * [Reconnect](#reconnect)
  * [Close](#close)
* [Asyncio client](#asyncio-client)
  * [Synopsis](#synopsis-1)
  * [FastAPI](#fastapi)
* [Notes](#notes)
  * [Regular expression filters](#regular-expression-filters)
  * [Reconnect and retry method decorators](#reconnect-and-retry-method-decorators)
* [Instrumentation](#instrumentation)
  * [OpenTelemetry](#open-telemetry)
* [Communities](#communities)
* [Contributors](#contributors)
  * [Install Poetry](#install-poetry)
  * [Setup for PyCharm users](#setup-for-pycharm-users)
  * [Setup from command line](#setup-from-command-line)
  * [Project Makefile commands](#project-makefile-commands)
<!-- TOC -->

## Synopsis<a id="synopsis"></a>

The `EventStoreDBClient` class can be imported from the `esdbclient` package.

Probably the three most useful methods of `EventStoreDBClient` are:

* `append_to_stream()` This method can be used to record new events in a particular
"stream". This is useful, for example, when executing a command in an application
that mutates an aggregate. This method is "atomic" in that either all or none of
the events will be recorded.

* `get_stream()` This method can be used to retrieve all the recorded
events in a "stream". This is useful, for example, when reconstructing
an aggregate from recorded events before executing a command in an
application that creates new events.

* `subscribe_to_all()` This method can be used to receive all recorded events in
the database. This is useful, for example, in event-processing components because
it supports processing events with "exactly-once" semantics.

The example below uses an "insecure" EventStoreDB server running locally on port 2113.

```python
import uuid

from esdbclient import EventStoreDBClient, NewEvent, StreamState


# Construct EventStoreDBClient with an EventStoreDB URI. The
# connection string URI specifies that the client should
# connect to an "insecure" server running on port 2113.

client = EventStoreDBClient(
    uri="esdb://localhost:2113?Tls=false"
)


# Generate new events. Typically, domain events of different
# types are generated in a domain model, and then serialized
# into NewEvent objects. An aggregate ID may be used as the
# name of a stream in EventStoreDB.

stream_name1 = str(uuid.uuid4())
event1 = NewEvent(
    type='OrderCreated',
    data=b'{"order_number": "123456"}'
)
event2 = NewEvent(
    type='OrderSubmitted',
    data=b'{}'
)
event3 = NewEvent(
    type='OrderCancelled',
    data=b'{}'
)


# Append new events to a new stream. The value returned
# from the append_to_stream() method is the overall
# "commit position" in the database of the last new event
# recorded by this operation. The returned "commit position"
# may be used in a user interface to poll an eventually
# consistent event-processing component until it can
# present an up-to-date materialized view. New events are
# each allocated a "stream position", which is the next
# available position in the stream, starting from 0.

commit_position1 = client.append_to_stream(
    stream_name=stream_name1,
    current_version=StreamState.NO_STREAM,
    events=[event1, event2],
)

# Append events to an existing stream. The "current version"
# is the "stream position" of the last recorded event in a
# stream. We have recorded two new events, so the "current
# version" is 1. The exception 'WrongCurrentVersion' will be
# raised if an incorrect value is given.

commit_position2 = client.append_to_stream(
    stream_name=stream_name1,
    current_version=1,
    events=[event3],
)

# - allocated commit positions increase monotonically
assert commit_position2 > commit_position1


# Get events recorded in a stream. This method returns
# a sequence of recorded event objects. The recorded
# event objects may be deserialized to domain event
# objects of different types and used to reconstruct
# an aggregate in a domain model.

recorded_events = client.get_stream(
    stream_name=stream_name1
)

# - stream 'stream_name1' now has three events
assert len(recorded_events) == 3

# - allocated stream positions are zero-based and gapless
assert recorded_events[0].stream_position == 0
assert recorded_events[1].stream_position == 1
assert recorded_events[2].stream_position == 2

# - event attribute values are recorded faithfully
assert recorded_events[0].type == "OrderCreated"
assert recorded_events[0].data == b'{"order_number": "123456"}'
assert recorded_events[0].id == event1.id

assert recorded_events[1].type == "OrderSubmitted"
assert recorded_events[1].data == b'{}'
assert recorded_events[1].id == event2.id

assert recorded_events[2].type == "OrderCancelled"
assert recorded_events[2].data == b'{}'
assert recorded_events[2].id == event3.id


# Start a catch-up subscription from last recorded position.
# This method returns a "catch-up subscription" object,
# which can be iterated over to obtain recorded events.
# The iterator will not stop when there are no more recorded
# events to be returned, but instead will block, and then continue
# when further events are recorded. It can be used as a context
# manager so that the underlying streaming gRPC call to the database
# can be cancelled cleanly in case of any error.

received_events = []
with client.subscribe_to_all(commit_position=0) as subscription:

    # Iterate over the catch-up subscription. Process each recorded
    # event in turn. Within an atomic database transaction, record
    # the event's "commit position" along with any new state generated
    # by processing the event. Use the component's last recorded commit
    # position when restarting the catch-up subscription.

    for event in subscription:
        received_events.append(event)

        if event.commit_position == commit_position2:
            # Break so we can continue with the example.
            break


# - events are received in the order they were recorded
assert received_events[-3].type == "OrderCreated"
assert received_events[-3].data == b'{"order_number": "123456"}'
assert received_events[-3].id == event1.id

assert received_events[-2].type == "OrderSubmitted"
assert received_events[-2].data == b'{}'
assert received_events[-2].id == event2.id

assert received_events[-1].type == "OrderCancelled"
assert received_events[-1].data == b'{}'
assert received_events[-1].id == event3.id


# Close the client's gRPC connection.

client.close()
```


## Install package<a id="install-package"></a>

It is recommended to install Python packages into a Python virtual environment.

### From PyPI<a id="from-pypi"></a>

You can use pip to install this package directly from
[the Python Package Index](https://pypi.org/project/esdbclient/).

    $ pip install esdbclient

### With Poetry<a id="with-poetry"></a>

You can use Poetry to add this package to your pyproject.toml and install it.

    $ poetry add esdbclient

## EventStoreDB server<a id="eventstoredb-server"></a>

The EventStoreDB server can be run locally using the official Docker container image.

### Run container<a id="run-container"></a>

For development, you can run a "secure" EventStoreDB server using the following command.

    $ docker run -d --name eventstoredb-secure -it -p 2113:2113 --env "HOME=/tmp" docker.eventstore.com/eventstore-ce/eventstoredb-ce:23.10.0-jammy --dev

As we will see, your client will need an EventStoreDB connection string URI as the value
of its `uri` constructor argument. The connection string for this "secure" EventStoreDB
server would be:

    esdb://admin:changeit@localhost:2113

To connect to a "secure" server, you will usually need to include a "username"
and a "password" in the connection string, so that the server can authenticate the
client. With EventStoreDB, the default username is "admin" and the default password
is "changeit".

When connecting to a "secure" server, you may also need to provide an SSL/TLS certificate
as the value of the `root_certificates` constructor argument. If the server certificate
is publicly signed, the root certificates of the certificate authority may be installed
locally and picked up by the grpc package from a default location. The client uses the
root SSL/TLS certificate to authenticate the server. For development, you can either
use the SSL/TLS certificate of a self-signing certificate authority used to create the
server's certificate. Or, when using a single-node cluster, you can just use the server
certificate itself, getting the server certificate with the following Python code.

```python
import ssl

server_certificate = ssl.get_server_certificate(addr=('localhost', 2113))
```

Alternatively, you can start an "insecure" server using the following command.

    $ docker run -d --name eventstoredb-insecure -it -p 2113:2113 docker.eventstore.com/eventstore-ce/eventstoredb-ce:23.10.0-jammy --insecure

The connection string URI for this "insecure" server would be:

    esdb://localhost:2113?Tls=false

As we will see, when connecting to an "insecure" server, there is no need to include
a "username" and a "password" in the connection string. If you do, these values will
be ignored by the client, so that they are not sent over an insecure channel.

Please note, the "insecure" connection string uses a query string with the field-value
`Tls=false`. The value of this field is by default `true`.

### Stop container<a id="stop-container"></a>

To stop and remove the "secure" container, use the following Docker commands.

    $ docker stop eventstoredb-secure
	$ docker rm eventstoredb-secure

To stop and remove the "insecure" container, use the following Docker commands.

    $ docker stop eventstoredb-insecure
	$ docker rm eventstoredb-insecure


## EventStoreDB client<a id="eventstoredb-client"></a>

This EventStoreDB client is implemented in the `esdbclient` package with
the `EventStoreDBClient` class.

### Import class<a id="import-class"></a>

The `EventStoreDBClient` class can be imported from the `esdbclient` package.

```python
from esdbclient import EventStoreDBClient
```

### Construct client<a id="construct-client"></a>

The `EventStoreDBClient` class has one required constructor argument, `uri`, and three
optional constructor argument, `root_certificates`, `private_key`, and `certificate_chain`.

The `uri` argument is expected to be an EventStoreDB connection string URI that
conforms with the standard EventStoreDB "esdb" or "esdb+discover" URI schemes.

The client must be configured to create a "secure" connection to a "secure" server,
or alternatively an "insecure" connection to an "insecure" server. By default, the
client will attempt to create a "secure" connection. And so, when connecting to an
"insecure" server, the connection string must specify that the client should attempt
to make an "insecure" connection by using the URI query string field-value `Tls=false`.

The optional `root_certificates` argument can be either a Python `str` or a Python `bytes`
object containing PEM encoded SSL/TLS certificate(s), and is used to authenticate the
server to the client. When connecting to an "insecure" service, the value of this
argument will be ignored. When connecting to a "secure" server, it may be necessary to
set this argument. Typically, the value of this argument would be the public certificate
of the certificate authority that was responsible for generating the certificate used by
the EventStoreDB server. It is unnecessary to set this value in this case if certificate
authority certificates are installed locally, such that the Python grpc library can pick
them up from a default location. Alternatively, for development, you can use the server's
certificate itself. The value of this argument is passed directly to `grpc.ssl_channel_credentials()`.

An alternative way to supply the `root_certificates` argument is through the `tlsCaFile` field-value of the connection string URI query string (see below). If the `tlsCaFile` field-value is specified, the `root_certificates` argument will be ignored.

The optional `private_key` and `certificate_chain` arguments are both either a Python
`str` or a Python `bytes` object. These arguments may be used to authenticate the client
to the server. It is necessary to provide correct values for these arguments when connecting
to a "secure" server that is running the commercial edition of EventStoreDB with the
User Certificates plugin enabled. The value of `private_key` should be the X.509 user
certificate's private key in PEM format. The value of `certificate_chain` should be the
X.509 user certificate itself in PEM format. The values of these arguments are passed
directly to `grpc.ssl_channel_credentials()`. When connecting to an "insecure" service,
the values of these arguments will be ignored. Please note, an alternative way of
supplying the client with a user certificate and private key is to use the `UserCertFile`
and `UserKeyFile` field-values of the connection string URI query string (see below).
If the `UserCertFile` field-value is specified, the `certificate_chain` argument will be
ignored. If the `UserKeyFile` field-value is specified, the `public_key` argument will be
ignored.

In the example below, constructor argument values for `uri` and `root_certificates` are
taken from the operating system environment.

```python
import os

client = EventStoreDBClient(
    uri=os.getenv("ESDB_URI"),
    root_certificates=os.getenv("ESDB_ROOT_CERTIFICATES"),
)
```

## Connection strings<a id="connection-strings"></a>

An EventStoreDB connection string is a URI that conforms with one of two possible
schemes: either the "esdb" scheme, or the "esdb+discover" scheme.

The syntax and semantics of the EventStoreDB URI schemes are described below. The
syntax is defined using [EBNF](https://en.wikipedia.org/wiki/Extended_Backus–Naur_form).

### Two schemes<a id="two-schemes"></a>

The "esdb" URI scheme can be defined in the following way.

    esdb-uri = "esdb://" , [ user-info , "@" ] , grpc-target, { "," , grpc-target } , [ "?" , query-string ] ;

In the "esdb" URI scheme, after the optional user info string, there must be at least
one gRPC target. If there are several gRPC targets, they must be separated from each
other with the "," character.

Each gRPC target should indicate an EventStoreDB gRPC server socket, all in the same
EventStoreDB cluster, by specifying a host and a port number separated with the ":"
character. The host may be a hostname that can be resolved to an IP address, or an IP
address.

    grpc-target = ( hostname | ip-address ) , ":" , port-number ;

If there is one gRPC target, the client will simply attempt to connect to this
server, and it will use this connection when recording and retrieving events.

If there are two or more gRPC targets, the client will attempt to connect to the
Gossip API of each in turn, attempting to obtain information about the cluster from
it, until information about the cluster is obtained. A member of the cluster is then
selected by the client according to the "node preference" specified by the connection
string URI. The client will then close its connection and connect to the selected node
without the 'round robin' load balancing strategy. If the "node preference" is "leader",
and after connecting to a leader, if the leader becomes a follower, the client will
reconnect to the new leader.


The "esdb+discover" URI scheme can be defined in the following way.

    esdb-discover-uri = "esdb+discover://" , [ user-info, "@" ] , cluster-domainname, [ ":" , port-number ] , [ "?" , query-string ] ;

In the "esdb+discover" URI scheme, after the optional user info string, there should be
a domain name which identifies a cluster of EventStoreDB servers. Individual nodes in
the cluster should be declared with DNS 'A' records.

The client will use the cluster domain name with the gRPC library's 'round robin' load
balancing strategy to call the Gossip APIs of addresses discovered from DNS 'A' records.
Information about the EventStoreDB cluster is obtained from the Gossip API. A member of
the cluster is then selected by the client according to the "node preference" option.
The client will then close its connection and connect to the selected node without the
'round robin' load balancing strategy. If the "node preference" is "leader",
and after connecting to a leader, if the leader becomes a follower, the client will
reconnect to the new leader.

### User info string<a id="user-info-string"></a>

In both the "esdb" and "esdb+discover" schemes, the URI may include a user info string.
If it exists in the URI, the user info string must be separated from the rest of the URI
with the "@" character. The user info string must include a username and a password,
separated with the ":" character.

    user-info = username , ":" , password ;

The user info is sent by the client in a "basic auth" authorization header in each gRPC
call to a "secure" server. This authorization header is used by the server to authenticate
the client. The Python gRPC library does not allow call credentials to be transferred to
"insecure" servers.

### Query string<a id="query-string"></a>

In both the "esdb" and "esdb+discover" schemes, the optional query string must be one
or many field-value arguments, separated from each other with the "&" character.

    query-string = field-value, { "&", field-value } ;

Each field-value argument must be one of the supported fields, and an
appropriate value, separated with the "=" character.

    field-value = ( "Tls", "=" , "true" | "false" )
                | ( "TlsVerifyCert", "=" , "true" | "false" )
                | ( "ConnectionName", "=" , string )
                | ( "NodePreference", "=" , "leader" | "follower" | "readonlyreplica" | "random" )
                | ( "DefaultDeadline", "=" , integer )
                | ( "GossipTimeout", "=" , integer )
                | ( "MaxDiscoverAttempts", "=" , integer )
                | ( "DiscoveryInterval", "=" , integer )
                | ( "KeepAliveInterval", "=" , integer )
                | ( "KeepAliveTimeout", "=" , integer ) ;
                | ( "TlsCaFile", "=" , string ) ;
                | ( "UserCertFile", "=" , string ) ;
                | ( "UserKeyFile", "=" , string ) ;

The table below describes the query string field-values supported by this client.

| Field               | Value                                                                 | Description                                                                                                                                                       |
|---------------------|-----------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Tls                 | "true", "false" (default: "true")                                     | If "true" the client will create a "secure" gRPC channel. If "false" the client will create an "insecure" gRPC channel. This must match the server configuration. |
| TlsVerifyCert       | "true", "false" (default: "true")                                     | This value is currently ignored.                                                                                                                                  |
| ConnectionName      | string (default: auto-generated version-4 UUID)                       | Sent in call metadata for every call, to identify the client to the cluster.                                                                                      |
| NodePreference      | "leader", "follower", "readonlyreplica", "random" (default: "leader") | The node state preferred by the client. The client will select a node from the cluster info received from the Gossip API according to this preference.            |
| DefaultDeadline     | integer (default: `None`)                                             | The default value (in seconds) of the `timeout` argument of client "write" methods such as `append_to_stream()`.                                                  |
| GossipTimeout       | integer (default: 5)                                                  | The default value (in seconds) of the `timeout` argument of gossip read methods, such as `read_gossip()`.                                                         |
| MaxDiscoverAttempts | integer (default: 10)                                                 | The number of attempts to read gossip when connecting or reconnecting to a cluster member.                                                                        |
| DiscoveryInterval   | integer (default: 100)                                                | How long to wait (in milliseconds) between gossip retries.                                                                                                        |
| KeepAliveInterval   | integer (default: `None`)                                             | The value (in milliseconds) of the "grpc.keepalive_ms" gRPC channel option.                                                                                       |
| KeepAliveTimeout    | integer (default: `None`)                                             | The value (in milliseconds) of the "grpc.keepalive_timeout_ms" gRPC channel option.                                                                               |
| TlsCaFile           | string (default: `None`)                                              | Absolute filesystem path to file containing the CA certicate in PEM format. This will be used to verify the server's certificate.                                 |
| UserCertFile        | string (default: `None`)                                              | Absolute filesystem path to file containing the X.509 user certificate in PEM format.                                                                             |
| UserKeyFile         | string (default: `None`)                                              | Absolute filesystem path to file containing the X.509 user certificate's private key in PEM format.                                                               |


Please note, the client is insensitive to the case of fields and values. If fields are
repeated in the query string, the query string will be parsed without error. However,
the connection options used by the client will use the value of the first field. All
the other field-values in the query string with the same field name will be ignored.
Fields without values will also be ignored.

If the client's node preference is "follower" and there are no follower
nodes in the cluster, then the client will raise an exception. Similarly, if the
client's node preference is "readonlyreplica" and there are no read-only replica
nodes in the cluster, then the client will also raise an exception.

The gRPC channel option "grpc.max_receive_message_length" is automatically
configured to the value `17 * 1024 * 1024`. This value cannot be configured.


### Examples<a id="examples"></a>

Here are some examples of EventStoreDB connection string URIs.

The following URI will cause the client to make an "insecure" connection to
gRPC target `'localhost:2113'`. Because the client's node preference is "follower",
methods that can be called on a follower should complete successfully, methods that
require a leader will raise a `NodeIsNotLeader` exception.

    esdb://127.0.0.1:2113?Tls=false&NodePreference=follower

The following URI will cause the client to make an "insecure" connection to
gRPC target `'localhost:2113'`. Because the client's node preference is "leader",
if this node is not a leader, then a `NodeIsNotLeader` exception will be raised by
all methods.

    esdb://127.0.0.1:2113?Tls=false&NodePreference=leader

The following URI will cause the client to make a "secure" connection to
gRPC target `'localhost:2113'` with username `'admin'` and password `'changeit'`
as the default call credentials when making calls to the EventStoreDB gRPC API.
Because the client's node preference is "leader", by default, if this node is not
a leader, then a `NodeIsNotLeader` exception will be raised by all methods.

    esdb://admin:changeit@localhost:2113

The following URI will cause the client to make "secure" connections, firstly to
get cluster info from either `'localhost:2111'`, or `'localhost:2112'`, or `'localhost:2113'`.
Because the client's node preference is "leader", the client will select the leader
node from the cluster info and reconnect to the leader. If the "leader" node becomes
a "follower" and another node becomes "leader", then the client will reconnect to the
new leader.

    esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=leader


The following URI will cause the client to make "secure" connections, firstly to
get cluster info from either `'localhost:2111'`, or `'localhost:2112'`, or `'localhost:2113'`.
Because the client's node preference is "follower", the client will select a follower
node from the cluster info and reconnect to this follower. Please note, if the "follower"
node becomes the "leader", the client will not reconnect to a follower -- such behavior
may be implemented in a future version of the client and server.

    esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=follower


The following URI will cause the client to make "secure" connections, firstly to get
cluster info from addresses in DNS 'A' records for `'cluster1.example.com'`, and then
to connect to a "leader" node. The client will use a default timeout
of 5 seconds when making calls to EventStore API "write" methods.

    esdb+discover://admin:changeit@cluster1.example.com?DefaultDeadline=5


The following URI will cause the client to make "secure" connections, firstly to get
cluster info from addresses in DNS 'A' records for `'cluster1.example.com'`, and then
to connect to a "leader" node. It will configure gRPC connections with a "keep alive
interval" and a "keep alive timeout".

    esdb+discover://admin:changeit@cluster1.example.com?KeepAliveInterval=10000&KeepAliveTimeout=10000


## Event objects<a id="event-objects"></a>

This package defines a `NewEvent` class and a `RecordedEvent` class. The
`NewEvent` class should be used when writing events to the database. The
`RecordedEvent` class is used when reading events from the database.

### New events<a id="new-events"></a>

The `NewEvent` class should be used when writing events to an EventStoreDB database.
You will need to construct new event objects before calling `append_to_stream()`.

The `NewEvent` class is a frozen Python dataclass. It has two required constructor
arguments (`type` and `data`) and three optional constructor arguments (`metadata`,
`content_type` and `id`).

The required `type` argument is a Python `str`, used to describe the type of
domain event that is being recorded.

The required `data` argument is a Python `bytes` object, used to state the
serialized data of the domain event that is being recorded.

The optional `metadata` argument is a Python `bytes` object, used to indicate any
metadata of the event that will be recorded. The default value is an empty `bytes`
object.

The optional `content_type` argument is a Python `str`, used to indicate the
kind of data that is being recorded. The default value is `'application/json'`,
which indicates that the `data` was serialised using JSON. An alternative value
for this argument is the more general indication `'application/octet-stream'`.

The optional `id` argument is a Python `UUID` object, used to specify the unique ID
of the event that will be recorded. If no value is provided, a new version-4 UUID
will be generated.

```python
new_event1 = NewEvent(
    type='OrderCreated',
    data=b'{"name": "Greg"}',
)
assert new_event1.type == 'OrderCreated'
assert new_event1.data == b'{"name": "Greg"}'
assert new_event1.metadata == b''
assert new_event1.content_type == 'application/json'
assert isinstance(new_event1.id, uuid.UUID)

event_id = uuid.uuid4()
new_event2 = NewEvent(
    type='ImageCreated',
    data=b'01010101010101',
    metadata=b'{"a": 1}',
    content_type='application/octet-stream',
    id=event_id,
)
assert new_event2.type == 'ImageCreated'
assert new_event2.data == b'01010101010101'
assert new_event2.metadata == b'{"a": 1}'
assert new_event2.content_type == 'application/octet-stream'
assert new_event2.id == event_id
```

### Recorded events<a id="recorded-events"></a>

The `RecordedEvent` class is used when reading events from an EventStoreDB
database. The client will return event objects of this type from all methods
that return recorded events, such as `get_stream()`, `subscribe_to_all()`,
and `read_subscription_to_all()`. You do not need to construct recorded event objects.

Like `NewEvent`, the `RecordedEvent` class is a frozen Python dataclass. It has
all the attributes that `NewEvent` has (`type`, `data`, `metadata`, `content_type`, `id`)
that follow from an event having been recorded, and some additional attributes that follow
from the recording of an event (`stream_name`, `stream_position`, `commit_position`,
`recorded_at`). It also has a `link` attribute, which is `None` unless the recorded
event is a "link event" that has been "resolved" to the linked event. And it has a
`retry_count` which has an integer value when receiving recorded events from persistence
subscriptions, otherwise the value of `retry_count` is `None`.

The `type` attribute is a Python `str`, used to indicate the type of an event
that was recorded.

The `data` attribute is a Python `bytes` object, used to indicate the data of an
event that was recorded.

The `metadata` attribute is a Python `bytes` object, used to indicate the metadata of
an event that was recorded.

The `content_type` attribute is a Python `str`, used to indicate the type of
data that was recorded for an event. It is usually `'application/json'`, indicating
that the data can be parsed as JSON. Alternatively, it is `'application/octet-stream'`.

The `id` attribute is a Python `UUID` object, used to indicate the unique ID of an
event that was recorded.

The `stream_name` attribute is a Python `str`, used to indicate the name of a
stream in which an event was recorded.

The `stream_position` attribute is a Python `int`, used to indicate the position in a
stream at which an event was recorded.

In EventStoreDB, a "stream position" is an integer representing the position of a
recorded event in a stream. Each recorded event is recorded at a position in a stream.
Each stream position is occupied by only one recorded event. New events are recorded at the
next unoccupied position. All sequences of stream positions are zero-based and gapless.

The `commit_position` attribute is a Python `int`, used to indicate the position in the
database at which an event was recorded.

In EventStoreDB, a "commit position" is an integer representing the position of a
recorded event in the database. Each recorded event is recorded at a position in the
database. Each commit position is occupied by only one recorded event. Commit positions
are zero-based and increase monotonically as new events are recorded. But, unlike stream
positions, the sequence of successive commit positions is not gapless. Indeed, there are
usually large differences between the commit positions of successively recorded events.

Please note, in EventStoreDB 21.10, the `commit_position` of all `RecordedEvent` objects
obtained from `read_stream()` is `None`, whereas those obtained from `read_all()` have
the actual commit position of the recorded event. This was changed in version 22.10, so
that event objects obtained from both `get_stream()` and `read_all()` have the actual
commit position. The `commit_position` attribute of the `RecordedEvent` class is
annotated with the type `Optional[int]` for this reason only.

The `recorded_at` attribute is a Python `datetime`, used to indicate when an event was
recorded by the database.

The `link` attribute is an optional `RecordedEvent` that carries information about
a "link event" that has been "resolved" to the linked event. This allows access to
the link event attributes when link events have been resolved, for example access
to the correct event ID to be used when acknowledging or negatively acknowledging
link events. Link events are "resolved" when the `resolve_links` argument is `True`
and when replaying parked events (negatively acknowledging an event received from
a persistent subscription with the `'park'` action will create a link event, and
when parked event are replayed they are received as resolved events). The
`ack_id` property helps with obtaining the correct event ID to use when acknowledging
or negatively acknowledging events received from persistent subscriptions.

The `retry_count` is a Python `int`, used to indicate the number of times a persistent
subscription has retried sending the event to a consumer.


```python
from dataclasses import dataclass
from datetime import datetime

@dataclass(frozen=True)
class RecordedEvent:
    """
    Encapsulates event data that has been recorded in EventStoreDB.
    """

    type: str
    data: bytes
    metadata: bytes
    content_type: str
    id: UUID
    stream_name: str
    stream_position: int
    commit_position: Optional[int]
    recorded_at: Optional[datetime] = None
    link: Optional["RecordedEvent"] = None
    retry_count: Optional[int] = None

    @property
    def ack_id(self) -> UUID:
        if self.link is not None:
            return self.link.id
        else:
            return self.id

    @property
    def is_system_event(self) -> bool:
        return self.type.startswith("$")

    @property
    def is_link_event(self) -> bool:
        return self.type == "$>"

    @property
    def is_resolved_event(self) -> bool:
        return self.link is not None

    @property
    def is_checkpoint(self) -> bool:
        return False
```

The property `ack_id` can be used to obtain the correct event ID to `ack()` or `nack()`
events received when reading persistent subscriptions. The returned value is either the
value of the `id` attribute of the `link` attribute, if `link` is not `None`, otherwise
it is the value of the `id` attribute.

The property `is_system_event` indicates whether the event is a "system event". System
events have a `type` value that starts with `'$'`.

The property `is_link_event` indicates whether the event is a "link event". Link
events have a `type` value of `'$>'`.

The property `is_resolve_event` indicates whether the event has been resolved from a
"link event". The returned value is `True` if `link` is not `None`.

The property `is_checkpoint` is `False`. This can be used to identify `Checkpoint`
instances returned when receiving events from `include_checkpoints=True`.



## Streams<a id="streams"></a>

In EventStoreDB, a "stream" is a sequence of recorded events that all have
the same "stream name". There will normally be many streams in a database,
each with many recorded events. Each recorded event has a position in its stream
(the "stream position"), and a position in the database (the "commit position").
Stream positions are zero-based and gapless. Commit positions are also zero-based,
but are not gapless.

The methods `append_to_stream()`, `get_stream()` and `read_all()` can
be used to read and record in the database.

### Append events<a id="append-events"></a>

*requires leader*

The `append_to_stream()` method can be used atomically to record a sequence of new events.
If the operation is successful, it returns the commit position of the last event in the
sequence that has been recorded.

This method has three required arguments, `stream_name`, `current_version`
and `events`.

The required `stream_name` argument is a Python `str` that uniquely identifies a
stream to which a sequence of events will be appended.

The required `current_version` argument is expected to be either a Python `int`
that indicates the stream position of the last recorded event in the stream, or
`StreamState.NO_STREAM` if the stream does not yet exist or has been deleted. The
stream positions are zero-based and gapless, so that if a stream has two events, the
`current_version` should be 1. If an incorrect value is given, this method will raise a
`WrongCurrentVersion` exception. This behavior is designed to provide concurrency
control when recording new events. The correct value of `current_version` for any stream
can be obtained by calling `get_current_version()`. However, the typical approach is to
reconstruct an aggregate from the recorded events, so that the version of the aggregate
is the stream position of the last recorded event, then have the aggregate generate new
events, and then use the current version of the aggregate as the value of the
`current_version` argument when appending the new aggregate events. This ensures
the consistency of the recorded aggregate events, because operations that generate
new aggregate events can be retried with a freshly reconstructed aggregate if
a `WrongCurrentVersion` exception is encountered when recording new events. This
controlling behavior can be entirely disabled by setting the value of the `current_version`
argument to the constant `StreamState.ANY`. More selectively, this behaviour can be
disabled for existing streams by setting the value of the `current_version`
argument to the constant `StreamState.EXISTS`.

The required `events` argument is expected to be a sequence of new event objects. The
`NewEvent` class should be used to construct new event objects. The `append_to_stream()`
operation is atomic, so that either all or none of the new events will be recorded. It
is not possible with EventStoreDB atomically to record new events in more than one stream.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

In the example below, a new event, `event1`, is appended to a new stream. The stream
does not yet exist, so `current_version` is `StreamState.NO_STREAM`.

```python
# Construct a new event object.
event1 = NewEvent(type='OrderCreated', data=b'{}')

# Define a new stream name.
stream_name1 = str(uuid.uuid4())

# Append the new events to the new stream.
commit_position1 = client.append_to_stream(
    stream_name=stream_name1,
    current_version=StreamState.NO_STREAM,
    events=[event1],
)
```

In the example below, two subsequent events are appended to an existing
stream. The stream has one recorded event, so `current_version` is `0`.

```python
event2 = NewEvent(type='OrderUpdated', data=b'{}')
event3 = NewEvent(type='OrderDeleted', data=b'{}')

commit_position2 = client.append_to_stream(
    stream_name=stream_name1,
    current_version=0,
    events=[event2, event3],
)
```

The returned values, `commit_position1` and `commit_position2`, are the
commit positions in the database of the last events in the recorded sequences.
That is, `commit_position1` is the commit position of `event1` and
`commit_position2` is the commit position of `event3`.

Commit positions that are returned in this way can be used by a user interface to poll
a downstream component until it has processed all the newly recorded events. For example,
consider a user interface command that results in the recording of new events, and an
eventually consistent materialized view in a downstream component that is updated from
these events. If the new events have not yet been processed, the view might be stale,
or out-of-date. Instead of displaying a stale view, the user interface can poll the
downstream component until it has processed the newly recorded events, and then display
an up-to-date view to the user.


### Idempotent append operations<a id="idempotent-append-operations"></a>

The `append_to_stream()` method is "idempotent" with respect to the `id` value of a
`NewEvent` object. That is to say, if `append_to_stream()` is called with events
whose `id` values are equal to those already recorded in the stream, then the
method call will successfully return, with the commit position of the last new event,
without making any changes to the database.

This is because sometimes it may happen, when calling `append_to_stream()`, that the new
events are successfully recorded, but somehow something bad happens before the method call
can return successfully to the caller. In this case, we cannot be sure that the events have
in fact been recorded, and so we may wish to retry.

If the events were in fact successfully recorded, it is convenient for the retried method call
to return successfully, and without either raising an exception (when `current_version`
is either `StreamState.NO_STREAM` an integer value) or creating further event records
(when `current_version` is `StreamState.ANY` or `StreamState.EXISTS`), as it would
if the `append_to_stream()` method were not idempotent.

If the method call initially failed and the new events were not in fact recorded, it
makes good sense, when the method call is retried, that the new events are recorded
and that the method call returns successfully. If the concurrency controls have not been disabled,
that is if the `current version` is either `StreamState.NO_STREAM` or an integer value, and
if a `WrongCurrentVersion` exception is raised when retrying the method call, then we can assume
both that the initial method call did not in fact successfully record the events, and also
that subsequent events have in the meantime been recorded by somebody else. In this case,
an application command which generated the new events may need to be executed again. And
the user of the application may need to be given an opportunity to decide if they still wish to
proceed with their original intention, by displaying a suitable error with an up-to-date view of
the recorded state. In the case where concurrency controls have been disabled, by using `StreamState.ANY` or
`StreamState.EXISTS` as the value of `current_version`, retrying a method call that failed to
return successfully will, more simply, just attempt to ensure the new events are recorded, regardless
of their resulting stream positions. In either case, when the method call does return successfully, we
can be sure the events have been recorded.

The example below shows the `append_to_stream()` method being called again with events
`event2` and `event3`, and with `current_version=0`. We can see that repeating the call
to `append_to_stream()` returns successfully without raising a `WrongCurrentVersion`
exception, as it would if the `append_to_stream()` operation were not idempotent.

```python
# Retry appending event3.
commit_position_retry = client.append_to_stream(
    stream_name=stream_name1,
    current_version=0,
    events=[event2, event3],
)
```

We can see that the same commit position is returned as above.

```python
assert commit_position_retry == commit_position2
```

The example below shows the `append_to_stream()` method being called again with events
`event2` and `event3`, with and `current_version=StreamState.ANY`.

```python
# Retry appending event3.
commit_position_retry = client.append_to_stream(
    stream_name=stream_name1,
    current_version=0,
    events=[event2, event3],
)
```

We can see that the same commit position is returned as above.

```python
assert commit_position_retry == commit_position2
```

By calling `get_stream()`, we can also see the stream has been unchanged.
That is, there are still only three events in the stream.

```python
events = client.get_stream(
    stream_name=stream_name1
)

assert len(events) == 3
```

This idempotent behaviour depends on the `id` attribute of the `NewEvent` class.
This attribute is, by default, assigned a new and unique version-4 UUID when an
instance of `NewEvent` is constructed. To set the `id` value of a `NewEvent`,
the optional `id` constructor argument can be used when constructing `NewEvent` objects.


### Read stream events<a id="read-stream-events"></a>

The `read_stream()` method can be used to get events that have been appended
to a stream. This method returns a "read response" object.

A "read response" object is a Python iterator. Recorded events can be
obtained by iterating over the "read response" object. Recorded events are
streamed from the server to the client as the iteration proceeds. The iteration
will automatically stop when there are no more recorded events to be returned.
The streaming of events, and hence the iterator, can also be stopped by calling
the `stop()` method on the "read response" object.

The `get_stream()` method can be used to get events that have been appended
to a stream. This method returns a Python `tuple` of recorded event objects.
The recorded event objects are instances of the `RecordedEvent` class. It
calls `read_stream()` and passes the "read response" iterator into a Python
`tuple`, so that the streaming will complete before the method returns.

The `read_stream()` and `get_stream()` methods have one required argument, `stream_name`.

The required `stream_name` argument is a Python `str` that uniquely identifies a
stream from which recorded events will be returned.

The `read_stream()` and `get_stream()` methods also have six optional arguments,
`stream_position`, `backwards`, `resolve_links`, `limit`, `timeout`, and `credentials`.

The optional `stream_position` argument is a Python `int` that can be used to
indicate the position in the stream from which to start reading. The default value
of `stream_position` is `None`. When reading a stream from a specific position in the
stream, the recorded event at that position will be included, both when reading
forwards from that position, and when reading backwards.

The optional `backwards` argument is a Python `bool`. The default value of `backwards`
is `False`, which means the stream will be read forwards, so that events are returned
in the order they were recorded. If `backwards` is `True`, the events are returned in
reverse order.

If `backwards` is `False` and `stream_position` is `None`, the stream's events will be
returned in the order they were recorded, starting from the first recorded event. If
`backwards` is `True` and `stream_position` is `None`, the stream's events will be
returned in reverse order, starting from the last recorded event.

The optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`
is `False`, which means any event links will not be resolved, so that the events that are
returned may represent event links. If `resolve_links` is `True`, any event links will
be resolved, so that the linked events will be returned instead of the event links.

The optional `limit` argument is a Python `int` which restricts the number of events
that will be returned. The default value of `limit` is `sys.maxint`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to override call credentials derived
from the connection string URI. A suitable value for this argument can be constructed
by calling the client method `construct_call_credentials()`.

The example below shows the default behavior, which is to return all the recorded
events of a stream forwards from the first recorded events to the last.

```python
events = client.get_stream(
    stream_name=stream_name1
)

assert len(events) == 3
assert events[0] == event1
assert events[1] == event2
assert events[2] == event3
```

The example below shows how to use the `stream_position` argument to read a stream
from a specific stream position to the end of the stream. Stream positions are
zero-based, and so `stream_position=1` corresponds to the second event that was
recorded in the stream, in this case `event2`.

```python
events = client.get_stream(
    stream_name=stream_name1,
    stream_position=1,
)

assert len(events) == 2
assert events[0] == event2
assert events[1] == event3
```

The example below shows how to use the `backwards` argument to read a stream backwards.

```python
events = client.get_stream(
    stream_name=stream_name1,
    backwards=True,
)

assert len(events) == 3
assert events[0] == event3
assert events[1] == event2
assert events[2] == event1
```

The example below shows how to use the `limit` argument to read a limited number of
events.

```python
events = client.get_stream(
    stream_name=stream_name1,
    limit=2,
)

assert len(events) == 2
assert events[0] == event1
assert events[1] == event2
```

The `read_stream()` and `get_stream()` methods will raise a `NotFound` exception if the
named stream has never existed or has been deleted.

```python
from esdbclient.exceptions import NotFound


try:
    client.get_stream('does-not-exist')
except NotFound:
    pass  # The stream does not exist.
else:
    raise Exception("Shouldn't get here")
```

Please note, the `get_stream()` method is decorated with the `@autoreconnect` and
`@retrygrpc` decorators, whilst the `read_stream()` method is not. This means that
all errors due to connection issues will be caught by the retry and reconnect decorators
when calling the `get_stream()` method, but not when calling `read_stream()`. The
`read_stream()` method has no such decorators because the streaming only starts
when iterating over the "read response" starts, which means that the method returns
before the streaming starts, and so there is no chance for any decorators to catch
any connection issues.

For the same reason, `read_stream()` will not raise a `NotFound` exception when
the stream does not exist, until iterating over the "read response" object begins.

If you are reading a very large stream, then you might prefer to call `read_stream()`,
and begin iterating through the recorded events whilst they are being streamed from
the server, rather than both waiting and having them all accumulate in memory.

### Get current version<a id="get-current-version"></a>

The `get_current_version()` method is a convenience method that essentially calls
`get_stream()` with `backwards=True` and `limit=1`. This method returns
the value of the `stream_position` attribute of the last recorded event in a
stream. If a stream does not exist, the returned value is `StreamState.NO_STREAM`.
The returned value is the correct value of `current_version` when appending events
to a stream, and when deleting or tombstoning a stream.

This method has one required argument, `stream_name`.

The required `stream_name` argument is a Python `str` that uniquely identifies a
stream from which a stream position will be returned.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

In the example below, the last stream position of `stream_name1` is obtained.
Since three events have been appended to `stream_name1`, and because positions
in a stream are zero-based and gapless, so the current version is `2`.

```python
current_version = client.get_current_version(
    stream_name=stream_name1
)

assert current_version == 2
```

If a stream has never existed or has been deleted, the returned value is
`StreamState.NO_STREAM`, which is the correct value of the `current_version`
argument both when appending the first event of a new stream, and also when
appending events to a stream that has been deleted.

```python
current_version = client.get_current_version(
    stream_name='does-not-exist'
)

assert current_version is StreamState.NO_STREAM
```

### How to implement snapshotting with EventStoreDB<a id="how-to-implement-snapshotting-with-eventstoredb"></a>

Snapshots can improve the performance of aggregates that would otherwise be
reconstructed from very long streams. However, it is generally recommended to design
aggregates to have a finite lifecycle, and so to have relatively short streams,
thereby avoiding the need for snapshotting. This "how to" section is intended merely
to show how snapshotting of aggregates can be implemented with EventStoreDB using
this Python client.

Event-sourced aggregates are typically reconstructed from recorded events by calling
a mutator function for each recorded event, evolving from an initial state
`None` to the current state of the aggregate. The function `get_aggregate()` shows
how this can be done. The aggregate ID is used as a stream name. The exception
`AggregateNotFound` is raised if the aggregate stream is not found.

```python
class AggregateNotFound(Exception):
    """Raised when an aggregate is not found."""


def get_aggregate(aggregate_id, mutator_func):
    stream_name = aggregate_id

    # Get recorded events.
    try:
        events = client.get_stream(
            stream_name=stream_name,
            stream_position=None
        )
    except NotFound as e:
        raise AggregateNotFound(aggregate_id) from e
    else:
        # Reconstruct aggregate from recorded events.
        aggregate = None
        for event in events:
            aggregate = mutator_func(aggregate, event)
        return aggregate
```

Snapshotting of aggregates can be implemented by recording the current state of
an aggregate as a new event.

If an aggregate object has a version number that corresponds to the stream position of
the last event that was used to reconstruct the aggregate, and this version number
is recorded in the snapshot metadata, then any events that are recorded after the
snapshot can be selected using this version number. The aggregate can then be
reconstructed from the last snapshot and any subsequent events, without having
to replay the entire history.

We will use a separate stream for an aggregate's snapshots that is named after the
stream used for recording its events. The name of the snapshot stream will be
constructed by prefixing the aggregate's stream name with `'snapshot-$'`.

```python
SNAPSHOT_STREAM_NAME_PREFIX = 'snapshot-$'

def make_snapshot_stream_name(stream_name):
    return f'{SNAPSHOT_STREAM_NAME_PREFIX}{stream_name}'


def remove_snapshot_stream_prefix(snapshot_stream_name):
    assert snapshot_stream_name.startswith(SNAPSHOT_STREAM_NAME_PREFIX)
    return snapshot_stream_name[len(SNAPSHOT_STREAM_NAME_PREFIX):]
```

Now, let's redefine the `get_aggregate()` function, so that it looks for a snapshot event,
then selects subsequent aggregate events, and then calls a mutator function for each
recorded event.

Notice that the aggregate events are read from a stream for serialized aggregate
events, whilst the snapshot is read from a separate stream for serialized aggregate
snapshots. We will use JSON to serialize and deserialize event data.


```python
import json


def get_aggregate(aggregate_id, mutator_func):
    stream_name = aggregate_id
    recorded_events = []

    # Look for a snapshot.
    try:
        snapshots = client.get_stream(
            stream_name=make_snapshot_stream_name(stream_name),
            backwards=True,
            limit=1
        )
    except NotFound:
        stream_position = None
    else:
        assert len(snapshots) == 1
        snapshot = snapshots[0]
        stream_position = deserialize(snapshot.metadata)['version'] + 1
        recorded_events.append(snapshot)

    # Get subsequent events.
    try:
        events = client.get_stream(
            stream_name=stream_name,
            stream_position=stream_position
        )
    except NotFound as e:
        raise AggregateNotFound(aggregate_id) from e
    else:
        recorded_events += events

    # Reconstruct aggregate from recorded events.
    aggregate = None
    for event in recorded_events:
        aggregate = mutator_func(aggregate, event)

    return aggregate


def serialize(d):
    return json.dumps(d).encode('utf8')


def deserialize(s):
    return json.loads(s.decode('utf8'))
```

To show how `get_aggregate()` can be used, let's define a `Dog` aggregate class, with
attributes `name` and `tricks`. The attributes `id` and `version` will indicate an
aggregate object's ID and version number. The attribute `is_from_snapshot` is added
here merely to demonstrate below when an aggregate object has been reconstructed using
a snapshot.

```python
from dataclasses import dataclass


@dataclass(frozen=True)
class Aggregate:
    id: str
    version: int
    is_from_snapshot: bool


@dataclass(frozen=True)
class Dog(Aggregate):
    name: str
    tricks: list
```

Let's also define a mutator function `mutate_dog()` that evolves the state of a
`Dog` aggregate given various different types of events, `'DogRegistered'`,
`'DogLearnedTrick'`, and `'Snapshot'`.

```python
def mutate_dog(dog, event):
    data = deserialize(event.data)
    if event.type == 'DogRegistered':
        return Dog(
            id=event.stream_name,
            version=event.stream_position,
            is_from_snapshot=False,
            name=data['name'],
            tricks=[],
        )
    elif event.type == 'DogLearnedTrick':
        assert event.stream_position == dog.version + 1
        assert event.stream_name == dog.id, (event.stream_name, dog.id)
        return Dog(
            id=dog.id,
            version=event.stream_position,
            is_from_snapshot=dog.is_from_snapshot,
            name=dog.name,
            tricks=dog.tricks + [data['trick']],
        )
    elif event.type == 'Snapshot':
        return Dog(
            id=remove_snapshot_stream_prefix(event.stream_name),
            version=deserialize(event.metadata)['version'],
            is_from_snapshot=True,
            name=data['name'],
            tricks=data['tricks'],
        )
    else:
        raise Exception(f"Unknown event type: {event.type}")
```

For convenience, let's also define a `get_dog()` function that calls `get_aggregate()`
with the `mutate_dog()` function as the value of its `mutator_func` argument.

```python
def get_dog(dog_id):
    return get_aggregate(
        aggregate_id=dog_id,
        mutator_func=mutate_dog,
    )
```

We can also define some "command" functions that append new events to the
database. The `register_dog()` function appends a `DogRegistered` event. The
`record_trick_learned()` appends a `DogLearnedTrick` event. The function
`snapshot_dog()` appends a `Snapshot` event. Notice that the
`record_trick_learned()` and `snapshot_dog()` functions use `get_dog()`.

Notice also that the `DogRegistered` and `DogLearnedTrick` events are appended to a
stream for aggregate events, whilst the `Snapshot` event is appended to a separate
stream for aggregate snapshots.

```python
def register_dog(name):
    dog_id = str(uuid.uuid4())
    event = NewEvent(
        type='DogRegistered',
        data=serialize({'name': name}),
    )
    client.append_to_stream(
        stream_name=dog_id,
        current_version=StreamState.NO_STREAM,
        events=event,
    )
    return dog_id


def record_trick_learned(dog_id, trick):
    dog = get_dog(dog_id)
    event = NewEvent(
        type='DogLearnedTrick',
        data=serialize({'trick': trick}),
    )
    client.append_to_stream(
        stream_name=dog_id,
        current_version=dog.version,
        events=event,
    )


def snapshot_dog(dog_id):
    dog = get_dog(dog_id)
    event = NewEvent(
        type='Snapshot',
        data=serialize({'name': dog.name, 'tricks': dog.tricks}),
        metadata=serialize({'version': dog.version}),
    )
    client.append_to_stream(
        stream_name=make_snapshot_stream_name(dog_id),
        current_version=StreamState.ANY,
        events=event,
    )
```

We can call `register_dog()` to register a new dog.

```python
# Register a new dog.
dog_id = register_dog('Fido')

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == []
assert dog.version == 0
assert dog.is_from_snapshot is False

```

We can call `record_trick_learned()` to record that some tricks have been learned.

```python

# Record that 'Fido' learned a new trick.
record_trick_learned(dog_id, trick='roll over')

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over']
assert dog.version == 1
assert dog.is_from_snapshot is False


# Record that 'Fido' learned another new trick.
record_trick_learned(dog_id, trick='fetch ball')

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball']
assert dog.version == 2
assert dog.is_from_snapshot is False
```

We can call `snapshot_dog()` to record a snapshot of the current state of the `Dog`
aggregate. After we call `snapshot_dog()`, the `get_dog()` function will return a `Dog`
object that has been constructed using the `Snapshot` event.

```python
# Snapshot 'Fido'.
snapshot_dog(dog_id)

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball']
assert dog.version == 2
assert dog.is_from_snapshot is True
```

We can continue to evolve the state of the `Dog` aggregate, using
the snapshot both during the call to `record_trick_learned()` and
when calling `get_dog()` directly.

```python
record_trick_learned(dog_id, trick='sit')

dog = get_dog(dog_id)
assert dog.name == 'Fido'
assert dog.tricks == ['roll over', 'fetch ball', 'sit']
assert dog.version == 3
assert dog.is_from_snapshot is True
```

We can see from the `is_from_snapshot` attribute that the `Dog` object was indeed
reconstructed from the snapshot.

Snapshots can be created at fixed version number intervals, fixed time
periods, after a particular type of event, immediately after events are
appended, or as a background process.


### Read all events<a id="read-all-events"></a>

The `read_all()` method can be used to get all recorded events
in the database in the order they were recorded. This method returns
a "read response" object, just like `read_stream()`.

A "read response" is an iterator, and not a sequence. Recorded events can be
obtained by iterating over the "read response" object. Recorded events are
streamed from the server to the client as the iteration proceeds. The iteration
will automatically stop when there are no more recorded events to be returned.
The streaming of events, and hence the iterator, can also be stopped by calling
the `stop()` method on the "read response" object. The recorded event objects
are instances of the `RecordedEvent` class.

This method has nine optional arguments, `commit_position`, `backwards`, `resolve_links`,
`filter_exclude`, `filter_include`, `filter_by_stream_name`, `limit`, `timeout`,
and `credentials`.

The optional `commit_position` argument is a Python `int` that can be used to
specify a commit position from which to start reading. The default value of
`commit_position` is `None`. Please note, if a commit position is specified,
it must be an actually existing commit position in the database. When reading
forwards, the event at the commit position may be included, depending upon the
filter. When reading backwards, the event at the commit position will not be
included.

The optional `backwards` argument is a Python `bool`. The default of `backwards` is
`False`, which means events are returned in the order they were recorded, If
`backwards` is `True`, then events are returned in reverse order.

If `backwards` is `False` and `commit_position` is `None`, the database's events will
be returned in the order they were recorded, starting from the first recorded event.
This is the default behavior of `read_all()`. If `backwards` is `True` and
`commit_position` is `None`, the database's events will be returned in reverse order,
starting from the last recorded event.

The optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`
is `False`, which means any event links will not be resolved, so that the events that are
returned may represent event links. If `resolve_links` is `True`, any event links will
be resolved, so that the linked events will be returned instead of the event links.

The optional `filter_exclude` argument is a sequence of regular expressions that
specifies which recorded events should be returned. This argument is ignored
if `filter_include` is set to a non-empty sequence. The default value of this
argument matches the event types of EventStoreDB "system events", so that system
events will not normally be included. See the Notes section below for more
information about filter expressions.

The optional `filter_include` argument is a sequence of regular expressions
that specifies which recorded events should be returned. By default, this
argument is an empty tuple. If this argument is set to a non-empty sequence,
the `filter_exclude` argument is ignored.

The optional `filter_by_stream_name` argument is a Python `bool` that indicates
whether the filtering will apply to event types or stream names. By default, this
value is `False` and so the filtering will apply to the event type strings of
recorded events.

The optional `limit` argument is an integer which restricts the number of events that
will be returned. The default value is `sys.maxint`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

The filtering of events is done on the EventStoreDB server. The
`limit` argument is applied on the server after filtering.

The example below shows how to get all the events we have recorded in the database
so far, in the order they were recorded. We can see the three events of `stream_name1`
(`event1`, `event2` and `event3`) are included, along with others.

```python
# Read all events (creates a streaming gRPC call).
read_response = client.read_all()

# Convert the iterator into a sequence of recorded events.
events = tuple(read_response)
assert len(events) > 3  # more than three

# Convert the sequence of recorded events into a set of event IDs.
event_ids = set(e.id for e in events)
assert event1.id in event_ids
assert event2.id in event_ids
assert event3.id in event_ids
```

The example below shows how to read all recorded events in the database from
a particular commit position, in this case `commit_position1`. When reading
forwards from a specific commit position, the event at the specified position
will be included. The value of `commit_position1` is the position we obtained
when appending `event1`. And so `event1` is the first recorded event we shall
receive, `event2` is the second, and `event3` is the third.

```python
# Read all events forwards from a commit position.
read_response = client.read_all(
    commit_position=commit_position1
)

# Step through the "read response" iterator.
assert next(read_response) == event1
assert next(read_response) == event2
assert next(read_response) == event3

# Stop the iterator.
read_response.stop()
```

The example below shows how to read all events recorded in the database in reverse
order. We can see that the first events we receive are the last events that were
recorded: the events of the `Dog` aggregate from the section about snapshotting
and the snapshot.

```python
# Read all events backwards from the end.
read_response = client.read_all(
    backwards=True
)

# Step through the "read response" iterator.
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "Snapshot"
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "DogLearnedTrick"
assert next(read_response).type == "DogRegistered"

# Stop the iterator.
read_response.stop()
```

The example below shows how to read a limited number of events
forwards from a specific commit position.

```python
events = tuple(
    client.read_all(
        commit_position=commit_position1,
        limit=1,
    )
)

assert len(events) == 1
assert events[0] == event1
```

The example below shows how to read a limited number of the recorded events
in the database backwards from the end. In this case, the limit is 1, and
so we receive the last recorded event.

```python
events = tuple(
    client.read_all(
        backwards=True,
        limit=1,
    )
)

assert len(events) == 1

assert events[0].type == 'DogLearnedTrick'
assert deserialize(events[0].data)['trick'] == 'sit'
```

Please note, like the `read_stream()` method, the `read_all()` method
is not decorated with retry and reconnect decorators, because the streaming of recorded
events from the server only starts when iterating over the "read response" starts, which
means that the method returns before the streaming starts, and so there is no chance for
any decorators to catch any connection issues.

### Get commit position<a id="get-commit-position"></a>

The `get_commit_position()` method can be used to get the commit position of the
last recorded event in the database. It simply calls `read_all()` with
`backwards=True` and `limit=1`, and returns the value of the `commit_position`
attribute of the last recorded event.

```python
commit_position = client.get_commit_position()
```

This method has five optional arguments, `filter_exclude`, `filter_include`,
`filter_by_stream_name`, `timeout` and `credentials`. These values are passed to
`read_all()`.

The optional `filter_exclude`, `filter_include` and `filter_by_stream_name` arguments
work in the same way as they do in the `read_all()` method.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to override call credentials
derived from the connection string URI.

This method might be used to measure progress of a downstream component
that is processing all recorded events, by comparing the current commit
position with the recorded commit position of the last successfully processed
event in a downstream component. In this case, the value of the `filter_exclude`,
`filter_include` and `filter_by_stream_name` arguments should equal those used
by the downstream component to obtain recorded events.


### Get stream metadata<a id="get-stream-metadata"></a>

The `get_stream_metadata()` method returns the metadata for a stream, along
with the version of the stream metadata.

This method has one required argument, `stream_name`, which is a Python `str` that
uniquely identifies a stream for which a stream metadata will be obtained.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

In the example below, metadata for `stream_name1` is obtained.

```python
metadata, metadata_version = client.get_stream_metadata(stream_name=stream_name1)
```

The returned `metadata` value is a Python `dict`. The returned `metadata_version`
value is either an `int` if the stream exists, or `StreamState.NO_STREAM` if the stream
does not exist and no metadata has been set. These values can be used as the arguments
of `set_stream_metadata()`.

### Set stream metadata<a id="set-stream-metadata"></a>

*requires leader*

The method `set_stream_metadata()` sets metadata for a stream. Stream metadata
can be set before appending events to a stream.

This method has one required argument, `stream_name`, which is a Python `str` that
uniquely identifies a stream for which a stream metadata will be set.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

In the example below, metadata for `stream_name1` is set.


```python
metadata["foo"] = "bar"

client.set_stream_metadata(
    stream_name=stream_name1,
    metadata=metadata,
    current_version=metadata_version,
)
```

The `current_version` argument should be the current version of the stream metadata
obtained from `get_stream_metadata()`.

Please refer to the EventStoreDB documentation for more information about stream
metadata.

### Delete stream<a id="delete-stream"></a>

*requires leader*

The method `delete_stream()` can be used to "delete" a stream.

This method has two required arguments, `stream_name` and `current_version`.

The required `stream_name` argument is a Python `str` that uniquely identifies a
stream to which a sequence of events will be appended.

The required `current_version` argument is expected to be either a Python `int`
that indicates the stream position of the last recorded event in the stream.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

In the example below, `stream_name1` is deleted.

```python
commit_position = client.delete_stream(stream_name=stream_name1, current_version=2)
```

After deleting a stream, it's still possible to append new events. Reading from a
deleted stream will return only events that have been appended after it was
deleted.

### Tombstone stream<a id="tombstone-stream"></a>

*requires leader*

The method `tombstone_stream()` can be used to "tombstone" a stream.

This method has two required arguments, `stream_name` and `current_version`.

The required `stream_name` argument is a Python `str` that uniquely identifies a
stream to which a sequence of events will be appended.

The required `current_version` argument is expected to be either a Python `int`
that indicates the stream position of the last recorded event in the stream.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

In the example below, `stream_name1` is tombstoned.

```python
commit_position = client.tombstone_stream(stream_name=stream_name1, current_version=2)
```

After tombstoning a stream, it's not possible to append new events.


## Catch-up subscriptions<a id="catch-up-subscriptions"></a>

A "catch-up" subscription can be used to receive events that have already been
recorded and events that are recorded subsequently. A catch-up subscription can
be used by an event-processing component that processes recorded events with
"exactly-once" semantics.

The `subscribe_to_all()` method starts a catch-up subscription that can receive
all events in the database. The `subscribe_to_stream()` method starts a catch-up
subscription that can receive events from a specific stream. Both methods return a
"catch-up subscription" object, which is a Python iterator. Recorded events can be
obtained by iteration. Recorded event objects obtained in this way are instances
of the `RecordedEvent` class.

Before the "catch-up subscription" object is returned to the caller, the client will
firstly obtain a "confirmation" response from the server, which allows the client to
detect that both the gRPC connection and the streaming gRPC call is operational. For
this reason, the `subscribe_to_all()` and `subscribe_to_stream()` methods are both
usefully decorated with the reconnect and retry decorators. However, once the method
has returned, the decorators will have exited, and any exceptions that are raised
due to connection issues whilst iterating over the subscription object will have to
be handled by your code.

A "catch-up subscription" iterator will not automatically stop when there are no more
events to be returned, but instead the iteration will block until new events are
subsequently recorded in the database. Any subsequently recorded events will then be
immediately streamed to the client, and the iteration will then continue. The streaming
of events, and hence the iteration, can be stopped by calling the `stop()` method on the
"catch-up subscription" object.

### Subscribe to all events<a id="subscribe-to-all-events"></a>

The`subscribe_to_all()` method can be used to start a catch-up subscription
from which all events recorded in the database can be obtained in the order
they were recorded. This method returns a "catch-up subscription" iterator.

This method also has ten optional arguments, `commit_position`, `from_end`, `resolve_links`,
`filter_exclude`, `filter_include`, `filter_by_stream_name`, `include_checkpoints`,
`include_caught_up`, `timeout` and `credentials`.

The optional `commit_position` argument specifies a commit position. The default
value of `commit_position` is `None`, which means the catch-up subscription will
start from the first recorded event in the database. If a commit position is given,
it must match an actually existing commit position in the database. Only events
recorded after that position will be obtained.

The optional `from_end` argument specifies whether or not the catch-up subscription
will start from the last recorded event in the database. By default, this argument
is `False`. If `from_end` is `True`, only events recorded after the subscription
is started will be obtained. This argument will be disregarded if `commit_position`
is not `None`.

The optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`
is `False`, which means any event links will not be resolved, so that the events that are
returned may represent event links. If `resolve_links` is `True`, any event links will
be resolved, so that the linked events will be returned instead of the event links.

The optional `filter_exclude` argument is a sequence of regular expressions that
specifies which recorded events should be returned. This argument is ignored
if `filter_include` is set to a non-empty sequence. The default value of this
argument matches the event types of EventStoreDB "system events", so that system
events will not normally be included. See the Notes section below for more
information about filter expressions.

The optional `filter_include` argument is a sequence of regular expressions
that specifies which recorded events should be returned. By default, this
argument is an empty tuple. If this argument is set to a non-empty sequence,
the `filter_exclude` argument is ignored.

The optional `filter_by_stream_name` argument is a Python `bool` that indicates
whether the filtering will apply to event types or stream names. By default, this
value is `False` and so the filtering will apply to the event type strings of
recorded events.

The optional `include_checkpoints` argument is a Python `bool` which indicates
whether "checkpoint" messages should be included when recorded events are received.
Checkpoints have a `commit_position` value that can be used by an event processing component to
update its recorded commit position value, so that, when lots of events are being
filter out, the subscriber does not have to start from the same old position when
the event processing component is restarted.

The optional `include_caught_up` argument is a Python `bool` which indicates
whether "caught up" messages should be included when recorded events are
received. The default value of `include_caught_up` is `False`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

The example below shows how to start a catch-up subscription that starts
from the first recorded event in the database.

```python
# Subscribe from the first recorded event in the database.
catchup_subscription = client.subscribe_to_all()
```

The example below shows that catch-up subscriptions do not stop
automatically, but block when the last recorded event is received,
and then continue when subsequent events are recorded.

```python
from time import sleep
from threading import Thread


# Append a new event to a new stream.
stream_name2 = str(uuid.uuid4())
event4 = NewEvent(type='OrderCreated', data=b'{}')

client.append_to_stream(
    stream_name=stream_name2,
    current_version=StreamState.NO_STREAM,
    events=[event4],
)


# Receive events from the catch-up subscription in a different thread.
received_events = []

def receive_events():
    for event in catchup_subscription:
        received_events.append(event)


def wait_for_event(event):
    for _ in range(100):
        for received in reversed(received_events):
            if event == received:
                return
        else:
            sleep(0.1)
    else:
        raise AssertionError("Event wasn't received")


thread = Thread(target=receive_events, daemon=True)
thread.start()

# Wait to receive event4.
wait_for_event(event4)

# Append another event whilst the subscription is running.
event5 = NewEvent(type='OrderUpdated', data=b'{}')
client.append_to_stream(
    stream_name=stream_name2,
    current_version=0,
    events=[event5],
)

# Wait for the subscription to block.
wait_for_event(event5)

# Stop the subscription.
catchup_subscription.stop()
thread.join()
```

The example below shows how to subscribe to events recorded after a
particular commit position, in this case from the commit position of
the last recorded event that was received above. Then, another event is
recorded before the subscription is restarted. And three more events are
recorded whilst the subscription is running. These four events are
received in the order they were recorded.


```python

# Append another event.
event6 = NewEvent(type='OrderDeleted', data=b'{}')
client.append_to_stream(
    stream_name=stream_name2,
    current_version=1,
    events=[event6],
)

# Restart subscribing to all events after the
# commit position of the last received event.
catchup_subscription = client.subscribe_to_all(
    commit_position=received_events[-1].commit_position
)

thread = Thread(target=receive_events, daemon=True)
thread.start()

# Wait for event6.
wait_for_event(event6)

# Append three more events to a new stream.
stream_name3 = str(uuid.uuid4())
event7 = NewEvent(type='OrderCreated', data=b'{}')
event8 = NewEvent(type='OrderUpdated', data=b'{}')
event9 = NewEvent(type='OrderDeleted', data=b'{}')

client.append_to_stream(
    stream_name=stream_name3,
    current_version=StreamState.NO_STREAM,
    events=[event7, event8, event9],
)

# Wait for events 7, 8 and 9.
wait_for_event(event7)
wait_for_event(event8)
wait_for_event(event9)

# Stop the subscription.
catchup_subscription.stop()
thread.join()
```

The catch-up subscription call is ended as soon as the subscription object's
`stop()` method is called. This happens automatically when it goes out of scope,
or when it is explicitly deleted from memory using the Python `del` keyword.

### Subscribe to stream events<a id="subscribe-to-stream-events"></a>

The `subscribe_to_stream()` method can be used to start a catch-up subscription
from which events recorded in a single stream can be obtained. This method
returns a "catch-up subscription" iterator.

This method has a required `stream_name` argument, which specifies the name of the
stream from which recorded events will be received.

This method also has six optional arguments, `stream_position`, `from_end`,
`resolve_links`, `include_caught_up`, `timeout` and `credentials`.

The optional `stream_position` argument specifies a position in the stream from
which to start subscribing. The default value of `stream_position` is `None`,
which means that all events recorded in the stream will be obtained in the
order they were recorded, unless `from_end` is set to `True`. If a stream
position is given, then only events recorded after that position will be obtained.

The optional `from_end` argument specifies that the subscription will start
from the last position in the stream. The default value of `from_end` is `False`.
If `from_end` is `True`, then only events recorded after the subscription was
created will be obtained. This argument if ignored is `stream_position` is set.

The optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`
is `False`, which means any event links will not be resolved, so that the events that are
returned may represent event links. If `resolve_links` is `True`, any event links will
be resolved, so that the linked events will be returned instead of the event links.

The optional `include_caught_up` argument is a Python `bool` which indicates
whether "caught up" messages should be included when recorded events are
received. The default value of `include_caught_up` is `False`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

The example below shows how to start a catch-up subscription from
the first recorded event in a stream.

```python
# Subscribe from the start of 'stream2'.
subscription = client.subscribe_to_stream(stream_name=stream_name2)
```

The example below shows how to start a catch-up subscription from
a particular stream position.

```python
# Subscribe to stream2, from the second recorded event.
subscription = client.subscribe_to_stream(
    stream_name=stream_name2,
    stream_position=1,
)
```

### How to implement exactly-once event processing<a id="how-to-implement-exactly-once-event-processing"></a>

The commit positions of recorded events that are received and processed by a
downstream component are usefully recorded by the downstream component, so that
the commit position of last processed event can be determined when processing is
resumed.

The last recorded commit position can be used to specify the commit position from which
to subscribe when processing is resumed. Since this commit position will represent the
position of the last successfully processed event in a downstream component, so it
will be usual to want the next event after this position, because that is the next
event that has not yet been processed. For this reason, when subscribing for events
from a specific commit position using a catch-up subscription in EventStoreDB, the
recorded event at the specified commit position will NOT be included in the sequence
of recorded events that are received.

To accomplish "exactly-once" processing of recorded events in a downstream
component when using a catch-up subscription, the commit position of a recorded
event should be recorded atomically and uniquely along with the result of processing
recorded events, for example in the same database as materialised views when
implementing eventually-consistent CQRS, or in the same database as a downstream
analytics or reporting or archiving application. By recording the commit position
of recorded events atomically with the new state that results from processing
recorded events, "dual writing" in the consumption of recorded events can be
avoided. By also recording the commit position uniquely, the new state cannot be
recorded twice, and hence the recorded state of the downstream component will be
updated only once for any recorded event. By using the greatest recorded commit
position to resume a catch-up subscription, all recorded events will eventually
be processed. The combination of the "at-most-once" condition and the "at-least-once"
condition gives the "exactly-once" condition.

The danger with "dual writing" in the consumption of recorded events is that if a
recorded event is successfully processed and new state recorded atomically in one
transaction with the commit position recorded in a separate transaction, one may
happen and not the other. If the new state is recorded but the position is lost,
and then the processing is stopped and resumed, the recorded event may be processed
twice. On the other hand, if the commit position is recorded but the new state is
lost, the recorded event may effectively not be processed at all. By either
processing an event more than once, or by failing to process an event, the recorded
state of the downstream component might be inaccurate, or possibly inconsistent, and
perhaps catastrophically so. Such consequences may or may not matter in your situation.
But sometimes inconsistencies may halt processing until the issue is resolved. You can
avoid "dual writing" in the consumption of events by atomically recording the commit
position of a recorded event along with the new state that results from processing that
event in the same atomic transaction. By making the recording of the commit positions
unique, so that transactions will be rolled back when there is a conflict, you will
prevent the results of any duplicate processing of a recorded event being committed.

Recorded events received from a catch-up subscription cannot be acknowledged back
to the EventStoreDB server. Acknowledging events, however, is an aspect of "persistent
subscriptions". Hoping to rely on acknowledging events to an upstream
component is an example of dual writing.


## Persistent subscriptions<a id="persistent-subscriptions"></a>

In EventStoreDB, "persistent" subscriptions are similar to catch-up subscriptions,
in that reading a persistent subscription will block when there are no more recorded
events to be received, and then continue when new events are subsequently recorded.

Persistent subscriptions can be consumed by a group of consumers operating with one
of the supported "consumer strategies".

The significant different with persistent subscriptions is the server will keep track
of the progress of the consumers. The consumer of a persistent subscription will
therefore need to "acknowledge" when a recorded event has been processed successfully,
and otherwise "negatively acknowledge" a recorded event that has been received but was
not successfully processed.

All of this means that for persistent subscriptions there are "create", "read", "update"
"delete", "ack", and "nack" operations to consider.

Whilst there are some advantages of persistent subscriptions, in particular the
concurrent processing of recorded events by a group of consumers, by tracking in
the server the position in the commit sequence of events that have been processed,
the issue of "dual writing" in the consumption of events arises. Reliability in the
processing of recorded events by a group of persistent subscription consumers will
rely on their idempotent handling of duplicate messages, and their resilience to
out-of-order delivery.


### Create subscription to all<a id="create-subscription-to-all"></a>

*requires leader*

The `create_subscription_to_all()` method can be used to create a "persistent subscription"
to all the recorded events in the database across all streams.

This method has a required `group_name` argument, which is the
name of a "group" of consumers of the subscription.

This method has nineteen optional arguments, `from_end`, `commit_position`, `resolve_links`,
`filter_exclude`, `filter_include`, `filter_by_stream_name`, `consumer_strategy`,
`message_timeout`, `max_retry_count`, `min_checkpoint_count`, `max_checkpoint_count`,
`checkpoint_after`, `max_subscriber_count`, `live_buffer_size`, `read_batch_size`,
`history_buffer_size`, `extra_statistics`, `timeout` and `credentials`.

The optional `from_end` argument can be used to specify that the group of consumers
of the subscription should only receive events that were recorded after the subscription
was created.

Alternatively, the optional `commit_position` argument can be used to specify a commit
position from which the group of consumers of the subscription should
receive events. Please note, the recorded event at the specified commit position might
be included in the recorded events received by the group of consumers.

If neither `from_end` nor `commit_position` are specified, the group of consumers
of the subscription will potentially receive all recorded events in the database.

The optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`
is `False`, which means any event links will not be resolved, so that the events that are
returned may represent event links. If `resolve_links` is `True`, any event links will
be resolved, so that the linked events will be returned instead of the event links.

The optional `filter_exclude` argument is a sequence of regular expressions that
specifies which recorded events should be returned. This argument is ignored
if `filter_include` is set to a non-empty sequence. The default value of this
argument matches the event types of EventStoreDB "system events", so that system
events will not normally be included. See the Notes section below for more
information about filter expressions.

The optional `filter_include` argument is a sequence of regular expressions
that specifies which recorded events should be returned. By default, this
argument is an empty tuple. If this argument is set to a non-empty sequence,
the `filter_exclude` argument is ignored.

The optional `filter_by_stream_name` argument is a Python `bool` that indicates
whether the filtering will apply to event types or stream names. By default, this
value is `False` and so the filtering will apply to the event type strings of
recorded events.

The optional `consumer_strategy` argument is a Python `str` that defines
the consumer strategy for this persistent subscription. The value of this argument
can be `'DispatchToSingle'`, `'RoundRobin'`, `'Pinned'`, or `'PinnedByCorrelation'`. The
default value is `'DispatchToSingle'`.

The optional `message_timeout` argument is a Python `float` which sets a maximum duration,
in seconds, from the server sending a recorded event to a consumer of the persistent
subscription until either an "acknowledgement" (ack) or a "negative acknowledgement"
(nack) is received by the server, after which the server will retry to send the event.
The default value of `message_timeout` is `30.0`.

The optional `max_retry_count` argument is a Python `int` which sets the number of times
the server will retry to send an event. The default value of `max_retry_count` is `10`.

The optional `min_checkpoint_count` argument is a Python `int` which sets the minimum
number of "acknowledgements" (acks) received by the server before the server may record
the acknowledgements. The default value of `min_checkpoint_count` is `10`.

The optional `max_checkpoint_count` argument is a Python `int` which sets the maximum
number of "acknowledgements" (acks) received by the server before the server must
record the acknowledgements. The default value of `max_checkpoint_count` is `1000`.

The optional `checkpoint_after` argument is a Python `float` which sets the maximum
duration in seconds between recording "acknowledgements" (acks). The default value of
`checkpoint_after` is `2.0`.

The optional `max_subscriber_count` argument is a Python `int` which sets the maximum
number of concurrent readers of the persistent subscription, beyond which attempts to
read the persistent subscription will raise a `MaximumSubscriptionsReached` error.

The optional `live_buffer_size` argument is a Python `int` which sets the size of the
buffer (in-memory) holding newly recorded events. The default value of `live_buffer_size`
is 500.

The optional `read_batch_size` argument is a Python `int` which sets the number of
recorded events read from disk when catching up. The default value of `read_batch_size`
is 200.

The optional `history_buffer_size` argument is a Python `int` which sets the number of
recorded events to cache in memory when catching up. The default value of `history_buffer_size`
is 500.

The optional `extra_statistics` argument is a Python `bool` which enables tracking of
extra statistics on this subscription. The default value of `extra_statistics` is `False`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

The method `create_subscription_to_all()` does not return a value. Recorded events are
obtained by calling the `read_subscription_to_all()` method.

In the example below, a persistent subscription is created to operate from the
first recorded non-system event in the database.

```python
# Create a persistent subscription.
group_name1 = f"group-{uuid.uuid4()}"
client.create_subscription_to_all(group_name=group_name1)
```

### Read subscription to all<a id="read-subscription-to-all"></a>

*requires leader*

The `read_subscription_to_all()` method can be used by a group of consumers to receive
recorded events from a persistent subscription that has been created using
the `create_subscription_to_all()` method.

This method has a required `group_name` argument, which is
the name of a "group" of consumers of the subscription specified
when `create_subscription_to_all()` was called.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

This method returns a `PersistentSubscription` object, which is an iterator
giving `RecordedEvent` objects. It also has `ack()`, `nack()` and `stop()`
methods.

```python
subscription = client.read_subscription_to_all(group_name=group_name1)
```

The `ack()` method should be used by a consumer to "acknowledge" to the server that
it has received and successfully processed a recorded event. This will prevent that
recorded event being received by another consumer in the same group. The `ack()`
has an `item` argument which can be either a `RecordedEvent` or a `UUID`. If you pass
in a `RecordedEvent`, the value of its `ack_id` attribute will be used to acknowledge
the event to the server. If you pass in a UUID, then used the value of the `ack_id`
of the `RecordedEvent` that is being acknowledged, in case the event has been resolved
from a link event (which can happen both when persistent subscription setting
`resolve_links` is `True` and also when replaying parked events regardless of the
`resolve_links` setting).

The example below iterates over the subscription object, and calls `ack()` with the
received `RecordedEvent` objects. The subscription's `stop()` method is called when
we have received `event9`, stopping the iteration, so that we can continue with the
examples below.

```python
received_events = []

for event in subscription:
    received_events.append(event)

    # Acknowledge the received event.
    subscription.ack(event)

    # Stop when 'event9' has been received.
    if event == event9:
        subscription.stop()
```

The `nack()` should be used by a consumer to "negatively acknowledge" to the server that
it has received but not successfully processed a recorded event. The `nack()` method has
an `item` argument that works in the same way as `ack()`. Use the recorded event or its
`ack_id` attribute. The `nack()` method also has an `action` argument, which should be
a Python `str`: either `'unknown'`, `'park'`, `'retry'`, `'skip'` or `'stop'`.

The `stop()` method can be used to stop the gRPC streaming operation.

### How to write a persistent subscription consumer<a id="how-to-write-a-persistent-subscription-consumer"></a>

The reading of a persistent subscription can be encapsulated in a "consumer" that calls
a "policy" function when a recorded event is received and then automatically calls
`ack()` if the policy function returns normally, and `nack()` if it raises an exception,
perhaps retrying the event for a certain number of times before parking the event.

The simple example below shows how this might be done. We can see that 'event9' is
acknowledged before 'event5' is finally parked.

The  number of time a `RecordedEvent` has been retried is presented by the its
`retry_count` attribute.

```python
acked_events = {}
nacked_events = {}


class ExampleConsumer:
    def __init__(self, subscription, max_retry_count, final_action):
        self.subscription = subscription
        self.max_retry_count = max_retry_count
        self.final_action = final_action
        self.error = None

    def run(self):
        with self.subscription:
            for event in self.subscription:
                try:
                    self.policy(event)
                except Exception:
                    if event.retry_count < self.max_retry_count:
                        action = "retry"
                    else:
                        action = self.final_action
                    self.subscription.nack(event, action)
                    self.after_nack(event, action)
                else:
                    self.subscription.ack(event)
                    self.after_ack(event)

    def stop(self):
        self.subscription.stop()

    def policy(self, event):
        # Raise an exception when we see "event5".
        if event == event5:
            raise Exception()

    def after_ack(self, event):
        # Track retry count of acked events.
        acked_events[event.id] = event.retry_count

    def after_nack(self, event, action):
        # Track retry count of nacked events.
        nacked_events[event.id] = event.retry_count

        if action == self.final_action:
            # Stop the consumer, so we can continue with the examples.
            self.stop()


# Create subscription.
group_name = f"group-{uuid.uuid4()}"
client.create_subscription_to_all(group_name, commit_position=commit_position1)

# Read subscription.
subscription = client.read_subscription_to_all(group_name)

# Construct consumer.
consumer = ExampleConsumer(
    subscription=subscription,
    max_retry_count=5,
    final_action="park",
)

# Run consumer.
consumer.run()

# Check 'event5' was nacked and never acked.
assert event5.id in nacked_events
assert event5.id not in acked_events
assert nacked_events[event5.id] == 5

# Check 'event9' was acked and never nacked.
assert event9.id in acked_events
assert event9.id not in nacked_events
```

### Update subscription to all<a id="update-subscription-to-all"></a>

*requires leader*

The `update_subscription_to_all()` method can be used to update a
"persistent subscription". Please note, the filter options and consumer
strategy cannot be adjusted.

This method has a required `group_name` argument, which is the
name of a "group" of consumers of the subscription.

This method also has sixteen optional arguments, `from_end`, `commit_position`,
`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,
`min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,
`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,
`extra_statistics`, `timeout` and `credentials`.

The optional arguments `from_end`, `commit_position`,
`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,
`min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,
`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,
amd `extra_statistics` can be used to adjust the values set on previous calls to
`create_subscription_to_all()` and `update_subscription_to_all()`. If any of
these arguments are not mentioned in a call to `update_subscription_to_all()`,
the corresponding settings of the persistent subscription will be unchanged.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

The method `update_subscription_to_all()` does not return a value.

In the example below, a persistent subscription is updated to run from the end of the
database.

```python
# Create a persistent subscription.
client.update_subscription_to_all(group_name=group_name1, from_end=True)
```

### Create subscription to stream<a id="create-subscription-to-stream"></a>

*requires leader*

The `create_subscription_to_stream()` method can be used to create a persistent
subscription to a stream.

This method has two required arguments, `group_name` and `stream_name`. The
`group_name` argument names the group of consumers that will receive events
from this subscription. The `stream_name` argument specifies which stream
the subscription will follow. The values of both these arguments are expected
to be Python `str` objects.

This method also has sixteen optional arguments, `stream_position`, `from_end`,
`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,
`min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,
`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,
`extra_statistics`, `timeout` and `credentials`.

The optional `stream_position` argument specifies a stream position from
which to subscribe. The recorded event at this stream
position will be received when reading the subscription.

The optional `from_end` argument is a Python `bool`.
By default, the value of this argument is `False`. If this argument is set
to `True`, reading from the subscription will receive only events
recorded after the subscription was created. That is, it is not inclusive
of the current stream position.

The optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`
is `False`, which means any event links will not be resolved, so that the events that are
returned may represent event links. If `resolve_links` is `True`, any event links will
be resolved, so that the linked events will be returned instead of the event links.

The optional `consumer_strategy` argument is a Python `str` that defines
the consumer strategy for this persistent subscription. The value of this argument
can be `'DispatchToSingle'`, `'RoundRobin'`, `'Pinned'`, or `'PinnedByCorrelation'`. The
default value is `'DispatchToSingle'`.

The optional `message_timeout` argument is a Python `float` which sets a maximum duration,
in seconds, from the server sending a recorded event to a consumer of the persistent
subscription until either an "acknowledgement" (ack) or a "negative acknowledgement"
(nack) is received by the server, after which the server will retry to send the event.
The default value of `message_timeout` is `30.0`.

The optional `max_retry_count` argument is a Python `int` which sets the number of times
the server will retry to send an event. The default value of `max_retry_count` is `10`.

The optional `min_checkpoint_count` argument is a Python `int` which sets the minimum
number of "acknowledgements" (acks) received by the server before the server may record
the acknowledgements. The default value of `min_checkpoint_count` is `10`.

The optional `max_checkpoint_count` argument is a Python `int` which sets the maximum
number of "acknowledgements" (acks) received by the server before the server must
record the acknowledgements. The default value of `max_checkpoint_count` is `1000`.

The optional `checkpoint_after` argument is a Python `float` which sets the maximum
duration in seconds between recording "acknowledgements" (acks). The default value of
`checkpoint_after` is `2.0`.

The optional `max_subscriber_count` argument is a Python `int` which sets the maximum
number of concurrent readers of the persistent subscription, beyond which attempts to
read the persistent subscription will raise a `MaximumSubscriptionsReached` error.

The optional `live_buffer_size` argument is a Python `int` which sets the size of the
buffer (in-memory) holding newly recorded events. The default value of `live_buffer_size`
is 500.

The optional `read_batch_size` argument is a Python `int` which sets the number of
recorded events read from disk when catching up. The default value of `read_batch_size`
is 200.

The optional `history_buffer_size` argument is a Python `int` which sets the number of
recorded events to cache in memory when catching up. The default value of `history_buffer_size`
is 500.

The optional `extra_statistics` argument is a Python `bool` which enables tracking of
extra statistics on this subscription. The default value of `extra_statistics` is `False`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

This method does not return a value. Events can be received by calling
`read_subscription_to_stream()`.

The example below creates a persistent stream subscription from the start of the stream.

```python
# Create a persistent stream subscription from start of the stream.
group_name2 = f"group-{uuid.uuid4()}"
client.create_subscription_to_stream(
    group_name=group_name2,
    stream_name=stream_name2,
)
```

### Read subscription to stream<a id="read-subscription-to-stream"></a>

*requires leader*

The `read_subscription_to_stream()` method can be used to read a persistent
subscription to a stream.

This method has two required arguments, `group_name` and `stream_name`, which
should match the values of arguments used when calling `create_subscription_to_stream()`.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

This method returns a `PersistentSubscription` object, which is an iterator
giving `RecordedEvent` objects, that also has `ack()`, `nack()` and `stop()`
methods.

```python
subscription = client.read_subscription_to_stream(
    group_name=group_name2,
    stream_name=stream_name2,
)
```

The example below iterates over the subscription object, and calls `ack()`.
The subscription's `stop()` method is called when we have received `event6`,
stopping the iteration, so that we can continue with the examples below.

```python
events = []
for event in subscription:
    events.append(event)

    # Acknowledge the received event.
    subscription.ack(event)

    # Stop when 'event6' has been received.
    if event == event6:
        subscription.stop()
```

We can check we received all the events that were appended to `stream_name2`
in the examples above.

```python
assert len(events) == 3
assert events[0] == event4
assert events[1] == event5
assert events[2] == event6
```

### Update subscription to stream<a id="update-subscription-to-stream"></a>

*requires leader*

The `update_subscription_to_stream()` method can be used to update a persistent
subscription to a stream. Please note, the consumer strategy cannot be adjusted.

This method has a required `group_name` argument, which is the
name of a "group" of consumers of the subscription, and a required
`stream_name` argument, which is the name of a stream.

This method also has sixteen optional arguments, `from_end`, `stream_position`,
`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,
`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,
`extra_statistics`, `min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,
`timeout` and `credentials`.

The optional arguments `from_end`, `stream_position`,
`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,
`min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,
`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,
and `extra_statistics` can be used to adjust the values set on previous calls to
`create_subscription_to_stream()` and `update_subscription_to_stream()`. If any of
these arguments are not mentioned in a call to `update_subscription_to_stream()`,
the corresponding settings of the persistent subscription will be unchanged.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

The `update_subscription_to_stream()` method does not return a value.

In the example below, a persistent subscription to a stream is updated to run from the
end of the stream.

```python
# Create a persistent subscription.
client.update_subscription_to_stream(
    group_name=group_name2,
    stream_name=stream_name2,
    from_end=True,
)
```

### Replay parked events<a id="replay-parked-events"></a>

*requires leader*

The `replay_parked_events()` method can be used to "replay" events that have
been "parked" (negatively acknowledged with the action `'park'`) when reading
a persistent subscription. Parked events will then be received again by consumers
reading from the persistent subscription.

This method has a required `group_name` argument and an optional `stream_name`
argument. The values of these arguments should match those used when calling
`create_subscription_to_all()` or `create_subscription_to_stream()`.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

The example below replays parked events for group `group_name1`.

```python
client.replay_parked_events(
    group_name=group_name1,
)
```

The example below replays parked events for group `group_name2`.

```python
client.replay_parked_events(
    group_name=group_name2,
    stream_name=stream_name2,
)
```

### Get subscription info<a id="get-subscription-info"></a>

*requires leader*

The `get_subscription_info()` method can be used to get information for a
persistent subscription.

This method has a required `group_name` argument and an optional `stream_name`
argument, which should match the values of arguments used when calling either
`create_subscription_to_all()` or `create_subscription_to_stream()`.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

The example below gets information for the persistent subscription `group_name1` which
was created by calling `create_subscription_to_all()`.

```python
subscription_info = client.get_subscription_info(
    group_name=group_name1,
)
```

The example below gets information for the persistent subscription `group_name2` on
`stream_name2` which was created by calling `create_subscription_to_stream()`.

```python
subscription_info = client.get_subscription_info(
    group_name=group_name2,
    stream_name=stream_name2,
)
```

The returned value is a `SubscriptionInfo` object.

### List subscriptions<a id="list-subscriptions"></a>

*requires leader*

The `list_subscriptions()` method can be used to get information for all
existing persistent subscriptions, both "subscriptions to all" and
"subscriptions to stream".

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

The example below lists all the existing persistent subscriptions.

```python
subscriptions = client.list_subscriptions()
```

The returned value is a list of `SubscriptionInfo` objects.


### List subscriptions to stream<a id="list-subscriptions-to-stream"></a>

*requires leader*

The `list_subscriptions_to_stream()` method can be used to get information for all
the persistent subscriptions to a stream.

This method has one required argument, `stream_name`.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

```python
subscriptions = client.list_subscriptions_to_stream(
    stream_name=stream_name2,
)
```

The returned value is a list of `SubscriptionInfo` objects.


### Delete subscription<a id="delete-subscription"></a>

*requires leader*

The `delete_subscription()` method can be used to delete a persistent
subscription.

This method has a required `group_name` argument and an optional `stream_name`
argument, which should match the values of arguments used when calling either
`create_subscription_to_all()` or `create_subscription_to_stream()`.

This method has an optional `timeout` argument, which is a Python `float`
that sets a maximum duration, in seconds, for the completion of the gRPC operation.

This method has an optional `credentials` argument, which can be used to
override call credentials derived from the connection string URI.

The example below deletes the persistent subscription `group_name1` which
was created by calling `create_subscription_to_all()`.

```python
client.delete_subscription(
    group_name=group_name1,
)
```

The example below deleted the persistent subscription `group_name2` on
`stream_name2` which was created by calling `create_subscription_to_stream()`.

```python
client.delete_subscription(
    group_name=group_name2,
    stream_name=stream_name2,
)
```


## Projections<a id="projections"></a>

Please refer to the [EventStoreDB documentation](https://developers.eventstore.com/server/v23.10/projections.html)
for more information on projections in EventStoreDB.

### Create projection<a id="create-projection"></a>

*requires leader*

The `create_projection()` method can be used to create a "continuous" projection.

This method has two required arguments, `name` and `query`.

This required `name` argument is a Python `str` that specifies the name of the projection.

This required `query` argument is a Python `str` that defines what the projection will do.

This method also has four optional arguments, `emit_enabled`,
`track_emitted_streams`, `timeout`, and `credentials`.

The optional `emit_enabled` argument is a Python `bool` which specifies whether a
projection will be able to emit events. If a `True` value is specified, the projection
will be able to emit events, otherwise the projection will not be able to emit events.
The default value of `emit_enabled` is `False`.

Please note, `emit_enabled` must be `True` if your projection query includes a call to
`emit()`, otherwise the projection will not run.

The optional `track_emitted_streams` argument is a Python `bool` which specifies whether
a projection will have its emitted streams tracked. If a `True` value is specified, the
projection will have its emitted streams tracked, otherwise the projection will not
have its emitted streams tracked. The default value of `track_emitted_streams` is `False`.

The purpose of tracking emitted streams is that they can optionally be deleted when
a projection is deleted (see the `delete_projection()` method for more details).

Please note, if you set `track_emitted_streams` to `True`, then you must also set
`emit_enabled` to `True`, otherwise an error will be raised by this method.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

In the example below, a projection is created that processes events appended to
`stream_name2`. The "state" of the projection is initialised to have a "count" that
is incremented once for each event.

```python
projection_name = str(uuid.uuid4())

projection_query = """fromStream('%s')
.when({
  $init: function(){
    return {
      count: 0
    };
  },
  OrderCreated: function(s,e){
    s.count += 1;
  },
  OrderUpdated: function(s,e){
    s.count += 1;
  },
  OrderDeleted: function(s,e){
    s.count += 1;
  }
})
.outputState()
"""  % stream_name2

client.create_projection(
    name=projection_name,
    query=projection_query,
)
```

Please note, the `outputState()` call is optional, and causes the state of the
projection to be persisted in a "result" stream. If `outputState()` is called, an
event representing the state of the projection will immediately be written to a
"result" stream.

The default name of the "result" stream for a projection with name `projection_name`
is `$projections-{projection_name}-result`. This stream name can be used to read from
and subscribe to the "result" stream, with the `get_stream()`, or `read_stream()`,
or `subscribe_to_stream()`, or `create_subscription_to_stream()` and
`read_subscription_to_stream()` methods.

If your projection does not call `outputState()`, then you won't be able to read or
subscribe to a "result" stream, but you will still be able to get the projection
"state" using the `get_projection_state()` method.

The "type" string of events recorded in "result" streams is `'Result'`. You may want to
include this in a `filter_exclude` argument when filtering events by type whilst reading
or subscribing to "all" events recorded in the database (with `read_all()`,
`subscribe_to_all()`, etc).

Additionally, and in any case, from time to time the state of the projection will be
recorded in a "state" stream, and also the projection will write to a "checkpoint"
stream. The "state" stream, the "checkpoint" stream, and all "emitted" streams that
have been "tracked" (as a consequence of the `track_emitted_streams` argument having
been `True`) can optionally be deleted when the projection is deleted. See
`delete_projection()` for details.

Unlike the "result" and "emitted" streams, the "state" and the "checkpoint" streams
cannot be read or subscribed to by users, or viewed in the "stream browser" view of
EventStoreDB's Web interface.

### Get projection state<a id="get-projection-state"></a>

*requires leader*

The `get_projection_state()` method can be used to get a projection's "state".

This method has a required `name` argument, which is a Python `str` that
specifies the name of a projection.

This method also has two optional arguments, `timeout` and `credentials`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

In the example below, after sleeping for 1 second to allow the projection
to process all the recorded events, the projection "state" is obtained.
We can see that the projection has processed three events.

```python
sleep(1)  # allow time for projection to process recorded events

projection_state = client.get_projection_state(name=projection_name)

assert projection_state.value == {'count': 3}
```

### Get projection statistics<a id="get-projection-statistics"></a>

*requires leader*

The `get_projection_statistics()` method can be used to get projection statistics.

This method has a required `name` argument, which is a Python `str` that specifies the
name of a projection.

This method also has two optional arguments, `timeout` and `credentials`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

This method returns a `ProjectionStatistics` object that represents
the named projection.

```python
statistics = client.get_projection_statistics(name=projection_name)
```

A `ProjectionStatistics` object is returned. The attributes of this object
have values that represent the progress of the projection.

### Update projection<a id="update-projection"></a>

*requires leader*

The `update_projection()` method can be used to update a projection.

This method has two required arguments, `name` and `query`.

The required `name` argument is a Python `str` which specifies the name of the projection
to be updated.

The required `query` argument is a Python `str` which defines what the projection will do.

This method also has three optional arguments, `emit_enabled`, `timeout`, and `credentials`.

The optional `emit_enabled` argument is a Python `bool` which specifies whether a
projection will be able to emit events. If a `True` value is specified, the projection
will be able to emit events. If a `False` value is specified, the projection will not
be able to emit events. The default value of `emit_enabled` is `False`.

Please note, `emit_enabled` must be `True` if your projection query includes a call
to `emit()`, otherwise the projection will not run.

Please note, it is not possible to update `track_emitted_streams` via the gRPC API.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

```python
client.update_projection(name=projection_name, query=projection_query)
```

### Enable projection<a id="enable-projection"></a>

*requires leader*

The `enable_projection()` method can be used to enable (start running) a projection
that was previously disabled (stopped).

This method has a required `name` argument, which is a Python `str` that
specifies the name of the projection to be enabled.

This method also has two optional arguments, `timeout` and `credentials`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

```python
client.enable_projection(name=projection_name)
```

### Disable projection<a id="disable-projection"></a>

*requires leader*

The `disable_projection()` method can be used to disable (stop running) a projection.

This method has a required `name` argument, which is a Python `str` that
specifies the name of the projection to be disabled.

This method also has two optional arguments, `timeout`, and `credentials`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

```python
client.disable_projection(name=projection_name)
```

### Reset projection<a id="reset-projection"></a>

*requires leader*

The `reset_projection()` method can be used to reset a projection.

This method has a required `name` argument, which is a Python `str` that
specifies the name of the projection to be reset.

This method also has two optional arguments, `timeout`, and `credentials`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

```python
client.reset_projection(name=projection_name)
```

Please note, a projection must be disabled before it can be reset.


### Delete projection<a id="delete-projection"></a>

*requires leader*

The `delete_projection()` method can be used to delete a projection.

This method has a required `name` argument, which is a Python `str` that
specifies the name of the projection to be deleted.

This method also has five optional arguments, `delete_emitted_streams`,
`delete_state_stream`, `delete_checkpoint_stream`, `timeout`, and `credentials`.

The optional `delete_emitted_streams` argument is a Python `bool` which specifies
that all "emitted" streams that have been tracked will be deleted. For emitted streams
to be deleted, they must have been tracked (see the `track_emitted_streams` argument of
the `create_projection()` method.)

The optional `delete_state_stream` argument is a Python `bool` which specifies that
the projection's "state" stream should also be deleted. The "state" stream is like
the "result" stream, but events are written to the "state" stream occasionally, along
with events written to the "checkpoint" stream, rather than being written immediately
in the way a call `outputState()` immediately writes events to the "result" stream.

The optional `delete_checkpoint_stream` argument is a Python `bool` which specifies
that the projection's "checkpoint" stream should also be deleted.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

```python
client.delete_projection(name=projection_name)
```

Please note, a projection must be disabled before it can be deleted.

### Restart projections subsystem<a id="restart-projections-subsystem"></a>

*requires leader*

The `restart_projections_subsystem()` method can be used to restart the projections subsystem.

This method also has two optional arguments, `timeout` and `credentials`.

The optional `timeout` argument is a Python `float` which sets a
maximum duration, in seconds, for the completion of the gRPC operation.

The optional `credentials` argument can be used to
override call credentials derived from the connection string URI.

```python
client.restart_projections_subsystem()
```


## Call credentials<a id="call-credentials"></a>

Default call credentials are derived by the client from the user info part of the
connection string URI.

Many of the client methods described above have an optional `credentials` argument,
which can be used to set call credentials for an individual method call that override
those derived from the connection string URI.

Call credentials are sent to "secure" servers in a "basic auth" authorization header.
This authorization header is used by the server to authenticate the client. The
authorization header is not sent to "insecure" servers.


### Construct call credentials<a id="construct-call-credentials"></a>

The client method `construct_call_credentials()` can be used to construct a call
credentials object from a username and password.

```python
call_credentials = client.construct_call_credentials(
    username='admin', password='changeit'
)
```

The call credentials object can be used as the value of the `credentials`
argument in other client methods.

## Connection<a id="connection"></a>

### Reconnect<a id="reconnect"></a>

The `reconnect()` method can be used to manually reconnect the client to a
suitable EventStoreDB node. This method uses the same routine for reading the
cluster node states and then connecting to a suitable node according to the
client's node preference that is specified in the connection string URI when
the client is constructed. This method is thread-safe, in that when it is called
by several threads at the same time, only one reconnection will occur. Concurrent
attempts to reconnect will block until the client has reconnected successfully,
and then they will all return normally.

```python
client.reconnect()
```

An example of when it might be desirable to reconnect manually is when (for performance
reasons) the client's node preference is to be connected to a follower node in the
cluster, and, after a cluster leader election, the follower becomes the leader.
Reconnecting to a follower node in this case is currently beyond the capabilities of
this client, but this behavior might be implemented in a future release.

Reconnection will happen automatically in many cases, due to the `@autoreconnect`
decorator.

### Close<a id="close"></a>

The `close()` method can be used to cleanly close the client's gRPC connection.

```python
client.close()
```


## Asyncio client<a id="asyncio-client"></a>

The `esdbclient` package also provides an asynchronous I/O gRPC Python client for
EventStoreDB. It is functionally equivalent to the multithreaded client. It uses
the `grpc.aio` package and the `asyncio` module, instead of `grpc` and `threading`.

It supports both the "esdb" and the "esdb+discover" connection string URI schemes,
and can connect to both "secure" and "insecure" EventStoreDB servers.

The class `AsyncEventStoreDBClient` can be used to construct an instance of the
asynchronous I/O gRPC Python client. It can be imported from `esdbclient`. The
async method `connect()` should be called after constructing the client.

The asyncio client has exactly the same methods as the multithreaded `EventStoreDBClient`.
These methods are defined as `async def` methods, and so calls to these methods will
return Python "awaitables" that must be awaited to obtain the method return values.
The methods have the same behaviors, the same arguments and the same or equivalent
return values. The methods are similarly decorated with reconnect and retry decorators,
that selectively reconnect and retry when connection issues or server errors are
encountered.

When awaited, the methods `read_all()` and `read_stream()` return an `AsyncReadResponse`
object. The methods `subscribe_to_all()` and `subscribe_to_stream()` return an
`AsyncCatchupSubscription` object. The methods `read_subscription_to_all()` and
`read_subscription_to_stream()` return an `AsyncPersistentSubscription` object.
These objects are asyncio iterables, which you can iterate over with Python's `async for`
syntax to obtain `RecordedEvent` objects. They are also asyncio context managers,
supporting the `async with` syntax. They also have a `stop()` method which can be
used to terminate the iterator in a way that actively cancels the streaming gRPC call
to the server. When used as a context manager, the `stop()` method will be called when
the context manager exits.

The methods `read_subscription_to_all()` and `read_subscription_to_stream()` return
instances of the class `AsyncPersistentSubscription`, which has async methods `ack()`,
`nack()` that work in the same way as the methods on `PersistentSubscription`,
supporting the acknowledgement and negative acknowledgement of recorded events that
have been received from a persistent subscription. See above for details.

### Synopsis<a id="synopsis-1"></a>

The example below demonstrates the async `append_to_stream()`, `get_stream()` and
`subscribe_to_all()` methods. These are the most useful methods for writing
an event-sourced application, allowing new aggregate events to be recorded, the
recorded events of an aggregate to be obtained so aggregates can be reconstructed,
and the state of an application to propagated and processed with "exactly-once"
semantics.

```python
import asyncio

from esdbclient import AsyncEventStoreDBClient


async def demonstrate_async_client():

    # Construct client.
    client = AsyncEventStoreDBClient(
        uri=os.getenv("ESDB_URI"),
        root_certificates=os.getenv("ESDB_ROOT_CERTIFICATES"),
    )

    # Connect to EventStoreDB.
    await client.connect()

    # Append events.
    stream_name = str(uuid.uuid4())
    event1 = NewEvent("OrderCreated", data=b'{}')
    event2 = NewEvent("OrderUpdated", data=b'{}')
    event3 = NewEvent("OrderDeleted", data=b'{}')

    commit_position = await client.append_to_stream(
        stream_name=stream_name,
        current_version=StreamState.NO_STREAM,
        events=[event1, event2, event3]
    )

    # Get stream events.
    recorded = await client.get_stream(stream_name)
    assert len(recorded) == 3
    assert recorded[0] == event1
    assert recorded[1] == event2
    assert recorded[2] == event3


    # Subscribe to all events.
    received = []
    async with await client.subscribe_to_all(commit_position=0) as subscription:
        async for event in subscription:
            received.append(event)
            if event.commit_position == commit_position:
                break
    assert received[-3] == event1
    assert received[-2] == event2
    assert received[-1] == event3


    # Close the client.
    await client.close()


# Run the demo.
asyncio.run(
    demonstrate_async_client()
)
```

### FastAPI example<a id="fastapi"></a>

The example below shows how to use `AsyncEventStoreDBClient` with [FastAPI](https://fastapi.tiangolo.com).

```python
from contextlib import asynccontextmanager

from fastapi import FastAPI

from esdbclient import AsyncEventStoreDBClient


client: AsyncEventStoreDBClient


@asynccontextmanager
async def lifespan(_: FastAPI):
    # Construct the client.
    global client
    client = AsyncEventStoreDBClient(
        uri="esdb+discover://localhost:2113?Tls=false",
    )
    await client.connect()

    yield

    # Close the client.
    await client.close()


app = FastAPI(lifespan=lifespan)


@app.get("/commit_position")
async def commit_position():
    commit_position = await client.get_commit_position()
    return {"commit_position": commit_position}
```

If you put this code in a file called `fastapi_example.py` and then run command
`uvicorn fastapi_example:app --host 0.0.0.0 --port 80`, then the FastAPI application
will return something like `{"commit_position":628917}` when a browser is pointed
to `http://localhost/commit_position`. Use Ctrl-c to exit the process.

## Notes<a id="notes"></a>

### Regular expression filters<a id="regular-expression-filters"></a>

The `read_all()`, `subscribe_to_all()`, `create_subscription_to_all()`
and `get_commit_position()` methods have `filter_exclude` and `filter_include`
arguments. This section provides some more details about the values of these
arguments.

The first thing to note is that the values of these arguments should be sequences
of regular expressions.

Please note, they are concatenated together by the client as bracketed alternatives in a larger
regular expression that is anchored to the start and end of the strings being
matched. So there is no need to include the `'^'` and `'$'` anchor assertions.

You should use wildcards if you want to match substrings, for example `'.*Snapshot'`
to match all strings that end with `'Snapshot`', or `'Order.*'` to match all strings
that start with `'Order'`.

System events generated by EventStoreDB have `type` strings that start with
the `$` sign. Persistence subscription events generated when manipulating
persistence subscriptions have `type` strings that start with `PersistentConfig`.

For example, to match the type of EventStoreDB system events, use the regular
expression string `r'\$.+'`. Please note, the constant `ESDB_SYSTEM_EVENTS_REGEX` is
set to this value. You can import this constant from `esdbclient` and use it when
building longer sequences of regular expressions.

Similarly, to match the type of EventStoreDB persistence subscription events, use the
regular expression `r'PersistentConfig\d+'`. The constant `ESDB_PERSISTENT_CONFIG_EVENTS_REGEX`
is set to this value. You can import this constant from `esdbclient` and use it when
building longer sequences of regular expressions.

The constant `DEFAULT_EXCLUDE_FILTER` is a sequence of regular expressions that includes
both `ESDB_SYSTEM_EVENTS_REGEX` and `ESDB_PERSISTENT_CONFIG_EVENTS_REGEX`. It is used
as the default value of `filter_exclude` so that the events generated internally by
EventStoreDB are excluded by default.

In all methods that have a `filter_exclude` argument, the default value of the argument
is the constant `DEFAULT_EXCLUDE_FILTER`, which is designed to match (and therefore
to exclude) both "system" and "persistence subscription config" event types, which
would otherwise be included.

This value can be extended. For example, if you want to exclude system events and
persistent subscription events and also events that have a type that ends with
`'Snapshot'`, then you can use `DEFAULT_EXCLUDE_FILTER + ['.*Snapshot']` as the
`filter_exclude` argument.

The `filter_include` and `filter_exclude` arguments are designed to have exactly
the opposite effect from each other, so that a sequence of strings given to
`filter_include` will return exactly those events which would be excluded if
the same argument value were used with `filter_exclude`. And vice versa, so that
a sequence of strings given to `filter_exclude` will return exactly those events
that would not be included if the same argument value were used with `filter_include`.


### Reconnect and retry method decorators<a id="reconnect-and-retry-method-decorators"></a>

Please note, nearly all the client methods are decorated with the `@autoreconnect` and
the `@retrygrpc` decorators.

The `@autoreconnect` decorator will reconnect to a suitable node in the cluster when
the server to which the client has been connected has become unavailable, or when the
client's gRPC channel happens to have been closed. The client will also reconnect when
a method is called that requires a leader, and the client's node preference is to be
connected to a leader, but the node that the client has been connected to stops being
the leader. In this case, the client will reconnect to the current leader. After
reconnecting, the failed operation will be retried.

The `@retrygrpc` decorator selectively retries gRPC operations that have failed due to
a timeout, network error, or server error. It doesn't retry operations that fail due to
bad requests that will certainly fail again.

Please also note, the aspects not covered by the reconnect and retry decorator
behaviours have to do with methods that return iterators. For example, consider
the "read response" iterator returned from the `read_all()` method. The
`read_all()` method will have returned, and the method decorators will therefore
have exited, before iterating over the "read response" begins. Therefore, if a
connection issue occurs whilst iterating over the "read response", it isn't possible
for any decorator on the `read_all()` method to trigger a reconnection.

With the "catch-up subscription" objects, there is an initial "confirmation" response
from the server which is received and checked by the client. And so, when a call is
made to `subscribe_to_all()` or `subscribe_to_stream()`, if the server is unavailable,
or if the channel has somehow been closed, or if the request fails for some other reason,
then the client will reconnect and retry. However, if an exception is raised when iterating over a
successfully returned "catch-up subscription" object, the catch-up subscription will
need to be restarted. Similarly, when reading persistent subscriptions, if there are
connection issues whilst iterating over a successfully received response, the consumer
will need to be restarted.

## Instrumentation<a id="instrumentation"></a>

Instrumentation is the act of modifying software so that analysis can be performed on it.
Instrumentation helps enterprises reveal areas or features where users frequently
encounter errors or slowdowns in their software or platform.

Instrumentation helps you understand the inner state of your software systems.
Instrumented applications measure what code is doing when it responds to active
requests by collecting data such as metrics, events, logs, and traces.

Instrumentation provides immediate visibility into your application, often using
charts and graphs to illustrate what is going on “under the hood.”

This package supports instrumenting the EventStoreDB clients with OpenTelemetry.

### OpenTelemetry<a id="open-telemetry"></a>

The [OpenTelemetry](https://opentelemetry.io) project provides a collection of APIs,
SDKs, and tools for instrumenting, generating, collecting, and exporting telemetry data,
that can help you analyze your software’s performance and behavior. It is vendor-neutral,
100% Free and Open Source, and adopted and supported by industry leaders in the
observability space.

This package provides OpenTelemetry instrumentors for both the `EventStoreDBClient`
and the `AsyncEventStoreDBClient` clients. These instrumentors depend on various
OpenTelemetry Python packages, which you will need to install, preferably with this
project's "opentelemetry" package extra to ensure verified version compatibility.

For example, you can install the "opentelemetry" package extra with pip.

    $ pip install esdbclient[opentelemetry]

Or you can use Poetry to add it to your pyproject.toml file and install it.

    $ poetry add esdbclient[opentelemetry]


You can then use the OpenTelemetry instrumentor `EventStoreDBClientInstrumentor` to
instrument the `EventStoreDBClient`.

```python
from esdbclient.instrumentation.opentelemetry import EventStoreDBClientInstrumentor

# Activate instrumentation.
EventStoreDBClientInstrumentor().instrument()

# Deactivate instrumentation.
EventStoreDBClientInstrumentor().uninstrument()
```

You can also use the OpenTelemetry instrumentor `AsyncEventStoreDBClientInstrumentor`
to instrument the `AsyncEventStoreDBClient`.

```python
from esdbclient.instrumentation.opentelemetry import AsyncEventStoreDBClientInstrumentor

# Activate instrumentation.
AsyncEventStoreDBClientInstrumentor().instrument()

# Deactivate instrumentation.
AsyncEventStoreDBClientInstrumentor().uninstrument()
```

The instrumentors use a global OpenTelemetry "tracer provider", which you will need to
initialise in order to export telemetry data.

For example, to export data to the console you will need to install the Python
package `opentelemetry-sdk`, and use the class `TracerProvider`, `BatchSpanProcessor`,
and `ConsoleSpanExporter` in the following way.

```python
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import set_tracer_provider

resource = Resource.create(
    attributes={
        SERVICE_NAME: "eventstoredb",
    }
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
set_tracer_provider(provider)
```

Or to export to an OpenTelemetry compatible data collector, such as
[Jaeger](https://www.jaegertracing.io), you will need to install the Python package
`opentelemetry-exporter-otlp-proto-http`, and then use the class `OTLPSpanExporter`
from the `opentelemetry.exporter.otlp.proto.http.trace_exporter` module, with an
appropriate `endpoint` argument for your collector.

```python
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import set_tracer_provider

resource = Resource.create(
    attributes={
        SERVICE_NAME: "eventstoredb",
    }
)
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")))
set_tracer_provider(provider)
```

You can start Jaeger locally by running the following command.

    $ docker run -d -p 4318:4318 -p 16686:16686 --name jaeger jaegertracing/all-in-one:latest

You can then navigate to `http://localhost:16686` to access the Jaeger UI. And telemetry
data can be sent by an OpenTelemetry tracer provider to `http://localhost:4318/v1/traces`.

At this time, the instrumented methods are `append_to_stream()`, `subscribe_to_stream()`
`subscribe_to_all()`, `read_subscription_to_stream()`, `read_subscription_to_all()`.

The `append_to_stream()` method is instrumented by spanning the method call with a
"producer" span kind. It also adds span context information to the new event metadata
so that consumers can associate "consumer" spans with the "producer" span.

The subscription methods are instrumented by instrumenting the response iterators,
creating a "consumer" span for each recorded event received. It extracts span
context information from the recorded event metadata and associates the "consumer"
spans with a "producer" span, by making the "consumer" span a child of the "producer"
span.


## Communities<a id="communities"></a>

- [Issues](https://github.com/pyeventsourcing/esdbclient/issues)
- [Discuss](https://discuss.eventstore.com/)
- [Discord (Event Store)](https://discord.gg/Phn9pmCw3t)


## Contributors<a id="contributors"></a>

### Install Poetry<a id="install-poetry"></a>

The first thing is to check you have Poetry installed.

    $ poetry --version

If you don't, then please [install Poetry](https://python-poetry.org/docs/#installing-with-the-official-installer).

    $ curl -sSL https://install.python-poetry.org | python3 -

It will help to make sure Poetry's bin directory is in your `PATH` environment variable.

But in any case, make sure you know the path to the `poetry` executable. The Poetry
installer tells you where it has been installed, and how to configure your shell.

Please refer to the [Poetry docs](https://python-poetry.org/docs/) for guidance on
using Poetry.

### Setup for PyCharm users<a id="setup-for-pycharm-users"></a>

You can easily obtain the project files using PyCharm (menu "Git > Clone...").
PyCharm will then usually prompt you to open the project.

Open the project in a new window. PyCharm will then usually prompt you to create
a new virtual environment.

Create a new Poetry virtual environment for the project. If PyCharm doesn't already
know where your `poetry` executable is, then set the path to your `poetry` executable
in the "New Poetry Environment" form input field labelled "Poetry executable". In the
"New Poetry Environment" form, you will also have the opportunity to select which
Python executable will be used by the virtual environment.

PyCharm will then create a new Poetry virtual environment for your project, using
a particular version of Python, and also install into this virtual environment the
project's package dependencies according to the project's `poetry.lock` file.

You can add different Poetry environments for different Python versions, and switch
between them using the "Python Interpreter" settings of PyCharm. If you want to use
a version of Python that isn't installed, either use your favourite package manager,
or install Python by downloading an installer for recent versions of Python directly
from the [Python website](https://www.python.org/downloads/).

Once project dependencies have been installed, you should be able to run tests
from within PyCharm (right-click on the `tests` folder and select the 'Run' option).

Because of a conflict between pytest and PyCharm's debugger and the coverage tool,
you may need to add ``--no-cov`` as an option to the test runner template. Alternatively,
just use the Python Standard Library's ``unittest`` module.

You should also be able to open a terminal window in PyCharm, and run the project's
Makefile commands from the command line (see below).

### Setup from command line<a id="setup-from-command-line"></a>

Obtain the project files, using Git or suitable alternative.

In a terminal application, change your current working directory
to the root folder of the project files. There should be a Makefile
in this folder.

Use the Makefile to create a new Poetry virtual environment for the
project and install the project's package dependencies into it,
using the following command.

    $ make install-packages

It's also possible to also install the project in 'editable mode'.

    $ make install

Please note, if you create the virtual environment in this way, and then try to
open the project in PyCharm and configure the project to use this virtual
environment as an "Existing Poetry Environment", PyCharm sometimes has some
issues (don't know why) which might be problematic. If you encounter such
issues, you can resolve these issues by deleting the virtual environment
and creating the Poetry virtual environment using PyCharm (see above).

### Project Makefile commands<a id="project-makefile-commands"></a>

You can start EventStoreDB using the following command.

    $ make start-eventstoredb

You can run tests using the following command (needs EventStoreDB to be running).

    $ make test

You can stop EventStoreDB using the following command.

    $ make stop-eventstoredb

You can check the formatting of the code using the following command.

    $ make lint

You can reformat the code using the following command.

    $ make fmt

Tests belong in `./tests`. Code-under-test belongs in `./esdbclient`.

Edit package dependencies in `pyproject.toml`. Update installed packages (and the
`poetry.lock` file) using the following command.

    $ make update-packages

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/pyeventsourcing/esdbclient",
    "name": "esdbclient",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": "John Bywater",
    "author_email": "john.bywater@appropriatesoftware.net",
    "download_url": "https://files.pythonhosted.org/packages/16/ea/39338bea95be626ef583c88db91d4e7ff27e339e6754126db5a8e0326d49/esdbclient-1.1.3.tar.gz",
    "platform": null,
    "description": "# Python gRPC Client for EventStoreDB\n\nThis [Python package](https://pypi.org/project/esdbclient/) provides multithreaded and asyncio Python\nclients for the [EventStoreDB](https://www.eventstore.com/) database.\n\nThe multithreaded `EventStoreDBClient` is described in detail below. Please scroll\ndown for <a href=\"#asyncio-client\">information</a> about `AsyncEventStoreDBClient`.\n\nThese clients have been developed and are being maintained in a collaboration\nwith the EventStoreDB team, and are officially support by Event Store Ltd.\nAlthough not all aspects of the EventStoreDB gRPC API are implemented, many\nof the most useful features are presented in an easy-to-use interface.\n\nThese clients have been tested to work with EventStoreDB LTS versions 22.10 and 23.10,\nand release candidates 24.2 and 24.6, without and without SSL/TLS, with both single-server\nand cluster modes, and with Python versions 3.8, 3.9, 3.10, 3.11 and 3.12.\n\nThe test suite has 100% line and branch coverage. The code has typing annotations\nchecked strictly with mypy. The code is formatted with black and isort, and checked\nwith flake8. Poetry is used for package management during development, and for\nbuilding and publishing distributions to [PyPI](https://pypi.org/project/esdbclient/).\n\nFor an example of usage, see the [eventsourcing-eventstoredb](\nhttps://github.com/pyeventsourcing/eventsourcing-eventstoredb) package.\n\n\n<!-- TOC -->\n* [Synopsis](#synopsis)\n* [Install package](#install-package)\n  * [From PyPI](#from-pypi)\n  * [With Poetry](#with-poetry)\n* [EventStoreDB server](#eventstoredb-server)\n  * [Run container](#run-container)\n  * [Stop container](#stop-container)\n* [EventStoreDB client](#eventstoredb-client)\n  * [Import class](#import-class)\n  * [Construct client](#construct-client)\n* [Connection strings](#connection-strings)\n  * [Two schemes](#two-schemes)\n  * [User info string](#user-info-string)\n  * [Query string](#query-string)\n  * [Examples](#examples)\n* [Event objects](#event-objects)\n  * [New events](#new-events)\n  * [Recorded events](#recorded-events)\n* [Streams](#streams)\n  * [Append events](#append-events)\n  * [Idempotent append operations](#idempotent-append-operations)\n  * [Read stream events](#read-stream-events)\n  * [Get current version](#get-current-version)\n  * [How to implement snapshotting with EventStoreDB](#how-to-implement-snapshotting-with-eventstoredb)\n  * [Read all events](#read-all-events)\n  * [Get commit position](#get-commit-position)\n  * [Get stream metadata](#get-stream-metadata)\n  * [Set stream metadata](#set-stream-metadata)\n  * [Delete stream](#delete-stream)\n  * [Tombstone stream](#tombstone-stream)\n* [Catch-up subscriptions](#catch-up-subscriptions)\n  * [Subscribe to all events](#subscribe-to-all-events)\n  * [Subscribe to stream events](#subscribe-to-stream-events)\n  * [How to implement exactly-once event processing](#how-to-implement-exactly-once-event-processing)\n* [Persistent subscriptions](#persistent-subscriptions)\n  * [Create subscription to all](#create-subscription-to-all)\n  * [Read subscription to all](#read-subscription-to-all)\n  * [How to write a persistent subscription consumer](#how-to-write-a-persistent-subscription-consumer)\n  * [Update subscription to all](#update-subscription-to-all)\n  * [Create subscription to stream](#create-subscription-to-stream)\n  * [Read subscription to stream](#read-subscription-to-stream)\n  * [Update subscription to stream](#update-subscription-to-stream)\n  * [Replay parked events](#replay-parked-events)\n  * [Get subscription info](#get-subscription-info)\n  * [List subscriptions](#list-subscriptions)\n  * [List subscriptions to stream](#list-subscriptions-to-stream)\n  * [Delete subscription](#delete-subscription)\n* [Projections](#projections)\n  * [Create projection](#create-projection)\n  * [Get projection state](#get-projection-state)\n  * [Get projection statistics](#get-projection-statistics)\n  * [Update projection](#update-projection)\n  * [Enable projection](#enable-projection)\n  * [Disable projection](#disable-projection)\n  * [Reset projection](#reset-projection)\n  * [Delete projection](#delete-projection)\n  * [Restart projections subsystem](#restart-projections-subsystem)\n* [Call credentials](#call-credentials)\n  * [Construct call credentials](#construct-call-credentials)\n* [Connection](#connection)\n  * [Reconnect](#reconnect)\n  * [Close](#close)\n* [Asyncio client](#asyncio-client)\n  * [Synopsis](#synopsis-1)\n  * [FastAPI](#fastapi)\n* [Notes](#notes)\n  * [Regular expression filters](#regular-expression-filters)\n  * [Reconnect and retry method decorators](#reconnect-and-retry-method-decorators)\n* [Instrumentation](#instrumentation)\n  * [OpenTelemetry](#open-telemetry)\n* [Communities](#communities)\n* [Contributors](#contributors)\n  * [Install Poetry](#install-poetry)\n  * [Setup for PyCharm users](#setup-for-pycharm-users)\n  * [Setup from command line](#setup-from-command-line)\n  * [Project Makefile commands](#project-makefile-commands)\n<!-- TOC -->\n\n## Synopsis<a id=\"synopsis\"></a>\n\nThe `EventStoreDBClient` class can be imported from the `esdbclient` package.\n\nProbably the three most useful methods of `EventStoreDBClient` are:\n\n* `append_to_stream()` This method can be used to record new events in a particular\n\"stream\". This is useful, for example, when executing a command in an application\nthat mutates an aggregate. This method is \"atomic\" in that either all or none of\nthe events will be recorded.\n\n* `get_stream()` This method can be used to retrieve all the recorded\nevents in a \"stream\". This is useful, for example, when reconstructing\nan aggregate from recorded events before executing a command in an\napplication that creates new events.\n\n* `subscribe_to_all()` This method can be used to receive all recorded events in\nthe database. This is useful, for example, in event-processing components because\nit supports processing events with \"exactly-once\" semantics.\n\nThe example below uses an \"insecure\" EventStoreDB server running locally on port 2113.\n\n```python\nimport uuid\n\nfrom esdbclient import EventStoreDBClient, NewEvent, StreamState\n\n\n# Construct EventStoreDBClient with an EventStoreDB URI. The\n# connection string URI specifies that the client should\n# connect to an \"insecure\" server running on port 2113.\n\nclient = EventStoreDBClient(\n    uri=\"esdb://localhost:2113?Tls=false\"\n)\n\n\n# Generate new events. Typically, domain events of different\n# types are generated in a domain model, and then serialized\n# into NewEvent objects. An aggregate ID may be used as the\n# name of a stream in EventStoreDB.\n\nstream_name1 = str(uuid.uuid4())\nevent1 = NewEvent(\n    type='OrderCreated',\n    data=b'{\"order_number\": \"123456\"}'\n)\nevent2 = NewEvent(\n    type='OrderSubmitted',\n    data=b'{}'\n)\nevent3 = NewEvent(\n    type='OrderCancelled',\n    data=b'{}'\n)\n\n\n# Append new events to a new stream. The value returned\n# from the append_to_stream() method is the overall\n# \"commit position\" in the database of the last new event\n# recorded by this operation. The returned \"commit position\"\n# may be used in a user interface to poll an eventually\n# consistent event-processing component until it can\n# present an up-to-date materialized view. New events are\n# each allocated a \"stream position\", which is the next\n# available position in the stream, starting from 0.\n\ncommit_position1 = client.append_to_stream(\n    stream_name=stream_name1,\n    current_version=StreamState.NO_STREAM,\n    events=[event1, event2],\n)\n\n# Append events to an existing stream. The \"current version\"\n# is the \"stream position\" of the last recorded event in a\n# stream. We have recorded two new events, so the \"current\n# version\" is 1. The exception 'WrongCurrentVersion' will be\n# raised if an incorrect value is given.\n\ncommit_position2 = client.append_to_stream(\n    stream_name=stream_name1,\n    current_version=1,\n    events=[event3],\n)\n\n# - allocated commit positions increase monotonically\nassert commit_position2 > commit_position1\n\n\n# Get events recorded in a stream. This method returns\n# a sequence of recorded event objects. The recorded\n# event objects may be deserialized to domain event\n# objects of different types and used to reconstruct\n# an aggregate in a domain model.\n\nrecorded_events = client.get_stream(\n    stream_name=stream_name1\n)\n\n# - stream 'stream_name1' now has three events\nassert len(recorded_events) == 3\n\n# - allocated stream positions are zero-based and gapless\nassert recorded_events[0].stream_position == 0\nassert recorded_events[1].stream_position == 1\nassert recorded_events[2].stream_position == 2\n\n# - event attribute values are recorded faithfully\nassert recorded_events[0].type == \"OrderCreated\"\nassert recorded_events[0].data == b'{\"order_number\": \"123456\"}'\nassert recorded_events[0].id == event1.id\n\nassert recorded_events[1].type == \"OrderSubmitted\"\nassert recorded_events[1].data == b'{}'\nassert recorded_events[1].id == event2.id\n\nassert recorded_events[2].type == \"OrderCancelled\"\nassert recorded_events[2].data == b'{}'\nassert recorded_events[2].id == event3.id\n\n\n# Start a catch-up subscription from last recorded position.\n# This method returns a \"catch-up subscription\" object,\n# which can be iterated over to obtain recorded events.\n# The iterator will not stop when there are no more recorded\n# events to be returned, but instead will block, and then continue\n# when further events are recorded. It can be used as a context\n# manager so that the underlying streaming gRPC call to the database\n# can be cancelled cleanly in case of any error.\n\nreceived_events = []\nwith client.subscribe_to_all(commit_position=0) as subscription:\n\n    # Iterate over the catch-up subscription. Process each recorded\n    # event in turn. Within an atomic database transaction, record\n    # the event's \"commit position\" along with any new state generated\n    # by processing the event. Use the component's last recorded commit\n    # position when restarting the catch-up subscription.\n\n    for event in subscription:\n        received_events.append(event)\n\n        if event.commit_position == commit_position2:\n            # Break so we can continue with the example.\n            break\n\n\n# - events are received in the order they were recorded\nassert received_events[-3].type == \"OrderCreated\"\nassert received_events[-3].data == b'{\"order_number\": \"123456\"}'\nassert received_events[-3].id == event1.id\n\nassert received_events[-2].type == \"OrderSubmitted\"\nassert received_events[-2].data == b'{}'\nassert received_events[-2].id == event2.id\n\nassert received_events[-1].type == \"OrderCancelled\"\nassert received_events[-1].data == b'{}'\nassert received_events[-1].id == event3.id\n\n\n# Close the client's gRPC connection.\n\nclient.close()\n```\n\n\n## Install package<a id=\"install-package\"></a>\n\nIt is recommended to install Python packages into a Python virtual environment.\n\n### From PyPI<a id=\"from-pypi\"></a>\n\nYou can use pip to install this package directly from\n[the Python Package Index](https://pypi.org/project/esdbclient/).\n\n    $ pip install esdbclient\n\n### With Poetry<a id=\"with-poetry\"></a>\n\nYou can use Poetry to add this package to your pyproject.toml and install it.\n\n    $ poetry add esdbclient\n\n## EventStoreDB server<a id=\"eventstoredb-server\"></a>\n\nThe EventStoreDB server can be run locally using the official Docker container image.\n\n### Run container<a id=\"run-container\"></a>\n\nFor development, you can run a \"secure\" EventStoreDB server using the following command.\n\n    $ docker run -d --name eventstoredb-secure -it -p 2113:2113 --env \"HOME=/tmp\" docker.eventstore.com/eventstore-ce/eventstoredb-ce:23.10.0-jammy --dev\n\nAs we will see, your client will need an EventStoreDB connection string URI as the value\nof its `uri` constructor argument. The connection string for this \"secure\" EventStoreDB\nserver would be:\n\n    esdb://admin:changeit@localhost:2113\n\nTo connect to a \"secure\" server, you will usually need to include a \"username\"\nand a \"password\" in the connection string, so that the server can authenticate the\nclient. With EventStoreDB, the default username is \"admin\" and the default password\nis \"changeit\".\n\nWhen connecting to a \"secure\" server, you may also need to provide an SSL/TLS certificate\nas the value of the `root_certificates` constructor argument. If the server certificate\nis publicly signed, the root certificates of the certificate authority may be installed\nlocally and picked up by the grpc package from a default location. The client uses the\nroot SSL/TLS certificate to authenticate the server. For development, you can either\nuse the SSL/TLS certificate of a self-signing certificate authority used to create the\nserver's certificate. Or, when using a single-node cluster, you can just use the server\ncertificate itself, getting the server certificate with the following Python code.\n\n```python\nimport ssl\n\nserver_certificate = ssl.get_server_certificate(addr=('localhost', 2113))\n```\n\nAlternatively, you can start an \"insecure\" server using the following command.\n\n    $ docker run -d --name eventstoredb-insecure -it -p 2113:2113 docker.eventstore.com/eventstore-ce/eventstoredb-ce:23.10.0-jammy --insecure\n\nThe connection string URI for this \"insecure\" server would be:\n\n    esdb://localhost:2113?Tls=false\n\nAs we will see, when connecting to an \"insecure\" server, there is no need to include\na \"username\" and a \"password\" in the connection string. If you do, these values will\nbe ignored by the client, so that they are not sent over an insecure channel.\n\nPlease note, the \"insecure\" connection string uses a query string with the field-value\n`Tls=false`. The value of this field is by default `true`.\n\n### Stop container<a id=\"stop-container\"></a>\n\nTo stop and remove the \"secure\" container, use the following Docker commands.\n\n    $ docker stop eventstoredb-secure\n\t$ docker rm eventstoredb-secure\n\nTo stop and remove the \"insecure\" container, use the following Docker commands.\n\n    $ docker stop eventstoredb-insecure\n\t$ docker rm eventstoredb-insecure\n\n\n## EventStoreDB client<a id=\"eventstoredb-client\"></a>\n\nThis EventStoreDB client is implemented in the `esdbclient` package with\nthe `EventStoreDBClient` class.\n\n### Import class<a id=\"import-class\"></a>\n\nThe `EventStoreDBClient` class can be imported from the `esdbclient` package.\n\n```python\nfrom esdbclient import EventStoreDBClient\n```\n\n### Construct client<a id=\"construct-client\"></a>\n\nThe `EventStoreDBClient` class has one required constructor argument, `uri`, and three\noptional constructor argument, `root_certificates`, `private_key`, and `certificate_chain`.\n\nThe `uri` argument is expected to be an EventStoreDB connection string URI that\nconforms with the standard EventStoreDB \"esdb\" or \"esdb+discover\" URI schemes.\n\nThe client must be configured to create a \"secure\" connection to a \"secure\" server,\nor alternatively an \"insecure\" connection to an \"insecure\" server. By default, the\nclient will attempt to create a \"secure\" connection. And so, when connecting to an\n\"insecure\" server, the connection string must specify that the client should attempt\nto make an \"insecure\" connection by using the URI query string field-value `Tls=false`.\n\nThe optional `root_certificates` argument can be either a Python `str` or a Python `bytes`\nobject containing PEM encoded SSL/TLS certificate(s), and is used to authenticate the\nserver to the client. When connecting to an \"insecure\" service, the value of this\nargument will be ignored. When connecting to a \"secure\" server, it may be necessary to\nset this argument. Typically, the value of this argument would be the public certificate\nof the certificate authority that was responsible for generating the certificate used by\nthe EventStoreDB server. It is unnecessary to set this value in this case if certificate\nauthority certificates are installed locally, such that the Python grpc library can pick\nthem up from a default location. Alternatively, for development, you can use the server's\ncertificate itself. The value of this argument is passed directly to `grpc.ssl_channel_credentials()`.\n\nAn alternative way to supply the `root_certificates` argument is through the `tlsCaFile` field-value of the connection string URI query string (see below). If the `tlsCaFile` field-value is specified, the `root_certificates` argument will be ignored.\n\nThe optional `private_key` and `certificate_chain` arguments are both either a Python\n`str` or a Python `bytes` object. These arguments may be used to authenticate the client\nto the server. It is necessary to provide correct values for these arguments when connecting\nto a \"secure\" server that is running the commercial edition of EventStoreDB with the\nUser Certificates plugin enabled. The value of `private_key` should be the X.509 user\ncertificate's private key in PEM format. The value of `certificate_chain` should be the\nX.509 user certificate itself in PEM format. The values of these arguments are passed\ndirectly to `grpc.ssl_channel_credentials()`. When connecting to an \"insecure\" service,\nthe values of these arguments will be ignored. Please note, an alternative way of\nsupplying the client with a user certificate and private key is to use the `UserCertFile`\nand `UserKeyFile` field-values of the connection string URI query string (see below).\nIf the `UserCertFile` field-value is specified, the `certificate_chain` argument will be\nignored. If the `UserKeyFile` field-value is specified, the `public_key` argument will be\nignored.\n\nIn the example below, constructor argument values for `uri` and `root_certificates` are\ntaken from the operating system environment.\n\n```python\nimport os\n\nclient = EventStoreDBClient(\n    uri=os.getenv(\"ESDB_URI\"),\n    root_certificates=os.getenv(\"ESDB_ROOT_CERTIFICATES\"),\n)\n```\n\n## Connection strings<a id=\"connection-strings\"></a>\n\nAn EventStoreDB connection string is a URI that conforms with one of two possible\nschemes: either the \"esdb\" scheme, or the \"esdb+discover\" scheme.\n\nThe syntax and semantics of the EventStoreDB URI schemes are described below. The\nsyntax is defined using [EBNF](https://en.wikipedia.org/wiki/Extended_Backus\u2013Naur_form).\n\n### Two schemes<a id=\"two-schemes\"></a>\n\nThe \"esdb\" URI scheme can be defined in the following way.\n\n    esdb-uri = \"esdb://\" , [ user-info , \"@\" ] , grpc-target, { \",\" , grpc-target } , [ \"?\" , query-string ] ;\n\nIn the \"esdb\" URI scheme, after the optional user info string, there must be at least\none gRPC target. If there are several gRPC targets, they must be separated from each\nother with the \",\" character.\n\nEach gRPC target should indicate an EventStoreDB gRPC server socket, all in the same\nEventStoreDB cluster, by specifying a host and a port number separated with the \":\"\ncharacter. The host may be a hostname that can be resolved to an IP address, or an IP\naddress.\n\n    grpc-target = ( hostname | ip-address ) , \":\" , port-number ;\n\nIf there is one gRPC target, the client will simply attempt to connect to this\nserver, and it will use this connection when recording and retrieving events.\n\nIf there are two or more gRPC targets, the client will attempt to connect to the\nGossip API of each in turn, attempting to obtain information about the cluster from\nit, until information about the cluster is obtained. A member of the cluster is then\nselected by the client according to the \"node preference\" specified by the connection\nstring URI. The client will then close its connection and connect to the selected node\nwithout the 'round robin' load balancing strategy. If the \"node preference\" is \"leader\",\nand after connecting to a leader, if the leader becomes a follower, the client will\nreconnect to the new leader.\n\n\nThe \"esdb+discover\" URI scheme can be defined in the following way.\n\n    esdb-discover-uri = \"esdb+discover://\" , [ user-info, \"@\" ] , cluster-domainname, [ \":\" , port-number ] , [ \"?\" , query-string ] ;\n\nIn the \"esdb+discover\" URI scheme, after the optional user info string, there should be\na domain name which identifies a cluster of EventStoreDB servers. Individual nodes in\nthe cluster should be declared with DNS 'A' records.\n\nThe client will use the cluster domain name with the gRPC library's 'round robin' load\nbalancing strategy to call the Gossip APIs of addresses discovered from DNS 'A' records.\nInformation about the EventStoreDB cluster is obtained from the Gossip API. A member of\nthe cluster is then selected by the client according to the \"node preference\" option.\nThe client will then close its connection and connect to the selected node without the\n'round robin' load balancing strategy. If the \"node preference\" is \"leader\",\nand after connecting to a leader, if the leader becomes a follower, the client will\nreconnect to the new leader.\n\n### User info string<a id=\"user-info-string\"></a>\n\nIn both the \"esdb\" and \"esdb+discover\" schemes, the URI may include a user info string.\nIf it exists in the URI, the user info string must be separated from the rest of the URI\nwith the \"@\" character. The user info string must include a username and a password,\nseparated with the \":\" character.\n\n    user-info = username , \":\" , password ;\n\nThe user info is sent by the client in a \"basic auth\" authorization header in each gRPC\ncall to a \"secure\" server. This authorization header is used by the server to authenticate\nthe client. The Python gRPC library does not allow call credentials to be transferred to\n\"insecure\" servers.\n\n### Query string<a id=\"query-string\"></a>\n\nIn both the \"esdb\" and \"esdb+discover\" schemes, the optional query string must be one\nor many field-value arguments, separated from each other with the \"&\" character.\n\n    query-string = field-value, { \"&\", field-value } ;\n\nEach field-value argument must be one of the supported fields, and an\nappropriate value, separated with the \"=\" character.\n\n    field-value = ( \"Tls\", \"=\" , \"true\" | \"false\" )\n                | ( \"TlsVerifyCert\", \"=\" , \"true\" | \"false\" )\n                | ( \"ConnectionName\", \"=\" , string )\n                | ( \"NodePreference\", \"=\" , \"leader\" | \"follower\" | \"readonlyreplica\" | \"random\" )\n                | ( \"DefaultDeadline\", \"=\" , integer )\n                | ( \"GossipTimeout\", \"=\" , integer )\n                | ( \"MaxDiscoverAttempts\", \"=\" , integer )\n                | ( \"DiscoveryInterval\", \"=\" , integer )\n                | ( \"KeepAliveInterval\", \"=\" , integer )\n                | ( \"KeepAliveTimeout\", \"=\" , integer ) ;\n                | ( \"TlsCaFile\", \"=\" , string ) ;\n                | ( \"UserCertFile\", \"=\" , string ) ;\n                | ( \"UserKeyFile\", \"=\" , string ) ;\n\nThe table below describes the query string field-values supported by this client.\n\n| Field               | Value                                                                 | Description                                                                                                                                                       |\n|---------------------|-----------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| Tls                 | \"true\", \"false\" (default: \"true\")                                     | If \"true\" the client will create a \"secure\" gRPC channel. If \"false\" the client will create an \"insecure\" gRPC channel. This must match the server configuration. |\n| TlsVerifyCert       | \"true\", \"false\" (default: \"true\")                                     | This value is currently ignored.                                                                                                                                  |\n| ConnectionName      | string (default: auto-generated version-4 UUID)                       | Sent in call metadata for every call, to identify the client to the cluster.                                                                                      |\n| NodePreference      | \"leader\", \"follower\", \"readonlyreplica\", \"random\" (default: \"leader\") | The node state preferred by the client. The client will select a node from the cluster info received from the Gossip API according to this preference.            |\n| DefaultDeadline     | integer (default: `None`)                                             | The default value (in seconds) of the `timeout` argument of client \"write\" methods such as `append_to_stream()`.                                                  |\n| GossipTimeout       | integer (default: 5)                                                  | The default value (in seconds) of the `timeout` argument of gossip read methods, such as `read_gossip()`.                                                         |\n| MaxDiscoverAttempts | integer (default: 10)                                                 | The number of attempts to read gossip when connecting or reconnecting to a cluster member.                                                                        |\n| DiscoveryInterval   | integer (default: 100)                                                | How long to wait (in milliseconds) between gossip retries.                                                                                                        |\n| KeepAliveInterval   | integer (default: `None`)                                             | The value (in milliseconds) of the \"grpc.keepalive_ms\" gRPC channel option.                                                                                       |\n| KeepAliveTimeout    | integer (default: `None`)                                             | The value (in milliseconds) of the \"grpc.keepalive_timeout_ms\" gRPC channel option.                                                                               |\n| TlsCaFile           | string (default: `None`)                                              | Absolute filesystem path to file containing the CA certicate in PEM format. This will be used to verify the server's certificate.                                 |\n| UserCertFile        | string (default: `None`)                                              | Absolute filesystem path to file containing the X.509 user certificate in PEM format.                                                                             |\n| UserKeyFile         | string (default: `None`)                                              | Absolute filesystem path to file containing the X.509 user certificate's private key in PEM format.                                                               |\n\n\nPlease note, the client is insensitive to the case of fields and values. If fields are\nrepeated in the query string, the query string will be parsed without error. However,\nthe connection options used by the client will use the value of the first field. All\nthe other field-values in the query string with the same field name will be ignored.\nFields without values will also be ignored.\n\nIf the client's node preference is \"follower\" and there are no follower\nnodes in the cluster, then the client will raise an exception. Similarly, if the\nclient's node preference is \"readonlyreplica\" and there are no read-only replica\nnodes in the cluster, then the client will also raise an exception.\n\nThe gRPC channel option \"grpc.max_receive_message_length\" is automatically\nconfigured to the value `17 * 1024 * 1024`. This value cannot be configured.\n\n\n### Examples<a id=\"examples\"></a>\n\nHere are some examples of EventStoreDB connection string URIs.\n\nThe following URI will cause the client to make an \"insecure\" connection to\ngRPC target `'localhost:2113'`. Because the client's node preference is \"follower\",\nmethods that can be called on a follower should complete successfully, methods that\nrequire a leader will raise a `NodeIsNotLeader` exception.\n\n    esdb://127.0.0.1:2113?Tls=false&NodePreference=follower\n\nThe following URI will cause the client to make an \"insecure\" connection to\ngRPC target `'localhost:2113'`. Because the client's node preference is \"leader\",\nif this node is not a leader, then a `NodeIsNotLeader` exception will be raised by\nall methods.\n\n    esdb://127.0.0.1:2113?Tls=false&NodePreference=leader\n\nThe following URI will cause the client to make a \"secure\" connection to\ngRPC target `'localhost:2113'` with username `'admin'` and password `'changeit'`\nas the default call credentials when making calls to the EventStoreDB gRPC API.\nBecause the client's node preference is \"leader\", by default, if this node is not\na leader, then a `NodeIsNotLeader` exception will be raised by all methods.\n\n    esdb://admin:changeit@localhost:2113\n\nThe following URI will cause the client to make \"secure\" connections, firstly to\nget cluster info from either `'localhost:2111'`, or `'localhost:2112'`, or `'localhost:2113'`.\nBecause the client's node preference is \"leader\", the client will select the leader\nnode from the cluster info and reconnect to the leader. If the \"leader\" node becomes\na \"follower\" and another node becomes \"leader\", then the client will reconnect to the\nnew leader.\n\n    esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=leader\n\n\nThe following URI will cause the client to make \"secure\" connections, firstly to\nget cluster info from either `'localhost:2111'`, or `'localhost:2112'`, or `'localhost:2113'`.\nBecause the client's node preference is \"follower\", the client will select a follower\nnode from the cluster info and reconnect to this follower. Please note, if the \"follower\"\nnode becomes the \"leader\", the client will not reconnect to a follower -- such behavior\nmay be implemented in a future version of the client and server.\n\n    esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?NodePreference=follower\n\n\nThe following URI will cause the client to make \"secure\" connections, firstly to get\ncluster info from addresses in DNS 'A' records for `'cluster1.example.com'`, and then\nto connect to a \"leader\" node. The client will use a default timeout\nof 5 seconds when making calls to EventStore API \"write\" methods.\n\n    esdb+discover://admin:changeit@cluster1.example.com?DefaultDeadline=5\n\n\nThe following URI will cause the client to make \"secure\" connections, firstly to get\ncluster info from addresses in DNS 'A' records for `'cluster1.example.com'`, and then\nto connect to a \"leader\" node. It will configure gRPC connections with a \"keep alive\ninterval\" and a \"keep alive timeout\".\n\n    esdb+discover://admin:changeit@cluster1.example.com?KeepAliveInterval=10000&KeepAliveTimeout=10000\n\n\n## Event objects<a id=\"event-objects\"></a>\n\nThis package defines a `NewEvent` class and a `RecordedEvent` class. The\n`NewEvent` class should be used when writing events to the database. The\n`RecordedEvent` class is used when reading events from the database.\n\n### New events<a id=\"new-events\"></a>\n\nThe `NewEvent` class should be used when writing events to an EventStoreDB database.\nYou will need to construct new event objects before calling `append_to_stream()`.\n\nThe `NewEvent` class is a frozen Python dataclass. It has two required constructor\narguments (`type` and `data`) and three optional constructor arguments (`metadata`,\n`content_type` and `id`).\n\nThe required `type` argument is a Python `str`, used to describe the type of\ndomain event that is being recorded.\n\nThe required `data` argument is a Python `bytes` object, used to state the\nserialized data of the domain event that is being recorded.\n\nThe optional `metadata` argument is a Python `bytes` object, used to indicate any\nmetadata of the event that will be recorded. The default value is an empty `bytes`\nobject.\n\nThe optional `content_type` argument is a Python `str`, used to indicate the\nkind of data that is being recorded. The default value is `'application/json'`,\nwhich indicates that the `data` was serialised using JSON. An alternative value\nfor this argument is the more general indication `'application/octet-stream'`.\n\nThe optional `id` argument is a Python `UUID` object, used to specify the unique ID\nof the event that will be recorded. If no value is provided, a new version-4 UUID\nwill be generated.\n\n```python\nnew_event1 = NewEvent(\n    type='OrderCreated',\n    data=b'{\"name\": \"Greg\"}',\n)\nassert new_event1.type == 'OrderCreated'\nassert new_event1.data == b'{\"name\": \"Greg\"}'\nassert new_event1.metadata == b''\nassert new_event1.content_type == 'application/json'\nassert isinstance(new_event1.id, uuid.UUID)\n\nevent_id = uuid.uuid4()\nnew_event2 = NewEvent(\n    type='ImageCreated',\n    data=b'01010101010101',\n    metadata=b'{\"a\": 1}',\n    content_type='application/octet-stream',\n    id=event_id,\n)\nassert new_event2.type == 'ImageCreated'\nassert new_event2.data == b'01010101010101'\nassert new_event2.metadata == b'{\"a\": 1}'\nassert new_event2.content_type == 'application/octet-stream'\nassert new_event2.id == event_id\n```\n\n### Recorded events<a id=\"recorded-events\"></a>\n\nThe `RecordedEvent` class is used when reading events from an EventStoreDB\ndatabase. The client will return event objects of this type from all methods\nthat return recorded events, such as `get_stream()`, `subscribe_to_all()`,\nand `read_subscription_to_all()`. You do not need to construct recorded event objects.\n\nLike `NewEvent`, the `RecordedEvent` class is a frozen Python dataclass. It has\nall the attributes that `NewEvent` has (`type`, `data`, `metadata`, `content_type`, `id`)\nthat follow from an event having been recorded, and some additional attributes that follow\nfrom the recording of an event (`stream_name`, `stream_position`, `commit_position`,\n`recorded_at`). It also has a `link` attribute, which is `None` unless the recorded\nevent is a \"link event\" that has been \"resolved\" to the linked event. And it has a\n`retry_count` which has an integer value when receiving recorded events from persistence\nsubscriptions, otherwise the value of `retry_count` is `None`.\n\nThe `type` attribute is a Python `str`, used to indicate the type of an event\nthat was recorded.\n\nThe `data` attribute is a Python `bytes` object, used to indicate the data of an\nevent that was recorded.\n\nThe `metadata` attribute is a Python `bytes` object, used to indicate the metadata of\nan event that was recorded.\n\nThe `content_type` attribute is a Python `str`, used to indicate the type of\ndata that was recorded for an event. It is usually `'application/json'`, indicating\nthat the data can be parsed as JSON. Alternatively, it is `'application/octet-stream'`.\n\nThe `id` attribute is a Python `UUID` object, used to indicate the unique ID of an\nevent that was recorded.\n\nThe `stream_name` attribute is a Python `str`, used to indicate the name of a\nstream in which an event was recorded.\n\nThe `stream_position` attribute is a Python `int`, used to indicate the position in a\nstream at which an event was recorded.\n\nIn EventStoreDB, a \"stream position\" is an integer representing the position of a\nrecorded event in a stream. Each recorded event is recorded at a position in a stream.\nEach stream position is occupied by only one recorded event. New events are recorded at the\nnext unoccupied position. All sequences of stream positions are zero-based and gapless.\n\nThe `commit_position` attribute is a Python `int`, used to indicate the position in the\ndatabase at which an event was recorded.\n\nIn EventStoreDB, a \"commit position\" is an integer representing the position of a\nrecorded event in the database. Each recorded event is recorded at a position in the\ndatabase. Each commit position is occupied by only one recorded event. Commit positions\nare zero-based and increase monotonically as new events are recorded. But, unlike stream\npositions, the sequence of successive commit positions is not gapless. Indeed, there are\nusually large differences between the commit positions of successively recorded events.\n\nPlease note, in EventStoreDB 21.10, the `commit_position` of all `RecordedEvent` objects\nobtained from `read_stream()` is `None`, whereas those obtained from `read_all()` have\nthe actual commit position of the recorded event. This was changed in version 22.10, so\nthat event objects obtained from both `get_stream()` and `read_all()` have the actual\ncommit position. The `commit_position` attribute of the `RecordedEvent` class is\nannotated with the type `Optional[int]` for this reason only.\n\nThe `recorded_at` attribute is a Python `datetime`, used to indicate when an event was\nrecorded by the database.\n\nThe `link` attribute is an optional `RecordedEvent` that carries information about\na \"link event\" that has been \"resolved\" to the linked event. This allows access to\nthe link event attributes when link events have been resolved, for example access\nto the correct event ID to be used when acknowledging or negatively acknowledging\nlink events. Link events are \"resolved\" when the `resolve_links` argument is `True`\nand when replaying parked events (negatively acknowledging an event received from\na persistent subscription with the `'park'` action will create a link event, and\nwhen parked event are replayed they are received as resolved events). The\n`ack_id` property helps with obtaining the correct event ID to use when acknowledging\nor negatively acknowledging events received from persistent subscriptions.\n\nThe `retry_count` is a Python `int`, used to indicate the number of times a persistent\nsubscription has retried sending the event to a consumer.\n\n\n```python\nfrom dataclasses import dataclass\nfrom datetime import datetime\n\n@dataclass(frozen=True)\nclass RecordedEvent:\n    \"\"\"\n    Encapsulates event data that has been recorded in EventStoreDB.\n    \"\"\"\n\n    type: str\n    data: bytes\n    metadata: bytes\n    content_type: str\n    id: UUID\n    stream_name: str\n    stream_position: int\n    commit_position: Optional[int]\n    recorded_at: Optional[datetime] = None\n    link: Optional[\"RecordedEvent\"] = None\n    retry_count: Optional[int] = None\n\n    @property\n    def ack_id(self) -> UUID:\n        if self.link is not None:\n            return self.link.id\n        else:\n            return self.id\n\n    @property\n    def is_system_event(self) -> bool:\n        return self.type.startswith(\"$\")\n\n    @property\n    def is_link_event(self) -> bool:\n        return self.type == \"$>\"\n\n    @property\n    def is_resolved_event(self) -> bool:\n        return self.link is not None\n\n    @property\n    def is_checkpoint(self) -> bool:\n        return False\n```\n\nThe property `ack_id` can be used to obtain the correct event ID to `ack()` or `nack()`\nevents received when reading persistent subscriptions. The returned value is either the\nvalue of the `id` attribute of the `link` attribute, if `link` is not `None`, otherwise\nit is the value of the `id` attribute.\n\nThe property `is_system_event` indicates whether the event is a \"system event\". System\nevents have a `type` value that starts with `'$'`.\n\nThe property `is_link_event` indicates whether the event is a \"link event\". Link\nevents have a `type` value of `'$>'`.\n\nThe property `is_resolve_event` indicates whether the event has been resolved from a\n\"link event\". The returned value is `True` if `link` is not `None`.\n\nThe property `is_checkpoint` is `False`. This can be used to identify `Checkpoint`\ninstances returned when receiving events from `include_checkpoints=True`.\n\n\n\n## Streams<a id=\"streams\"></a>\n\nIn EventStoreDB, a \"stream\" is a sequence of recorded events that all have\nthe same \"stream name\". There will normally be many streams in a database,\neach with many recorded events. Each recorded event has a position in its stream\n(the \"stream position\"), and a position in the database (the \"commit position\").\nStream positions are zero-based and gapless. Commit positions are also zero-based,\nbut are not gapless.\n\nThe methods `append_to_stream()`, `get_stream()` and `read_all()` can\nbe used to read and record in the database.\n\n### Append events<a id=\"append-events\"></a>\n\n*requires leader*\n\nThe `append_to_stream()` method can be used atomically to record a sequence of new events.\nIf the operation is successful, it returns the commit position of the last event in the\nsequence that has been recorded.\n\nThis method has three required arguments, `stream_name`, `current_version`\nand `events`.\n\nThe required `stream_name` argument is a Python `str` that uniquely identifies a\nstream to which a sequence of events will be appended.\n\nThe required `current_version` argument is expected to be either a Python `int`\nthat indicates the stream position of the last recorded event in the stream, or\n`StreamState.NO_STREAM` if the stream does not yet exist or has been deleted. The\nstream positions are zero-based and gapless, so that if a stream has two events, the\n`current_version` should be 1. If an incorrect value is given, this method will raise a\n`WrongCurrentVersion` exception. This behavior is designed to provide concurrency\ncontrol when recording new events. The correct value of `current_version` for any stream\ncan be obtained by calling `get_current_version()`. However, the typical approach is to\nreconstruct an aggregate from the recorded events, so that the version of the aggregate\nis the stream position of the last recorded event, then have the aggregate generate new\nevents, and then use the current version of the aggregate as the value of the\n`current_version` argument when appending the new aggregate events. This ensures\nthe consistency of the recorded aggregate events, because operations that generate\nnew aggregate events can be retried with a freshly reconstructed aggregate if\na `WrongCurrentVersion` exception is encountered when recording new events. This\ncontrolling behavior can be entirely disabled by setting the value of the `current_version`\nargument to the constant `StreamState.ANY`. More selectively, this behaviour can be\ndisabled for existing streams by setting the value of the `current_version`\nargument to the constant `StreamState.EXISTS`.\n\nThe required `events` argument is expected to be a sequence of new event objects. The\n`NewEvent` class should be used to construct new event objects. The `append_to_stream()`\noperation is atomic, so that either all or none of the new events will be recorded. It\nis not possible with EventStoreDB atomically to record new events in more than one stream.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nIn the example below, a new event, `event1`, is appended to a new stream. The stream\ndoes not yet exist, so `current_version` is `StreamState.NO_STREAM`.\n\n```python\n# Construct a new event object.\nevent1 = NewEvent(type='OrderCreated', data=b'{}')\n\n# Define a new stream name.\nstream_name1 = str(uuid.uuid4())\n\n# Append the new events to the new stream.\ncommit_position1 = client.append_to_stream(\n    stream_name=stream_name1,\n    current_version=StreamState.NO_STREAM,\n    events=[event1],\n)\n```\n\nIn the example below, two subsequent events are appended to an existing\nstream. The stream has one recorded event, so `current_version` is `0`.\n\n```python\nevent2 = NewEvent(type='OrderUpdated', data=b'{}')\nevent3 = NewEvent(type='OrderDeleted', data=b'{}')\n\ncommit_position2 = client.append_to_stream(\n    stream_name=stream_name1,\n    current_version=0,\n    events=[event2, event3],\n)\n```\n\nThe returned values, `commit_position1` and `commit_position2`, are the\ncommit positions in the database of the last events in the recorded sequences.\nThat is, `commit_position1` is the commit position of `event1` and\n`commit_position2` is the commit position of `event3`.\n\nCommit positions that are returned in this way can be used by a user interface to poll\na downstream component until it has processed all the newly recorded events. For example,\nconsider a user interface command that results in the recording of new events, and an\neventually consistent materialized view in a downstream component that is updated from\nthese events. If the new events have not yet been processed, the view might be stale,\nor out-of-date. Instead of displaying a stale view, the user interface can poll the\ndownstream component until it has processed the newly recorded events, and then display\nan up-to-date view to the user.\n\n\n### Idempotent append operations<a id=\"idempotent-append-operations\"></a>\n\nThe `append_to_stream()` method is \"idempotent\" with respect to the `id` value of a\n`NewEvent` object. That is to say, if `append_to_stream()` is called with events\nwhose `id` values are equal to those already recorded in the stream, then the\nmethod call will successfully return, with the commit position of the last new event,\nwithout making any changes to the database.\n\nThis is because sometimes it may happen, when calling `append_to_stream()`, that the new\nevents are successfully recorded, but somehow something bad happens before the method call\ncan return successfully to the caller. In this case, we cannot be sure that the events have\nin fact been recorded, and so we may wish to retry.\n\nIf the events were in fact successfully recorded, it is convenient for the retried method call\nto return successfully, and without either raising an exception (when `current_version`\nis either `StreamState.NO_STREAM` an integer value) or creating further event records\n(when `current_version` is `StreamState.ANY` or `StreamState.EXISTS`), as it would\nif the `append_to_stream()` method were not idempotent.\n\nIf the method call initially failed and the new events were not in fact recorded, it\nmakes good sense, when the method call is retried, that the new events are recorded\nand that the method call returns successfully. If the concurrency controls have not been disabled,\nthat is if the `current version` is either `StreamState.NO_STREAM` or an integer value, and\nif a `WrongCurrentVersion` exception is raised when retrying the method call, then we can assume\nboth that the initial method call did not in fact successfully record the events, and also\nthat subsequent events have in the meantime been recorded by somebody else. In this case,\nan application command which generated the new events may need to be executed again. And\nthe user of the application may need to be given an opportunity to decide if they still wish to\nproceed with their original intention, by displaying a suitable error with an up-to-date view of\nthe recorded state. In the case where concurrency controls have been disabled, by using `StreamState.ANY` or\n`StreamState.EXISTS` as the value of `current_version`, retrying a method call that failed to\nreturn successfully will, more simply, just attempt to ensure the new events are recorded, regardless\nof their resulting stream positions. In either case, when the method call does return successfully, we\ncan be sure the events have been recorded.\n\nThe example below shows the `append_to_stream()` method being called again with events\n`event2` and `event3`, and with `current_version=0`. We can see that repeating the call\nto `append_to_stream()` returns successfully without raising a `WrongCurrentVersion`\nexception, as it would if the `append_to_stream()` operation were not idempotent.\n\n```python\n# Retry appending event3.\ncommit_position_retry = client.append_to_stream(\n    stream_name=stream_name1,\n    current_version=0,\n    events=[event2, event3],\n)\n```\n\nWe can see that the same commit position is returned as above.\n\n```python\nassert commit_position_retry == commit_position2\n```\n\nThe example below shows the `append_to_stream()` method being called again with events\n`event2` and `event3`, with and `current_version=StreamState.ANY`.\n\n```python\n# Retry appending event3.\ncommit_position_retry = client.append_to_stream(\n    stream_name=stream_name1,\n    current_version=0,\n    events=[event2, event3],\n)\n```\n\nWe can see that the same commit position is returned as above.\n\n```python\nassert commit_position_retry == commit_position2\n```\n\nBy calling `get_stream()`, we can also see the stream has been unchanged.\nThat is, there are still only three events in the stream.\n\n```python\nevents = client.get_stream(\n    stream_name=stream_name1\n)\n\nassert len(events) == 3\n```\n\nThis idempotent behaviour depends on the `id` attribute of the `NewEvent` class.\nThis attribute is, by default, assigned a new and unique version-4 UUID when an\ninstance of `NewEvent` is constructed. To set the `id` value of a `NewEvent`,\nthe optional `id` constructor argument can be used when constructing `NewEvent` objects.\n\n\n### Read stream events<a id=\"read-stream-events\"></a>\n\nThe `read_stream()` method can be used to get events that have been appended\nto a stream. This method returns a \"read response\" object.\n\nA \"read response\" object is a Python iterator. Recorded events can be\nobtained by iterating over the \"read response\" object. Recorded events are\nstreamed from the server to the client as the iteration proceeds. The iteration\nwill automatically stop when there are no more recorded events to be returned.\nThe streaming of events, and hence the iterator, can also be stopped by calling\nthe `stop()` method on the \"read response\" object.\n\nThe `get_stream()` method can be used to get events that have been appended\nto a stream. This method returns a Python `tuple` of recorded event objects.\nThe recorded event objects are instances of the `RecordedEvent` class. It\ncalls `read_stream()` and passes the \"read response\" iterator into a Python\n`tuple`, so that the streaming will complete before the method returns.\n\nThe `read_stream()` and `get_stream()` methods have one required argument, `stream_name`.\n\nThe required `stream_name` argument is a Python `str` that uniquely identifies a\nstream from which recorded events will be returned.\n\nThe `read_stream()` and `get_stream()` methods also have six optional arguments,\n`stream_position`, `backwards`, `resolve_links`, `limit`, `timeout`, and `credentials`.\n\nThe optional `stream_position` argument is a Python `int` that can be used to\nindicate the position in the stream from which to start reading. The default value\nof `stream_position` is `None`. When reading a stream from a specific position in the\nstream, the recorded event at that position will be included, both when reading\nforwards from that position, and when reading backwards.\n\nThe optional `backwards` argument is a Python `bool`. The default value of `backwards`\nis `False`, which means the stream will be read forwards, so that events are returned\nin the order they were recorded. If `backwards` is `True`, the events are returned in\nreverse order.\n\nIf `backwards` is `False` and `stream_position` is `None`, the stream's events will be\nreturned in the order they were recorded, starting from the first recorded event. If\n`backwards` is `True` and `stream_position` is `None`, the stream's events will be\nreturned in reverse order, starting from the last recorded event.\n\nThe optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`\nis `False`, which means any event links will not be resolved, so that the events that are\nreturned may represent event links. If `resolve_links` is `True`, any event links will\nbe resolved, so that the linked events will be returned instead of the event links.\n\nThe optional `limit` argument is a Python `int` which restricts the number of events\nthat will be returned. The default value of `limit` is `sys.maxint`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to override call credentials derived\nfrom the connection string URI. A suitable value for this argument can be constructed\nby calling the client method `construct_call_credentials()`.\n\nThe example below shows the default behavior, which is to return all the recorded\nevents of a stream forwards from the first recorded events to the last.\n\n```python\nevents = client.get_stream(\n    stream_name=stream_name1\n)\n\nassert len(events) == 3\nassert events[0] == event1\nassert events[1] == event2\nassert events[2] == event3\n```\n\nThe example below shows how to use the `stream_position` argument to read a stream\nfrom a specific stream position to the end of the stream. Stream positions are\nzero-based, and so `stream_position=1` corresponds to the second event that was\nrecorded in the stream, in this case `event2`.\n\n```python\nevents = client.get_stream(\n    stream_name=stream_name1,\n    stream_position=1,\n)\n\nassert len(events) == 2\nassert events[0] == event2\nassert events[1] == event3\n```\n\nThe example below shows how to use the `backwards` argument to read a stream backwards.\n\n```python\nevents = client.get_stream(\n    stream_name=stream_name1,\n    backwards=True,\n)\n\nassert len(events) == 3\nassert events[0] == event3\nassert events[1] == event2\nassert events[2] == event1\n```\n\nThe example below shows how to use the `limit` argument to read a limited number of\nevents.\n\n```python\nevents = client.get_stream(\n    stream_name=stream_name1,\n    limit=2,\n)\n\nassert len(events) == 2\nassert events[0] == event1\nassert events[1] == event2\n```\n\nThe `read_stream()` and `get_stream()` methods will raise a `NotFound` exception if the\nnamed stream has never existed or has been deleted.\n\n```python\nfrom esdbclient.exceptions import NotFound\n\n\ntry:\n    client.get_stream('does-not-exist')\nexcept NotFound:\n    pass  # The stream does not exist.\nelse:\n    raise Exception(\"Shouldn't get here\")\n```\n\nPlease note, the `get_stream()` method is decorated with the `@autoreconnect` and\n`@retrygrpc` decorators, whilst the `read_stream()` method is not. This means that\nall errors due to connection issues will be caught by the retry and reconnect decorators\nwhen calling the `get_stream()` method, but not when calling `read_stream()`. The\n`read_stream()` method has no such decorators because the streaming only starts\nwhen iterating over the \"read response\" starts, which means that the method returns\nbefore the streaming starts, and so there is no chance for any decorators to catch\nany connection issues.\n\nFor the same reason, `read_stream()` will not raise a `NotFound` exception when\nthe stream does not exist, until iterating over the \"read response\" object begins.\n\nIf you are reading a very large stream, then you might prefer to call `read_stream()`,\nand begin iterating through the recorded events whilst they are being streamed from\nthe server, rather than both waiting and having them all accumulate in memory.\n\n### Get current version<a id=\"get-current-version\"></a>\n\nThe `get_current_version()` method is a convenience method that essentially calls\n`get_stream()` with `backwards=True` and `limit=1`. This method returns\nthe value of the `stream_position` attribute of the last recorded event in a\nstream. If a stream does not exist, the returned value is `StreamState.NO_STREAM`.\nThe returned value is the correct value of `current_version` when appending events\nto a stream, and when deleting or tombstoning a stream.\n\nThis method has one required argument, `stream_name`.\n\nThe required `stream_name` argument is a Python `str` that uniquely identifies a\nstream from which a stream position will be returned.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nIn the example below, the last stream position of `stream_name1` is obtained.\nSince three events have been appended to `stream_name1`, and because positions\nin a stream are zero-based and gapless, so the current version is `2`.\n\n```python\ncurrent_version = client.get_current_version(\n    stream_name=stream_name1\n)\n\nassert current_version == 2\n```\n\nIf a stream has never existed or has been deleted, the returned value is\n`StreamState.NO_STREAM`, which is the correct value of the `current_version`\nargument both when appending the first event of a new stream, and also when\nappending events to a stream that has been deleted.\n\n```python\ncurrent_version = client.get_current_version(\n    stream_name='does-not-exist'\n)\n\nassert current_version is StreamState.NO_STREAM\n```\n\n### How to implement snapshotting with EventStoreDB<a id=\"how-to-implement-snapshotting-with-eventstoredb\"></a>\n\nSnapshots can improve the performance of aggregates that would otherwise be\nreconstructed from very long streams. However, it is generally recommended to design\naggregates to have a finite lifecycle, and so to have relatively short streams,\nthereby avoiding the need for snapshotting. This \"how to\" section is intended merely\nto show how snapshotting of aggregates can be implemented with EventStoreDB using\nthis Python client.\n\nEvent-sourced aggregates are typically reconstructed from recorded events by calling\na mutator function for each recorded event, evolving from an initial state\n`None` to the current state of the aggregate. The function `get_aggregate()` shows\nhow this can be done. The aggregate ID is used as a stream name. The exception\n`AggregateNotFound` is raised if the aggregate stream is not found.\n\n```python\nclass AggregateNotFound(Exception):\n    \"\"\"Raised when an aggregate is not found.\"\"\"\n\n\ndef get_aggregate(aggregate_id, mutator_func):\n    stream_name = aggregate_id\n\n    # Get recorded events.\n    try:\n        events = client.get_stream(\n            stream_name=stream_name,\n            stream_position=None\n        )\n    except NotFound as e:\n        raise AggregateNotFound(aggregate_id) from e\n    else:\n        # Reconstruct aggregate from recorded events.\n        aggregate = None\n        for event in events:\n            aggregate = mutator_func(aggregate, event)\n        return aggregate\n```\n\nSnapshotting of aggregates can be implemented by recording the current state of\nan aggregate as a new event.\n\nIf an aggregate object has a version number that corresponds to the stream position of\nthe last event that was used to reconstruct the aggregate, and this version number\nis recorded in the snapshot metadata, then any events that are recorded after the\nsnapshot can be selected using this version number. The aggregate can then be\nreconstructed from the last snapshot and any subsequent events, without having\nto replay the entire history.\n\nWe will use a separate stream for an aggregate's snapshots that is named after the\nstream used for recording its events. The name of the snapshot stream will be\nconstructed by prefixing the aggregate's stream name with `'snapshot-$'`.\n\n```python\nSNAPSHOT_STREAM_NAME_PREFIX = 'snapshot-$'\n\ndef make_snapshot_stream_name(stream_name):\n    return f'{SNAPSHOT_STREAM_NAME_PREFIX}{stream_name}'\n\n\ndef remove_snapshot_stream_prefix(snapshot_stream_name):\n    assert snapshot_stream_name.startswith(SNAPSHOT_STREAM_NAME_PREFIX)\n    return snapshot_stream_name[len(SNAPSHOT_STREAM_NAME_PREFIX):]\n```\n\nNow, let's redefine the `get_aggregate()` function, so that it looks for a snapshot event,\nthen selects subsequent aggregate events, and then calls a mutator function for each\nrecorded event.\n\nNotice that the aggregate events are read from a stream for serialized aggregate\nevents, whilst the snapshot is read from a separate stream for serialized aggregate\nsnapshots. We will use JSON to serialize and deserialize event data.\n\n\n```python\nimport json\n\n\ndef get_aggregate(aggregate_id, mutator_func):\n    stream_name = aggregate_id\n    recorded_events = []\n\n    # Look for a snapshot.\n    try:\n        snapshots = client.get_stream(\n            stream_name=make_snapshot_stream_name(stream_name),\n            backwards=True,\n            limit=1\n        )\n    except NotFound:\n        stream_position = None\n    else:\n        assert len(snapshots) == 1\n        snapshot = snapshots[0]\n        stream_position = deserialize(snapshot.metadata)['version'] + 1\n        recorded_events.append(snapshot)\n\n    # Get subsequent events.\n    try:\n        events = client.get_stream(\n            stream_name=stream_name,\n            stream_position=stream_position\n        )\n    except NotFound as e:\n        raise AggregateNotFound(aggregate_id) from e\n    else:\n        recorded_events += events\n\n    # Reconstruct aggregate from recorded events.\n    aggregate = None\n    for event in recorded_events:\n        aggregate = mutator_func(aggregate, event)\n\n    return aggregate\n\n\ndef serialize(d):\n    return json.dumps(d).encode('utf8')\n\n\ndef deserialize(s):\n    return json.loads(s.decode('utf8'))\n```\n\nTo show how `get_aggregate()` can be used, let's define a `Dog` aggregate class, with\nattributes `name` and `tricks`. The attributes `id` and `version` will indicate an\naggregate object's ID and version number. The attribute `is_from_snapshot` is added\nhere merely to demonstrate below when an aggregate object has been reconstructed using\na snapshot.\n\n```python\nfrom dataclasses import dataclass\n\n\n@dataclass(frozen=True)\nclass Aggregate:\n    id: str\n    version: int\n    is_from_snapshot: bool\n\n\n@dataclass(frozen=True)\nclass Dog(Aggregate):\n    name: str\n    tricks: list\n```\n\nLet's also define a mutator function `mutate_dog()` that evolves the state of a\n`Dog` aggregate given various different types of events, `'DogRegistered'`,\n`'DogLearnedTrick'`, and `'Snapshot'`.\n\n```python\ndef mutate_dog(dog, event):\n    data = deserialize(event.data)\n    if event.type == 'DogRegistered':\n        return Dog(\n            id=event.stream_name,\n            version=event.stream_position,\n            is_from_snapshot=False,\n            name=data['name'],\n            tricks=[],\n        )\n    elif event.type == 'DogLearnedTrick':\n        assert event.stream_position == dog.version + 1\n        assert event.stream_name == dog.id, (event.stream_name, dog.id)\n        return Dog(\n            id=dog.id,\n            version=event.stream_position,\n            is_from_snapshot=dog.is_from_snapshot,\n            name=dog.name,\n            tricks=dog.tricks + [data['trick']],\n        )\n    elif event.type == 'Snapshot':\n        return Dog(\n            id=remove_snapshot_stream_prefix(event.stream_name),\n            version=deserialize(event.metadata)['version'],\n            is_from_snapshot=True,\n            name=data['name'],\n            tricks=data['tricks'],\n        )\n    else:\n        raise Exception(f\"Unknown event type: {event.type}\")\n```\n\nFor convenience, let's also define a `get_dog()` function that calls `get_aggregate()`\nwith the `mutate_dog()` function as the value of its `mutator_func` argument.\n\n```python\ndef get_dog(dog_id):\n    return get_aggregate(\n        aggregate_id=dog_id,\n        mutator_func=mutate_dog,\n    )\n```\n\nWe can also define some \"command\" functions that append new events to the\ndatabase. The `register_dog()` function appends a `DogRegistered` event. The\n`record_trick_learned()` appends a `DogLearnedTrick` event. The function\n`snapshot_dog()` appends a `Snapshot` event. Notice that the\n`record_trick_learned()` and `snapshot_dog()` functions use `get_dog()`.\n\nNotice also that the `DogRegistered` and `DogLearnedTrick` events are appended to a\nstream for aggregate events, whilst the `Snapshot` event is appended to a separate\nstream for aggregate snapshots.\n\n```python\ndef register_dog(name):\n    dog_id = str(uuid.uuid4())\n    event = NewEvent(\n        type='DogRegistered',\n        data=serialize({'name': name}),\n    )\n    client.append_to_stream(\n        stream_name=dog_id,\n        current_version=StreamState.NO_STREAM,\n        events=event,\n    )\n    return dog_id\n\n\ndef record_trick_learned(dog_id, trick):\n    dog = get_dog(dog_id)\n    event = NewEvent(\n        type='DogLearnedTrick',\n        data=serialize({'trick': trick}),\n    )\n    client.append_to_stream(\n        stream_name=dog_id,\n        current_version=dog.version,\n        events=event,\n    )\n\n\ndef snapshot_dog(dog_id):\n    dog = get_dog(dog_id)\n    event = NewEvent(\n        type='Snapshot',\n        data=serialize({'name': dog.name, 'tricks': dog.tricks}),\n        metadata=serialize({'version': dog.version}),\n    )\n    client.append_to_stream(\n        stream_name=make_snapshot_stream_name(dog_id),\n        current_version=StreamState.ANY,\n        events=event,\n    )\n```\n\nWe can call `register_dog()` to register a new dog.\n\n```python\n# Register a new dog.\ndog_id = register_dog('Fido')\n\ndog = get_dog(dog_id)\nassert dog.name == 'Fido'\nassert dog.tricks == []\nassert dog.version == 0\nassert dog.is_from_snapshot is False\n\n```\n\nWe can call `record_trick_learned()` to record that some tricks have been learned.\n\n```python\n\n# Record that 'Fido' learned a new trick.\nrecord_trick_learned(dog_id, trick='roll over')\n\ndog = get_dog(dog_id)\nassert dog.name == 'Fido'\nassert dog.tricks == ['roll over']\nassert dog.version == 1\nassert dog.is_from_snapshot is False\n\n\n# Record that 'Fido' learned another new trick.\nrecord_trick_learned(dog_id, trick='fetch ball')\n\ndog = get_dog(dog_id)\nassert dog.name == 'Fido'\nassert dog.tricks == ['roll over', 'fetch ball']\nassert dog.version == 2\nassert dog.is_from_snapshot is False\n```\n\nWe can call `snapshot_dog()` to record a snapshot of the current state of the `Dog`\naggregate. After we call `snapshot_dog()`, the `get_dog()` function will return a `Dog`\nobject that has been constructed using the `Snapshot` event.\n\n```python\n# Snapshot 'Fido'.\nsnapshot_dog(dog_id)\n\ndog = get_dog(dog_id)\nassert dog.name == 'Fido'\nassert dog.tricks == ['roll over', 'fetch ball']\nassert dog.version == 2\nassert dog.is_from_snapshot is True\n```\n\nWe can continue to evolve the state of the `Dog` aggregate, using\nthe snapshot both during the call to `record_trick_learned()` and\nwhen calling `get_dog()` directly.\n\n```python\nrecord_trick_learned(dog_id, trick='sit')\n\ndog = get_dog(dog_id)\nassert dog.name == 'Fido'\nassert dog.tricks == ['roll over', 'fetch ball', 'sit']\nassert dog.version == 3\nassert dog.is_from_snapshot is True\n```\n\nWe can see from the `is_from_snapshot` attribute that the `Dog` object was indeed\nreconstructed from the snapshot.\n\nSnapshots can be created at fixed version number intervals, fixed time\nperiods, after a particular type of event, immediately after events are\nappended, or as a background process.\n\n\n### Read all events<a id=\"read-all-events\"></a>\n\nThe `read_all()` method can be used to get all recorded events\nin the database in the order they were recorded. This method returns\na \"read response\" object, just like `read_stream()`.\n\nA \"read response\" is an iterator, and not a sequence. Recorded events can be\nobtained by iterating over the \"read response\" object. Recorded events are\nstreamed from the server to the client as the iteration proceeds. The iteration\nwill automatically stop when there are no more recorded events to be returned.\nThe streaming of events, and hence the iterator, can also be stopped by calling\nthe `stop()` method on the \"read response\" object. The recorded event objects\nare instances of the `RecordedEvent` class.\n\nThis method has nine optional arguments, `commit_position`, `backwards`, `resolve_links`,\n`filter_exclude`, `filter_include`, `filter_by_stream_name`, `limit`, `timeout`,\nand `credentials`.\n\nThe optional `commit_position` argument is a Python `int` that can be used to\nspecify a commit position from which to start reading. The default value of\n`commit_position` is `None`. Please note, if a commit position is specified,\nit must be an actually existing commit position in the database. When reading\nforwards, the event at the commit position may be included, depending upon the\nfilter. When reading backwards, the event at the commit position will not be\nincluded.\n\nThe optional `backwards` argument is a Python `bool`. The default of `backwards` is\n`False`, which means events are returned in the order they were recorded, If\n`backwards` is `True`, then events are returned in reverse order.\n\nIf `backwards` is `False` and `commit_position` is `None`, the database's events will\nbe returned in the order they were recorded, starting from the first recorded event.\nThis is the default behavior of `read_all()`. If `backwards` is `True` and\n`commit_position` is `None`, the database's events will be returned in reverse order,\nstarting from the last recorded event.\n\nThe optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`\nis `False`, which means any event links will not be resolved, so that the events that are\nreturned may represent event links. If `resolve_links` is `True`, any event links will\nbe resolved, so that the linked events will be returned instead of the event links.\n\nThe optional `filter_exclude` argument is a sequence of regular expressions that\nspecifies which recorded events should be returned. This argument is ignored\nif `filter_include` is set to a non-empty sequence. The default value of this\nargument matches the event types of EventStoreDB \"system events\", so that system\nevents will not normally be included. See the Notes section below for more\ninformation about filter expressions.\n\nThe optional `filter_include` argument is a sequence of regular expressions\nthat specifies which recorded events should be returned. By default, this\nargument is an empty tuple. If this argument is set to a non-empty sequence,\nthe `filter_exclude` argument is ignored.\n\nThe optional `filter_by_stream_name` argument is a Python `bool` that indicates\nwhether the filtering will apply to event types or stream names. By default, this\nvalue is `False` and so the filtering will apply to the event type strings of\nrecorded events.\n\nThe optional `limit` argument is an integer which restricts the number of events that\nwill be returned. The default value is `sys.maxint`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nThe filtering of events is done on the EventStoreDB server. The\n`limit` argument is applied on the server after filtering.\n\nThe example below shows how to get all the events we have recorded in the database\nso far, in the order they were recorded. We can see the three events of `stream_name1`\n(`event1`, `event2` and `event3`) are included, along with others.\n\n```python\n# Read all events (creates a streaming gRPC call).\nread_response = client.read_all()\n\n# Convert the iterator into a sequence of recorded events.\nevents = tuple(read_response)\nassert len(events) > 3  # more than three\n\n# Convert the sequence of recorded events into a set of event IDs.\nevent_ids = set(e.id for e in events)\nassert event1.id in event_ids\nassert event2.id in event_ids\nassert event3.id in event_ids\n```\n\nThe example below shows how to read all recorded events in the database from\na particular commit position, in this case `commit_position1`. When reading\nforwards from a specific commit position, the event at the specified position\nwill be included. The value of `commit_position1` is the position we obtained\nwhen appending `event1`. And so `event1` is the first recorded event we shall\nreceive, `event2` is the second, and `event3` is the third.\n\n```python\n# Read all events forwards from a commit position.\nread_response = client.read_all(\n    commit_position=commit_position1\n)\n\n# Step through the \"read response\" iterator.\nassert next(read_response) == event1\nassert next(read_response) == event2\nassert next(read_response) == event3\n\n# Stop the iterator.\nread_response.stop()\n```\n\nThe example below shows how to read all events recorded in the database in reverse\norder. We can see that the first events we receive are the last events that were\nrecorded: the events of the `Dog` aggregate from the section about snapshotting\nand the snapshot.\n\n```python\n# Read all events backwards from the end.\nread_response = client.read_all(\n    backwards=True\n)\n\n# Step through the \"read response\" iterator.\nassert next(read_response).type == \"DogLearnedTrick\"\nassert next(read_response).type == \"Snapshot\"\nassert next(read_response).type == \"DogLearnedTrick\"\nassert next(read_response).type == \"DogLearnedTrick\"\nassert next(read_response).type == \"DogRegistered\"\n\n# Stop the iterator.\nread_response.stop()\n```\n\nThe example below shows how to read a limited number of events\nforwards from a specific commit position.\n\n```python\nevents = tuple(\n    client.read_all(\n        commit_position=commit_position1,\n        limit=1,\n    )\n)\n\nassert len(events) == 1\nassert events[0] == event1\n```\n\nThe example below shows how to read a limited number of the recorded events\nin the database backwards from the end. In this case, the limit is 1, and\nso we receive the last recorded event.\n\n```python\nevents = tuple(\n    client.read_all(\n        backwards=True,\n        limit=1,\n    )\n)\n\nassert len(events) == 1\n\nassert events[0].type == 'DogLearnedTrick'\nassert deserialize(events[0].data)['trick'] == 'sit'\n```\n\nPlease note, like the `read_stream()` method, the `read_all()` method\nis not decorated with retry and reconnect decorators, because the streaming of recorded\nevents from the server only starts when iterating over the \"read response\" starts, which\nmeans that the method returns before the streaming starts, and so there is no chance for\nany decorators to catch any connection issues.\n\n### Get commit position<a id=\"get-commit-position\"></a>\n\nThe `get_commit_position()` method can be used to get the commit position of the\nlast recorded event in the database. It simply calls `read_all()` with\n`backwards=True` and `limit=1`, and returns the value of the `commit_position`\nattribute of the last recorded event.\n\n```python\ncommit_position = client.get_commit_position()\n```\n\nThis method has five optional arguments, `filter_exclude`, `filter_include`,\n`filter_by_stream_name`, `timeout` and `credentials`. These values are passed to\n`read_all()`.\n\nThe optional `filter_exclude`, `filter_include` and `filter_by_stream_name` arguments\nwork in the same way as they do in the `read_all()` method.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to override call credentials\nderived from the connection string URI.\n\nThis method might be used to measure progress of a downstream component\nthat is processing all recorded events, by comparing the current commit\nposition with the recorded commit position of the last successfully processed\nevent in a downstream component. In this case, the value of the `filter_exclude`,\n`filter_include` and `filter_by_stream_name` arguments should equal those used\nby the downstream component to obtain recorded events.\n\n\n### Get stream metadata<a id=\"get-stream-metadata\"></a>\n\nThe `get_stream_metadata()` method returns the metadata for a stream, along\nwith the version of the stream metadata.\n\nThis method has one required argument, `stream_name`, which is a Python `str` that\nuniquely identifies a stream for which a stream metadata will be obtained.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nIn the example below, metadata for `stream_name1` is obtained.\n\n```python\nmetadata, metadata_version = client.get_stream_metadata(stream_name=stream_name1)\n```\n\nThe returned `metadata` value is a Python `dict`. The returned `metadata_version`\nvalue is either an `int` if the stream exists, or `StreamState.NO_STREAM` if the stream\ndoes not exist and no metadata has been set. These values can be used as the arguments\nof `set_stream_metadata()`.\n\n### Set stream metadata<a id=\"set-stream-metadata\"></a>\n\n*requires leader*\n\nThe method `set_stream_metadata()` sets metadata for a stream. Stream metadata\ncan be set before appending events to a stream.\n\nThis method has one required argument, `stream_name`, which is a Python `str` that\nuniquely identifies a stream for which a stream metadata will be set.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nIn the example below, metadata for `stream_name1` is set.\n\n\n```python\nmetadata[\"foo\"] = \"bar\"\n\nclient.set_stream_metadata(\n    stream_name=stream_name1,\n    metadata=metadata,\n    current_version=metadata_version,\n)\n```\n\nThe `current_version` argument should be the current version of the stream metadata\nobtained from `get_stream_metadata()`.\n\nPlease refer to the EventStoreDB documentation for more information about stream\nmetadata.\n\n### Delete stream<a id=\"delete-stream\"></a>\n\n*requires leader*\n\nThe method `delete_stream()` can be used to \"delete\" a stream.\n\nThis method has two required arguments, `stream_name` and `current_version`.\n\nThe required `stream_name` argument is a Python `str` that uniquely identifies a\nstream to which a sequence of events will be appended.\n\nThe required `current_version` argument is expected to be either a Python `int`\nthat indicates the stream position of the last recorded event in the stream.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nIn the example below, `stream_name1` is deleted.\n\n```python\ncommit_position = client.delete_stream(stream_name=stream_name1, current_version=2)\n```\n\nAfter deleting a stream, it's still possible to append new events. Reading from a\ndeleted stream will return only events that have been appended after it was\ndeleted.\n\n### Tombstone stream<a id=\"tombstone-stream\"></a>\n\n*requires leader*\n\nThe method `tombstone_stream()` can be used to \"tombstone\" a stream.\n\nThis method has two required arguments, `stream_name` and `current_version`.\n\nThe required `stream_name` argument is a Python `str` that uniquely identifies a\nstream to which a sequence of events will be appended.\n\nThe required `current_version` argument is expected to be either a Python `int`\nthat indicates the stream position of the last recorded event in the stream.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nIn the example below, `stream_name1` is tombstoned.\n\n```python\ncommit_position = client.tombstone_stream(stream_name=stream_name1, current_version=2)\n```\n\nAfter tombstoning a stream, it's not possible to append new events.\n\n\n## Catch-up subscriptions<a id=\"catch-up-subscriptions\"></a>\n\nA \"catch-up\" subscription can be used to receive events that have already been\nrecorded and events that are recorded subsequently. A catch-up subscription can\nbe used by an event-processing component that processes recorded events with\n\"exactly-once\" semantics.\n\nThe `subscribe_to_all()` method starts a catch-up subscription that can receive\nall events in the database. The `subscribe_to_stream()` method starts a catch-up\nsubscription that can receive events from a specific stream. Both methods return a\n\"catch-up subscription\" object, which is a Python iterator. Recorded events can be\nobtained by iteration. Recorded event objects obtained in this way are instances\nof the `RecordedEvent` class.\n\nBefore the \"catch-up subscription\" object is returned to the caller, the client will\nfirstly obtain a \"confirmation\" response from the server, which allows the client to\ndetect that both the gRPC connection and the streaming gRPC call is operational. For\nthis reason, the `subscribe_to_all()` and `subscribe_to_stream()` methods are both\nusefully decorated with the reconnect and retry decorators. However, once the method\nhas returned, the decorators will have exited, and any exceptions that are raised\ndue to connection issues whilst iterating over the subscription object will have to\nbe handled by your code.\n\nA \"catch-up subscription\" iterator will not automatically stop when there are no more\nevents to be returned, but instead the iteration will block until new events are\nsubsequently recorded in the database. Any subsequently recorded events will then be\nimmediately streamed to the client, and the iteration will then continue. The streaming\nof events, and hence the iteration, can be stopped by calling the `stop()` method on the\n\"catch-up subscription\" object.\n\n### Subscribe to all events<a id=\"subscribe-to-all-events\"></a>\n\nThe`subscribe_to_all()` method can be used to start a catch-up subscription\nfrom which all events recorded in the database can be obtained in the order\nthey were recorded. This method returns a \"catch-up subscription\" iterator.\n\nThis method also has ten optional arguments, `commit_position`, `from_end`, `resolve_links`,\n`filter_exclude`, `filter_include`, `filter_by_stream_name`, `include_checkpoints`,\n`include_caught_up`, `timeout` and `credentials`.\n\nThe optional `commit_position` argument specifies a commit position. The default\nvalue of `commit_position` is `None`, which means the catch-up subscription will\nstart from the first recorded event in the database. If a commit position is given,\nit must match an actually existing commit position in the database. Only events\nrecorded after that position will be obtained.\n\nThe optional `from_end` argument specifies whether or not the catch-up subscription\nwill start from the last recorded event in the database. By default, this argument\nis `False`. If `from_end` is `True`, only events recorded after the subscription\nis started will be obtained. This argument will be disregarded if `commit_position`\nis not `None`.\n\nThe optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`\nis `False`, which means any event links will not be resolved, so that the events that are\nreturned may represent event links. If `resolve_links` is `True`, any event links will\nbe resolved, so that the linked events will be returned instead of the event links.\n\nThe optional `filter_exclude` argument is a sequence of regular expressions that\nspecifies which recorded events should be returned. This argument is ignored\nif `filter_include` is set to a non-empty sequence. The default value of this\nargument matches the event types of EventStoreDB \"system events\", so that system\nevents will not normally be included. See the Notes section below for more\ninformation about filter expressions.\n\nThe optional `filter_include` argument is a sequence of regular expressions\nthat specifies which recorded events should be returned. By default, this\nargument is an empty tuple. If this argument is set to a non-empty sequence,\nthe `filter_exclude` argument is ignored.\n\nThe optional `filter_by_stream_name` argument is a Python `bool` that indicates\nwhether the filtering will apply to event types or stream names. By default, this\nvalue is `False` and so the filtering will apply to the event type strings of\nrecorded events.\n\nThe optional `include_checkpoints` argument is a Python `bool` which indicates\nwhether \"checkpoint\" messages should be included when recorded events are received.\nCheckpoints have a `commit_position` value that can be used by an event processing component to\nupdate its recorded commit position value, so that, when lots of events are being\nfilter out, the subscriber does not have to start from the same old position when\nthe event processing component is restarted.\n\nThe optional `include_caught_up` argument is a Python `bool` which indicates\nwhether \"caught up\" messages should be included when recorded events are\nreceived. The default value of `include_caught_up` is `False`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nThe example below shows how to start a catch-up subscription that starts\nfrom the first recorded event in the database.\n\n```python\n# Subscribe from the first recorded event in the database.\ncatchup_subscription = client.subscribe_to_all()\n```\n\nThe example below shows that catch-up subscriptions do not stop\nautomatically, but block when the last recorded event is received,\nand then continue when subsequent events are recorded.\n\n```python\nfrom time import sleep\nfrom threading import Thread\n\n\n# Append a new event to a new stream.\nstream_name2 = str(uuid.uuid4())\nevent4 = NewEvent(type='OrderCreated', data=b'{}')\n\nclient.append_to_stream(\n    stream_name=stream_name2,\n    current_version=StreamState.NO_STREAM,\n    events=[event4],\n)\n\n\n# Receive events from the catch-up subscription in a different thread.\nreceived_events = []\n\ndef receive_events():\n    for event in catchup_subscription:\n        received_events.append(event)\n\n\ndef wait_for_event(event):\n    for _ in range(100):\n        for received in reversed(received_events):\n            if event == received:\n                return\n        else:\n            sleep(0.1)\n    else:\n        raise AssertionError(\"Event wasn't received\")\n\n\nthread = Thread(target=receive_events, daemon=True)\nthread.start()\n\n# Wait to receive event4.\nwait_for_event(event4)\n\n# Append another event whilst the subscription is running.\nevent5 = NewEvent(type='OrderUpdated', data=b'{}')\nclient.append_to_stream(\n    stream_name=stream_name2,\n    current_version=0,\n    events=[event5],\n)\n\n# Wait for the subscription to block.\nwait_for_event(event5)\n\n# Stop the subscription.\ncatchup_subscription.stop()\nthread.join()\n```\n\nThe example below shows how to subscribe to events recorded after a\nparticular commit position, in this case from the commit position of\nthe last recorded event that was received above. Then, another event is\nrecorded before the subscription is restarted. And three more events are\nrecorded whilst the subscription is running. These four events are\nreceived in the order they were recorded.\n\n\n```python\n\n# Append another event.\nevent6 = NewEvent(type='OrderDeleted', data=b'{}')\nclient.append_to_stream(\n    stream_name=stream_name2,\n    current_version=1,\n    events=[event6],\n)\n\n# Restart subscribing to all events after the\n# commit position of the last received event.\ncatchup_subscription = client.subscribe_to_all(\n    commit_position=received_events[-1].commit_position\n)\n\nthread = Thread(target=receive_events, daemon=True)\nthread.start()\n\n# Wait for event6.\nwait_for_event(event6)\n\n# Append three more events to a new stream.\nstream_name3 = str(uuid.uuid4())\nevent7 = NewEvent(type='OrderCreated', data=b'{}')\nevent8 = NewEvent(type='OrderUpdated', data=b'{}')\nevent9 = NewEvent(type='OrderDeleted', data=b'{}')\n\nclient.append_to_stream(\n    stream_name=stream_name3,\n    current_version=StreamState.NO_STREAM,\n    events=[event7, event8, event9],\n)\n\n# Wait for events 7, 8 and 9.\nwait_for_event(event7)\nwait_for_event(event8)\nwait_for_event(event9)\n\n# Stop the subscription.\ncatchup_subscription.stop()\nthread.join()\n```\n\nThe catch-up subscription call is ended as soon as the subscription object's\n`stop()` method is called. This happens automatically when it goes out of scope,\nor when it is explicitly deleted from memory using the Python `del` keyword.\n\n### Subscribe to stream events<a id=\"subscribe-to-stream-events\"></a>\n\nThe `subscribe_to_stream()` method can be used to start a catch-up subscription\nfrom which events recorded in a single stream can be obtained. This method\nreturns a \"catch-up subscription\" iterator.\n\nThis method has a required `stream_name` argument, which specifies the name of the\nstream from which recorded events will be received.\n\nThis method also has six optional arguments, `stream_position`, `from_end`,\n`resolve_links`, `include_caught_up`, `timeout` and `credentials`.\n\nThe optional `stream_position` argument specifies a position in the stream from\nwhich to start subscribing. The default value of `stream_position` is `None`,\nwhich means that all events recorded in the stream will be obtained in the\norder they were recorded, unless `from_end` is set to `True`. If a stream\nposition is given, then only events recorded after that position will be obtained.\n\nThe optional `from_end` argument specifies that the subscription will start\nfrom the last position in the stream. The default value of `from_end` is `False`.\nIf `from_end` is `True`, then only events recorded after the subscription was\ncreated will be obtained. This argument if ignored is `stream_position` is set.\n\nThe optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`\nis `False`, which means any event links will not be resolved, so that the events that are\nreturned may represent event links. If `resolve_links` is `True`, any event links will\nbe resolved, so that the linked events will be returned instead of the event links.\n\nThe optional `include_caught_up` argument is a Python `bool` which indicates\nwhether \"caught up\" messages should be included when recorded events are\nreceived. The default value of `include_caught_up` is `False`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nThe example below shows how to start a catch-up subscription from\nthe first recorded event in a stream.\n\n```python\n# Subscribe from the start of 'stream2'.\nsubscription = client.subscribe_to_stream(stream_name=stream_name2)\n```\n\nThe example below shows how to start a catch-up subscription from\na particular stream position.\n\n```python\n# Subscribe to stream2, from the second recorded event.\nsubscription = client.subscribe_to_stream(\n    stream_name=stream_name2,\n    stream_position=1,\n)\n```\n\n### How to implement exactly-once event processing<a id=\"how-to-implement-exactly-once-event-processing\"></a>\n\nThe commit positions of recorded events that are received and processed by a\ndownstream component are usefully recorded by the downstream component, so that\nthe commit position of last processed event can be determined when processing is\nresumed.\n\nThe last recorded commit position can be used to specify the commit position from which\nto subscribe when processing is resumed. Since this commit position will represent the\nposition of the last successfully processed event in a downstream component, so it\nwill be usual to want the next event after this position, because that is the next\nevent that has not yet been processed. For this reason, when subscribing for events\nfrom a specific commit position using a catch-up subscription in EventStoreDB, the\nrecorded event at the specified commit position will NOT be included in the sequence\nof recorded events that are received.\n\nTo accomplish \"exactly-once\" processing of recorded events in a downstream\ncomponent when using a catch-up subscription, the commit position of a recorded\nevent should be recorded atomically and uniquely along with the result of processing\nrecorded events, for example in the same database as materialised views when\nimplementing eventually-consistent CQRS, or in the same database as a downstream\nanalytics or reporting or archiving application. By recording the commit position\nof recorded events atomically with the new state that results from processing\nrecorded events, \"dual writing\" in the consumption of recorded events can be\navoided. By also recording the commit position uniquely, the new state cannot be\nrecorded twice, and hence the recorded state of the downstream component will be\nupdated only once for any recorded event. By using the greatest recorded commit\nposition to resume a catch-up subscription, all recorded events will eventually\nbe processed. The combination of the \"at-most-once\" condition and the \"at-least-once\"\ncondition gives the \"exactly-once\" condition.\n\nThe danger with \"dual writing\" in the consumption of recorded events is that if a\nrecorded event is successfully processed and new state recorded atomically in one\ntransaction with the commit position recorded in a separate transaction, one may\nhappen and not the other. If the new state is recorded but the position is lost,\nand then the processing is stopped and resumed, the recorded event may be processed\ntwice. On the other hand, if the commit position is recorded but the new state is\nlost, the recorded event may effectively not be processed at all. By either\nprocessing an event more than once, or by failing to process an event, the recorded\nstate of the downstream component might be inaccurate, or possibly inconsistent, and\nperhaps catastrophically so. Such consequences may or may not matter in your situation.\nBut sometimes inconsistencies may halt processing until the issue is resolved. You can\navoid \"dual writing\" in the consumption of events by atomically recording the commit\nposition of a recorded event along with the new state that results from processing that\nevent in the same atomic transaction. By making the recording of the commit positions\nunique, so that transactions will be rolled back when there is a conflict, you will\nprevent the results of any duplicate processing of a recorded event being committed.\n\nRecorded events received from a catch-up subscription cannot be acknowledged back\nto the EventStoreDB server. Acknowledging events, however, is an aspect of \"persistent\nsubscriptions\". Hoping to rely on acknowledging events to an upstream\ncomponent is an example of dual writing.\n\n\n## Persistent subscriptions<a id=\"persistent-subscriptions\"></a>\n\nIn EventStoreDB, \"persistent\" subscriptions are similar to catch-up subscriptions,\nin that reading a persistent subscription will block when there are no more recorded\nevents to be received, and then continue when new events are subsequently recorded.\n\nPersistent subscriptions can be consumed by a group of consumers operating with one\nof the supported \"consumer strategies\".\n\nThe significant different with persistent subscriptions is the server will keep track\nof the progress of the consumers. The consumer of a persistent subscription will\ntherefore need to \"acknowledge\" when a recorded event has been processed successfully,\nand otherwise \"negatively acknowledge\" a recorded event that has been received but was\nnot successfully processed.\n\nAll of this means that for persistent subscriptions there are \"create\", \"read\", \"update\"\n\"delete\", \"ack\", and \"nack\" operations to consider.\n\nWhilst there are some advantages of persistent subscriptions, in particular the\nconcurrent processing of recorded events by a group of consumers, by tracking in\nthe server the position in the commit sequence of events that have been processed,\nthe issue of \"dual writing\" in the consumption of events arises. Reliability in the\nprocessing of recorded events by a group of persistent subscription consumers will\nrely on their idempotent handling of duplicate messages, and their resilience to\nout-of-order delivery.\n\n\n### Create subscription to all<a id=\"create-subscription-to-all\"></a>\n\n*requires leader*\n\nThe `create_subscription_to_all()` method can be used to create a \"persistent subscription\"\nto all the recorded events in the database across all streams.\n\nThis method has a required `group_name` argument, which is the\nname of a \"group\" of consumers of the subscription.\n\nThis method has nineteen optional arguments, `from_end`, `commit_position`, `resolve_links`,\n`filter_exclude`, `filter_include`, `filter_by_stream_name`, `consumer_strategy`,\n`message_timeout`, `max_retry_count`, `min_checkpoint_count`, `max_checkpoint_count`,\n`checkpoint_after`, `max_subscriber_count`, `live_buffer_size`, `read_batch_size`,\n`history_buffer_size`, `extra_statistics`, `timeout` and `credentials`.\n\nThe optional `from_end` argument can be used to specify that the group of consumers\nof the subscription should only receive events that were recorded after the subscription\nwas created.\n\nAlternatively, the optional `commit_position` argument can be used to specify a commit\nposition from which the group of consumers of the subscription should\nreceive events. Please note, the recorded event at the specified commit position might\nbe included in the recorded events received by the group of consumers.\n\nIf neither `from_end` nor `commit_position` are specified, the group of consumers\nof the subscription will potentially receive all recorded events in the database.\n\nThe optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`\nis `False`, which means any event links will not be resolved, so that the events that are\nreturned may represent event links. If `resolve_links` is `True`, any event links will\nbe resolved, so that the linked events will be returned instead of the event links.\n\nThe optional `filter_exclude` argument is a sequence of regular expressions that\nspecifies which recorded events should be returned. This argument is ignored\nif `filter_include` is set to a non-empty sequence. The default value of this\nargument matches the event types of EventStoreDB \"system events\", so that system\nevents will not normally be included. See the Notes section below for more\ninformation about filter expressions.\n\nThe optional `filter_include` argument is a sequence of regular expressions\nthat specifies which recorded events should be returned. By default, this\nargument is an empty tuple. If this argument is set to a non-empty sequence,\nthe `filter_exclude` argument is ignored.\n\nThe optional `filter_by_stream_name` argument is a Python `bool` that indicates\nwhether the filtering will apply to event types or stream names. By default, this\nvalue is `False` and so the filtering will apply to the event type strings of\nrecorded events.\n\nThe optional `consumer_strategy` argument is a Python `str` that defines\nthe consumer strategy for this persistent subscription. The value of this argument\ncan be `'DispatchToSingle'`, `'RoundRobin'`, `'Pinned'`, or `'PinnedByCorrelation'`. The\ndefault value is `'DispatchToSingle'`.\n\nThe optional `message_timeout` argument is a Python `float` which sets a maximum duration,\nin seconds, from the server sending a recorded event to a consumer of the persistent\nsubscription until either an \"acknowledgement\" (ack) or a \"negative acknowledgement\"\n(nack) is received by the server, after which the server will retry to send the event.\nThe default value of `message_timeout` is `30.0`.\n\nThe optional `max_retry_count` argument is a Python `int` which sets the number of times\nthe server will retry to send an event. The default value of `max_retry_count` is `10`.\n\nThe optional `min_checkpoint_count` argument is a Python `int` which sets the minimum\nnumber of \"acknowledgements\" (acks) received by the server before the server may record\nthe acknowledgements. The default value of `min_checkpoint_count` is `10`.\n\nThe optional `max_checkpoint_count` argument is a Python `int` which sets the maximum\nnumber of \"acknowledgements\" (acks) received by the server before the server must\nrecord the acknowledgements. The default value of `max_checkpoint_count` is `1000`.\n\nThe optional `checkpoint_after` argument is a Python `float` which sets the maximum\nduration in seconds between recording \"acknowledgements\" (acks). The default value of\n`checkpoint_after` is `2.0`.\n\nThe optional `max_subscriber_count` argument is a Python `int` which sets the maximum\nnumber of concurrent readers of the persistent subscription, beyond which attempts to\nread the persistent subscription will raise a `MaximumSubscriptionsReached` error.\n\nThe optional `live_buffer_size` argument is a Python `int` which sets the size of the\nbuffer (in-memory) holding newly recorded events. The default value of `live_buffer_size`\nis 500.\n\nThe optional `read_batch_size` argument is a Python `int` which sets the number of\nrecorded events read from disk when catching up. The default value of `read_batch_size`\nis 200.\n\nThe optional `history_buffer_size` argument is a Python `int` which sets the number of\nrecorded events to cache in memory when catching up. The default value of `history_buffer_size`\nis 500.\n\nThe optional `extra_statistics` argument is a Python `bool` which enables tracking of\nextra statistics on this subscription. The default value of `extra_statistics` is `False`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nThe method `create_subscription_to_all()` does not return a value. Recorded events are\nobtained by calling the `read_subscription_to_all()` method.\n\nIn the example below, a persistent subscription is created to operate from the\nfirst recorded non-system event in the database.\n\n```python\n# Create a persistent subscription.\ngroup_name1 = f\"group-{uuid.uuid4()}\"\nclient.create_subscription_to_all(group_name=group_name1)\n```\n\n### Read subscription to all<a id=\"read-subscription-to-all\"></a>\n\n*requires leader*\n\nThe `read_subscription_to_all()` method can be used by a group of consumers to receive\nrecorded events from a persistent subscription that has been created using\nthe `create_subscription_to_all()` method.\n\nThis method has a required `group_name` argument, which is\nthe name of a \"group\" of consumers of the subscription specified\nwhen `create_subscription_to_all()` was called.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nThis method returns a `PersistentSubscription` object, which is an iterator\ngiving `RecordedEvent` objects. It also has `ack()`, `nack()` and `stop()`\nmethods.\n\n```python\nsubscription = client.read_subscription_to_all(group_name=group_name1)\n```\n\nThe `ack()` method should be used by a consumer to \"acknowledge\" to the server that\nit has received and successfully processed a recorded event. This will prevent that\nrecorded event being received by another consumer in the same group. The `ack()`\nhas an `item` argument which can be either a `RecordedEvent` or a `UUID`. If you pass\nin a `RecordedEvent`, the value of its `ack_id` attribute will be used to acknowledge\nthe event to the server. If you pass in a UUID, then used the value of the `ack_id`\nof the `RecordedEvent` that is being acknowledged, in case the event has been resolved\nfrom a link event (which can happen both when persistent subscription setting\n`resolve_links` is `True` and also when replaying parked events regardless of the\n`resolve_links` setting).\n\nThe example below iterates over the subscription object, and calls `ack()` with the\nreceived `RecordedEvent` objects. The subscription's `stop()` method is called when\nwe have received `event9`, stopping the iteration, so that we can continue with the\nexamples below.\n\n```python\nreceived_events = []\n\nfor event in subscription:\n    received_events.append(event)\n\n    # Acknowledge the received event.\n    subscription.ack(event)\n\n    # Stop when 'event9' has been received.\n    if event == event9:\n        subscription.stop()\n```\n\nThe `nack()` should be used by a consumer to \"negatively acknowledge\" to the server that\nit has received but not successfully processed a recorded event. The `nack()` method has\nan `item` argument that works in the same way as `ack()`. Use the recorded event or its\n`ack_id` attribute. The `nack()` method also has an `action` argument, which should be\na Python `str`: either `'unknown'`, `'park'`, `'retry'`, `'skip'` or `'stop'`.\n\nThe `stop()` method can be used to stop the gRPC streaming operation.\n\n### How to write a persistent subscription consumer<a id=\"how-to-write-a-persistent-subscription-consumer\"></a>\n\nThe reading of a persistent subscription can be encapsulated in a \"consumer\" that calls\na \"policy\" function when a recorded event is received and then automatically calls\n`ack()` if the policy function returns normally, and `nack()` if it raises an exception,\nperhaps retrying the event for a certain number of times before parking the event.\n\nThe simple example below shows how this might be done. We can see that 'event9' is\nacknowledged before 'event5' is finally parked.\n\nThe  number of time a `RecordedEvent` has been retried is presented by the its\n`retry_count` attribute.\n\n```python\nacked_events = {}\nnacked_events = {}\n\n\nclass ExampleConsumer:\n    def __init__(self, subscription, max_retry_count, final_action):\n        self.subscription = subscription\n        self.max_retry_count = max_retry_count\n        self.final_action = final_action\n        self.error = None\n\n    def run(self):\n        with self.subscription:\n            for event in self.subscription:\n                try:\n                    self.policy(event)\n                except Exception:\n                    if event.retry_count < self.max_retry_count:\n                        action = \"retry\"\n                    else:\n                        action = self.final_action\n                    self.subscription.nack(event, action)\n                    self.after_nack(event, action)\n                else:\n                    self.subscription.ack(event)\n                    self.after_ack(event)\n\n    def stop(self):\n        self.subscription.stop()\n\n    def policy(self, event):\n        # Raise an exception when we see \"event5\".\n        if event == event5:\n            raise Exception()\n\n    def after_ack(self, event):\n        # Track retry count of acked events.\n        acked_events[event.id] = event.retry_count\n\n    def after_nack(self, event, action):\n        # Track retry count of nacked events.\n        nacked_events[event.id] = event.retry_count\n\n        if action == self.final_action:\n            # Stop the consumer, so we can continue with the examples.\n            self.stop()\n\n\n# Create subscription.\ngroup_name = f\"group-{uuid.uuid4()}\"\nclient.create_subscription_to_all(group_name, commit_position=commit_position1)\n\n# Read subscription.\nsubscription = client.read_subscription_to_all(group_name)\n\n# Construct consumer.\nconsumer = ExampleConsumer(\n    subscription=subscription,\n    max_retry_count=5,\n    final_action=\"park\",\n)\n\n# Run consumer.\nconsumer.run()\n\n# Check 'event5' was nacked and never acked.\nassert event5.id in nacked_events\nassert event5.id not in acked_events\nassert nacked_events[event5.id] == 5\n\n# Check 'event9' was acked and never nacked.\nassert event9.id in acked_events\nassert event9.id not in nacked_events\n```\n\n### Update subscription to all<a id=\"update-subscription-to-all\"></a>\n\n*requires leader*\n\nThe `update_subscription_to_all()` method can be used to update a\n\"persistent subscription\". Please note, the filter options and consumer\nstrategy cannot be adjusted.\n\nThis method has a required `group_name` argument, which is the\nname of a \"group\" of consumers of the subscription.\n\nThis method also has sixteen optional arguments, `from_end`, `commit_position`,\n`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,\n`min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,\n`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,\n`extra_statistics`, `timeout` and `credentials`.\n\nThe optional arguments `from_end`, `commit_position`,\n`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,\n`min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,\n`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,\namd `extra_statistics` can be used to adjust the values set on previous calls to\n`create_subscription_to_all()` and `update_subscription_to_all()`. If any of\nthese arguments are not mentioned in a call to `update_subscription_to_all()`,\nthe corresponding settings of the persistent subscription will be unchanged.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nThe method `update_subscription_to_all()` does not return a value.\n\nIn the example below, a persistent subscription is updated to run from the end of the\ndatabase.\n\n```python\n# Create a persistent subscription.\nclient.update_subscription_to_all(group_name=group_name1, from_end=True)\n```\n\n### Create subscription to stream<a id=\"create-subscription-to-stream\"></a>\n\n*requires leader*\n\nThe `create_subscription_to_stream()` method can be used to create a persistent\nsubscription to a stream.\n\nThis method has two required arguments, `group_name` and `stream_name`. The\n`group_name` argument names the group of consumers that will receive events\nfrom this subscription. The `stream_name` argument specifies which stream\nthe subscription will follow. The values of both these arguments are expected\nto be Python `str` objects.\n\nThis method also has sixteen optional arguments, `stream_position`, `from_end`,\n`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,\n`min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,\n`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,\n`extra_statistics`, `timeout` and `credentials`.\n\nThe optional `stream_position` argument specifies a stream position from\nwhich to subscribe. The recorded event at this stream\nposition will be received when reading the subscription.\n\nThe optional `from_end` argument is a Python `bool`.\nBy default, the value of this argument is `False`. If this argument is set\nto `True`, reading from the subscription will receive only events\nrecorded after the subscription was created. That is, it is not inclusive\nof the current stream position.\n\nThe optional `resolve_links` argument is a Python `bool`. The default value of `resolve_links`\nis `False`, which means any event links will not be resolved, so that the events that are\nreturned may represent event links. If `resolve_links` is `True`, any event links will\nbe resolved, so that the linked events will be returned instead of the event links.\n\nThe optional `consumer_strategy` argument is a Python `str` that defines\nthe consumer strategy for this persistent subscription. The value of this argument\ncan be `'DispatchToSingle'`, `'RoundRobin'`, `'Pinned'`, or `'PinnedByCorrelation'`. The\ndefault value is `'DispatchToSingle'`.\n\nThe optional `message_timeout` argument is a Python `float` which sets a maximum duration,\nin seconds, from the server sending a recorded event to a consumer of the persistent\nsubscription until either an \"acknowledgement\" (ack) or a \"negative acknowledgement\"\n(nack) is received by the server, after which the server will retry to send the event.\nThe default value of `message_timeout` is `30.0`.\n\nThe optional `max_retry_count` argument is a Python `int` which sets the number of times\nthe server will retry to send an event. The default value of `max_retry_count` is `10`.\n\nThe optional `min_checkpoint_count` argument is a Python `int` which sets the minimum\nnumber of \"acknowledgements\" (acks) received by the server before the server may record\nthe acknowledgements. The default value of `min_checkpoint_count` is `10`.\n\nThe optional `max_checkpoint_count` argument is a Python `int` which sets the maximum\nnumber of \"acknowledgements\" (acks) received by the server before the server must\nrecord the acknowledgements. The default value of `max_checkpoint_count` is `1000`.\n\nThe optional `checkpoint_after` argument is a Python `float` which sets the maximum\nduration in seconds between recording \"acknowledgements\" (acks). The default value of\n`checkpoint_after` is `2.0`.\n\nThe optional `max_subscriber_count` argument is a Python `int` which sets the maximum\nnumber of concurrent readers of the persistent subscription, beyond which attempts to\nread the persistent subscription will raise a `MaximumSubscriptionsReached` error.\n\nThe optional `live_buffer_size` argument is a Python `int` which sets the size of the\nbuffer (in-memory) holding newly recorded events. The default value of `live_buffer_size`\nis 500.\n\nThe optional `read_batch_size` argument is a Python `int` which sets the number of\nrecorded events read from disk when catching up. The default value of `read_batch_size`\nis 200.\n\nThe optional `history_buffer_size` argument is a Python `int` which sets the number of\nrecorded events to cache in memory when catching up. The default value of `history_buffer_size`\nis 500.\n\nThe optional `extra_statistics` argument is a Python `bool` which enables tracking of\nextra statistics on this subscription. The default value of `extra_statistics` is `False`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nThis method does not return a value. Events can be received by calling\n`read_subscription_to_stream()`.\n\nThe example below creates a persistent stream subscription from the start of the stream.\n\n```python\n# Create a persistent stream subscription from start of the stream.\ngroup_name2 = f\"group-{uuid.uuid4()}\"\nclient.create_subscription_to_stream(\n    group_name=group_name2,\n    stream_name=stream_name2,\n)\n```\n\n### Read subscription to stream<a id=\"read-subscription-to-stream\"></a>\n\n*requires leader*\n\nThe `read_subscription_to_stream()` method can be used to read a persistent\nsubscription to a stream.\n\nThis method has two required arguments, `group_name` and `stream_name`, which\nshould match the values of arguments used when calling `create_subscription_to_stream()`.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nThis method returns a `PersistentSubscription` object, which is an iterator\ngiving `RecordedEvent` objects, that also has `ack()`, `nack()` and `stop()`\nmethods.\n\n```python\nsubscription = client.read_subscription_to_stream(\n    group_name=group_name2,\n    stream_name=stream_name2,\n)\n```\n\nThe example below iterates over the subscription object, and calls `ack()`.\nThe subscription's `stop()` method is called when we have received `event6`,\nstopping the iteration, so that we can continue with the examples below.\n\n```python\nevents = []\nfor event in subscription:\n    events.append(event)\n\n    # Acknowledge the received event.\n    subscription.ack(event)\n\n    # Stop when 'event6' has been received.\n    if event == event6:\n        subscription.stop()\n```\n\nWe can check we received all the events that were appended to `stream_name2`\nin the examples above.\n\n```python\nassert len(events) == 3\nassert events[0] == event4\nassert events[1] == event5\nassert events[2] == event6\n```\n\n### Update subscription to stream<a id=\"update-subscription-to-stream\"></a>\n\n*requires leader*\n\nThe `update_subscription_to_stream()` method can be used to update a persistent\nsubscription to a stream. Please note, the consumer strategy cannot be adjusted.\n\nThis method has a required `group_name` argument, which is the\nname of a \"group\" of consumers of the subscription, and a required\n`stream_name` argument, which is the name of a stream.\n\nThis method also has sixteen optional arguments, `from_end`, `stream_position`,\n`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,\n`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,\n`extra_statistics`, `min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,\n`timeout` and `credentials`.\n\nThe optional arguments `from_end`, `stream_position`,\n`resolve_links`, `consumer_strategy`, `message_timeout`, `max_retry_count`,\n`min_checkpoint_count`, `max_checkpoint_count`, `checkpoint_after`,\n`max_subscriber_count`, `live_buffer_size`, `read_batch_size`, `history_buffer_size`,\nand `extra_statistics` can be used to adjust the values set on previous calls to\n`create_subscription_to_stream()` and `update_subscription_to_stream()`. If any of\nthese arguments are not mentioned in a call to `update_subscription_to_stream()`,\nthe corresponding settings of the persistent subscription will be unchanged.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nThe `update_subscription_to_stream()` method does not return a value.\n\nIn the example below, a persistent subscription to a stream is updated to run from the\nend of the stream.\n\n```python\n# Create a persistent subscription.\nclient.update_subscription_to_stream(\n    group_name=group_name2,\n    stream_name=stream_name2,\n    from_end=True,\n)\n```\n\n### Replay parked events<a id=\"replay-parked-events\"></a>\n\n*requires leader*\n\nThe `replay_parked_events()` method can be used to \"replay\" events that have\nbeen \"parked\" (negatively acknowledged with the action `'park'`) when reading\na persistent subscription. Parked events will then be received again by consumers\nreading from the persistent subscription.\n\nThis method has a required `group_name` argument and an optional `stream_name`\nargument. The values of these arguments should match those used when calling\n`create_subscription_to_all()` or `create_subscription_to_stream()`.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nThe example below replays parked events for group `group_name1`.\n\n```python\nclient.replay_parked_events(\n    group_name=group_name1,\n)\n```\n\nThe example below replays parked events for group `group_name2`.\n\n```python\nclient.replay_parked_events(\n    group_name=group_name2,\n    stream_name=stream_name2,\n)\n```\n\n### Get subscription info<a id=\"get-subscription-info\"></a>\n\n*requires leader*\n\nThe `get_subscription_info()` method can be used to get information for a\npersistent subscription.\n\nThis method has a required `group_name` argument and an optional `stream_name`\nargument, which should match the values of arguments used when calling either\n`create_subscription_to_all()` or `create_subscription_to_stream()`.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nThe example below gets information for the persistent subscription `group_name1` which\nwas created by calling `create_subscription_to_all()`.\n\n```python\nsubscription_info = client.get_subscription_info(\n    group_name=group_name1,\n)\n```\n\nThe example below gets information for the persistent subscription `group_name2` on\n`stream_name2` which was created by calling `create_subscription_to_stream()`.\n\n```python\nsubscription_info = client.get_subscription_info(\n    group_name=group_name2,\n    stream_name=stream_name2,\n)\n```\n\nThe returned value is a `SubscriptionInfo` object.\n\n### List subscriptions<a id=\"list-subscriptions\"></a>\n\n*requires leader*\n\nThe `list_subscriptions()` method can be used to get information for all\nexisting persistent subscriptions, both \"subscriptions to all\" and\n\"subscriptions to stream\".\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nThe example below lists all the existing persistent subscriptions.\n\n```python\nsubscriptions = client.list_subscriptions()\n```\n\nThe returned value is a list of `SubscriptionInfo` objects.\n\n\n### List subscriptions to stream<a id=\"list-subscriptions-to-stream\"></a>\n\n*requires leader*\n\nThe `list_subscriptions_to_stream()` method can be used to get information for all\nthe persistent subscriptions to a stream.\n\nThis method has one required argument, `stream_name`.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\n```python\nsubscriptions = client.list_subscriptions_to_stream(\n    stream_name=stream_name2,\n)\n```\n\nThe returned value is a list of `SubscriptionInfo` objects.\n\n\n### Delete subscription<a id=\"delete-subscription\"></a>\n\n*requires leader*\n\nThe `delete_subscription()` method can be used to delete a persistent\nsubscription.\n\nThis method has a required `group_name` argument and an optional `stream_name`\nargument, which should match the values of arguments used when calling either\n`create_subscription_to_all()` or `create_subscription_to_stream()`.\n\nThis method has an optional `timeout` argument, which is a Python `float`\nthat sets a maximum duration, in seconds, for the completion of the gRPC operation.\n\nThis method has an optional `credentials` argument, which can be used to\noverride call credentials derived from the connection string URI.\n\nThe example below deletes the persistent subscription `group_name1` which\nwas created by calling `create_subscription_to_all()`.\n\n```python\nclient.delete_subscription(\n    group_name=group_name1,\n)\n```\n\nThe example below deleted the persistent subscription `group_name2` on\n`stream_name2` which was created by calling `create_subscription_to_stream()`.\n\n```python\nclient.delete_subscription(\n    group_name=group_name2,\n    stream_name=stream_name2,\n)\n```\n\n\n## Projections<a id=\"projections\"></a>\n\nPlease refer to the [EventStoreDB documentation](https://developers.eventstore.com/server/v23.10/projections.html)\nfor more information on projections in EventStoreDB.\n\n### Create projection<a id=\"create-projection\"></a>\n\n*requires leader*\n\nThe `create_projection()` method can be used to create a \"continuous\" projection.\n\nThis method has two required arguments, `name` and `query`.\n\nThis required `name` argument is a Python `str` that specifies the name of the projection.\n\nThis required `query` argument is a Python `str` that defines what the projection will do.\n\nThis method also has four optional arguments, `emit_enabled`,\n`track_emitted_streams`, `timeout`, and `credentials`.\n\nThe optional `emit_enabled` argument is a Python `bool` which specifies whether a\nprojection will be able to emit events. If a `True` value is specified, the projection\nwill be able to emit events, otherwise the projection will not be able to emit events.\nThe default value of `emit_enabled` is `False`.\n\nPlease note, `emit_enabled` must be `True` if your projection query includes a call to\n`emit()`, otherwise the projection will not run.\n\nThe optional `track_emitted_streams` argument is a Python `bool` which specifies whether\na projection will have its emitted streams tracked. If a `True` value is specified, the\nprojection will have its emitted streams tracked, otherwise the projection will not\nhave its emitted streams tracked. The default value of `track_emitted_streams` is `False`.\n\nThe purpose of tracking emitted streams is that they can optionally be deleted when\na projection is deleted (see the `delete_projection()` method for more details).\n\nPlease note, if you set `track_emitted_streams` to `True`, then you must also set\n`emit_enabled` to `True`, otherwise an error will be raised by this method.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nIn the example below, a projection is created that processes events appended to\n`stream_name2`. The \"state\" of the projection is initialised to have a \"count\" that\nis incremented once for each event.\n\n```python\nprojection_name = str(uuid.uuid4())\n\nprojection_query = \"\"\"fromStream('%s')\n.when({\n  $init: function(){\n    return {\n      count: 0\n    };\n  },\n  OrderCreated: function(s,e){\n    s.count += 1;\n  },\n  OrderUpdated: function(s,e){\n    s.count += 1;\n  },\n  OrderDeleted: function(s,e){\n    s.count += 1;\n  }\n})\n.outputState()\n\"\"\"  % stream_name2\n\nclient.create_projection(\n    name=projection_name,\n    query=projection_query,\n)\n```\n\nPlease note, the `outputState()` call is optional, and causes the state of the\nprojection to be persisted in a \"result\" stream. If `outputState()` is called, an\nevent representing the state of the projection will immediately be written to a\n\"result\" stream.\n\nThe default name of the \"result\" stream for a projection with name `projection_name`\nis `$projections-{projection_name}-result`. This stream name can be used to read from\nand subscribe to the \"result\" stream, with the `get_stream()`, or `read_stream()`,\nor `subscribe_to_stream()`, or `create_subscription_to_stream()` and\n`read_subscription_to_stream()` methods.\n\nIf your projection does not call `outputState()`, then you won't be able to read or\nsubscribe to a \"result\" stream, but you will still be able to get the projection\n\"state\" using the `get_projection_state()` method.\n\nThe \"type\" string of events recorded in \"result\" streams is `'Result'`. You may want to\ninclude this in a `filter_exclude` argument when filtering events by type whilst reading\nor subscribing to \"all\" events recorded in the database (with `read_all()`,\n`subscribe_to_all()`, etc).\n\nAdditionally, and in any case, from time to time the state of the projection will be\nrecorded in a \"state\" stream, and also the projection will write to a \"checkpoint\"\nstream. The \"state\" stream, the \"checkpoint\" stream, and all \"emitted\" streams that\nhave been \"tracked\" (as a consequence of the `track_emitted_streams` argument having\nbeen `True`) can optionally be deleted when the projection is deleted. See\n`delete_projection()` for details.\n\nUnlike the \"result\" and \"emitted\" streams, the \"state\" and the \"checkpoint\" streams\ncannot be read or subscribed to by users, or viewed in the \"stream browser\" view of\nEventStoreDB's Web interface.\n\n### Get projection state<a id=\"get-projection-state\"></a>\n\n*requires leader*\n\nThe `get_projection_state()` method can be used to get a projection's \"state\".\n\nThis method has a required `name` argument, which is a Python `str` that\nspecifies the name of a projection.\n\nThis method also has two optional arguments, `timeout` and `credentials`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nIn the example below, after sleeping for 1 second to allow the projection\nto process all the recorded events, the projection \"state\" is obtained.\nWe can see that the projection has processed three events.\n\n```python\nsleep(1)  # allow time for projection to process recorded events\n\nprojection_state = client.get_projection_state(name=projection_name)\n\nassert projection_state.value == {'count': 3}\n```\n\n### Get projection statistics<a id=\"get-projection-statistics\"></a>\n\n*requires leader*\n\nThe `get_projection_statistics()` method can be used to get projection statistics.\n\nThis method has a required `name` argument, which is a Python `str` that specifies the\nname of a projection.\n\nThis method also has two optional arguments, `timeout` and `credentials`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\nThis method returns a `ProjectionStatistics` object that represents\nthe named projection.\n\n```python\nstatistics = client.get_projection_statistics(name=projection_name)\n```\n\nA `ProjectionStatistics` object is returned. The attributes of this object\nhave values that represent the progress of the projection.\n\n### Update projection<a id=\"update-projection\"></a>\n\n*requires leader*\n\nThe `update_projection()` method can be used to update a projection.\n\nThis method has two required arguments, `name` and `query`.\n\nThe required `name` argument is a Python `str` which specifies the name of the projection\nto be updated.\n\nThe required `query` argument is a Python `str` which defines what the projection will do.\n\nThis method also has three optional arguments, `emit_enabled`, `timeout`, and `credentials`.\n\nThe optional `emit_enabled` argument is a Python `bool` which specifies whether a\nprojection will be able to emit events. If a `True` value is specified, the projection\nwill be able to emit events. If a `False` value is specified, the projection will not\nbe able to emit events. The default value of `emit_enabled` is `False`.\n\nPlease note, `emit_enabled` must be `True` if your projection query includes a call\nto `emit()`, otherwise the projection will not run.\n\nPlease note, it is not possible to update `track_emitted_streams` via the gRPC API.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\n```python\nclient.update_projection(name=projection_name, query=projection_query)\n```\n\n### Enable projection<a id=\"enable-projection\"></a>\n\n*requires leader*\n\nThe `enable_projection()` method can be used to enable (start running) a projection\nthat was previously disabled (stopped).\n\nThis method has a required `name` argument, which is a Python `str` that\nspecifies the name of the projection to be enabled.\n\nThis method also has two optional arguments, `timeout` and `credentials`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\n```python\nclient.enable_projection(name=projection_name)\n```\n\n### Disable projection<a id=\"disable-projection\"></a>\n\n*requires leader*\n\nThe `disable_projection()` method can be used to disable (stop running) a projection.\n\nThis method has a required `name` argument, which is a Python `str` that\nspecifies the name of the projection to be disabled.\n\nThis method also has two optional arguments, `timeout`, and `credentials`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\n```python\nclient.disable_projection(name=projection_name)\n```\n\n### Reset projection<a id=\"reset-projection\"></a>\n\n*requires leader*\n\nThe `reset_projection()` method can be used to reset a projection.\n\nThis method has a required `name` argument, which is a Python `str` that\nspecifies the name of the projection to be reset.\n\nThis method also has two optional arguments, `timeout`, and `credentials`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\n```python\nclient.reset_projection(name=projection_name)\n```\n\nPlease note, a projection must be disabled before it can be reset.\n\n\n### Delete projection<a id=\"delete-projection\"></a>\n\n*requires leader*\n\nThe `delete_projection()` method can be used to delete a projection.\n\nThis method has a required `name` argument, which is a Python `str` that\nspecifies the name of the projection to be deleted.\n\nThis method also has five optional arguments, `delete_emitted_streams`,\n`delete_state_stream`, `delete_checkpoint_stream`, `timeout`, and `credentials`.\n\nThe optional `delete_emitted_streams` argument is a Python `bool` which specifies\nthat all \"emitted\" streams that have been tracked will be deleted. For emitted streams\nto be deleted, they must have been tracked (see the `track_emitted_streams` argument of\nthe `create_projection()` method.)\n\nThe optional `delete_state_stream` argument is a Python `bool` which specifies that\nthe projection's \"state\" stream should also be deleted. The \"state\" stream is like\nthe \"result\" stream, but events are written to the \"state\" stream occasionally, along\nwith events written to the \"checkpoint\" stream, rather than being written immediately\nin the way a call `outputState()` immediately writes events to the \"result\" stream.\n\nThe optional `delete_checkpoint_stream` argument is a Python `bool` which specifies\nthat the projection's \"checkpoint\" stream should also be deleted.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\n```python\nclient.delete_projection(name=projection_name)\n```\n\nPlease note, a projection must be disabled before it can be deleted.\n\n### Restart projections subsystem<a id=\"restart-projections-subsystem\"></a>\n\n*requires leader*\n\nThe `restart_projections_subsystem()` method can be used to restart the projections subsystem.\n\nThis method also has two optional arguments, `timeout` and `credentials`.\n\nThe optional `timeout` argument is a Python `float` which sets a\nmaximum duration, in seconds, for the completion of the gRPC operation.\n\nThe optional `credentials` argument can be used to\noverride call credentials derived from the connection string URI.\n\n```python\nclient.restart_projections_subsystem()\n```\n\n\n## Call credentials<a id=\"call-credentials\"></a>\n\nDefault call credentials are derived by the client from the user info part of the\nconnection string URI.\n\nMany of the client methods described above have an optional `credentials` argument,\nwhich can be used to set call credentials for an individual method call that override\nthose derived from the connection string URI.\n\nCall credentials are sent to \"secure\" servers in a \"basic auth\" authorization header.\nThis authorization header is used by the server to authenticate the client. The\nauthorization header is not sent to \"insecure\" servers.\n\n\n### Construct call credentials<a id=\"construct-call-credentials\"></a>\n\nThe client method `construct_call_credentials()` can be used to construct a call\ncredentials object from a username and password.\n\n```python\ncall_credentials = client.construct_call_credentials(\n    username='admin', password='changeit'\n)\n```\n\nThe call credentials object can be used as the value of the `credentials`\nargument in other client methods.\n\n## Connection<a id=\"connection\"></a>\n\n### Reconnect<a id=\"reconnect\"></a>\n\nThe `reconnect()` method can be used to manually reconnect the client to a\nsuitable EventStoreDB node. This method uses the same routine for reading the\ncluster node states and then connecting to a suitable node according to the\nclient's node preference that is specified in the connection string URI when\nthe client is constructed. This method is thread-safe, in that when it is called\nby several threads at the same time, only one reconnection will occur. Concurrent\nattempts to reconnect will block until the client has reconnected successfully,\nand then they will all return normally.\n\n```python\nclient.reconnect()\n```\n\nAn example of when it might be desirable to reconnect manually is when (for performance\nreasons) the client's node preference is to be connected to a follower node in the\ncluster, and, after a cluster leader election, the follower becomes the leader.\nReconnecting to a follower node in this case is currently beyond the capabilities of\nthis client, but this behavior might be implemented in a future release.\n\nReconnection will happen automatically in many cases, due to the `@autoreconnect`\ndecorator.\n\n### Close<a id=\"close\"></a>\n\nThe `close()` method can be used to cleanly close the client's gRPC connection.\n\n```python\nclient.close()\n```\n\n\n## Asyncio client<a id=\"asyncio-client\"></a>\n\nThe `esdbclient` package also provides an asynchronous I/O gRPC Python client for\nEventStoreDB. It is functionally equivalent to the multithreaded client. It uses\nthe `grpc.aio` package and the `asyncio` module, instead of `grpc` and `threading`.\n\nIt supports both the \"esdb\" and the \"esdb+discover\" connection string URI schemes,\nand can connect to both \"secure\" and \"insecure\" EventStoreDB servers.\n\nThe class `AsyncEventStoreDBClient` can be used to construct an instance of the\nasynchronous I/O gRPC Python client. It can be imported from `esdbclient`. The\nasync method `connect()` should be called after constructing the client.\n\nThe asyncio client has exactly the same methods as the multithreaded `EventStoreDBClient`.\nThese methods are defined as `async def` methods, and so calls to these methods will\nreturn Python \"awaitables\" that must be awaited to obtain the method return values.\nThe methods have the same behaviors, the same arguments and the same or equivalent\nreturn values. The methods are similarly decorated with reconnect and retry decorators,\nthat selectively reconnect and retry when connection issues or server errors are\nencountered.\n\nWhen awaited, the methods `read_all()` and `read_stream()` return an `AsyncReadResponse`\nobject. The methods `subscribe_to_all()` and `subscribe_to_stream()` return an\n`AsyncCatchupSubscription` object. The methods `read_subscription_to_all()` and\n`read_subscription_to_stream()` return an `AsyncPersistentSubscription` object.\nThese objects are asyncio iterables, which you can iterate over with Python's `async for`\nsyntax to obtain `RecordedEvent` objects. They are also asyncio context managers,\nsupporting the `async with` syntax. They also have a `stop()` method which can be\nused to terminate the iterator in a way that actively cancels the streaming gRPC call\nto the server. When used as a context manager, the `stop()` method will be called when\nthe context manager exits.\n\nThe methods `read_subscription_to_all()` and `read_subscription_to_stream()` return\ninstances of the class `AsyncPersistentSubscription`, which has async methods `ack()`,\n`nack()` that work in the same way as the methods on `PersistentSubscription`,\nsupporting the acknowledgement and negative acknowledgement of recorded events that\nhave been received from a persistent subscription. See above for details.\n\n### Synopsis<a id=\"synopsis-1\"></a>\n\nThe example below demonstrates the async `append_to_stream()`, `get_stream()` and\n`subscribe_to_all()` methods. These are the most useful methods for writing\nan event-sourced application, allowing new aggregate events to be recorded, the\nrecorded events of an aggregate to be obtained so aggregates can be reconstructed,\nand the state of an application to propagated and processed with \"exactly-once\"\nsemantics.\n\n```python\nimport asyncio\n\nfrom esdbclient import AsyncEventStoreDBClient\n\n\nasync def demonstrate_async_client():\n\n    # Construct client.\n    client = AsyncEventStoreDBClient(\n        uri=os.getenv(\"ESDB_URI\"),\n        root_certificates=os.getenv(\"ESDB_ROOT_CERTIFICATES\"),\n    )\n\n    # Connect to EventStoreDB.\n    await client.connect()\n\n    # Append events.\n    stream_name = str(uuid.uuid4())\n    event1 = NewEvent(\"OrderCreated\", data=b'{}')\n    event2 = NewEvent(\"OrderUpdated\", data=b'{}')\n    event3 = NewEvent(\"OrderDeleted\", data=b'{}')\n\n    commit_position = await client.append_to_stream(\n        stream_name=stream_name,\n        current_version=StreamState.NO_STREAM,\n        events=[event1, event2, event3]\n    )\n\n    # Get stream events.\n    recorded = await client.get_stream(stream_name)\n    assert len(recorded) == 3\n    assert recorded[0] == event1\n    assert recorded[1] == event2\n    assert recorded[2] == event3\n\n\n    # Subscribe to all events.\n    received = []\n    async with await client.subscribe_to_all(commit_position=0) as subscription:\n        async for event in subscription:\n            received.append(event)\n            if event.commit_position == commit_position:\n                break\n    assert received[-3] == event1\n    assert received[-2] == event2\n    assert received[-1] == event3\n\n\n    # Close the client.\n    await client.close()\n\n\n# Run the demo.\nasyncio.run(\n    demonstrate_async_client()\n)\n```\n\n### FastAPI example<a id=\"fastapi\"></a>\n\nThe example below shows how to use `AsyncEventStoreDBClient` with [FastAPI](https://fastapi.tiangolo.com).\n\n```python\nfrom contextlib import asynccontextmanager\n\nfrom fastapi import FastAPI\n\nfrom esdbclient import AsyncEventStoreDBClient\n\n\nclient: AsyncEventStoreDBClient\n\n\n@asynccontextmanager\nasync def lifespan(_: FastAPI):\n    # Construct the client.\n    global client\n    client = AsyncEventStoreDBClient(\n        uri=\"esdb+discover://localhost:2113?Tls=false\",\n    )\n    await client.connect()\n\n    yield\n\n    # Close the client.\n    await client.close()\n\n\napp = FastAPI(lifespan=lifespan)\n\n\n@app.get(\"/commit_position\")\nasync def commit_position():\n    commit_position = await client.get_commit_position()\n    return {\"commit_position\": commit_position}\n```\n\nIf you put this code in a file called `fastapi_example.py` and then run command\n`uvicorn fastapi_example:app --host 0.0.0.0 --port 80`, then the FastAPI application\nwill return something like `{\"commit_position\":628917}` when a browser is pointed\nto `http://localhost/commit_position`. Use Ctrl-c to exit the process.\n\n## Notes<a id=\"notes\"></a>\n\n### Regular expression filters<a id=\"regular-expression-filters\"></a>\n\nThe `read_all()`, `subscribe_to_all()`, `create_subscription_to_all()`\nand `get_commit_position()` methods have `filter_exclude` and `filter_include`\narguments. This section provides some more details about the values of these\narguments.\n\nThe first thing to note is that the values of these arguments should be sequences\nof regular expressions.\n\nPlease note, they are concatenated together by the client as bracketed alternatives in a larger\nregular expression that is anchored to the start and end of the strings being\nmatched. So there is no need to include the `'^'` and `'$'` anchor assertions.\n\nYou should use wildcards if you want to match substrings, for example `'.*Snapshot'`\nto match all strings that end with `'Snapshot`', or `'Order.*'` to match all strings\nthat start with `'Order'`.\n\nSystem events generated by EventStoreDB have `type` strings that start with\nthe `$` sign. Persistence subscription events generated when manipulating\npersistence subscriptions have `type` strings that start with `PersistentConfig`.\n\nFor example, to match the type of EventStoreDB system events, use the regular\nexpression string `r'\\$.+'`. Please note, the constant `ESDB_SYSTEM_EVENTS_REGEX` is\nset to this value. You can import this constant from `esdbclient` and use it when\nbuilding longer sequences of regular expressions.\n\nSimilarly, to match the type of EventStoreDB persistence subscription events, use the\nregular expression `r'PersistentConfig\\d+'`. The constant `ESDB_PERSISTENT_CONFIG_EVENTS_REGEX`\nis set to this value. You can import this constant from `esdbclient` and use it when\nbuilding longer sequences of regular expressions.\n\nThe constant `DEFAULT_EXCLUDE_FILTER` is a sequence of regular expressions that includes\nboth `ESDB_SYSTEM_EVENTS_REGEX` and `ESDB_PERSISTENT_CONFIG_EVENTS_REGEX`. It is used\nas the default value of `filter_exclude` so that the events generated internally by\nEventStoreDB are excluded by default.\n\nIn all methods that have a `filter_exclude` argument, the default value of the argument\nis the constant `DEFAULT_EXCLUDE_FILTER`, which is designed to match (and therefore\nto exclude) both \"system\" and \"persistence subscription config\" event types, which\nwould otherwise be included.\n\nThis value can be extended. For example, if you want to exclude system events and\npersistent subscription events and also events that have a type that ends with\n`'Snapshot'`, then you can use `DEFAULT_EXCLUDE_FILTER + ['.*Snapshot']` as the\n`filter_exclude` argument.\n\nThe `filter_include` and `filter_exclude` arguments are designed to have exactly\nthe opposite effect from each other, so that a sequence of strings given to\n`filter_include` will return exactly those events which would be excluded if\nthe same argument value were used with `filter_exclude`. And vice versa, so that\na sequence of strings given to `filter_exclude` will return exactly those events\nthat would not be included if the same argument value were used with `filter_include`.\n\n\n### Reconnect and retry method decorators<a id=\"reconnect-and-retry-method-decorators\"></a>\n\nPlease note, nearly all the client methods are decorated with the `@autoreconnect` and\nthe `@retrygrpc` decorators.\n\nThe `@autoreconnect` decorator will reconnect to a suitable node in the cluster when\nthe server to which the client has been connected has become unavailable, or when the\nclient's gRPC channel happens to have been closed. The client will also reconnect when\na method is called that requires a leader, and the client's node preference is to be\nconnected to a leader, but the node that the client has been connected to stops being\nthe leader. In this case, the client will reconnect to the current leader. After\nreconnecting, the failed operation will be retried.\n\nThe `@retrygrpc` decorator selectively retries gRPC operations that have failed due to\na timeout, network error, or server error. It doesn't retry operations that fail due to\nbad requests that will certainly fail again.\n\nPlease also note, the aspects not covered by the reconnect and retry decorator\nbehaviours have to do with methods that return iterators. For example, consider\nthe \"read response\" iterator returned from the `read_all()` method. The\n`read_all()` method will have returned, and the method decorators will therefore\nhave exited, before iterating over the \"read response\" begins. Therefore, if a\nconnection issue occurs whilst iterating over the \"read response\", it isn't possible\nfor any decorator on the `read_all()` method to trigger a reconnection.\n\nWith the \"catch-up subscription\" objects, there is an initial \"confirmation\" response\nfrom the server which is received and checked by the client. And so, when a call is\nmade to `subscribe_to_all()` or `subscribe_to_stream()`, if the server is unavailable,\nor if the channel has somehow been closed, or if the request fails for some other reason,\nthen the client will reconnect and retry. However, if an exception is raised when iterating over a\nsuccessfully returned \"catch-up subscription\" object, the catch-up subscription will\nneed to be restarted. Similarly, when reading persistent subscriptions, if there are\nconnection issues whilst iterating over a successfully received response, the consumer\nwill need to be restarted.\n\n## Instrumentation<a id=\"instrumentation\"></a>\n\nInstrumentation is the act of modifying software so that analysis can be performed on it.\nInstrumentation helps enterprises reveal areas or features where users frequently\nencounter errors or slowdowns in their software or platform.\n\nInstrumentation helps you understand the inner state of your software systems.\nInstrumented applications measure what code is doing when it responds to active\nrequests by collecting data such as metrics, events, logs, and traces.\n\nInstrumentation provides immediate visibility into your application, often using\ncharts and graphs to illustrate what is going on \u201cunder the hood.\u201d\n\nThis package supports instrumenting the EventStoreDB clients with OpenTelemetry.\n\n### OpenTelemetry<a id=\"open-telemetry\"></a>\n\nThe [OpenTelemetry](https://opentelemetry.io) project provides a collection of APIs,\nSDKs, and tools for instrumenting, generating, collecting, and exporting telemetry data,\nthat can help you analyze your software\u2019s performance and behavior. It is vendor-neutral,\n100% Free and Open Source, and adopted and supported by industry leaders in the\nobservability space.\n\nThis package provides OpenTelemetry instrumentors for both the `EventStoreDBClient`\nand the `AsyncEventStoreDBClient` clients. These instrumentors depend on various\nOpenTelemetry Python packages, which you will need to install, preferably with this\nproject's \"opentelemetry\" package extra to ensure verified version compatibility.\n\nFor example, you can install the \"opentelemetry\" package extra with pip.\n\n    $ pip install esdbclient[opentelemetry]\n\nOr you can use Poetry to add it to your pyproject.toml file and install it.\n\n    $ poetry add esdbclient[opentelemetry]\n\n\nYou can then use the OpenTelemetry instrumentor `EventStoreDBClientInstrumentor` to\ninstrument the `EventStoreDBClient`.\n\n```python\nfrom esdbclient.instrumentation.opentelemetry import EventStoreDBClientInstrumentor\n\n# Activate instrumentation.\nEventStoreDBClientInstrumentor().instrument()\n\n# Deactivate instrumentation.\nEventStoreDBClientInstrumentor().uninstrument()\n```\n\nYou can also use the OpenTelemetry instrumentor `AsyncEventStoreDBClientInstrumentor`\nto instrument the `AsyncEventStoreDBClient`.\n\n```python\nfrom esdbclient.instrumentation.opentelemetry import AsyncEventStoreDBClientInstrumentor\n\n# Activate instrumentation.\nAsyncEventStoreDBClientInstrumentor().instrument()\n\n# Deactivate instrumentation.\nAsyncEventStoreDBClientInstrumentor().uninstrument()\n```\n\nThe instrumentors use a global OpenTelemetry \"tracer provider\", which you will need to\ninitialise in order to export telemetry data.\n\nFor example, to export data to the console you will need to install the Python\npackage `opentelemetry-sdk`, and use the class `TracerProvider`, `BatchSpanProcessor`,\nand `ConsoleSpanExporter` in the following way.\n\n```python\nfrom opentelemetry.sdk.resources import SERVICE_NAME, Resource\nfrom opentelemetry.sdk.trace import TracerProvider\nfrom opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter\nfrom opentelemetry.trace import set_tracer_provider\n\nresource = Resource.create(\n    attributes={\n        SERVICE_NAME: \"eventstoredb\",\n    }\n)\nprovider = TracerProvider(resource=resource)\nprovider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))\nset_tracer_provider(provider)\n```\n\nOr to export to an OpenTelemetry compatible data collector, such as\n[Jaeger](https://www.jaegertracing.io), you will need to install the Python package\n`opentelemetry-exporter-otlp-proto-http`, and then use the class `OTLPSpanExporter`\nfrom the `opentelemetry.exporter.otlp.proto.http.trace_exporter` module, with an\nappropriate `endpoint` argument for your collector.\n\n```python\nfrom opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter\nfrom opentelemetry.sdk.resources import SERVICE_NAME, Resource\nfrom opentelemetry.sdk.trace import TracerProvider\nfrom opentelemetry.sdk.trace.export import BatchSpanProcessor\nfrom opentelemetry.trace import set_tracer_provider\n\nresource = Resource.create(\n    attributes={\n        SERVICE_NAME: \"eventstoredb\",\n    }\n)\nprovider = TracerProvider(resource=resource)\nprovider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=\"http://localhost:4318/v1/traces\")))\nset_tracer_provider(provider)\n```\n\nYou can start Jaeger locally by running the following command.\n\n    $ docker run -d -p 4318:4318 -p 16686:16686 --name jaeger jaegertracing/all-in-one:latest\n\nYou can then navigate to `http://localhost:16686` to access the Jaeger UI. And telemetry\ndata can be sent by an OpenTelemetry tracer provider to `http://localhost:4318/v1/traces`.\n\nAt this time, the instrumented methods are `append_to_stream()`, `subscribe_to_stream()`\n`subscribe_to_all()`, `read_subscription_to_stream()`, `read_subscription_to_all()`.\n\nThe `append_to_stream()` method is instrumented by spanning the method call with a\n\"producer\" span kind. It also adds span context information to the new event metadata\nso that consumers can associate \"consumer\" spans with the \"producer\" span.\n\nThe subscription methods are instrumented by instrumenting the response iterators,\ncreating a \"consumer\" span for each recorded event received. It extracts span\ncontext information from the recorded event metadata and associates the \"consumer\"\nspans with a \"producer\" span, by making the \"consumer\" span a child of the \"producer\"\nspan.\n\n\n## Communities<a id=\"communities\"></a>\n\n- [Issues](https://github.com/pyeventsourcing/esdbclient/issues)\n- [Discuss](https://discuss.eventstore.com/)\n- [Discord (Event Store)](https://discord.gg/Phn9pmCw3t)\n\n\n## Contributors<a id=\"contributors\"></a>\n\n### Install Poetry<a id=\"install-poetry\"></a>\n\nThe first thing is to check you have Poetry installed.\n\n    $ poetry --version\n\nIf you don't, then please [install Poetry](https://python-poetry.org/docs/#installing-with-the-official-installer).\n\n    $ curl -sSL https://install.python-poetry.org | python3 -\n\nIt will help to make sure Poetry's bin directory is in your `PATH` environment variable.\n\nBut in any case, make sure you know the path to the `poetry` executable. The Poetry\ninstaller tells you where it has been installed, and how to configure your shell.\n\nPlease refer to the [Poetry docs](https://python-poetry.org/docs/) for guidance on\nusing Poetry.\n\n### Setup for PyCharm users<a id=\"setup-for-pycharm-users\"></a>\n\nYou can easily obtain the project files using PyCharm (menu \"Git > Clone...\").\nPyCharm will then usually prompt you to open the project.\n\nOpen the project in a new window. PyCharm will then usually prompt you to create\na new virtual environment.\n\nCreate a new Poetry virtual environment for the project. If PyCharm doesn't already\nknow where your `poetry` executable is, then set the path to your `poetry` executable\nin the \"New Poetry Environment\" form input field labelled \"Poetry executable\". In the\n\"New Poetry Environment\" form, you will also have the opportunity to select which\nPython executable will be used by the virtual environment.\n\nPyCharm will then create a new Poetry virtual environment for your project, using\na particular version of Python, and also install into this virtual environment the\nproject's package dependencies according to the project's `poetry.lock` file.\n\nYou can add different Poetry environments for different Python versions, and switch\nbetween them using the \"Python Interpreter\" settings of PyCharm. If you want to use\na version of Python that isn't installed, either use your favourite package manager,\nor install Python by downloading an installer for recent versions of Python directly\nfrom the [Python website](https://www.python.org/downloads/).\n\nOnce project dependencies have been installed, you should be able to run tests\nfrom within PyCharm (right-click on the `tests` folder and select the 'Run' option).\n\nBecause of a conflict between pytest and PyCharm's debugger and the coverage tool,\nyou may need to add ``--no-cov`` as an option to the test runner template. Alternatively,\njust use the Python Standard Library's ``unittest`` module.\n\nYou should also be able to open a terminal window in PyCharm, and run the project's\nMakefile commands from the command line (see below).\n\n### Setup from command line<a id=\"setup-from-command-line\"></a>\n\nObtain the project files, using Git or suitable alternative.\n\nIn a terminal application, change your current working directory\nto the root folder of the project files. There should be a Makefile\nin this folder.\n\nUse the Makefile to create a new Poetry virtual environment for the\nproject and install the project's package dependencies into it,\nusing the following command.\n\n    $ make install-packages\n\nIt's also possible to also install the project in 'editable mode'.\n\n    $ make install\n\nPlease note, if you create the virtual environment in this way, and then try to\nopen the project in PyCharm and configure the project to use this virtual\nenvironment as an \"Existing Poetry Environment\", PyCharm sometimes has some\nissues (don't know why) which might be problematic. If you encounter such\nissues, you can resolve these issues by deleting the virtual environment\nand creating the Poetry virtual environment using PyCharm (see above).\n\n### Project Makefile commands<a id=\"project-makefile-commands\"></a>\n\nYou can start EventStoreDB using the following command.\n\n    $ make start-eventstoredb\n\nYou can run tests using the following command (needs EventStoreDB to be running).\n\n    $ make test\n\nYou can stop EventStoreDB using the following command.\n\n    $ make stop-eventstoredb\n\nYou can check the formatting of the code using the following command.\n\n    $ make lint\n\nYou can reformat the code using the following command.\n\n    $ make fmt\n\nTests belong in `./tests`. Code-under-test belongs in `./esdbclient`.\n\nEdit package dependencies in `pyproject.toml`. Update installed packages (and the\n`poetry.lock` file) using the following command.\n\n    $ make update-packages\n",
    "bugtrack_url": null,
    "license": "BSD 3-Clause",
    "summary": "Python gRPC Client for EventStoreDB",
    "version": "1.1.3",
    "project_urls": {
        "Homepage": "https://github.com/pyeventsourcing/esdbclient",
        "Repository": "https://github.com/pyeventsourcing/esdbclient"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "78f3567db05e7c2d8efe0211263340854f9fab3284213b0d3062bbb589109978",
                "md5": "af96dfaf12de797845c826354079720f",
                "sha256": "bc3f337ff6d20e5b2dfe0c552d8bf4ce45ab8d60129b4ebca6120668a4052c10"
            },
            "downloads": -1,
            "filename": "esdbclient-1.1.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "af96dfaf12de797845c826354079720f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.8",
            "size": 145031,
            "upload_time": "2024-11-08T02:41:52",
            "upload_time_iso_8601": "2024-11-08T02:41:52.257200Z",
            "url": "https://files.pythonhosted.org/packages/78/f3/567db05e7c2d8efe0211263340854f9fab3284213b0d3062bbb589109978/esdbclient-1.1.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "16ea39338bea95be626ef583c88db91d4e7ff27e339e6754126db5a8e0326d49",
                "md5": "cf580e3717064b938d3d1ed1978ed0db",
                "sha256": "be40ed914e322ca954d279a56ecb0a2b44894554bdb24ff817e946ebed9a89c6"
            },
            "downloads": -1,
            "filename": "esdbclient-1.1.3.tar.gz",
            "has_sig": false,
            "md5_digest": "cf580e3717064b938d3d1ed1978ed0db",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.8",
            "size": 162993,
            "upload_time": "2024-11-08T02:41:54",
            "upload_time_iso_8601": "2024-11-08T02:41:54.654414Z",
            "url": "https://files.pythonhosted.org/packages/16/ea/39338bea95be626ef583c88db91d4e7ff27e339e6754126db5a8e0326d49/esdbclient-1.1.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-08 02:41:54",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pyeventsourcing",
    "github_project": "esdbclient",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "lcname": "esdbclient"
}
        
Elapsed time: 0.93156s