# py-event-sourcing - A minimal, `asyncio`-native event sourcing library
[](https://github.com/johnlogsdon/py-event-sourcing/actions/workflows/ci.yml)
[](https://pypi.org/project/py-event-sourcing/)
[](https://pypi.org/project/py-event-sourcing/)
[](https://opensource.org/licenses/MIT)
This library provides core components for building event-sourced systems in Python. It uses SQLite for persistence and offers a simple API for `write`, `read`, and `watch` operations on event streams.
For a deeper dive into the concepts and design, please see:
* **[Concepts](https://github.com/johnlogsdon/py-event-sourcing/blob/main/docs/CONCEPTS.md)**: An introduction to the Event Sourcing pattern.
* **[Design](https://github.com/johnlogsdon/py-event-sourcing/blob/main/docs/DESIGN.md)**: An overview of the library's architecture and components.
## Key Features
* **Simple & Serverless**: Uses SQLite for a file-based, zero-dependency persistence layer.
* **Global Event Stream**: Query all events across all streams in their global sequence using the special `'@all'` stream, perfect for building cross-stream read models or for auditing.
* **Idempotent Writes**: Prevents duplicate events in distributed systems by using an optional `id` on each event.
* **Optimistic Concurrency Control**: Ensures data integrity by allowing writes only against a specific, expected stream version.
* **Efficient Watching**: A centralized notifier polls the database once to serve all watchers, avoiding the "thundering herd" problem and ensuring low-latency updates.
* **Snapshot Support**: Accelerate state reconstruction for long-lived streams by saving and loading state snapshots. This is also supported for the `'@all'` stream, which is ideal for complex, system-wide projections.
* **Fully Async API**: Built from the ground up with `asyncio` for high-performance, non-blocking I/O.
* **Extensible by Design**: Core logic is decoupled from storage implementation via `Protocol`-based adapters. While this package provides a highly-optimized SQLite backend, you can easily create your own adapters for other databases (e.g., PostgreSQL, Firestore).
## Installation
This project uses `uv` for dependency management.
1. **Create and activate the virtual environment:**
```bash
uv venv
source .venv/bin/activate
```
2. **Install the package in editable mode with dev dependencies:**
```bash
uv pip install -e ".[dev]"
```
## Quick Start
Here’s a quick example of writing to and reading from a stream.
```python
import asyncio
import os
import tempfile
from py_event_sourcing import sqlite_stream_factory, CandidateEvent
async def main():
# Use a temporary file for the database to keep the example self-contained.
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "example.db")
# The factory is an async context manager that handles all resources.
async with sqlite_stream_factory(db_path) as open_stream:
stream_id = "my_first_stream"
# Write an event
async with open_stream(stream_id) as stream:
event = CandidateEvent(type="UserRegistered", data=b'{"user": "Alice"}')
await stream.write([event])
print(f"Event written. Stream version is now {stream.version}.")
# Read the event back
async with open_stream(stream_id) as stream:
all_events = [e async for e in stream.read()]
print(f"Read {len(all_events)} event(s) from the stream.")
print(f" -> Event type: {all_events[0].type}, Data: {all_events[0].data.decode()}, Version: {all_events[0].version}")
if __name__ == "__main__":
asyncio.run(main())
```
## Basic Usage
For a detailed, fully-commented example covering all major features (writing, reading, snapshots, and watching), please see [`basic_usage.py`](https://github.com/johnlogsdon/py-event-sourcing/blob/main/basic_usage.py).
To run the example:
```bash
uv run python3 basic_usage.py
```
**Sample Output:**
```
--- Example 1: Writing and Reading Events ---
Stream 'counter_stream_1' is now at version 3.
Reading all events from the stream:
- Event version 1: Increment
- Event version 2: Increment
- Event version 3: Decrement
--- Example 2: Reconstructing State from Events (Read Model & Projector) ---
Reconstructed state: Counter is 1.
--- Example 3: Watching for New Events ---
Watching for events (historical and new)...
- Watched event: Increment (version 1)
- Watched event: Increment (version 2)
- Watched event: Decrement (version 3)
Writing new events to trigger the watcher...
- Watched event: Increment (version 4)
- Watched event: Increment (version 5)
--- Example 4: Using Snapshots for Efficiency ---
Snapshot for 'counter' projection saved at version 5 with state: count = 3
State for 'counter' projection restored from snapshot at version 5. Count is 3.
Replaying events since snapshot...
- Applying event version 6: Increment
Final reconstructed state: Counter is 4.
```
## Benchmarks
A benchmark script is included to measure write/read throughput and demonstrate the performance benefits of using snapshots. You can find the script at [`benchmark.py`](https://github.com/johnlogsdon/py-event-sourcing/blob/main/benchmark.py).
To run the benchmark:
```bash
uv run python3 benchmark.py
```
**Sample Output:**
```
--- Running benchmark with 1,000,000 events ---
Writing 1,000,000 events...
Finished writing 1,000,000 events in 9.41 seconds.
Write throughput: 106,320.88 events/sec.
--- Benchmark 1: Reconstructing state from all events ---
Reconstructed state (all events): Counter is 1,000,000.
Time to reconstruct from all events: 2.66 seconds.
Read throughput: 375,273.94 events/sec.
--- Benchmark 2: Reconstructing state using snapshot ---
Creating snapshot...
Snapshot created.
Writing 100 additional events...
Finished writing 100 additional events.
State restored from snapshot at version 1,000,000.
Reconstructed state (with snapshot): Counter is 1,000,100.
Time to reconstruct with snapshot: 0.00 seconds.
--- Benchmark Summary ---
Time to reconstruct from all 1,000,100 events: 2.66 seconds.
Time to reconstruct with snapshot (after 1,000,000 events): 0.0007 seconds.
Database file size: 148.13 MB
```
## Testing
The test suite uses `pytest`. To run all tests, use the following command:
```bash
uv run pytest
```
## Contributing
We welcome contributions! Please see our [Contributing Guide](https://github.com/johnlogsdon/py-event-sourcing/blob/main/CONTRIBUTING.md) for details on:
- Setting up a development environment
- Code style guidelines
- Testing requirements
- Pull request process
## License
This project is licensed under the MIT License - see the [LICENSE](https://github.com/johnlogsdon/py-event-sourcing/blob/main/LICENSE) file for details.
## Changelog
See [CHANGELOG.md](https://github.com/johnlogsdon/py-event-sourcing/blob/main/CHANGELOG.md) for a list of changes and version history.
Raw data
{
"_id": null,
"home_page": null,
"name": "py-event-sourcing",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": "John <john@example.com>",
"keywords": "asyncio, database, event-sourcing, sqlite, stream-processing",
"author": null,
"author_email": "John <john@example.com>",
"download_url": "https://files.pythonhosted.org/packages/b1/f8/a63999269e8a804dbf534e53b41c7ea7ad9bfad7f8450b8fb26f10baf6da/py_event_sourcing-1.0.3.tar.gz",
"platform": null,
"description": "# py-event-sourcing - A minimal, `asyncio`-native event sourcing library\n\n[](https://github.com/johnlogsdon/py-event-sourcing/actions/workflows/ci.yml)\n[](https://pypi.org/project/py-event-sourcing/)\n[](https://pypi.org/project/py-event-sourcing/)\n[](https://opensource.org/licenses/MIT)\n\nThis library provides core components for building event-sourced systems in Python. It uses SQLite for persistence and offers a simple API for `write`, `read`, and `watch` operations on event streams.\n\nFor a deeper dive into the concepts and design, please see:\n* **[Concepts](https://github.com/johnlogsdon/py-event-sourcing/blob/main/docs/CONCEPTS.md)**: An introduction to the Event Sourcing pattern.\n* **[Design](https://github.com/johnlogsdon/py-event-sourcing/blob/main/docs/DESIGN.md)**: An overview of the library's architecture and components.\n\n## Key Features\n\n* **Simple & Serverless**: Uses SQLite for a file-based, zero-dependency persistence layer.\n* **Global Event Stream**: Query all events across all streams in their global sequence using the special `'@all'` stream, perfect for building cross-stream read models or for auditing.\n* **Idempotent Writes**: Prevents duplicate events in distributed systems by using an optional `id` on each event.\n* **Optimistic Concurrency Control**: Ensures data integrity by allowing writes only against a specific, expected stream version.\n* **Efficient Watching**: A centralized notifier polls the database once to serve all watchers, avoiding the \"thundering herd\" problem and ensuring low-latency updates.\n* **Snapshot Support**: Accelerate state reconstruction for long-lived streams by saving and loading state snapshots. This is also supported for the `'@all'` stream, which is ideal for complex, system-wide projections.\n* **Fully Async API**: Built from the ground up with `asyncio` for high-performance, non-blocking I/O.\n* **Extensible by Design**: Core logic is decoupled from storage implementation via `Protocol`-based adapters. While this package provides a highly-optimized SQLite backend, you can easily create your own adapters for other databases (e.g., PostgreSQL, Firestore).\n\n## Installation\n\nThis project uses `uv` for dependency management.\n\n1. **Create and activate the virtual environment:**\n ```bash\n uv venv\n source .venv/bin/activate\n ```\n\n2. **Install the package in editable mode with dev dependencies:**\n ```bash\n uv pip install -e \".[dev]\"\n ```\n\n## Quick Start\n\nHere\u2019s a quick example of writing to and reading from a stream.\n\n```python\nimport asyncio\nimport os\nimport tempfile\nfrom py_event_sourcing import sqlite_stream_factory, CandidateEvent\n\nasync def main():\n # Use a temporary file for the database to keep the example self-contained.\n with tempfile.TemporaryDirectory() as tmpdir:\n db_path = os.path.join(tmpdir, \"example.db\")\n\n # The factory is an async context manager that handles all resources.\n async with sqlite_stream_factory(db_path) as open_stream:\n stream_id = \"my_first_stream\"\n\n # Write an event\n async with open_stream(stream_id) as stream:\n event = CandidateEvent(type=\"UserRegistered\", data=b'{\"user\": \"Alice\"}')\n await stream.write([event])\n print(f\"Event written. Stream version is now {stream.version}.\")\n\n # Read the event back\n async with open_stream(stream_id) as stream:\n all_events = [e async for e in stream.read()]\n print(f\"Read {len(all_events)} event(s) from the stream.\")\n print(f\" -> Event type: {all_events[0].type}, Data: {all_events[0].data.decode()}, Version: {all_events[0].version}\")\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## Basic Usage\n\nFor a detailed, fully-commented example covering all major features (writing, reading, snapshots, and watching), please see [`basic_usage.py`](https://github.com/johnlogsdon/py-event-sourcing/blob/main/basic_usage.py).\n\nTo run the example:\n```bash\nuv run python3 basic_usage.py\n```\n\n**Sample Output:**\n```\n--- Example 1: Writing and Reading Events ---\nStream 'counter_stream_1' is now at version 3.\nReading all events from the stream:\n - Event version 1: Increment\n - Event version 2: Increment\n - Event version 3: Decrement\n\n--- Example 2: Reconstructing State from Events (Read Model & Projector) ---\nReconstructed state: Counter is 1.\n\n--- Example 3: Watching for New Events ---\nWatching for events (historical and new)...\n - Watched event: Increment (version 1)\n - Watched event: Increment (version 2)\n - Watched event: Decrement (version 3)\nWriting new events to trigger the watcher...\n - Watched event: Increment (version 4)\n - Watched event: Increment (version 5)\n\n--- Example 4: Using Snapshots for Efficiency ---\nSnapshot for 'counter' projection saved at version 5 with state: count = 3\nState for 'counter' projection restored from snapshot at version 5. Count is 3.\nReplaying events since snapshot...\n - Applying event version 6: Increment\nFinal reconstructed state: Counter is 4.\n```\n\n## Benchmarks\n\nA benchmark script is included to measure write/read throughput and demonstrate the performance benefits of using snapshots. You can find the script at [`benchmark.py`](https://github.com/johnlogsdon/py-event-sourcing/blob/main/benchmark.py).\n\nTo run the benchmark:\n```bash\nuv run python3 benchmark.py\n```\n\n**Sample Output:**\n```\n--- Running benchmark with 1,000,000 events ---\nWriting 1,000,000 events...\n\nFinished writing 1,000,000 events in 9.41 seconds.\nWrite throughput: 106,320.88 events/sec.\n\n--- Benchmark 1: Reconstructing state from all events ---\nReconstructed state (all events): Counter is 1,000,000.\nTime to reconstruct from all events: 2.66 seconds.\nRead throughput: 375,273.94 events/sec.\n\n--- Benchmark 2: Reconstructing state using snapshot ---\nCreating snapshot...\nSnapshot created.\nWriting 100 additional events...\nFinished writing 100 additional events.\nState restored from snapshot at version 1,000,000.\nReconstructed state (with snapshot): Counter is 1,000,100.\nTime to reconstruct with snapshot: 0.00 seconds.\n\n--- Benchmark Summary ---\nTime to reconstruct from all 1,000,100 events: 2.66 seconds.\nTime to reconstruct with snapshot (after 1,000,000 events): 0.0007 seconds.\n\nDatabase file size: 148.13 MB\n```\n\n## Testing\n\nThe test suite uses `pytest`. To run all tests, use the following command:\n\n```bash\nuv run pytest\n```\n\n## Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](https://github.com/johnlogsdon/py-event-sourcing/blob/main/CONTRIBUTING.md) for details on:\n\n- Setting up a development environment\n- Code style guidelines\n- Testing requirements\n- Pull request process\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](https://github.com/johnlogsdon/py-event-sourcing/blob/main/LICENSE) file for details.\n\n## Changelog\n\nSee [CHANGELOG.md](https://github.com/johnlogsdon/py-event-sourcing/blob/main/CHANGELOG.md) for a list of changes and version history.",
"bugtrack_url": null,
"license": "MIT",
"summary": "A minimal, `asyncio`-native event sourcing library",
"version": "1.0.3",
"project_urls": {
"Changelog": "https://github.com/johnlogsdon/py-event-sourcing/blob/main/CHANGELOG.md",
"Documentation": "https://github.com/johnlogsdon/py-event-sourcing#readme",
"Homepage": "https://github.com/johnlogsdon/py-event-sourcing",
"Issues": "https://github.com/johnlogsdon/py-event-sourcing/issues",
"Repository": "https://github.com/johnlogsdon/py-event-sourcing.git"
},
"split_keywords": [
"asyncio",
" database",
" event-sourcing",
" sqlite",
" stream-processing"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "2c2c5df0c06c5873872531791302e7ecdf55de6603e00f0429c806782766baa2",
"md5": "6a5589520ccbcf40a7b2063ca870d83f",
"sha256": "5693dd7208f65d8f7bc30a88bc7aec07825f1de601f1a786f3b255c0729021e1"
},
"downloads": -1,
"filename": "py_event_sourcing-1.0.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "6a5589520ccbcf40a7b2063ca870d83f",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 20430,
"upload_time": "2025-09-03T12:43:50",
"upload_time_iso_8601": "2025-09-03T12:43:50.465345Z",
"url": "https://files.pythonhosted.org/packages/2c/2c/5df0c06c5873872531791302e7ecdf55de6603e00f0429c806782766baa2/py_event_sourcing-1.0.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "b1f8a63999269e8a804dbf534e53b41c7ea7ad9bfad7f8450b8fb26f10baf6da",
"md5": "151ac80065c4c88ec1f754492336b6ec",
"sha256": "b3931495bf88b27aba7a90e06ebdf9b39e1738d22e61d493218f2849afb31de3"
},
"downloads": -1,
"filename": "py_event_sourcing-1.0.3.tar.gz",
"has_sig": false,
"md5_digest": "151ac80065c4c88ec1f754492336b6ec",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 85248,
"upload_time": "2025-09-03T12:43:52",
"upload_time_iso_8601": "2025-09-03T12:43:52.143582Z",
"url": "https://files.pythonhosted.org/packages/b1/f8/a63999269e8a804dbf534e53b41c7ea7ad9bfad7f8450b8fb26f10baf6da/py_event_sourcing-1.0.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-03 12:43:52",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "johnlogsdon",
"github_project": "py-event-sourcing",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "py-event-sourcing"
}