py-event-sourcing


Namepy-event-sourcing JSON
Version 1.0.3 PyPI version JSON
download
home_pageNone
SummaryA minimal, `asyncio`-native event sourcing library
upload_time2025-09-03 12:43:52
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseMIT
keywords asyncio database event-sourcing sqlite stream-processing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # py-event-sourcing - A minimal, `asyncio`-native event sourcing library

[![CI](https://github.com/johnlogsdon/py-event-sourcing/actions/workflows/ci.yml/badge.svg)](https://github.com/johnlogsdon/py-event-sourcing/actions/workflows/ci.yml)
[![PyPI version](https://badge.fury.io/py/py-event-sourcing.svg)](https://pypi.org/project/py-event-sourcing/)
[![Python Versions](https://img.shields.io/pypi/pyversions/py-event-sourcing)](https://pypi.org/project/py-event-sourcing/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

This library provides core components for building event-sourced systems in Python. It uses SQLite for persistence and offers a simple API for `write`, `read`, and `watch` operations on event streams.

For a deeper dive into the concepts and design, please see:
*   **[Concepts](https://github.com/johnlogsdon/py-event-sourcing/blob/main/docs/CONCEPTS.md)**: An introduction to the Event Sourcing pattern.
*   **[Design](https://github.com/johnlogsdon/py-event-sourcing/blob/main/docs/DESIGN.md)**: An overview of the library's architecture and components.

## Key Features

*   **Simple & Serverless**: Uses SQLite for a file-based, zero-dependency persistence layer.
*   **Global Event Stream**: Query all events across all streams in their global sequence using the special `'@all'` stream, perfect for building cross-stream read models or for auditing.
*   **Idempotent Writes**: Prevents duplicate events in distributed systems by using an optional `id` on each event.
*   **Optimistic Concurrency Control**: Ensures data integrity by allowing writes only against a specific, expected stream version.
*   **Efficient Watching**: A centralized notifier polls the database once to serve all watchers, avoiding the "thundering herd" problem and ensuring low-latency updates.
*   **Snapshot Support**: Accelerate state reconstruction for long-lived streams by saving and loading state snapshots. This is also supported for the `'@all'` stream, which is ideal for complex, system-wide projections.
*   **Fully Async API**: Built from the ground up with `asyncio` for high-performance, non-blocking I/O.
*   **Extensible by Design**: Core logic is decoupled from storage implementation via `Protocol`-based adapters. While this package provides a highly-optimized SQLite backend, you can easily create your own adapters for other databases (e.g., PostgreSQL, Firestore).

## Installation

This project uses `uv` for dependency management.

1.  **Create and activate the virtual environment:**
    ```bash
    uv venv
    source .venv/bin/activate
    ```

2.  **Install the package in editable mode with dev dependencies:**
    ```bash
    uv pip install -e ".[dev]"
    ```

## Quick Start

Here’s a quick example of writing to and reading from a stream.

```python
import asyncio
import os
import tempfile
from py_event_sourcing import sqlite_stream_factory, CandidateEvent

async def main():
    # Use a temporary file for the database to keep the example self-contained.
    with tempfile.TemporaryDirectory() as tmpdir:
        db_path = os.path.join(tmpdir, "example.db")

        # The factory is an async context manager that handles all resources.
        async with sqlite_stream_factory(db_path) as open_stream:
            stream_id = "my_first_stream"

            # Write an event
            async with open_stream(stream_id) as stream:
                event = CandidateEvent(type="UserRegistered", data=b'{"user": "Alice"}')
                await stream.write([event])
                print(f"Event written. Stream version is now {stream.version}.")

            # Read the event back
            async with open_stream(stream_id) as stream:
                all_events = [e async for e in stream.read()]
                print(f"Read {len(all_events)} event(s) from the stream.")
                print(f"  -> Event type: {all_events[0].type}, Data: {all_events[0].data.decode()}, Version: {all_events[0].version}")

if __name__ == "__main__":
    asyncio.run(main())
```

## Basic Usage

For a detailed, fully-commented example covering all major features (writing, reading, snapshots, and watching), please see [`basic_usage.py`](https://github.com/johnlogsdon/py-event-sourcing/blob/main/basic_usage.py).

To run the example:
```bash
uv run python3 basic_usage.py
```

**Sample Output:**
```
--- Example 1: Writing and Reading Events ---
Stream 'counter_stream_1' is now at version 3.
Reading all events from the stream:
  - Event version 1: Increment
  - Event version 2: Increment
  - Event version 3: Decrement

--- Example 2: Reconstructing State from Events (Read Model & Projector) ---
Reconstructed state: Counter is 1.

--- Example 3: Watching for New Events ---
Watching for events (historical and new)...
  - Watched event: Increment (version 1)
  - Watched event: Increment (version 2)
  - Watched event: Decrement (version 3)
Writing new events to trigger the watcher...
  - Watched event: Increment (version 4)
  - Watched event: Increment (version 5)

--- Example 4: Using Snapshots for Efficiency ---
Snapshot for 'counter' projection saved at version 5 with state: count = 3
State for 'counter' projection restored from snapshot at version 5. Count is 3.
Replaying events since snapshot...
  - Applying event version 6: Increment
Final reconstructed state: Counter is 4.
```

## Benchmarks

A benchmark script is included to measure write/read throughput and demonstrate the performance benefits of using snapshots. You can find the script at [`benchmark.py`](https://github.com/johnlogsdon/py-event-sourcing/blob/main/benchmark.py).

To run the benchmark:
```bash
uv run python3 benchmark.py
```

**Sample Output:**
```
--- Running benchmark with 1,000,000 events ---
Writing 1,000,000 events...

Finished writing 1,000,000 events in 9.41 seconds.
Write throughput: 106,320.88 events/sec.

--- Benchmark 1: Reconstructing state from all events ---
Reconstructed state (all events): Counter is 1,000,000.
Time to reconstruct from all events: 2.66 seconds.
Read throughput: 375,273.94 events/sec.

--- Benchmark 2: Reconstructing state using snapshot ---
Creating snapshot...
Snapshot created.
Writing 100 additional events...
Finished writing 100 additional events.
State restored from snapshot at version 1,000,000.
Reconstructed state (with snapshot): Counter is 1,000,100.
Time to reconstruct with snapshot: 0.00 seconds.

--- Benchmark Summary ---
Time to reconstruct from all 1,000,100 events: 2.66 seconds.
Time to reconstruct with snapshot (after 1,000,000 events): 0.0007 seconds.

Database file size: 148.13 MB
```

## Testing

The test suite uses `pytest`. To run all tests, use the following command:

```bash
uv run pytest
```

## Contributing

We welcome contributions! Please see our [Contributing Guide](https://github.com/johnlogsdon/py-event-sourcing/blob/main/CONTRIBUTING.md) for details on:

- Setting up a development environment
- Code style guidelines
- Testing requirements
- Pull request process

## License

This project is licensed under the MIT License - see the [LICENSE](https://github.com/johnlogsdon/py-event-sourcing/blob/main/LICENSE) file for details.

## Changelog

See [CHANGELOG.md](https://github.com/johnlogsdon/py-event-sourcing/blob/main/CHANGELOG.md) for a list of changes and version history.
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "py-event-sourcing",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": "John <john@example.com>",
    "keywords": "asyncio, database, event-sourcing, sqlite, stream-processing",
    "author": null,
    "author_email": "John <john@example.com>",
    "download_url": "https://files.pythonhosted.org/packages/b1/f8/a63999269e8a804dbf534e53b41c7ea7ad9bfad7f8450b8fb26f10baf6da/py_event_sourcing-1.0.3.tar.gz",
    "platform": null,
    "description": "# py-event-sourcing - A minimal, `asyncio`-native event sourcing library\n\n[![CI](https://github.com/johnlogsdon/py-event-sourcing/actions/workflows/ci.yml/badge.svg)](https://github.com/johnlogsdon/py-event-sourcing/actions/workflows/ci.yml)\n[![PyPI version](https://badge.fury.io/py/py-event-sourcing.svg)](https://pypi.org/project/py-event-sourcing/)\n[![Python Versions](https://img.shields.io/pypi/pyversions/py-event-sourcing)](https://pypi.org/project/py-event-sourcing/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n\nThis library provides core components for building event-sourced systems in Python. It uses SQLite for persistence and offers a simple API for `write`, `read`, and `watch` operations on event streams.\n\nFor a deeper dive into the concepts and design, please see:\n*   **[Concepts](https://github.com/johnlogsdon/py-event-sourcing/blob/main/docs/CONCEPTS.md)**: An introduction to the Event Sourcing pattern.\n*   **[Design](https://github.com/johnlogsdon/py-event-sourcing/blob/main/docs/DESIGN.md)**: An overview of the library's architecture and components.\n\n## Key Features\n\n*   **Simple & Serverless**: Uses SQLite for a file-based, zero-dependency persistence layer.\n*   **Global Event Stream**: Query all events across all streams in their global sequence using the special `'@all'` stream, perfect for building cross-stream read models or for auditing.\n*   **Idempotent Writes**: Prevents duplicate events in distributed systems by using an optional `id` on each event.\n*   **Optimistic Concurrency Control**: Ensures data integrity by allowing writes only against a specific, expected stream version.\n*   **Efficient Watching**: A centralized notifier polls the database once to serve all watchers, avoiding the \"thundering herd\" problem and ensuring low-latency updates.\n*   **Snapshot Support**: Accelerate state reconstruction for long-lived streams by saving and loading state snapshots. This is also supported for the `'@all'` stream, which is ideal for complex, system-wide projections.\n*   **Fully Async API**: Built from the ground up with `asyncio` for high-performance, non-blocking I/O.\n*   **Extensible by Design**: Core logic is decoupled from storage implementation via `Protocol`-based adapters. While this package provides a highly-optimized SQLite backend, you can easily create your own adapters for other databases (e.g., PostgreSQL, Firestore).\n\n## Installation\n\nThis project uses `uv` for dependency management.\n\n1.  **Create and activate the virtual environment:**\n    ```bash\n    uv venv\n    source .venv/bin/activate\n    ```\n\n2.  **Install the package in editable mode with dev dependencies:**\n    ```bash\n    uv pip install -e \".[dev]\"\n    ```\n\n## Quick Start\n\nHere\u2019s a quick example of writing to and reading from a stream.\n\n```python\nimport asyncio\nimport os\nimport tempfile\nfrom py_event_sourcing import sqlite_stream_factory, CandidateEvent\n\nasync def main():\n    # Use a temporary file for the database to keep the example self-contained.\n    with tempfile.TemporaryDirectory() as tmpdir:\n        db_path = os.path.join(tmpdir, \"example.db\")\n\n        # The factory is an async context manager that handles all resources.\n        async with sqlite_stream_factory(db_path) as open_stream:\n            stream_id = \"my_first_stream\"\n\n            # Write an event\n            async with open_stream(stream_id) as stream:\n                event = CandidateEvent(type=\"UserRegistered\", data=b'{\"user\": \"Alice\"}')\n                await stream.write([event])\n                print(f\"Event written. Stream version is now {stream.version}.\")\n\n            # Read the event back\n            async with open_stream(stream_id) as stream:\n                all_events = [e async for e in stream.read()]\n                print(f\"Read {len(all_events)} event(s) from the stream.\")\n                print(f\"  -> Event type: {all_events[0].type}, Data: {all_events[0].data.decode()}, Version: {all_events[0].version}\")\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## Basic Usage\n\nFor a detailed, fully-commented example covering all major features (writing, reading, snapshots, and watching), please see [`basic_usage.py`](https://github.com/johnlogsdon/py-event-sourcing/blob/main/basic_usage.py).\n\nTo run the example:\n```bash\nuv run python3 basic_usage.py\n```\n\n**Sample Output:**\n```\n--- Example 1: Writing and Reading Events ---\nStream 'counter_stream_1' is now at version 3.\nReading all events from the stream:\n  - Event version 1: Increment\n  - Event version 2: Increment\n  - Event version 3: Decrement\n\n--- Example 2: Reconstructing State from Events (Read Model & Projector) ---\nReconstructed state: Counter is 1.\n\n--- Example 3: Watching for New Events ---\nWatching for events (historical and new)...\n  - Watched event: Increment (version 1)\n  - Watched event: Increment (version 2)\n  - Watched event: Decrement (version 3)\nWriting new events to trigger the watcher...\n  - Watched event: Increment (version 4)\n  - Watched event: Increment (version 5)\n\n--- Example 4: Using Snapshots for Efficiency ---\nSnapshot for 'counter' projection saved at version 5 with state: count = 3\nState for 'counter' projection restored from snapshot at version 5. Count is 3.\nReplaying events since snapshot...\n  - Applying event version 6: Increment\nFinal reconstructed state: Counter is 4.\n```\n\n## Benchmarks\n\nA benchmark script is included to measure write/read throughput and demonstrate the performance benefits of using snapshots. You can find the script at [`benchmark.py`](https://github.com/johnlogsdon/py-event-sourcing/blob/main/benchmark.py).\n\nTo run the benchmark:\n```bash\nuv run python3 benchmark.py\n```\n\n**Sample Output:**\n```\n--- Running benchmark with 1,000,000 events ---\nWriting 1,000,000 events...\n\nFinished writing 1,000,000 events in 9.41 seconds.\nWrite throughput: 106,320.88 events/sec.\n\n--- Benchmark 1: Reconstructing state from all events ---\nReconstructed state (all events): Counter is 1,000,000.\nTime to reconstruct from all events: 2.66 seconds.\nRead throughput: 375,273.94 events/sec.\n\n--- Benchmark 2: Reconstructing state using snapshot ---\nCreating snapshot...\nSnapshot created.\nWriting 100 additional events...\nFinished writing 100 additional events.\nState restored from snapshot at version 1,000,000.\nReconstructed state (with snapshot): Counter is 1,000,100.\nTime to reconstruct with snapshot: 0.00 seconds.\n\n--- Benchmark Summary ---\nTime to reconstruct from all 1,000,100 events: 2.66 seconds.\nTime to reconstruct with snapshot (after 1,000,000 events): 0.0007 seconds.\n\nDatabase file size: 148.13 MB\n```\n\n## Testing\n\nThe test suite uses `pytest`. To run all tests, use the following command:\n\n```bash\nuv run pytest\n```\n\n## Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](https://github.com/johnlogsdon/py-event-sourcing/blob/main/CONTRIBUTING.md) for details on:\n\n- Setting up a development environment\n- Code style guidelines\n- Testing requirements\n- Pull request process\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](https://github.com/johnlogsdon/py-event-sourcing/blob/main/LICENSE) file for details.\n\n## Changelog\n\nSee [CHANGELOG.md](https://github.com/johnlogsdon/py-event-sourcing/blob/main/CHANGELOG.md) for a list of changes and version history.",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A minimal, `asyncio`-native event sourcing library",
    "version": "1.0.3",
    "project_urls": {
        "Changelog": "https://github.com/johnlogsdon/py-event-sourcing/blob/main/CHANGELOG.md",
        "Documentation": "https://github.com/johnlogsdon/py-event-sourcing#readme",
        "Homepage": "https://github.com/johnlogsdon/py-event-sourcing",
        "Issues": "https://github.com/johnlogsdon/py-event-sourcing/issues",
        "Repository": "https://github.com/johnlogsdon/py-event-sourcing.git"
    },
    "split_keywords": [
        "asyncio",
        " database",
        " event-sourcing",
        " sqlite",
        " stream-processing"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "2c2c5df0c06c5873872531791302e7ecdf55de6603e00f0429c806782766baa2",
                "md5": "6a5589520ccbcf40a7b2063ca870d83f",
                "sha256": "5693dd7208f65d8f7bc30a88bc7aec07825f1de601f1a786f3b255c0729021e1"
            },
            "downloads": -1,
            "filename": "py_event_sourcing-1.0.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6a5589520ccbcf40a7b2063ca870d83f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 20430,
            "upload_time": "2025-09-03T12:43:50",
            "upload_time_iso_8601": "2025-09-03T12:43:50.465345Z",
            "url": "https://files.pythonhosted.org/packages/2c/2c/5df0c06c5873872531791302e7ecdf55de6603e00f0429c806782766baa2/py_event_sourcing-1.0.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b1f8a63999269e8a804dbf534e53b41c7ea7ad9bfad7f8450b8fb26f10baf6da",
                "md5": "151ac80065c4c88ec1f754492336b6ec",
                "sha256": "b3931495bf88b27aba7a90e06ebdf9b39e1738d22e61d493218f2849afb31de3"
            },
            "downloads": -1,
            "filename": "py_event_sourcing-1.0.3.tar.gz",
            "has_sig": false,
            "md5_digest": "151ac80065c4c88ec1f754492336b6ec",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 85248,
            "upload_time": "2025-09-03T12:43:52",
            "upload_time_iso_8601": "2025-09-03T12:43:52.143582Z",
            "url": "https://files.pythonhosted.org/packages/b1/f8/a63999269e8a804dbf534e53b41c7ea7ad9bfad7f8450b8fb26f10baf6da/py_event_sourcing-1.0.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-03 12:43:52",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "johnlogsdon",
    "github_project": "py-event-sourcing",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "py-event-sourcing"
}
        
Elapsed time: 1.61111s