eventsourcingdb


Nameeventsourcingdb JSON
Version 1.5.0 PyPI version JSON
download
home_pageNone
SummaryThe official Python client SDK for EventSourcingDB.
upload_time2025-07-29 20:35:55
maintainerNone
docs_urlNone
authorNone
requires_python<=3.13,>=3.11
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # eventsourcingdb

The official Python client SDK for [EventSourcingDB](https://www.eventsourcingdb.io) – a purpose-built database for event sourcing.

EventSourcingDB enables you to build and operate event-driven applications with native support for writing, reading, and observing events. This client SDK provides convenient access to its capabilities in Python.

For more information on EventSourcingDB, see its [official documentation](https://docs.eventsourcingdb.io/).

This client SDK includes support for [Testcontainers](https://testcontainers.com/) to spin up EventSourcingDB instances in integration tests. For details, see [Using Testcontainers](#using-testcontainers).

## Getting Started

Install the client SDK:

```shell
pip install eventsourcingdb
```

Import the `Client` class and create an instance by providing the URL of your EventSourcingDB instance and the API token to use:

```python
from eventsourcingdb import Client

url = 'http://localhost:3000'
api_token = 'secret'

client = Client(
  base_url = url,
  api_token = api_token,
)
```

Then call the `ping` function to check whether the instance is reachable. If it is not, the function will raise an error:

```python
await client.ping()
```

*Note that `ping` does not require authentication, so the call may succeed even if the API token is invalid.*

If you want to verify the API token, call `verify_api_token`. If the token is invalid, the function will raise an error:

```python
await client.verify_api_token()
```

### Writing Events

Call the `write_events` function and hand over a list with one or more events. You do not have to provide all event fields – some are automatically added by the server.

Specify `source`, `subject`, `type`, and `data` according to the [CloudEvents](https://docs.eventsourcingdb.io/fundamentals/cloud-events/) format.

The function returns the written events, including the fields added by the server:

```python
written_events = await client.write_events(
  events = [
    {
      'source': 'https://library.eventsourcingdb.io',
      'subject': '/books/42',
      'type': 'io.eventsourcingdb.library.book-acquired',
      'data': {
        'title': '2001 – A Space Odyssey',
        'author': 'Arthur C. Clarke',
        'isbn': '978-0756906788',
      },
    }
  ]
)
```

#### Using the `isSubjectPristine` precondition

If you only want to write events in case a subject (such as `/books/42`) does not yet have any events, import the `IsSubjectPristine` class and pass it as the second argument as a list of preconditions:

```python
from eventsourcingdb import IsSubjectPristine

written_events = await client.write_events(
  events = [
    # events
  ],
  preconditions = [
    IsSubjectPristine('/books/42')
  ],
)
```

#### Using the `isSubjectOnEventId` precondition

If you only want to write events in case the last event of a subject (such as `/books/42`) has a specific ID (e.g., `0`), import the `IsSubjectOnEventId` class and pass it as a list of preconditions in the second argument:

```python
from eventsourcingdb import IsSubjectOnEventId

written_events = await client.write_events(
  events = [
    # events
  ],
  preconditions = [
    IsSubjectOnEventId('/books/42', '0')
  ],
)
```

*Note that according to the CloudEvents standard, event IDs must be of type string.*

#### Using the `isEventQlTrue` precondition

If you want to write events depending on an EventQL query, import the `IsEventQlTrue` class and pass it as a list of preconditions in the second argument:

```python
from eventsourcingdb import IsEventQlTrue

written_events = await client.write_events(
  events = [
    # events
  ],
  preconditions = [
    IsEventQlTrue('FROM e IN events WHERE e.type == "io.eventsourcingdb.library.book-borrowed" PROJECT INTO COUNT() < 10')
  ],
)
```

*Note that the query must return a single row with a single value, which is interpreted as a boolean.*

### Reading Events

To read all events of a subject, call the `read_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.

The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:

```python
from eventsourcingdb import ReadEventsOptions

async for event in client.read_events(
  subject = '/books/42',
  options = ReadEventsOptions(
    recursive = False
  ),
):
  pass
```

#### Reading From Subjects Recursively

If you want to read not only all the events of a subject, but also the events of all nested subjects, set the `recursive` option to `True`:

```python
from eventsourcingdb import ReadEventsOptions

async for event in client.read_events(
  subject = '/books/42',
  options = ReadEventsOptions(
    recursive = True
  ),
):
  pass
```

This also allows you to read *all* events ever written. To do so, provide `/` as the subject and set `recursive` to `True`, since all subjects are nested under the root subject.

#### Reading in Anti-Chronological Order

By default, events are read in chronological order. To read in anti-chronological order, provide the `order` option and set it to `Order.ANTICHRONOLOGICAL`:

```python
from eventsourcingdb import (
  Order,
  ReadEventsOptions,
)

async for event in client.read_events(
  subject = '/books/42',
  options = ReadEventsOptions(
    recursive = False,
    order = Order.ANTICHRONOLOGICAL,
  ),
):
  pass
```

*Note that you can also specify `Order.CHRONOLOGICAL` to explicitly enforce the default order.*

#### Specifying Bounds

Sometimes you do not want to read all events, but only a range of events. For that, you can specify the `lower_bound` and `upper_bound` options – either one of them or even both at the same time.

Specify the ID and whether to include or exclude it, for both the lower and upper bound:

```python
from eventsourcingdb import (
  Bound,
  BoundType,
  ReadEventsOptions,
)

async for event in client.read_events(
  subject = '/books/42',
  options = ReadEventsOptions(
    recursive = False,
    lower_bound = Bound(id = '100', type = BoundType.INCLUSIVE),
    upper_bound = Bound(id = '200', type = BoundType.EXCLUSIVE),
  ),
):
  pass
```

#### Starting From the Latest Event of a Given Type

To read starting from the latest event of a given type, provide the `from_latest_event` option and specify the subject, the type, and how to proceed if no such event exists.

Possible options are `IfEventIsMissingDuringRead.READ_NOTHING`, which skips reading entirely, or `IfEventIsMissingDuringRead.READ_EVERYTHING`, which effectively behaves as if `from_latest_event` was not specified:

```python
from eventsourcingdb import (
  IfEventIsMissingDuringRead,
  ReadEventsOptions,
  ReadFromLatestEvent,
)

async for event in client.read_events(
  subject = '/books/42',
  options = ReadEventsOptions(
    recursive = False,
    from_latest_event = ReadFromLatestEvent(
      subject = '/books/42',
      type = 'io.eventsourcingdb.library.book-borrowed',
      if_event_is_missing = IfEventIsMissingDuringRead.READ_EVERYTHING,
    ),
  ),
):
  pass
```

*Note that `from_latest_event` and `lower_bound` can not be provided at the same time.*

#### Aborting Reading

If you need to abort reading use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.

To abort reading independently of that, store the generator in a variable, and close it explicitly:

```python
from eventsourcingdb import ReadEventsOptions

events = client.read_events(
  subject = '/books/42',
  options = ReadEventsOptions(
    recursive = False,
  ),
)

async for event in events:
  pass

# Somewhere else, abort the generator, which will cause
# reading to end.
await events.aclose()
```

### Running EventQL Queries

To run an EventQL query, call the `run_eventql_query` function and provide the query as argument. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:

```python
async for row in client.run_eventql_query(
  query = '''
    FROM e IN events
    PROJECT INTO e
  ''',
):
  pass
```

*Note that each row returned by the generator matches the projection specified in your query.*

#### Aborting a Query

If you need to abort a query use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.

To abort the query independently of that, store the generator in a variable, and close it explicitly:

```python
rows = client.run_eventql_query(
  query = '''
    FROM e IN events
    PROJECT INTO e
  ''',
)

async for row in rows:
  pass

# Somewhere else, abort the generator, which will cause
# the query to end.
await rows.aclose()
```

### Observing Events

To observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.

The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:

```python
from eventsourcingdb import ObserveEventsOptions

async for event in client.observe_events(
  subject = '/books/42',
  options = ObserveEventsOptions(
    recursive = False
  ),
):
  pass
```

#### Observing From Subjects Recursively

If you want to observe not only all the events of a subject, but also the events of all nested subjects, set the `recursive` option to `True`:

```python
from eventsourcingdb import ObserveEventsOptions

async for event in client.observe_events(
  subject = '/books/42',
  options = ObserveEventsOptions(
    recursive = True
  ),
):
  pass
```

This also allows you to observe *all* events ever written. To do so, provide `/` as the subject and set `recursive` to `True`, since all subjects are nested under the root subject.

#### Specifying Bounds

Sometimes you do not want to observe all events, but only a range of events. For that, you can specify the `lower_bound` option.

Specify the ID and whether to include or exclude it:

```python
from eventsourcingdb import (
  Bound,
  BoundType,
  ObserveEventsOptions,
)

async for event in client.observe_events(
  subject = '/books/42',
  options = ObserveEventsOptions(
    recursive = False,
    lower_bound = Bound(id = '100', type = BoundType.INCLUSIVE),
  ),
):
  pass
```

#### Starting From the Latest Event of a Given Type

To observe starting from the latest event of a given type, provide the `from_latest_event` option and specify the subject, the type, and how to proceed if no such event exists.

Possible options are `IfEventIsMissingDuringObserve.WAIT_FOR_EVENT`, which waits for an event of the given type to happen, or `IfEventIsMissingDuringObserve.READ_EVERYTHING`, which effectively behaves as if `from_latest_event` was not specified:

```python
from eventsourcingdb import (
  IfEventIsMissingDuringObserve,
  ObserveEventsOptions,
  ObserveFromLatestEvent,
)

async for event in client.observe_events(
  subject = '/books/42',
  options = ObserveEventsOptions(
    recursive = False,
    from_latest_event = ObserveFromLatestEvent(
      subject = '/books/42',
      type = 'io.eventsourcingdb.library.book-borrowed',
      if_event_is_missing = IfEventIsMissingDuringObserve.READ_EVERYTHING,
    ),
  ),
):
  pass
```

*Note that `from_latest_event` and `lower_bound` can not be provided at the same time.*

#### Aborting Observing

If you need to abort observing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.

To abort observing independently of that, store the generator in a variable, and close it explicitly:

```python
from eventsourcingdb import ObserveEventsOptions

events = client.observe_events(
  subject = '/books/42',
  options = ObserveEventsOptions(
    recursive = False
  ),
)

async for event in events:
  pass

# Somewhere else, abort the generator, which will cause
# observing to end.
await events.aclose()
```

### Registering an Event Schema

To register an event schema, call the `register_event_schema` function and hand over an event type and the desired schema:

```python
await client.register_event_schema(
  event_type = 'io.eventsourcingdb.library.book-acquired',
  json_schema = {
    'type': 'object',
    'properties': {
      'title': { 'type': 'string' },
      'author': { 'type': 'string' },
      'isbn': { 'type': 'string' },
    },
    'required': [
      'title',
      'author',
      'isbn'
    ],
    'additionalProperties': False,
  },
)
```

### Listing Subjects

To list all subjects, call the `read_subjects` function with `/` as the base subject. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:

```python
async for subject in client.read_subjects(
  base_subject = '/'
):
  pass
```

If you only want to list subjects within a specific branch, provide the desired base subject instead:

```python
async for subject in client.read_subjects(
  base_subject = '/books'
):
  pass
```

#### Aborting Listing

If you need to abort listing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.

To abort listing independently of that, store the generator in a variable, and close it explicitly:

```python
subjects = client.read_subjects(
  base_subject = '/'
)

async for subject in subjects:
  pass

# Somewhere else, abort the generator, which will cause
# reading to end.
await subjects.aclose()
```

### Listing Event Types

To list all event types, call the `read_event_types` function. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:

```python
async for event_type in client.read_event_types():
  pass
```

#### Aborting Listing

If you need to abort listing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.

To abort listing independently of that, store the generator in a variable, and close it explicitly:

```python
event_types = client.read_event_types()

async for event_type in event_types:
  pass

# Somewhere else, abort the generator, which will cause
# reading to end.
await event_types.aclose()
```

### Listing a Specific Event Type

To list a specific event type, call the `read_event_type` function with the event type as an argument. The function returns the detailed event type, which includes the schema:

```python
event_type = await client.read_event_type("io.eventsourcingdb.library.book-acquired")
```

### Using Testcontainers

Import the `Container` class, create an instance, call the `start` function to run a test container, get a client, run your test code, and finally call the `stop` function to stop the test container:

```python
from eventsourcingdb import Container

container = Container()
container.start()

client = container.get_client()

# ...

container.stop()
```

To check if the test container is running, call the `is_running` function:

```python
is_running = container.is_running()
```

#### Configuring the Container Instance

By default, `Container` uses the `latest` tag of the official EventSourcingDB Docker image. To change that, call the `with_image_tag` function:

```python
container = (
  Container()
    .with_image_tag('1.0.0')
)
```

Similarly, you can configure the port to use and the API token. Call the `with_port` or the `with_api_token` function respectively:

```python
container = (
  Container()
    .with_port(4000)
    .with_api_token('secret')
)
```

#### Configuring the Client Manually

In case you need to set up the client yourself, use the following functions to get details on the container:

- `get_host()` returns the host name
- `get_mapped_port()` returns the port
- `get_base_url()` returns the full URL of the container
- `get_api_token()` returns the API token

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "eventsourcingdb",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<=3.13,>=3.11",
    "maintainer_email": null,
    "keywords": null,
    "author": null,
    "author_email": "the native web GmbH <hello@thenativeweb.io>",
    "download_url": "https://files.pythonhosted.org/packages/50/f1/f5a2dde785ce089d07c18d201feae1009d3e04dbaa31ff04c236f01fa208/eventsourcingdb-1.5.0.tar.gz",
    "platform": null,
    "description": "# eventsourcingdb\n\nThe official Python client SDK for [EventSourcingDB](https://www.eventsourcingdb.io) \u2013 a purpose-built database for event sourcing.\n\nEventSourcingDB enables you to build and operate event-driven applications with native support for writing, reading, and observing events. This client SDK provides convenient access to its capabilities in Python.\n\nFor more information on EventSourcingDB, see its [official documentation](https://docs.eventsourcingdb.io/).\n\nThis client SDK includes support for [Testcontainers](https://testcontainers.com/) to spin up EventSourcingDB instances in integration tests. For details, see [Using Testcontainers](#using-testcontainers).\n\n## Getting Started\n\nInstall the client SDK:\n\n```shell\npip install eventsourcingdb\n```\n\nImport the `Client` class and create an instance by providing the URL of your EventSourcingDB instance and the API token to use:\n\n```python\nfrom eventsourcingdb import Client\n\nurl = 'http://localhost:3000'\napi_token = 'secret'\n\nclient = Client(\n  base_url = url,\n  api_token = api_token,\n)\n```\n\nThen call the `ping` function to check whether the instance is reachable. If it is not, the function will raise an error:\n\n```python\nawait client.ping()\n```\n\n*Note that `ping` does not require authentication, so the call may succeed even if the API token is invalid.*\n\nIf you want to verify the API token, call `verify_api_token`. If the token is invalid, the function will raise an error:\n\n```python\nawait client.verify_api_token()\n```\n\n### Writing Events\n\nCall the `write_events` function and hand over a list with one or more events. You do not have to provide all event fields \u2013 some are automatically added by the server.\n\nSpecify `source`, `subject`, `type`, and `data` according to the [CloudEvents](https://docs.eventsourcingdb.io/fundamentals/cloud-events/) format.\n\nThe function returns the written events, including the fields added by the server:\n\n```python\nwritten_events = await client.write_events(\n  events = [\n    {\n      'source': 'https://library.eventsourcingdb.io',\n      'subject': '/books/42',\n      'type': 'io.eventsourcingdb.library.book-acquired',\n      'data': {\n        'title': '2001 \u2013 A Space Odyssey',\n        'author': 'Arthur C. Clarke',\n        'isbn': '978-0756906788',\n      },\n    }\n  ]\n)\n```\n\n#### Using the `isSubjectPristine` precondition\n\nIf you only want to write events in case a subject (such as `/books/42`) does not yet have any events, import the `IsSubjectPristine` class and pass it as the second argument as a list of preconditions:\n\n```python\nfrom eventsourcingdb import IsSubjectPristine\n\nwritten_events = await client.write_events(\n  events = [\n    # events\n  ],\n  preconditions = [\n    IsSubjectPristine('/books/42')\n  ],\n)\n```\n\n#### Using the `isSubjectOnEventId` precondition\n\nIf you only want to write events in case the last event of a subject (such as `/books/42`) has a specific ID (e.g., `0`), import the `IsSubjectOnEventId` class and pass it as a list of preconditions in the second argument:\n\n```python\nfrom eventsourcingdb import IsSubjectOnEventId\n\nwritten_events = await client.write_events(\n  events = [\n    # events\n  ],\n  preconditions = [\n    IsSubjectOnEventId('/books/42', '0')\n  ],\n)\n```\n\n*Note that according to the CloudEvents standard, event IDs must be of type string.*\n\n#### Using the `isEventQlTrue` precondition\n\nIf you want to write events depending on an EventQL query, import the `IsEventQlTrue` class and pass it as a list of preconditions in the second argument:\n\n```python\nfrom eventsourcingdb import IsEventQlTrue\n\nwritten_events = await client.write_events(\n  events = [\n    # events\n  ],\n  preconditions = [\n    IsEventQlTrue('FROM e IN events WHERE e.type == \"io.eventsourcingdb.library.book-borrowed\" PROJECT INTO COUNT() < 10')\n  ],\n)\n```\n\n*Note that the query must return a single row with a single value, which is interpreted as a boolean.*\n\n### Reading Events\n\nTo read all events of a subject, call the `read_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.\n\nThe function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nfrom eventsourcingdb import ReadEventsOptions\n\nasync for event in client.read_events(\n  subject = '/books/42',\n  options = ReadEventsOptions(\n    recursive = False\n  ),\n):\n  pass\n```\n\n#### Reading From Subjects Recursively\n\nIf you want to read not only all the events of a subject, but also the events of all nested subjects, set the `recursive` option to `True`:\n\n```python\nfrom eventsourcingdb import ReadEventsOptions\n\nasync for event in client.read_events(\n  subject = '/books/42',\n  options = ReadEventsOptions(\n    recursive = True\n  ),\n):\n  pass\n```\n\nThis also allows you to read *all* events ever written. To do so, provide `/` as the subject and set `recursive` to `True`, since all subjects are nested under the root subject.\n\n#### Reading in Anti-Chronological Order\n\nBy default, events are read in chronological order. To read in anti-chronological order, provide the `order` option and set it to `Order.ANTICHRONOLOGICAL`:\n\n```python\nfrom eventsourcingdb import (\n  Order,\n  ReadEventsOptions,\n)\n\nasync for event in client.read_events(\n  subject = '/books/42',\n  options = ReadEventsOptions(\n    recursive = False,\n    order = Order.ANTICHRONOLOGICAL,\n  ),\n):\n  pass\n```\n\n*Note that you can also specify `Order.CHRONOLOGICAL` to explicitly enforce the default order.*\n\n#### Specifying Bounds\n\nSometimes you do not want to read all events, but only a range of events. For that, you can specify the `lower_bound` and `upper_bound` options \u2013 either one of them or even both at the same time.\n\nSpecify the ID and whether to include or exclude it, for both the lower and upper bound:\n\n```python\nfrom eventsourcingdb import (\n  Bound,\n  BoundType,\n  ReadEventsOptions,\n)\n\nasync for event in client.read_events(\n  subject = '/books/42',\n  options = ReadEventsOptions(\n    recursive = False,\n    lower_bound = Bound(id = '100', type = BoundType.INCLUSIVE),\n    upper_bound = Bound(id = '200', type = BoundType.EXCLUSIVE),\n  ),\n):\n  pass\n```\n\n#### Starting From the Latest Event of a Given Type\n\nTo read starting from the latest event of a given type, provide the `from_latest_event` option and specify the subject, the type, and how to proceed if no such event exists.\n\nPossible options are `IfEventIsMissingDuringRead.READ_NOTHING`, which skips reading entirely, or `IfEventIsMissingDuringRead.READ_EVERYTHING`, which effectively behaves as if `from_latest_event` was not specified:\n\n```python\nfrom eventsourcingdb import (\n  IfEventIsMissingDuringRead,\n  ReadEventsOptions,\n  ReadFromLatestEvent,\n)\n\nasync for event in client.read_events(\n  subject = '/books/42',\n  options = ReadEventsOptions(\n    recursive = False,\n    from_latest_event = ReadFromLatestEvent(\n      subject = '/books/42',\n      type = 'io.eventsourcingdb.library.book-borrowed',\n      if_event_is_missing = IfEventIsMissingDuringRead.READ_EVERYTHING,\n    ),\n  ),\n):\n  pass\n```\n\n*Note that `from_latest_event` and `lower_bound` can not be provided at the same time.*\n\n#### Aborting Reading\n\nIf you need to abort reading use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort reading independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nfrom eventsourcingdb import ReadEventsOptions\n\nevents = client.read_events(\n  subject = '/books/42',\n  options = ReadEventsOptions(\n    recursive = False,\n  ),\n)\n\nasync for event in events:\n  pass\n\n# Somewhere else, abort the generator, which will cause\n# reading to end.\nawait events.aclose()\n```\n\n### Running EventQL Queries\n\nTo run an EventQL query, call the `run_eventql_query` function and provide the query as argument. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nasync for row in client.run_eventql_query(\n  query = '''\n    FROM e IN events\n    PROJECT INTO e\n  ''',\n):\n  pass\n```\n\n*Note that each row returned by the generator matches the projection specified in your query.*\n\n#### Aborting a Query\n\nIf you need to abort a query use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort the query independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nrows = client.run_eventql_query(\n  query = '''\n    FROM e IN events\n    PROJECT INTO e\n  ''',\n)\n\nasync for row in rows:\n  pass\n\n# Somewhere else, abort the generator, which will cause\n# the query to end.\nawait rows.aclose()\n```\n\n### Observing Events\n\nTo observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.\n\nThe function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nfrom eventsourcingdb import ObserveEventsOptions\n\nasync for event in client.observe_events(\n  subject = '/books/42',\n  options = ObserveEventsOptions(\n    recursive = False\n  ),\n):\n  pass\n```\n\n#### Observing From Subjects Recursively\n\nIf you want to observe not only all the events of a subject, but also the events of all nested subjects, set the `recursive` option to `True`:\n\n```python\nfrom eventsourcingdb import ObserveEventsOptions\n\nasync for event in client.observe_events(\n  subject = '/books/42',\n  options = ObserveEventsOptions(\n    recursive = True\n  ),\n):\n  pass\n```\n\nThis also allows you to observe *all* events ever written. To do so, provide `/` as the subject and set `recursive` to `True`, since all subjects are nested under the root subject.\n\n#### Specifying Bounds\n\nSometimes you do not want to observe all events, but only a range of events. For that, you can specify the `lower_bound` option.\n\nSpecify the ID and whether to include or exclude it:\n\n```python\nfrom eventsourcingdb import (\n  Bound,\n  BoundType,\n  ObserveEventsOptions,\n)\n\nasync for event in client.observe_events(\n  subject = '/books/42',\n  options = ObserveEventsOptions(\n    recursive = False,\n    lower_bound = Bound(id = '100', type = BoundType.INCLUSIVE),\n  ),\n):\n  pass\n```\n\n#### Starting From the Latest Event of a Given Type\n\nTo observe starting from the latest event of a given type, provide the `from_latest_event` option and specify the subject, the type, and how to proceed if no such event exists.\n\nPossible options are `IfEventIsMissingDuringObserve.WAIT_FOR_EVENT`, which waits for an event of the given type to happen, or `IfEventIsMissingDuringObserve.READ_EVERYTHING`, which effectively behaves as if `from_latest_event` was not specified:\n\n```python\nfrom eventsourcingdb import (\n  IfEventIsMissingDuringObserve,\n  ObserveEventsOptions,\n  ObserveFromLatestEvent,\n)\n\nasync for event in client.observe_events(\n  subject = '/books/42',\n  options = ObserveEventsOptions(\n    recursive = False,\n    from_latest_event = ObserveFromLatestEvent(\n      subject = '/books/42',\n      type = 'io.eventsourcingdb.library.book-borrowed',\n      if_event_is_missing = IfEventIsMissingDuringObserve.READ_EVERYTHING,\n    ),\n  ),\n):\n  pass\n```\n\n*Note that `from_latest_event` and `lower_bound` can not be provided at the same time.*\n\n#### Aborting Observing\n\nIf you need to abort observing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort observing independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nfrom eventsourcingdb import ObserveEventsOptions\n\nevents = client.observe_events(\n  subject = '/books/42',\n  options = ObserveEventsOptions(\n    recursive = False\n  ),\n)\n\nasync for event in events:\n  pass\n\n# Somewhere else, abort the generator, which will cause\n# observing to end.\nawait events.aclose()\n```\n\n### Registering an Event Schema\n\nTo register an event schema, call the `register_event_schema` function and hand over an event type and the desired schema:\n\n```python\nawait client.register_event_schema(\n  event_type = 'io.eventsourcingdb.library.book-acquired',\n  json_schema = {\n    'type': 'object',\n    'properties': {\n      'title': { 'type': 'string' },\n      'author': { 'type': 'string' },\n      'isbn': { 'type': 'string' },\n    },\n    'required': [\n      'title',\n      'author',\n      'isbn'\n    ],\n    'additionalProperties': False,\n  },\n)\n```\n\n### Listing Subjects\n\nTo list all subjects, call the `read_subjects` function with `/` as the base subject. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nasync for subject in client.read_subjects(\n  base_subject = '/'\n):\n  pass\n```\n\nIf you only want to list subjects within a specific branch, provide the desired base subject instead:\n\n```python\nasync for subject in client.read_subjects(\n  base_subject = '/books'\n):\n  pass\n```\n\n#### Aborting Listing\n\nIf you need to abort listing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort listing independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nsubjects = client.read_subjects(\n  base_subject = '/'\n)\n\nasync for subject in subjects:\n  pass\n\n# Somewhere else, abort the generator, which will cause\n# reading to end.\nawait subjects.aclose()\n```\n\n### Listing Event Types\n\nTo list all event types, call the `read_event_types` function. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nasync for event_type in client.read_event_types():\n  pass\n```\n\n#### Aborting Listing\n\nIf you need to abort listing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort listing independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nevent_types = client.read_event_types()\n\nasync for event_type in event_types:\n  pass\n\n# Somewhere else, abort the generator, which will cause\n# reading to end.\nawait event_types.aclose()\n```\n\n### Listing a Specific Event Type\n\nTo list a specific event type, call the `read_event_type` function with the event type as an argument. The function returns the detailed event type, which includes the schema:\n\n```python\nevent_type = await client.read_event_type(\"io.eventsourcingdb.library.book-acquired\")\n```\n\n### Using Testcontainers\n\nImport the `Container` class, create an instance, call the `start` function to run a test container, get a client, run your test code, and finally call the `stop` function to stop the test container:\n\n```python\nfrom eventsourcingdb import Container\n\ncontainer = Container()\ncontainer.start()\n\nclient = container.get_client()\n\n# ...\n\ncontainer.stop()\n```\n\nTo check if the test container is running, call the `is_running` function:\n\n```python\nis_running = container.is_running()\n```\n\n#### Configuring the Container Instance\n\nBy default, `Container` uses the `latest` tag of the official EventSourcingDB Docker image. To change that, call the `with_image_tag` function:\n\n```python\ncontainer = (\n  Container()\n    .with_image_tag('1.0.0')\n)\n```\n\nSimilarly, you can configure the port to use and the API token. Call the `with_port` or the `with_api_token` function respectively:\n\n```python\ncontainer = (\n  Container()\n    .with_port(4000)\n    .with_api_token('secret')\n)\n```\n\n#### Configuring the Client Manually\n\nIn case you need to set up the client yourself, use the following functions to get details on the container:\n\n- `get_host()` returns the host name\n- `get_mapped_port()` returns the port\n- `get_base_url()` returns the full URL of the container\n- `get_api_token()` returns the API token\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "The official Python client SDK for EventSourcingDB.",
    "version": "1.5.0",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "69bcd7b366b7f8b278851d53a5c8d2c35516b8199a1b366b9e6d4fa374ada32b",
                "md5": "1fa148a00edd581439bd5bcfe7d06518",
                "sha256": "b1b3ecb37b123b942b1b546ac04342a547da763a01ba7feb8dbdf620954a978f"
            },
            "downloads": -1,
            "filename": "eventsourcingdb-1.5.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "1fa148a00edd581439bd5bcfe7d06518",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<=3.13,>=3.11",
            "size": 25889,
            "upload_time": "2025-07-29T20:35:54",
            "upload_time_iso_8601": "2025-07-29T20:35:54.255011Z",
            "url": "https://files.pythonhosted.org/packages/69/bc/d7b366b7f8b278851d53a5c8d2c35516b8199a1b366b9e6d4fa374ada32b/eventsourcingdb-1.5.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "50f1f5a2dde785ce089d07c18d201feae1009d3e04dbaa31ff04c236f01fa208",
                "md5": "ed63ddba36a7f67772cf9178fddc5d9f",
                "sha256": "200bffa45b4d7e087d7d31d6ad1e55615d1bf4f503ac50d96d9f43de94b11426"
            },
            "downloads": -1,
            "filename": "eventsourcingdb-1.5.0.tar.gz",
            "has_sig": false,
            "md5_digest": "ed63ddba36a7f67772cf9178fddc5d9f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<=3.13,>=3.11",
            "size": 14387,
            "upload_time": "2025-07-29T20:35:55",
            "upload_time_iso_8601": "2025-07-29T20:35:55.359590Z",
            "url": "https://files.pythonhosted.org/packages/50/f1/f5a2dde785ce089d07c18d201feae1009d3e04dbaa31ff04c236f01fa208/eventsourcingdb-1.5.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-29 20:35:55",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "eventsourcingdb"
}
        
Elapsed time: 1.99682s