zeroeventhub


Namezeroeventhub JSON
Version 0.2.4 PyPI version JSON
download
home_pagehttps://github.com/vippsas/zeroeventhub
SummaryBroker-less event streaming over HTTP
upload_time2024-11-27 13:32:03
maintainerNone
docs_urlNone
authorVipps MobilePay
requires_python<4.0,>=3.13
licenseMIT
keywords event-streaming
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # ZeroEventHub

This README file contains information specific to the Python port of the ZeroEventHub.
Please see the [main readme file](../../README.md) for an overview of what this project is about.

## Client

We recommend that you store the latest checkpoint/cursor for each partition in the client's
database. Example of simple single-partition consumption. *Note about the example*:

* Things starting with "my" is supplied by you
* Things starting with "their" is supplied by the service you connect to

```python
>>> import zeroeventhub
>>> import httpx
>>> import asyncio
>>> from typing import Sequence
>>> from unittest.mock import MagicMock, Mock, PropertyMock

>>> my_db = MagicMock()
>>> my_person_event_repository = Mock()
>>> my_person_event_repository.read_cursors_from_db.return_value = None

# Step 1: Setup
>>> their_partition_count = 1 # documented contract with server
>>> their_service_url = "https://localhost:8192/person/feed/v1"
>>> my_zeh_session = httpx.AsyncClient() # you can setup the authentication on the session
>>> client = zeroeventhub.Client(their_service_url, their_partition_count, my_zeh_session)

# Step 2: Load the cursors from last time we ran
>>> cursors = my_person_event_repository.read_cursors_from_db()
>>> if not cursors:
...     # we have never run before, so we can get all events with FIRST_CURSOR
...     # (if we just want to receive new events from now, we would use LAST_CURSOR)
...     cursors = [
...         zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR)
...         for partition_id in range(their_partition_count)
...     ]

# Step 3: Enter listening loop...
>>> my_still_want_to_read_events = PropertyMock(side_effect=[True, False])

>>> async def poll_for_events(cursors: Sequence[zeroeventhub.Cursor]) -> None:
...     page_of_events = zeroeventhub.PageEventReceiver()
...     while my_still_want_to_read_events():
...         # Step 4: Use ZeroEventHub client to fetch the next page of events.
...         await zeroeventhub.receive_events(page_of_events,
...             client.fetch_events(cursors),
...         )
...
...         # Step 5: Write the effect of changes to our own database and the updated
...         #         cursor value in the same transaction.
...         with my_db.begin_transaction() as tx:
...             my_person_event_repository.write_effect_of_events_to_db(tx, page_of_events.events)
...             my_person_event_repository.write_cursors_to_db(tx, page_of_events.latest_checkpoints)
...             tx.commit()
...
...         cursors = page_of_events.latest_checkpoints
...         page_of_events.clear()

>>> asyncio.run(poll_for_events(cursors))

```

## Server

This library makes it easy to setup a zeroeventhub feed endpoint with FastAPI.

```python
>>> from typing import Annotated, Any, AsyncGenerator, Dict, Optional, Sequence
>>> from fastapi import Depends, FastAPI, Request
>>> from fastapi.responses import StreamingResponse
>>> from zeroeventhub import (
...     Cursor,
...     DataReader,
...     ZeroEventHubFastApiHandler,
... )
>>> from unittest.mock import Mock

>>> app = FastAPI()

>>> PersonEventRepository = Mock

>>> class PersonDataReader(DataReader):
...     def __init__(self, person_event_repository: PersonEventRepository) -> None:
...         self._person_event_repository = person_event_repository
...
...     def get_data(
...         self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int]
...     ) -> AsyncGenerator[Dict[str, Any], Any]:
...         return (
...             self._person_event_repository.get_events_since(cursors[0].cursor)
...             .take(page_size)
...             .with_headers(headers)
...         )

>>> def get_person_data_reader() -> PersonDataReader:
...     return PersonDataReader(PersonEventRepository())

>>> PersonDataReaderDependency = Annotated[
...    PersonDataReader,
...    Depends(get_person_data_reader, use_cache=True),
... ]

>>> @app.get("person/feed/v1")
... async def feed(request: Request, person_data_reader: PersonDataReaderDependency) -> StreamingResponse:
...     api_handler = ZeroEventHubFastApiHandler(data_reader=person_data_reader, server_partition_count=1)
...     return api_handler.handle(request)

```

## Development

To run the test suite, assuming you already have Python 3.10 or later installed and on your `PATH`:
```sh
pip install poetry==1.8.4
poetry config virtualenvs.in-project true
poetry install --sync
poetry run coverage run --branch -m pytest
poetry run coverage html
```

Then, you can open the `htmlcov/index.html` file in your browser to look at the code coverage report.

Also, to pass the CI checks, you may want to run the following before pushing your changes:

```sh
poetry run ruff format
poetry run ruff check
poetry run pyright
```


            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/vippsas/zeroeventhub",
    "name": "zeroeventhub",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.13",
    "maintainer_email": null,
    "keywords": "event-streaming",
    "author": "Vipps MobilePay",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/14/68/56f1ab751d80de876db97f210eb219248b3b9ca4a134764cffa969c9ebb8/zeroeventhub-0.2.4.tar.gz",
    "platform": null,
    "description": "# ZeroEventHub\n\nThis README file contains information specific to the Python port of the ZeroEventHub.\nPlease see the [main readme file](../../README.md) for an overview of what this project is about.\n\n## Client\n\nWe recommend that you store the latest checkpoint/cursor for each partition in the client's\ndatabase. Example of simple single-partition consumption. *Note about the example*:\n\n* Things starting with \"my\" is supplied by you\n* Things starting with \"their\" is supplied by the service you connect to\n\n```python\n>>> import zeroeventhub\n>>> import httpx\n>>> import asyncio\n>>> from typing import Sequence\n>>> from unittest.mock import MagicMock, Mock, PropertyMock\n\n>>> my_db = MagicMock()\n>>> my_person_event_repository = Mock()\n>>> my_person_event_repository.read_cursors_from_db.return_value = None\n\n# Step 1: Setup\n>>> their_partition_count = 1 # documented contract with server\n>>> their_service_url = \"https://localhost:8192/person/feed/v1\"\n>>> my_zeh_session = httpx.AsyncClient() # you can setup the authentication on the session\n>>> client = zeroeventhub.Client(their_service_url, their_partition_count, my_zeh_session)\n\n# Step 2: Load the cursors from last time we ran\n>>> cursors = my_person_event_repository.read_cursors_from_db()\n>>> if not cursors:\n...     # we have never run before, so we can get all events with FIRST_CURSOR\n...     # (if we just want to receive new events from now, we would use LAST_CURSOR)\n...     cursors = [\n...         zeroeventhub.Cursor(partition_id, zeroeventhub.FIRST_CURSOR)\n...         for partition_id in range(their_partition_count)\n...     ]\n\n# Step 3: Enter listening loop...\n>>> my_still_want_to_read_events = PropertyMock(side_effect=[True, False])\n\n>>> async def poll_for_events(cursors: Sequence[zeroeventhub.Cursor]) -> None:\n...     page_of_events = zeroeventhub.PageEventReceiver()\n...     while my_still_want_to_read_events():\n...         # Step 4: Use ZeroEventHub client to fetch the next page of events.\n...         await zeroeventhub.receive_events(page_of_events,\n...             client.fetch_events(cursors),\n...         )\n...\n...         # Step 5: Write the effect of changes to our own database and the updated\n...         #         cursor value in the same transaction.\n...         with my_db.begin_transaction() as tx:\n...             my_person_event_repository.write_effect_of_events_to_db(tx, page_of_events.events)\n...             my_person_event_repository.write_cursors_to_db(tx, page_of_events.latest_checkpoints)\n...             tx.commit()\n...\n...         cursors = page_of_events.latest_checkpoints\n...         page_of_events.clear()\n\n>>> asyncio.run(poll_for_events(cursors))\n\n```\n\n## Server\n\nThis library makes it easy to setup a zeroeventhub feed endpoint with FastAPI.\n\n```python\n>>> from typing import Annotated, Any, AsyncGenerator, Dict, Optional, Sequence\n>>> from fastapi import Depends, FastAPI, Request\n>>> from fastapi.responses import StreamingResponse\n>>> from zeroeventhub import (\n...     Cursor,\n...     DataReader,\n...     ZeroEventHubFastApiHandler,\n... )\n>>> from unittest.mock import Mock\n\n>>> app = FastAPI()\n\n>>> PersonEventRepository = Mock\n\n>>> class PersonDataReader(DataReader):\n...     def __init__(self, person_event_repository: PersonEventRepository) -> None:\n...         self._person_event_repository = person_event_repository\n...\n...     def get_data(\n...         self, cursors: Sequence[Cursor], headers: Optional[Sequence[str]], page_size: Optional[int]\n...     ) -> AsyncGenerator[Dict[str, Any], Any]:\n...         return (\n...             self._person_event_repository.get_events_since(cursors[0].cursor)\n...             .take(page_size)\n...             .with_headers(headers)\n...         )\n\n>>> def get_person_data_reader() -> PersonDataReader:\n...     return PersonDataReader(PersonEventRepository())\n\n>>> PersonDataReaderDependency = Annotated[\n...    PersonDataReader,\n...    Depends(get_person_data_reader, use_cache=True),\n... ]\n\n>>> @app.get(\"person/feed/v1\")\n... async def feed(request: Request, person_data_reader: PersonDataReaderDependency) -> StreamingResponse:\n...     api_handler = ZeroEventHubFastApiHandler(data_reader=person_data_reader, server_partition_count=1)\n...     return api_handler.handle(request)\n\n```\n\n## Development\n\nTo run the test suite, assuming you already have Python 3.10 or later installed and on your `PATH`:\n```sh\npip install poetry==1.8.4\npoetry config virtualenvs.in-project true\npoetry install --sync\npoetry run coverage run --branch -m pytest\npoetry run coverage html\n```\n\nThen, you can open the `htmlcov/index.html` file in your browser to look at the code coverage report.\n\nAlso, to pass the CI checks, you may want to run the following before pushing your changes:\n\n```sh\npoetry run ruff format\npoetry run ruff check\npoetry run pyright\n```\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Broker-less event streaming over HTTP",
    "version": "0.2.4",
    "project_urls": {
        "Homepage": "https://github.com/vippsas/zeroeventhub",
        "Repository": "https://github.com/vippsas/zeroeventhub"
    },
    "split_keywords": [
        "event-streaming"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "764b51460e0e317bb8689718dbe31a0ceb971175074bf4463551a429eff62095",
                "md5": "3ba53901a98b5e9da2eaae2898f63a67",
                "sha256": "b4bbb0b8339db58cd1aed910ec2b2ec4c02e869f5de41e496cc763485b1ab99e"
            },
            "downloads": -1,
            "filename": "zeroeventhub-0.2.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3ba53901a98b5e9da2eaae2898f63a67",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.13",
            "size": 12587,
            "upload_time": "2024-11-27T13:32:02",
            "upload_time_iso_8601": "2024-11-27T13:32:02.196214Z",
            "url": "https://files.pythonhosted.org/packages/76/4b/51460e0e317bb8689718dbe31a0ceb971175074bf4463551a429eff62095/zeroeventhub-0.2.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "146856f1ab751d80de876db97f210eb219248b3b9ca4a134764cffa969c9ebb8",
                "md5": "4a21c33c434fac9a2fbb56213c1baa85",
                "sha256": "59fb74e05aa6a97284b85867ab1128fcb8f3c86cd1a08d26148328e953e419b4"
            },
            "downloads": -1,
            "filename": "zeroeventhub-0.2.4.tar.gz",
            "has_sig": false,
            "md5_digest": "4a21c33c434fac9a2fbb56213c1baa85",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.13",
            "size": 12193,
            "upload_time": "2024-11-27T13:32:03",
            "upload_time_iso_8601": "2024-11-27T13:32:03.678772Z",
            "url": "https://files.pythonhosted.org/packages/14/68/56f1ab751d80de876db97f210eb219248b3b9ca4a134764cffa969c9ebb8/zeroeventhub-0.2.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-27 13:32:03",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "vippsas",
    "github_project": "zeroeventhub",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "zeroeventhub"
}
        
Elapsed time: 1.56341s