Name | eventsourcingdb JSON |
Version |
1.5.0
JSON |
| download |
home_page | None |
Summary | The official Python client SDK for EventSourcingDB. |
upload_time | 2025-07-29 20:35:55 |
maintainer | None |
docs_url | None |
author | None |
requires_python | <=3.13,>=3.11 |
license | None |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# eventsourcingdb
The official Python client SDK for [EventSourcingDB](https://www.eventsourcingdb.io) – a purpose-built database for event sourcing.
EventSourcingDB enables you to build and operate event-driven applications with native support for writing, reading, and observing events. This client SDK provides convenient access to its capabilities in Python.
For more information on EventSourcingDB, see its [official documentation](https://docs.eventsourcingdb.io/).
This client SDK includes support for [Testcontainers](https://testcontainers.com/) to spin up EventSourcingDB instances in integration tests. For details, see [Using Testcontainers](#using-testcontainers).
## Getting Started
Install the client SDK:
```shell
pip install eventsourcingdb
```
Import the `Client` class and create an instance by providing the URL of your EventSourcingDB instance and the API token to use:
```python
from eventsourcingdb import Client
url = 'http://localhost:3000'
api_token = 'secret'
client = Client(
base_url = url,
api_token = api_token,
)
```
Then call the `ping` function to check whether the instance is reachable. If it is not, the function will raise an error:
```python
await client.ping()
```
*Note that `ping` does not require authentication, so the call may succeed even if the API token is invalid.*
If you want to verify the API token, call `verify_api_token`. If the token is invalid, the function will raise an error:
```python
await client.verify_api_token()
```
### Writing Events
Call the `write_events` function and hand over a list with one or more events. You do not have to provide all event fields – some are automatically added by the server.
Specify `source`, `subject`, `type`, and `data` according to the [CloudEvents](https://docs.eventsourcingdb.io/fundamentals/cloud-events/) format.
The function returns the written events, including the fields added by the server:
```python
written_events = await client.write_events(
events = [
{
'source': 'https://library.eventsourcingdb.io',
'subject': '/books/42',
'type': 'io.eventsourcingdb.library.book-acquired',
'data': {
'title': '2001 – A Space Odyssey',
'author': 'Arthur C. Clarke',
'isbn': '978-0756906788',
},
}
]
)
```
#### Using the `isSubjectPristine` precondition
If you only want to write events in case a subject (such as `/books/42`) does not yet have any events, import the `IsSubjectPristine` class and pass it as the second argument as a list of preconditions:
```python
from eventsourcingdb import IsSubjectPristine
written_events = await client.write_events(
events = [
# events
],
preconditions = [
IsSubjectPristine('/books/42')
],
)
```
#### Using the `isSubjectOnEventId` precondition
If you only want to write events in case the last event of a subject (such as `/books/42`) has a specific ID (e.g., `0`), import the `IsSubjectOnEventId` class and pass it as a list of preconditions in the second argument:
```python
from eventsourcingdb import IsSubjectOnEventId
written_events = await client.write_events(
events = [
# events
],
preconditions = [
IsSubjectOnEventId('/books/42', '0')
],
)
```
*Note that according to the CloudEvents standard, event IDs must be of type string.*
#### Using the `isEventQlTrue` precondition
If you want to write events depending on an EventQL query, import the `IsEventQlTrue` class and pass it as a list of preconditions in the second argument:
```python
from eventsourcingdb import IsEventQlTrue
written_events = await client.write_events(
events = [
# events
],
preconditions = [
IsEventQlTrue('FROM e IN events WHERE e.type == "io.eventsourcingdb.library.book-borrowed" PROJECT INTO COUNT() < 10')
],
)
```
*Note that the query must return a single row with a single value, which is interpreted as a boolean.*
### Reading Events
To read all events of a subject, call the `read_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.
The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:
```python
from eventsourcingdb import ReadEventsOptions
async for event in client.read_events(
subject = '/books/42',
options = ReadEventsOptions(
recursive = False
),
):
pass
```
#### Reading From Subjects Recursively
If you want to read not only all the events of a subject, but also the events of all nested subjects, set the `recursive` option to `True`:
```python
from eventsourcingdb import ReadEventsOptions
async for event in client.read_events(
subject = '/books/42',
options = ReadEventsOptions(
recursive = True
),
):
pass
```
This also allows you to read *all* events ever written. To do so, provide `/` as the subject and set `recursive` to `True`, since all subjects are nested under the root subject.
#### Reading in Anti-Chronological Order
By default, events are read in chronological order. To read in anti-chronological order, provide the `order` option and set it to `Order.ANTICHRONOLOGICAL`:
```python
from eventsourcingdb import (
Order,
ReadEventsOptions,
)
async for event in client.read_events(
subject = '/books/42',
options = ReadEventsOptions(
recursive = False,
order = Order.ANTICHRONOLOGICAL,
),
):
pass
```
*Note that you can also specify `Order.CHRONOLOGICAL` to explicitly enforce the default order.*
#### Specifying Bounds
Sometimes you do not want to read all events, but only a range of events. For that, you can specify the `lower_bound` and `upper_bound` options – either one of them or even both at the same time.
Specify the ID and whether to include or exclude it, for both the lower and upper bound:
```python
from eventsourcingdb import (
Bound,
BoundType,
ReadEventsOptions,
)
async for event in client.read_events(
subject = '/books/42',
options = ReadEventsOptions(
recursive = False,
lower_bound = Bound(id = '100', type = BoundType.INCLUSIVE),
upper_bound = Bound(id = '200', type = BoundType.EXCLUSIVE),
),
):
pass
```
#### Starting From the Latest Event of a Given Type
To read starting from the latest event of a given type, provide the `from_latest_event` option and specify the subject, the type, and how to proceed if no such event exists.
Possible options are `IfEventIsMissingDuringRead.READ_NOTHING`, which skips reading entirely, or `IfEventIsMissingDuringRead.READ_EVERYTHING`, which effectively behaves as if `from_latest_event` was not specified:
```python
from eventsourcingdb import (
IfEventIsMissingDuringRead,
ReadEventsOptions,
ReadFromLatestEvent,
)
async for event in client.read_events(
subject = '/books/42',
options = ReadEventsOptions(
recursive = False,
from_latest_event = ReadFromLatestEvent(
subject = '/books/42',
type = 'io.eventsourcingdb.library.book-borrowed',
if_event_is_missing = IfEventIsMissingDuringRead.READ_EVERYTHING,
),
),
):
pass
```
*Note that `from_latest_event` and `lower_bound` can not be provided at the same time.*
#### Aborting Reading
If you need to abort reading use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.
To abort reading independently of that, store the generator in a variable, and close it explicitly:
```python
from eventsourcingdb import ReadEventsOptions
events = client.read_events(
subject = '/books/42',
options = ReadEventsOptions(
recursive = False,
),
)
async for event in events:
pass
# Somewhere else, abort the generator, which will cause
# reading to end.
await events.aclose()
```
### Running EventQL Queries
To run an EventQL query, call the `run_eventql_query` function and provide the query as argument. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:
```python
async for row in client.run_eventql_query(
query = '''
FROM e IN events
PROJECT INTO e
''',
):
pass
```
*Note that each row returned by the generator matches the projection specified in your query.*
#### Aborting a Query
If you need to abort a query use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.
To abort the query independently of that, store the generator in a variable, and close it explicitly:
```python
rows = client.run_eventql_query(
query = '''
FROM e IN events
PROJECT INTO e
''',
)
async for row in rows:
pass
# Somewhere else, abort the generator, which will cause
# the query to end.
await rows.aclose()
```
### Observing Events
To observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.
The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:
```python
from eventsourcingdb import ObserveEventsOptions
async for event in client.observe_events(
subject = '/books/42',
options = ObserveEventsOptions(
recursive = False
),
):
pass
```
#### Observing From Subjects Recursively
If you want to observe not only all the events of a subject, but also the events of all nested subjects, set the `recursive` option to `True`:
```python
from eventsourcingdb import ObserveEventsOptions
async for event in client.observe_events(
subject = '/books/42',
options = ObserveEventsOptions(
recursive = True
),
):
pass
```
This also allows you to observe *all* events ever written. To do so, provide `/` as the subject and set `recursive` to `True`, since all subjects are nested under the root subject.
#### Specifying Bounds
Sometimes you do not want to observe all events, but only a range of events. For that, you can specify the `lower_bound` option.
Specify the ID and whether to include or exclude it:
```python
from eventsourcingdb import (
Bound,
BoundType,
ObserveEventsOptions,
)
async for event in client.observe_events(
subject = '/books/42',
options = ObserveEventsOptions(
recursive = False,
lower_bound = Bound(id = '100', type = BoundType.INCLUSIVE),
),
):
pass
```
#### Starting From the Latest Event of a Given Type
To observe starting from the latest event of a given type, provide the `from_latest_event` option and specify the subject, the type, and how to proceed if no such event exists.
Possible options are `IfEventIsMissingDuringObserve.WAIT_FOR_EVENT`, which waits for an event of the given type to happen, or `IfEventIsMissingDuringObserve.READ_EVERYTHING`, which effectively behaves as if `from_latest_event` was not specified:
```python
from eventsourcingdb import (
IfEventIsMissingDuringObserve,
ObserveEventsOptions,
ObserveFromLatestEvent,
)
async for event in client.observe_events(
subject = '/books/42',
options = ObserveEventsOptions(
recursive = False,
from_latest_event = ObserveFromLatestEvent(
subject = '/books/42',
type = 'io.eventsourcingdb.library.book-borrowed',
if_event_is_missing = IfEventIsMissingDuringObserve.READ_EVERYTHING,
),
),
):
pass
```
*Note that `from_latest_event` and `lower_bound` can not be provided at the same time.*
#### Aborting Observing
If you need to abort observing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.
To abort observing independently of that, store the generator in a variable, and close it explicitly:
```python
from eventsourcingdb import ObserveEventsOptions
events = client.observe_events(
subject = '/books/42',
options = ObserveEventsOptions(
recursive = False
),
)
async for event in events:
pass
# Somewhere else, abort the generator, which will cause
# observing to end.
await events.aclose()
```
### Registering an Event Schema
To register an event schema, call the `register_event_schema` function and hand over an event type and the desired schema:
```python
await client.register_event_schema(
event_type = 'io.eventsourcingdb.library.book-acquired',
json_schema = {
'type': 'object',
'properties': {
'title': { 'type': 'string' },
'author': { 'type': 'string' },
'isbn': { 'type': 'string' },
},
'required': [
'title',
'author',
'isbn'
],
'additionalProperties': False,
},
)
```
### Listing Subjects
To list all subjects, call the `read_subjects` function with `/` as the base subject. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:
```python
async for subject in client.read_subjects(
base_subject = '/'
):
pass
```
If you only want to list subjects within a specific branch, provide the desired base subject instead:
```python
async for subject in client.read_subjects(
base_subject = '/books'
):
pass
```
#### Aborting Listing
If you need to abort listing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.
To abort listing independently of that, store the generator in a variable, and close it explicitly:
```python
subjects = client.read_subjects(
base_subject = '/'
)
async for subject in subjects:
pass
# Somewhere else, abort the generator, which will cause
# reading to end.
await subjects.aclose()
```
### Listing Event Types
To list all event types, call the `read_event_types` function. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:
```python
async for event_type in client.read_event_types():
pass
```
#### Aborting Listing
If you need to abort listing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.
To abort listing independently of that, store the generator in a variable, and close it explicitly:
```python
event_types = client.read_event_types()
async for event_type in event_types:
pass
# Somewhere else, abort the generator, which will cause
# reading to end.
await event_types.aclose()
```
### Listing a Specific Event Type
To list a specific event type, call the `read_event_type` function with the event type as an argument. The function returns the detailed event type, which includes the schema:
```python
event_type = await client.read_event_type("io.eventsourcingdb.library.book-acquired")
```
### Using Testcontainers
Import the `Container` class, create an instance, call the `start` function to run a test container, get a client, run your test code, and finally call the `stop` function to stop the test container:
```python
from eventsourcingdb import Container
container = Container()
container.start()
client = container.get_client()
# ...
container.stop()
```
To check if the test container is running, call the `is_running` function:
```python
is_running = container.is_running()
```
#### Configuring the Container Instance
By default, `Container` uses the `latest` tag of the official EventSourcingDB Docker image. To change that, call the `with_image_tag` function:
```python
container = (
Container()
.with_image_tag('1.0.0')
)
```
Similarly, you can configure the port to use and the API token. Call the `with_port` or the `with_api_token` function respectively:
```python
container = (
Container()
.with_port(4000)
.with_api_token('secret')
)
```
#### Configuring the Client Manually
In case you need to set up the client yourself, use the following functions to get details on the container:
- `get_host()` returns the host name
- `get_mapped_port()` returns the port
- `get_base_url()` returns the full URL of the container
- `get_api_token()` returns the API token
Raw data
{
"_id": null,
"home_page": null,
"name": "eventsourcingdb",
"maintainer": null,
"docs_url": null,
"requires_python": "<=3.13,>=3.11",
"maintainer_email": null,
"keywords": null,
"author": null,
"author_email": "the native web GmbH <hello@thenativeweb.io>",
"download_url": "https://files.pythonhosted.org/packages/50/f1/f5a2dde785ce089d07c18d201feae1009d3e04dbaa31ff04c236f01fa208/eventsourcingdb-1.5.0.tar.gz",
"platform": null,
"description": "# eventsourcingdb\n\nThe official Python client SDK for [EventSourcingDB](https://www.eventsourcingdb.io) \u2013 a purpose-built database for event sourcing.\n\nEventSourcingDB enables you to build and operate event-driven applications with native support for writing, reading, and observing events. This client SDK provides convenient access to its capabilities in Python.\n\nFor more information on EventSourcingDB, see its [official documentation](https://docs.eventsourcingdb.io/).\n\nThis client SDK includes support for [Testcontainers](https://testcontainers.com/) to spin up EventSourcingDB instances in integration tests. For details, see [Using Testcontainers](#using-testcontainers).\n\n## Getting Started\n\nInstall the client SDK:\n\n```shell\npip install eventsourcingdb\n```\n\nImport the `Client` class and create an instance by providing the URL of your EventSourcingDB instance and the API token to use:\n\n```python\nfrom eventsourcingdb import Client\n\nurl = 'http://localhost:3000'\napi_token = 'secret'\n\nclient = Client(\n base_url = url,\n api_token = api_token,\n)\n```\n\nThen call the `ping` function to check whether the instance is reachable. If it is not, the function will raise an error:\n\n```python\nawait client.ping()\n```\n\n*Note that `ping` does not require authentication, so the call may succeed even if the API token is invalid.*\n\nIf you want to verify the API token, call `verify_api_token`. If the token is invalid, the function will raise an error:\n\n```python\nawait client.verify_api_token()\n```\n\n### Writing Events\n\nCall the `write_events` function and hand over a list with one or more events. You do not have to provide all event fields \u2013 some are automatically added by the server.\n\nSpecify `source`, `subject`, `type`, and `data` according to the [CloudEvents](https://docs.eventsourcingdb.io/fundamentals/cloud-events/) format.\n\nThe function returns the written events, including the fields added by the server:\n\n```python\nwritten_events = await client.write_events(\n events = [\n {\n 'source': 'https://library.eventsourcingdb.io',\n 'subject': '/books/42',\n 'type': 'io.eventsourcingdb.library.book-acquired',\n 'data': {\n 'title': '2001 \u2013 A Space Odyssey',\n 'author': 'Arthur C. Clarke',\n 'isbn': '978-0756906788',\n },\n }\n ]\n)\n```\n\n#### Using the `isSubjectPristine` precondition\n\nIf you only want to write events in case a subject (such as `/books/42`) does not yet have any events, import the `IsSubjectPristine` class and pass it as the second argument as a list of preconditions:\n\n```python\nfrom eventsourcingdb import IsSubjectPristine\n\nwritten_events = await client.write_events(\n events = [\n # events\n ],\n preconditions = [\n IsSubjectPristine('/books/42')\n ],\n)\n```\n\n#### Using the `isSubjectOnEventId` precondition\n\nIf you only want to write events in case the last event of a subject (such as `/books/42`) has a specific ID (e.g., `0`), import the `IsSubjectOnEventId` class and pass it as a list of preconditions in the second argument:\n\n```python\nfrom eventsourcingdb import IsSubjectOnEventId\n\nwritten_events = await client.write_events(\n events = [\n # events\n ],\n preconditions = [\n IsSubjectOnEventId('/books/42', '0')\n ],\n)\n```\n\n*Note that according to the CloudEvents standard, event IDs must be of type string.*\n\n#### Using the `isEventQlTrue` precondition\n\nIf you want to write events depending on an EventQL query, import the `IsEventQlTrue` class and pass it as a list of preconditions in the second argument:\n\n```python\nfrom eventsourcingdb import IsEventQlTrue\n\nwritten_events = await client.write_events(\n events = [\n # events\n ],\n preconditions = [\n IsEventQlTrue('FROM e IN events WHERE e.type == \"io.eventsourcingdb.library.book-borrowed\" PROJECT INTO COUNT() < 10')\n ],\n)\n```\n\n*Note that the query must return a single row with a single value, which is interpreted as a boolean.*\n\n### Reading Events\n\nTo read all events of a subject, call the `read_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.\n\nThe function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nfrom eventsourcingdb import ReadEventsOptions\n\nasync for event in client.read_events(\n subject = '/books/42',\n options = ReadEventsOptions(\n recursive = False\n ),\n):\n pass\n```\n\n#### Reading From Subjects Recursively\n\nIf you want to read not only all the events of a subject, but also the events of all nested subjects, set the `recursive` option to `True`:\n\n```python\nfrom eventsourcingdb import ReadEventsOptions\n\nasync for event in client.read_events(\n subject = '/books/42',\n options = ReadEventsOptions(\n recursive = True\n ),\n):\n pass\n```\n\nThis also allows you to read *all* events ever written. To do so, provide `/` as the subject and set `recursive` to `True`, since all subjects are nested under the root subject.\n\n#### Reading in Anti-Chronological Order\n\nBy default, events are read in chronological order. To read in anti-chronological order, provide the `order` option and set it to `Order.ANTICHRONOLOGICAL`:\n\n```python\nfrom eventsourcingdb import (\n Order,\n ReadEventsOptions,\n)\n\nasync for event in client.read_events(\n subject = '/books/42',\n options = ReadEventsOptions(\n recursive = False,\n order = Order.ANTICHRONOLOGICAL,\n ),\n):\n pass\n```\n\n*Note that you can also specify `Order.CHRONOLOGICAL` to explicitly enforce the default order.*\n\n#### Specifying Bounds\n\nSometimes you do not want to read all events, but only a range of events. For that, you can specify the `lower_bound` and `upper_bound` options \u2013 either one of them or even both at the same time.\n\nSpecify the ID and whether to include or exclude it, for both the lower and upper bound:\n\n```python\nfrom eventsourcingdb import (\n Bound,\n BoundType,\n ReadEventsOptions,\n)\n\nasync for event in client.read_events(\n subject = '/books/42',\n options = ReadEventsOptions(\n recursive = False,\n lower_bound = Bound(id = '100', type = BoundType.INCLUSIVE),\n upper_bound = Bound(id = '200', type = BoundType.EXCLUSIVE),\n ),\n):\n pass\n```\n\n#### Starting From the Latest Event of a Given Type\n\nTo read starting from the latest event of a given type, provide the `from_latest_event` option and specify the subject, the type, and how to proceed if no such event exists.\n\nPossible options are `IfEventIsMissingDuringRead.READ_NOTHING`, which skips reading entirely, or `IfEventIsMissingDuringRead.READ_EVERYTHING`, which effectively behaves as if `from_latest_event` was not specified:\n\n```python\nfrom eventsourcingdb import (\n IfEventIsMissingDuringRead,\n ReadEventsOptions,\n ReadFromLatestEvent,\n)\n\nasync for event in client.read_events(\n subject = '/books/42',\n options = ReadEventsOptions(\n recursive = False,\n from_latest_event = ReadFromLatestEvent(\n subject = '/books/42',\n type = 'io.eventsourcingdb.library.book-borrowed',\n if_event_is_missing = IfEventIsMissingDuringRead.READ_EVERYTHING,\n ),\n ),\n):\n pass\n```\n\n*Note that `from_latest_event` and `lower_bound` can not be provided at the same time.*\n\n#### Aborting Reading\n\nIf you need to abort reading use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort reading independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nfrom eventsourcingdb import ReadEventsOptions\n\nevents = client.read_events(\n subject = '/books/42',\n options = ReadEventsOptions(\n recursive = False,\n ),\n)\n\nasync for event in events:\n pass\n\n# Somewhere else, abort the generator, which will cause\n# reading to end.\nawait events.aclose()\n```\n\n### Running EventQL Queries\n\nTo run an EventQL query, call the `run_eventql_query` function and provide the query as argument. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nasync for row in client.run_eventql_query(\n query = '''\n FROM e IN events\n PROJECT INTO e\n ''',\n):\n pass\n```\n\n*Note that each row returned by the generator matches the projection specified in your query.*\n\n#### Aborting a Query\n\nIf you need to abort a query use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort the query independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nrows = client.run_eventql_query(\n query = '''\n FROM e IN events\n PROJECT INTO e\n ''',\n)\n\nasync for row in rows:\n pass\n\n# Somewhere else, abort the generator, which will cause\n# the query to end.\nawait rows.aclose()\n```\n\n### Observing Events\n\nTo observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.\n\nThe function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nfrom eventsourcingdb import ObserveEventsOptions\n\nasync for event in client.observe_events(\n subject = '/books/42',\n options = ObserveEventsOptions(\n recursive = False\n ),\n):\n pass\n```\n\n#### Observing From Subjects Recursively\n\nIf you want to observe not only all the events of a subject, but also the events of all nested subjects, set the `recursive` option to `True`:\n\n```python\nfrom eventsourcingdb import ObserveEventsOptions\n\nasync for event in client.observe_events(\n subject = '/books/42',\n options = ObserveEventsOptions(\n recursive = True\n ),\n):\n pass\n```\n\nThis also allows you to observe *all* events ever written. To do so, provide `/` as the subject and set `recursive` to `True`, since all subjects are nested under the root subject.\n\n#### Specifying Bounds\n\nSometimes you do not want to observe all events, but only a range of events. For that, you can specify the `lower_bound` option.\n\nSpecify the ID and whether to include or exclude it:\n\n```python\nfrom eventsourcingdb import (\n Bound,\n BoundType,\n ObserveEventsOptions,\n)\n\nasync for event in client.observe_events(\n subject = '/books/42',\n options = ObserveEventsOptions(\n recursive = False,\n lower_bound = Bound(id = '100', type = BoundType.INCLUSIVE),\n ),\n):\n pass\n```\n\n#### Starting From the Latest Event of a Given Type\n\nTo observe starting from the latest event of a given type, provide the `from_latest_event` option and specify the subject, the type, and how to proceed if no such event exists.\n\nPossible options are `IfEventIsMissingDuringObserve.WAIT_FOR_EVENT`, which waits for an event of the given type to happen, or `IfEventIsMissingDuringObserve.READ_EVERYTHING`, which effectively behaves as if `from_latest_event` was not specified:\n\n```python\nfrom eventsourcingdb import (\n IfEventIsMissingDuringObserve,\n ObserveEventsOptions,\n ObserveFromLatestEvent,\n)\n\nasync for event in client.observe_events(\n subject = '/books/42',\n options = ObserveEventsOptions(\n recursive = False,\n from_latest_event = ObserveFromLatestEvent(\n subject = '/books/42',\n type = 'io.eventsourcingdb.library.book-borrowed',\n if_event_is_missing = IfEventIsMissingDuringObserve.READ_EVERYTHING,\n ),\n ),\n):\n pass\n```\n\n*Note that `from_latest_event` and `lower_bound` can not be provided at the same time.*\n\n#### Aborting Observing\n\nIf you need to abort observing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort observing independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nfrom eventsourcingdb import ObserveEventsOptions\n\nevents = client.observe_events(\n subject = '/books/42',\n options = ObserveEventsOptions(\n recursive = False\n ),\n)\n\nasync for event in events:\n pass\n\n# Somewhere else, abort the generator, which will cause\n# observing to end.\nawait events.aclose()\n```\n\n### Registering an Event Schema\n\nTo register an event schema, call the `register_event_schema` function and hand over an event type and the desired schema:\n\n```python\nawait client.register_event_schema(\n event_type = 'io.eventsourcingdb.library.book-acquired',\n json_schema = {\n 'type': 'object',\n 'properties': {\n 'title': { 'type': 'string' },\n 'author': { 'type': 'string' },\n 'isbn': { 'type': 'string' },\n },\n 'required': [\n 'title',\n 'author',\n 'isbn'\n ],\n 'additionalProperties': False,\n },\n)\n```\n\n### Listing Subjects\n\nTo list all subjects, call the `read_subjects` function with `/` as the base subject. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nasync for subject in client.read_subjects(\n base_subject = '/'\n):\n pass\n```\n\nIf you only want to list subjects within a specific branch, provide the desired base subject instead:\n\n```python\nasync for subject in client.read_subjects(\n base_subject = '/books'\n):\n pass\n```\n\n#### Aborting Listing\n\nIf you need to abort listing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort listing independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nsubjects = client.read_subjects(\n base_subject = '/'\n)\n\nasync for subject in subjects:\n pass\n\n# Somewhere else, abort the generator, which will cause\n# reading to end.\nawait subjects.aclose()\n```\n\n### Listing Event Types\n\nTo list all event types, call the `read_event_types` function. The function returns an asynchronous generator, which you can use e.g. inside an `async for` loop:\n\n```python\nasync for event_type in client.read_event_types():\n pass\n```\n\n#### Aborting Listing\n\nIf you need to abort listing use `break` or `return` within the `async for` loop. However, this only works if there is currently an iteration going on.\n\nTo abort listing independently of that, store the generator in a variable, and close it explicitly:\n\n```python\nevent_types = client.read_event_types()\n\nasync for event_type in event_types:\n pass\n\n# Somewhere else, abort the generator, which will cause\n# reading to end.\nawait event_types.aclose()\n```\n\n### Listing a Specific Event Type\n\nTo list a specific event type, call the `read_event_type` function with the event type as an argument. The function returns the detailed event type, which includes the schema:\n\n```python\nevent_type = await client.read_event_type(\"io.eventsourcingdb.library.book-acquired\")\n```\n\n### Using Testcontainers\n\nImport the `Container` class, create an instance, call the `start` function to run a test container, get a client, run your test code, and finally call the `stop` function to stop the test container:\n\n```python\nfrom eventsourcingdb import Container\n\ncontainer = Container()\ncontainer.start()\n\nclient = container.get_client()\n\n# ...\n\ncontainer.stop()\n```\n\nTo check if the test container is running, call the `is_running` function:\n\n```python\nis_running = container.is_running()\n```\n\n#### Configuring the Container Instance\n\nBy default, `Container` uses the `latest` tag of the official EventSourcingDB Docker image. To change that, call the `with_image_tag` function:\n\n```python\ncontainer = (\n Container()\n .with_image_tag('1.0.0')\n)\n```\n\nSimilarly, you can configure the port to use and the API token. Call the `with_port` or the `with_api_token` function respectively:\n\n```python\ncontainer = (\n Container()\n .with_port(4000)\n .with_api_token('secret')\n)\n```\n\n#### Configuring the Client Manually\n\nIn case you need to set up the client yourself, use the following functions to get details on the container:\n\n- `get_host()` returns the host name\n- `get_mapped_port()` returns the port\n- `get_base_url()` returns the full URL of the container\n- `get_api_token()` returns the API token\n",
"bugtrack_url": null,
"license": null,
"summary": "The official Python client SDK for EventSourcingDB.",
"version": "1.5.0",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "69bcd7b366b7f8b278851d53a5c8d2c35516b8199a1b366b9e6d4fa374ada32b",
"md5": "1fa148a00edd581439bd5bcfe7d06518",
"sha256": "b1b3ecb37b123b942b1b546ac04342a547da763a01ba7feb8dbdf620954a978f"
},
"downloads": -1,
"filename": "eventsourcingdb-1.5.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1fa148a00edd581439bd5bcfe7d06518",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<=3.13,>=3.11",
"size": 25889,
"upload_time": "2025-07-29T20:35:54",
"upload_time_iso_8601": "2025-07-29T20:35:54.255011Z",
"url": "https://files.pythonhosted.org/packages/69/bc/d7b366b7f8b278851d53a5c8d2c35516b8199a1b366b9e6d4fa374ada32b/eventsourcingdb-1.5.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "50f1f5a2dde785ce089d07c18d201feae1009d3e04dbaa31ff04c236f01fa208",
"md5": "ed63ddba36a7f67772cf9178fddc5d9f",
"sha256": "200bffa45b4d7e087d7d31d6ad1e55615d1bf4f503ac50d96d9f43de94b11426"
},
"downloads": -1,
"filename": "eventsourcingdb-1.5.0.tar.gz",
"has_sig": false,
"md5_digest": "ed63ddba36a7f67772cf9178fddc5d9f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<=3.13,>=3.11",
"size": 14387,
"upload_time": "2025-07-29T20:35:55",
"upload_time_iso_8601": "2025-07-29T20:35:55.359590Z",
"url": "https://files.pythonhosted.org/packages/50/f1/f5a2dde785ce089d07c18d201feae1009d3e04dbaa31ff04c236f01fa208/eventsourcingdb-1.5.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-29 20:35:55",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "eventsourcingdb"
}