Name | async-py-bus JSON |
Version |
0.1.11
JSON |
| download |
home_page | None |
Summary | Async message bus framework designed for event-driven, cqrs python projects |
upload_time | 2025-01-28 16:10:03 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.11 |
license | None |
keywords |
async
bus
cqrs
edd
python
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# async-py-bus
The library is designed for asynchronous `event-driven` and `cqrs` Python projects,
has no third-party dependencies, and is useful for handling domain events, queries and commands.
See [more examples](https://github.com/andrei-samofalov/async-py-bus/tree/master/docs/examples) on
GitHub
## Basic usage
The core of the library is the `Dispatcher` class.
It handles three types of messages: `events`, `commands` and `queries`.
### Dispatcher initializing
You can use the default object from `pybus`
```python
from pybus import dispatcher as dp
```
Note that the query engine is not enabled in the default implementation.
Therefore, if your application uses queries you need to instantiate the `Dispatcher` class
with an additional parameter:
```python
from pybus import Dispatcher, RequestRouter
dp = Dispatcher(
queries_router_cls=RequestRouter,
)
```
### Handlers` signature
A basic handler is an asynchronous function that takes either one or zero arguments.
```python
async def handler_with_arg(event: EventType):
# do something with event...
async def handler_without_arg():
# do something
```
Additionally, a handler can accept any number of keyword arguments (how to pass them to the handler
is explained below). However, if the parameter expecting the message is strictly positional,
you must specify the `argname` parameter when registering the handler.
A handler can also be a class implementing the `HandlerProtocol` protocol or
an asynchronous `__call__` method.
The signature rules for the `handle` or `__call__` methods are the same as for a regular function.
```python
class HandlerProtocol(Protocol):
async def handle(self, message: MessageType, **kwargs) -> Any:
"""Handle message."""
async def add_event(self, event: EventType) -> None:
"""Add event to emit later."""
async def dump_events(self) -> list[EventType]:
"""Return list of collected events."""
```
If your class needs some initialization parameters, you can specify them during handler
registration as named argument pairs.
These arguments will be passed directly to the class’s `__init__` method
(make sure there are no strictly positional arguments).
```python
class CustomHandler:
def __init__(self, repo: RepoType):
self._repo = repo
async def __call__(self, cmd: CreateUserCommand):
user = await self._repo.create(cmd.data)
return UserCreated(data=user)
```
The same behavior applies if the handler is a simple function but takes more than one parameter
without default values.
```python
async def create_user_handler(cmd: CreateUserCommand, repo: RepoType):
user = await repo.create(cmd.data)
return UserCreated(data=user)
# main.py
user_repo = UserRepoImpl()
dp.commands.bind(CreateUserCommand, handler=create_user_handler, repo=user_repo)
```
You may notice that both the `create_user_handler` function and the `__call__` method of
the `CustomHandler` class return an event.
This allows you to pass new outgoing events to the dispatcher, which will forward them
to the appropriate handler.
In the case of a class implementing `HandlerProtocol`, additionally events can be added during
processing inside the `handle` method.
### Handlers binding
We’ve talked a lot about how to declare handlers; now let’s register them.
Currently, there are several ways to do this:
* Pass the handler to the dispatcher’s `register_<message>_handler` method,
where `<message>` is one of `event`, `command`, or `query`.
```python
dp.register_event_handler(UserCreated, create_user_handler)
```
* Use the `bind` method of one of the dispatcher’s routers (`dp.events`, `dp.commands`,
`dp.queries`).
```python
dp.events.bind(UserCreated, create_user_handler)
```
* Use the `register` decorator from one of the dispatcher’s routers (`dp.events`, `dp.commands`,
`dp.queries`).
```python
@dp.events.register(UserCreated)
async def create_user_handler(event): ...
```
### Dispatcher start
After registering the handlers, you just need to start the dispatcher:
```python
dp.start()
```
During this operation, the handler map will be finalized,
and you won’t be able to register new handlers. Please keep this in mind.
### Utils
For simple `dependency injection`, the library provides two classes: `Singleton` and `Factory`.
They use the `slice-syntax`: the name of the resource and the `callable` that retrieves it
(can be either synchronous or asynchronous).
In general, there are more convenient and optimized libraries for this task.
```python
def get_user_repo() -> UserRepo: ...
@dp.queries.register(UserQuery, repo=Singleton["repo": get_user_repo])
async def book_query_handler(query: UserQuery, repo: UserRepo) -> BookQueryResult: ...
```
###### The docs are being updated...
Raw data
{
"_id": null,
"home_page": null,
"name": "async-py-bus",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": "Andrei Samofalov <andrei.e.samofalov@gmail.com>",
"keywords": "async, bus, cqrs, edd, python",
"author": null,
"author_email": "Andrei Samofalov <andrei.e.samofalov@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/e6/70/7f4976dbc43e16b85ad14ba3084d0a1a9ddffbd08fcf9d497bc030f6f405/async_py_bus-0.1.11.tar.gz",
"platform": null,
"description": "# async-py-bus\n\nThe library is designed for asynchronous `event-driven` and `cqrs` Python projects,\nhas no third-party dependencies, and is useful for handling domain events, queries and commands.\n\nSee [more examples](https://github.com/andrei-samofalov/async-py-bus/tree/master/docs/examples) on\nGitHub\n\n## Basic usage\n\nThe core of the library is the `Dispatcher` class.\nIt handles three types of messages: `events`, `commands` and `queries`.\n\n### Dispatcher initializing\n\nYou can use the default object from `pybus`\n\n```python\nfrom pybus import dispatcher as dp\n```\n\nNote that the query engine is not enabled in the default implementation.\nTherefore, if your application uses queries you need to instantiate the `Dispatcher` class \nwith an additional parameter:\n\n```python\nfrom pybus import Dispatcher, RequestRouter\n\ndp = Dispatcher(\n queries_router_cls=RequestRouter,\n)\n```\n\n### Handlers` signature\n\nA basic handler is an asynchronous function that takes either one or zero arguments.\n\n```python\nasync def handler_with_arg(event: EventType):\n # do something with event...\n\nasync def handler_without_arg():\n # do something\n```\n\nAdditionally, a handler can accept any number of keyword arguments (how to pass them to the handler\nis explained below). However, if the parameter expecting the message is strictly positional,\nyou must specify the `argname` parameter when registering the handler.\n\nA handler can also be a class implementing the `HandlerProtocol` protocol or\nan asynchronous `__call__` method.\nThe signature rules for the `handle` or `__call__` methods are the same as for a regular function.\n\n```python\nclass HandlerProtocol(Protocol):\n\n async def handle(self, message: MessageType, **kwargs) -> Any:\n \"\"\"Handle message.\"\"\"\n\n async def add_event(self, event: EventType) -> None:\n \"\"\"Add event to emit later.\"\"\"\n\n async def dump_events(self) -> list[EventType]:\n \"\"\"Return list of collected events.\"\"\"\n```\n\nIf your class needs some initialization parameters, you can specify them during handler\nregistration as named argument pairs.\nThese arguments will be passed directly to the class\u2019s `__init__` method\n(make sure there are no strictly positional arguments).\n\n```python\nclass CustomHandler:\n def __init__(self, repo: RepoType):\n self._repo = repo\n\n async def __call__(self, cmd: CreateUserCommand):\n user = await self._repo.create(cmd.data)\n return UserCreated(data=user)\n```\n\nThe same behavior applies if the handler is a simple function but takes more than one parameter\nwithout default values.\n\n```python\nasync def create_user_handler(cmd: CreateUserCommand, repo: RepoType):\n user = await repo.create(cmd.data)\n return UserCreated(data=user)\n\n\n# main.py\nuser_repo = UserRepoImpl()\ndp.commands.bind(CreateUserCommand, handler=create_user_handler, repo=user_repo)\n```\n\nYou may notice that both the `create_user_handler` function and the `__call__` method of\nthe `CustomHandler` class return an event.\nThis allows you to pass new outgoing events to the dispatcher, which will forward them\nto the appropriate handler.\nIn the case of a class implementing `HandlerProtocol`, additionally events can be added during \nprocessing inside the `handle` method.\n\n### Handlers binding\n\nWe\u2019ve talked a lot about how to declare handlers; now let\u2019s register them.\n\nCurrently, there are several ways to do this:\n\n* Pass the handler to the dispatcher\u2019s `register_<message>_handler` method,\n where `<message>` is one of `event`, `command`, or `query`.\n ```python\n dp.register_event_handler(UserCreated, create_user_handler)\n ```\n* Use the `bind` method of one of the dispatcher\u2019s routers (`dp.events`, `dp.commands`,\n `dp.queries`).\n ```python\n dp.events.bind(UserCreated, create_user_handler)\n ```\n* Use the `register` decorator from one of the dispatcher\u2019s routers (`dp.events`, `dp.commands`,\n `dp.queries`).\n ```python\n @dp.events.register(UserCreated)\n async def create_user_handler(event): ...\n ```\n\n### Dispatcher start\n\nAfter registering the handlers, you just need to start the dispatcher:\n\n```python\ndp.start()\n```\n\nDuring this operation, the handler map will be finalized,\nand you won\u2019t be able to register new handlers. Please keep this in mind.\n\n### Utils\n\nFor simple `dependency injection`, the library provides two classes: `Singleton` and `Factory`.\nThey use the `slice-syntax`: the name of the resource and the `callable` that retrieves it\n(can be either synchronous or asynchronous).\n\nIn general, there are more convenient and optimized libraries for this task.\n\n```python\ndef get_user_repo() -> UserRepo: ...\n\n\n@dp.queries.register(UserQuery, repo=Singleton[\"repo\": get_user_repo])\nasync def book_query_handler(query: UserQuery, repo: UserRepo) -> BookQueryResult: ...\n```\n\n###### The docs are being updated...\n",
"bugtrack_url": null,
"license": null,
"summary": "Async message bus framework designed for event-driven, cqrs python projects",
"version": "0.1.11",
"project_urls": {
"Issues": "https://github.com/andrei-samofalov/async-py-bus/issues",
"Repository": "https://github.com/andrei-samofalov/async-py-bus"
},
"split_keywords": [
"async",
" bus",
" cqrs",
" edd",
" python"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "f47acd7c0c4d8216d4780743c7aac45ad7f47c7c94eab44c86e9840f037bf386",
"md5": "cbaca1b301359d100a8f3c26075e1b50",
"sha256": "5248be69c64e45a9581c6009012f1171e391b44e79147ac6c3d4c72165365059"
},
"downloads": -1,
"filename": "async_py_bus-0.1.11-py3-none-any.whl",
"has_sig": false,
"md5_digest": "cbaca1b301359d100a8f3c26075e1b50",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 22234,
"upload_time": "2025-01-28T16:10:01",
"upload_time_iso_8601": "2025-01-28T16:10:01.861380Z",
"url": "https://files.pythonhosted.org/packages/f4/7a/cd7c0c4d8216d4780743c7aac45ad7f47c7c94eab44c86e9840f037bf386/async_py_bus-0.1.11-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "e6707f4976dbc43e16b85ad14ba3084d0a1a9ddffbd08fcf9d497bc030f6f405",
"md5": "75f4ff646d2a36508537233e74cfba7b",
"sha256": "81def1fcbaea5b38e1634d8a89d4732c43580937e02aaec24422f1728f01fdd1"
},
"downloads": -1,
"filename": "async_py_bus-0.1.11.tar.gz",
"has_sig": false,
"md5_digest": "75f4ff646d2a36508537233e74cfba7b",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 22875,
"upload_time": "2025-01-28T16:10:03",
"upload_time_iso_8601": "2025-01-28T16:10:03.290660Z",
"url": "https://files.pythonhosted.org/packages/e6/70/7f4976dbc43e16b85ad14ba3084d0a1a9ddffbd08fcf9d497bc030f6f405/async_py_bus-0.1.11.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-28 16:10:03",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "andrei-samofalov",
"github_project": "async-py-bus",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "async-py-bus"
}