# ZeroEventHub
This README file contains information specific to the Python port of the ZeroEventHub.
Please see the [main readme file](../../README.md) for an overview of what this project is about.
## Client
We recommend that you store the latest checkpoint/cursor for each partition in the client's
database. Example of simple single-partition consumption. *Note about the example*:
* Things starting with "my" is supplied by you
* Things starting with "their" is supplied by the service you connect to
```python
>>> import zeroeventhub
>>> import httpx
>>> import asyncio
>>> from typing import Sequence
>>> from unittest.mock import MagicMock, Mock, PropertyMock
>>> my_db = MagicMock()
>>> my_person_event_repository = Mock()
>>> my_person_event_repository.read_cursors_from_db.return_value = None
# Step 1: Setup
>>> their_partition_count = 1 # documented contract with server
>>> their_service_url = "https://localhost:8192/person/feed/v1"
>>> my_zeh_session = httpx.AsyncClient() # you can setup the authentication on the session
>>> client = zeroeventhub.Client(their_service_url, their_partition_count, my_zeh_session)
# Step 2: Load the cursors from last time we ran
>>> cursors = my_person_event_repository.read_cursors_from_db()
>>> if not cursors:
... # we have never run before, so we can get all events with FIRST_CURSOR
... # (if we just want to receive new events from now, we would use LAST_CURSOR)
... cursors = [
... zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR)
... for partition_id in range(their_partition_count)
... ]
# Step 3: Enter listening loop...
>>> my_still_want_to_read_events = PropertyMock(side_effect=[True, False])
>>> async def poll_for_events(cursors: Sequence[zeroeventhub.Cursor]) -> None:
... page_of_events = zeroeventhub.PageEventReceiver()
... while my_still_want_to_read_events():
... # Step 4: Use ZeroEventHub client to fetch the next page of events.
... await zeroeventhub.receive_events(page_of_events,
... client.fetch_events(cursors),
... )
...
... # Step 5: Write the effect of changes to our own database and the updated
... # cursor value in the same transaction.
... with my_db.begin_transaction() as tx:
... my_person_event_repository.write_effect_of_events_to_db(tx, page_of_events.events)
... my_person_event_repository.write_cursors_to_db(tx, page_of_events.latest_checkpoints)
... tx.commit()
...
... cursors = page_of_events.latest_checkpoints
... page_of_events.clear()
>>> asyncio.run(poll_for_events(cursors))
```
## Server
This library makes it easy to setup a zeroeventhub feed endpoint with FastAPI.
```python
>>> from typing import Annotated, Any, AsyncGenerator, Dict, Optional, Sequence
>>> from fastapi import Depends, FastAPI, Request
>>> from fastapi.responses import StreamingResponse
>>> from zeroeventhub import (
... Cursor,
... DataReader,
... ZeroEventHubFastApiHandler,
... )
>>> from unittest.mock import Mock
>>> app = FastAPI()
>>> PersonEventRepository = Mock
>>> class PersonDataReader(DataReader):
... def __init__(self, person_event_repository: PersonEventRepository) -> None:
... self._person_event_repository = person_event_repository
...
... def get_data(
... self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int]
... ) -> AsyncGenerator[Dict[str, Any], Any]:
... return (
... self._person_event_repository.get_events_since(cursors[0].cursor)
... .take(page_size)
... .with_headers(headers)
... )
>>> def get_person_data_reader() -> PersonDataReader:
... return PersonDataReader(PersonEventRepository())
>>> PersonDataReaderDependency = Annotated[
... PersonDataReader,
... Depends(get_person_data_reader, use_cache=True),
... ]
>>> @app.get("person/feed/v1")
... async def feed(request: Request, person_data_reader: PersonDataReaderDependency) -> StreamingResponse:
... api_handler = ZeroEventHubFastApiHandler(data_reader=person_data_reader, server_partition_count=1)
... return api_handler.handle(request)
```
## Development
To run the test suite, assuming you already have Python 3.10 or later installed and on your `PATH`:
```sh
pip install poetry==1.8.4
poetry config virtualenvs.in-project true
poetry install --sync
poetry run coverage run --branch -m pytest
poetry run coverage html
```
Then, you can open the `htmlcov/index.html` file in your browser to look at the code coverage report.
Also, to pass the CI checks, you may want to run the following before pushing your changes:
```sh
poetry run ruff format
poetry run ruff check
poetry run pyright
```
Raw data
{
"_id": null,
"home_page": "https://github.com/vippsas/zeroeventhub",
"name": "zeroeventhub",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.13",
"maintainer_email": null,
"keywords": "event-streaming",
"author": "Vipps MobilePay",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/14/68/56f1ab751d80de876db97f210eb219248b3b9ca4a134764cffa969c9ebb8/zeroeventhub-0.2.4.tar.gz",
"platform": null,
"description": "# ZeroEventHub\n\nThis README file contains information specific to the Python port of the ZeroEventHub.\nPlease see the [main readme file](../../README.md) for an overview of what this project is about.\n\n## Client\n\nWe recommend that you store the latest checkpoint/cursor for each partition in the client's\ndatabase. Example of simple single-partition consumption. *Note about the example*:\n\n* Things starting with \"my\" is supplied by you\n* Things starting with \"their\" is supplied by the service you connect to\n\n```python\n>>> import zeroeventhub\n>>> import httpx\n>>> import asyncio\n>>> from typing import Sequence\n>>> from unittest.mock import MagicMock, Mock, PropertyMock\n\n>>> my_db = MagicMock()\n>>> my_person_event_repository = Mock()\n>>> my_person_event_repository.read_cursors_from_db.return_value = None\n\n# Step 1: Setup\n>>> their_partition_count = 1 # documented contract with server\n>>> their_service_url = \"https://localhost:8192/person/feed/v1\"\n>>> my_zeh_session = httpx.AsyncClient() # you can setup the authentication on the session\n>>> client = zeroeventhub.Client(their_service_url, their_partition_count, my_zeh_session)\n\n# Step 2: Load the cursors from last time we ran\n>>> cursors = my_person_event_repository.read_cursors_from_db()\n>>> if not cursors:\n... # we have never run before, so we can get all events with FIRST_CURSOR\n... # (if we just want to receive new events from now, we would use LAST_CURSOR)\n... cursors = [\n... zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR)\n... for partition_id in range(their_partition_count)\n... ]\n\n# Step 3: Enter listening loop...\n>>> my_still_want_to_read_events = PropertyMock(side_effect=[True, False])\n\n>>> async def poll_for_events(cursors: Sequence[zeroeventhub.Cursor]) -> None:\n... page_of_events = zeroeventhub.PageEventReceiver()\n... while my_still_want_to_read_events():\n... # Step 4: Use ZeroEventHub client to fetch the next page of events.\n... await zeroeventhub.receive_events(page_of_events,\n... client.fetch_events(cursors),\n... )\n...\n... # Step 5: Write the effect of changes to our own database and the updated\n... # cursor value in the same transaction.\n... with my_db.begin_transaction() as tx:\n... my_person_event_repository.write_effect_of_events_to_db(tx, page_of_events.events)\n... my_person_event_repository.write_cursors_to_db(tx, page_of_events.latest_checkpoints)\n... tx.commit()\n...\n... cursors = page_of_events.latest_checkpoints\n... page_of_events.clear()\n\n>>> asyncio.run(poll_for_events(cursors))\n\n```\n\n## Server\n\nThis library makes it easy to setup a zeroeventhub feed endpoint with FastAPI.\n\n```python\n>>> from typing import Annotated, Any, AsyncGenerator, Dict, Optional, Sequence\n>>> from fastapi import Depends, FastAPI, Request\n>>> from fastapi.responses import StreamingResponse\n>>> from zeroeventhub import (\n... Cursor,\n... DataReader,\n... ZeroEventHubFastApiHandler,\n... )\n>>> from unittest.mock import Mock\n\n>>> app = FastAPI()\n\n>>> PersonEventRepository = Mock\n\n>>> class PersonDataReader(DataReader):\n... def __init__(self, person_event_repository: PersonEventRepository) -> None:\n... self._person_event_repository = person_event_repository\n...\n... def get_data(\n... self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int]\n... ) -> AsyncGenerator[Dict[str, Any], Any]:\n... return (\n... self._person_event_repository.get_events_since(cursors[0].cursor)\n... .take(page_size)\n... .with_headers(headers)\n... )\n\n>>> def get_person_data_reader() -> PersonDataReader:\n... return PersonDataReader(PersonEventRepository())\n\n>>> PersonDataReaderDependency = Annotated[\n... PersonDataReader,\n... Depends(get_person_data_reader, use_cache=True),\n... ]\n\n>>> @app.get(\"person/feed/v1\")\n... async def feed(request: Request, person_data_reader: PersonDataReaderDependency) -> StreamingResponse:\n... api_handler = ZeroEventHubFastApiHandler(data_reader=person_data_reader, server_partition_count=1)\n... return api_handler.handle(request)\n\n```\n\n## Development\n\nTo run the test suite, assuming you already have Python 3.10 or later installed and on your `PATH`:\n```sh\npip install poetry==1.8.4\npoetry config virtualenvs.in-project true\npoetry install --sync\npoetry run coverage run --branch -m pytest\npoetry run coverage html\n```\n\nThen, you can open the `htmlcov/index.html` file in your browser to look at the code coverage report.\n\nAlso, to pass the CI checks, you may want to run the following before pushing your changes:\n\n```sh\npoetry run ruff format\npoetry run ruff check\npoetry run pyright\n```\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Broker-less event streaming over HTTP",
"version": "0.2.4",
"project_urls": {
"Homepage": "https://github.com/vippsas/zeroeventhub",
"Repository": "https://github.com/vippsas/zeroeventhub"
},
"split_keywords": [
"event-streaming"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "764b51460e0e317bb8689718dbe31a0ceb971175074bf4463551a429eff62095",
"md5": "3ba53901a98b5e9da2eaae2898f63a67",
"sha256": "b4bbb0b8339db58cd1aed910ec2b2ec4c02e869f5de41e496cc763485b1ab99e"
},
"downloads": -1,
"filename": "zeroeventhub-0.2.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "3ba53901a98b5e9da2eaae2898f63a67",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.13",
"size": 12587,
"upload_time": "2024-11-27T13:32:02",
"upload_time_iso_8601": "2024-11-27T13:32:02.196214Z",
"url": "https://files.pythonhosted.org/packages/76/4b/51460e0e317bb8689718dbe31a0ceb971175074bf4463551a429eff62095/zeroeventhub-0.2.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "146856f1ab751d80de876db97f210eb219248b3b9ca4a134764cffa969c9ebb8",
"md5": "4a21c33c434fac9a2fbb56213c1baa85",
"sha256": "59fb74e05aa6a97284b85867ab1128fcb8f3c86cd1a08d26148328e953e419b4"
},
"downloads": -1,
"filename": "zeroeventhub-0.2.4.tar.gz",
"has_sig": false,
"md5_digest": "4a21c33c434fac9a2fbb56213c1baa85",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.13",
"size": 12193,
"upload_time": "2024-11-27T13:32:03",
"upload_time_iso_8601": "2024-11-27T13:32:03.678772Z",
"url": "https://files.pythonhosted.org/packages/14/68/56f1ab751d80de876db97f210eb219248b3b9ca4a134764cffa969c9ebb8/zeroeventhub-0.2.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-27 13:32:03",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "vippsas",
"github_project": "zeroeventhub",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "zeroeventhub"
}