async-py-bus


Nameasync-py-bus JSON
Version 0.1.11 PyPI version JSON
download
home_pageNone
SummaryAsync message bus framework designed for event-driven, cqrs python projects
upload_time2025-01-28 16:10:03
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseNone
keywords async bus cqrs edd python
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # async-py-bus

The library is designed for asynchronous `event-driven` and `cqrs` Python projects,
has no third-party dependencies, and is useful for handling domain events, queries and commands.

See [more examples](https://github.com/andrei-samofalov/async-py-bus/tree/master/docs/examples) on
GitHub

## Basic usage

The core of the library is the `Dispatcher` class.
It handles three types of messages: `events`, `commands` and `queries`.

### Dispatcher initializing

You can use the default object from `pybus`

```python
from pybus import dispatcher as dp
```

Note that the query engine is not enabled in the default implementation.
Therefore, if your application uses queries you need to instantiate the `Dispatcher` class 
with an additional parameter:

```python
from pybus import Dispatcher, RequestRouter

dp = Dispatcher(
    queries_router_cls=RequestRouter,
)
```

### Handlers` signature

A basic handler is an asynchronous function that takes either one or zero arguments.

```python
async def handler_with_arg(event: EventType):
    # do something with event...

async def handler_without_arg():
    # do something
```

Additionally, a handler can accept any number of keyword arguments (how to pass them to the handler
is explained below). However, if the parameter expecting the message is strictly positional,
you must specify the `argname` parameter when registering the handler.

A handler can also be a class implementing the `HandlerProtocol` protocol or
an asynchronous `__call__` method.
The signature rules for the `handle` or `__call__` methods are the same as for a regular function.

```python
class HandlerProtocol(Protocol):

    async def handle(self, message: MessageType, **kwargs) -> Any:
        """Handle message."""

    async def add_event(self, event: EventType) -> None:
        """Add event to emit later."""

    async def dump_events(self) -> list[EventType]:
        """Return list of collected events."""
```

If your class needs some initialization parameters, you can specify them during handler
registration as named argument pairs.
These arguments will be passed directly to the class’s `__init__` method
(make sure there are no strictly positional arguments).

```python
class CustomHandler:
    def __init__(self, repo: RepoType):
        self._repo = repo

    async def __call__(self, cmd: CreateUserCommand):
        user = await self._repo.create(cmd.data)
        return UserCreated(data=user)
```

The same behavior applies if the handler is a simple function but takes more than one parameter
without default values.

```python
async def create_user_handler(cmd: CreateUserCommand, repo: RepoType):
    user = await repo.create(cmd.data)
    return UserCreated(data=user)


# main.py
user_repo = UserRepoImpl()
dp.commands.bind(CreateUserCommand, handler=create_user_handler, repo=user_repo)
```

You may notice that both the `create_user_handler` function and the `__call__` method of
the `CustomHandler` class return an event.
This allows you to pass new outgoing events to the dispatcher, which will forward them
to the appropriate handler.
In the case of a class implementing `HandlerProtocol`, additionally events can be added during 
processing inside the `handle` method.

### Handlers binding

We’ve talked a lot about how to declare handlers; now let’s register them.

Currently, there are several ways to do this:

* Pass the handler to the dispatcher’s `register_<message>_handler` method,
  where `<message>` is one of `event`, `command`, or `query`.
  ```python
  dp.register_event_handler(UserCreated, create_user_handler)
  ```
* Use the `bind` method of one of the dispatcher’s routers (`dp.events`, `dp.commands`,
  `dp.queries`).
  ```python
  dp.events.bind(UserCreated, create_user_handler)
  ```
* Use the `register` decorator from one of the dispatcher’s routers (`dp.events`, `dp.commands`,
  `dp.queries`).
  ```python
  @dp.events.register(UserCreated)
  async def create_user_handler(event): ...
  ```

### Dispatcher start

After registering the handlers, you just need to start the dispatcher:

```python
dp.start()
```

During this operation, the handler map will be finalized,
and you won’t be able to register new handlers. Please keep this in mind.

### Utils

For simple `dependency injection`, the library provides two classes: `Singleton` and `Factory`.
They use the `slice-syntax`: the name of the resource and the `callable` that retrieves it
(can be either synchronous or asynchronous).

In general, there are more convenient and optimized libraries for this task.

```python
def get_user_repo() -> UserRepo: ...


@dp.queries.register(UserQuery, repo=Singleton["repo": get_user_repo])
async def book_query_handler(query: UserQuery, repo: UserRepo) -> BookQueryResult: ...
```

###### The docs are being updated...

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "async-py-bus",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": "Andrei Samofalov <andrei.e.samofalov@gmail.com>",
    "keywords": "async, bus, cqrs, edd, python",
    "author": null,
    "author_email": "Andrei Samofalov <andrei.e.samofalov@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/e6/70/7f4976dbc43e16b85ad14ba3084d0a1a9ddffbd08fcf9d497bc030f6f405/async_py_bus-0.1.11.tar.gz",
    "platform": null,
    "description": "# async-py-bus\n\nThe library is designed for asynchronous `event-driven` and `cqrs` Python projects,\nhas no third-party dependencies, and is useful for handling domain events, queries and commands.\n\nSee [more examples](https://github.com/andrei-samofalov/async-py-bus/tree/master/docs/examples) on\nGitHub\n\n## Basic usage\n\nThe core of the library is the `Dispatcher` class.\nIt handles three types of messages: `events`, `commands` and `queries`.\n\n### Dispatcher initializing\n\nYou can use the default object from `pybus`\n\n```python\nfrom pybus import dispatcher as dp\n```\n\nNote that the query engine is not enabled in the default implementation.\nTherefore, if your application uses queries you need to instantiate the `Dispatcher` class \nwith an additional parameter:\n\n```python\nfrom pybus import Dispatcher, RequestRouter\n\ndp = Dispatcher(\n    queries_router_cls=RequestRouter,\n)\n```\n\n### Handlers` signature\n\nA basic handler is an asynchronous function that takes either one or zero arguments.\n\n```python\nasync def handler_with_arg(event: EventType):\n    # do something with event...\n\nasync def handler_without_arg():\n    # do something\n```\n\nAdditionally, a handler can accept any number of keyword arguments (how to pass them to the handler\nis explained below). However, if the parameter expecting the message is strictly positional,\nyou must specify the `argname` parameter when registering the handler.\n\nA handler can also be a class implementing the `HandlerProtocol` protocol or\nan asynchronous `__call__` method.\nThe signature rules for the `handle` or `__call__` methods are the same as for a regular function.\n\n```python\nclass HandlerProtocol(Protocol):\n\n    async def handle(self, message: MessageType, **kwargs) -> Any:\n        \"\"\"Handle message.\"\"\"\n\n    async def add_event(self, event: EventType) -> None:\n        \"\"\"Add event to emit later.\"\"\"\n\n    async def dump_events(self) -> list[EventType]:\n        \"\"\"Return list of collected events.\"\"\"\n```\n\nIf your class needs some initialization parameters, you can specify them during handler\nregistration as named argument pairs.\nThese arguments will be passed directly to the class\u2019s `__init__` method\n(make sure there are no strictly positional arguments).\n\n```python\nclass CustomHandler:\n    def __init__(self, repo: RepoType):\n        self._repo = repo\n\n    async def __call__(self, cmd: CreateUserCommand):\n        user = await self._repo.create(cmd.data)\n        return UserCreated(data=user)\n```\n\nThe same behavior applies if the handler is a simple function but takes more than one parameter\nwithout default values.\n\n```python\nasync def create_user_handler(cmd: CreateUserCommand, repo: RepoType):\n    user = await repo.create(cmd.data)\n    return UserCreated(data=user)\n\n\n# main.py\nuser_repo = UserRepoImpl()\ndp.commands.bind(CreateUserCommand, handler=create_user_handler, repo=user_repo)\n```\n\nYou may notice that both the `create_user_handler` function and the `__call__` method of\nthe `CustomHandler` class return an event.\nThis allows you to pass new outgoing events to the dispatcher, which will forward them\nto the appropriate handler.\nIn the case of a class implementing `HandlerProtocol`, additionally events can be added during \nprocessing inside the `handle` method.\n\n### Handlers binding\n\nWe\u2019ve talked a lot about how to declare handlers; now let\u2019s register them.\n\nCurrently, there are several ways to do this:\n\n* Pass the handler to the dispatcher\u2019s `register_<message>_handler` method,\n  where `<message>` is one of `event`, `command`, or `query`.\n  ```python\n  dp.register_event_handler(UserCreated, create_user_handler)\n  ```\n* Use the `bind` method of one of the dispatcher\u2019s routers (`dp.events`, `dp.commands`,\n  `dp.queries`).\n  ```python\n  dp.events.bind(UserCreated, create_user_handler)\n  ```\n* Use the `register` decorator from one of the dispatcher\u2019s routers (`dp.events`, `dp.commands`,\n  `dp.queries`).\n  ```python\n  @dp.events.register(UserCreated)\n  async def create_user_handler(event): ...\n  ```\n\n### Dispatcher start\n\nAfter registering the handlers, you just need to start the dispatcher:\n\n```python\ndp.start()\n```\n\nDuring this operation, the handler map will be finalized,\nand you won\u2019t be able to register new handlers. Please keep this in mind.\n\n### Utils\n\nFor simple `dependency injection`, the library provides two classes: `Singleton` and `Factory`.\nThey use the `slice-syntax`: the name of the resource and the `callable` that retrieves it\n(can be either synchronous or asynchronous).\n\nIn general, there are more convenient and optimized libraries for this task.\n\n```python\ndef get_user_repo() -> UserRepo: ...\n\n\n@dp.queries.register(UserQuery, repo=Singleton[\"repo\": get_user_repo])\nasync def book_query_handler(query: UserQuery, repo: UserRepo) -> BookQueryResult: ...\n```\n\n###### The docs are being updated...\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Async message bus framework designed for event-driven, cqrs python projects",
    "version": "0.1.11",
    "project_urls": {
        "Issues": "https://github.com/andrei-samofalov/async-py-bus/issues",
        "Repository": "https://github.com/andrei-samofalov/async-py-bus"
    },
    "split_keywords": [
        "async",
        " bus",
        " cqrs",
        " edd",
        " python"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f47acd7c0c4d8216d4780743c7aac45ad7f47c7c94eab44c86e9840f037bf386",
                "md5": "cbaca1b301359d100a8f3c26075e1b50",
                "sha256": "5248be69c64e45a9581c6009012f1171e391b44e79147ac6c3d4c72165365059"
            },
            "downloads": -1,
            "filename": "async_py_bus-0.1.11-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "cbaca1b301359d100a8f3c26075e1b50",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 22234,
            "upload_time": "2025-01-28T16:10:01",
            "upload_time_iso_8601": "2025-01-28T16:10:01.861380Z",
            "url": "https://files.pythonhosted.org/packages/f4/7a/cd7c0c4d8216d4780743c7aac45ad7f47c7c94eab44c86e9840f037bf386/async_py_bus-0.1.11-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "e6707f4976dbc43e16b85ad14ba3084d0a1a9ddffbd08fcf9d497bc030f6f405",
                "md5": "75f4ff646d2a36508537233e74cfba7b",
                "sha256": "81def1fcbaea5b38e1634d8a89d4732c43580937e02aaec24422f1728f01fdd1"
            },
            "downloads": -1,
            "filename": "async_py_bus-0.1.11.tar.gz",
            "has_sig": false,
            "md5_digest": "75f4ff646d2a36508537233e74cfba7b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 22875,
            "upload_time": "2025-01-28T16:10:03",
            "upload_time_iso_8601": "2025-01-28T16:10:03.290660Z",
            "url": "https://files.pythonhosted.org/packages/e6/70/7f4976dbc43e16b85ad14ba3084d0a1a9ddffbd08fcf9d497bc030f6f405/async_py_bus-0.1.11.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-28 16:10:03",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "andrei-samofalov",
    "github_project": "async-py-bus",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "async-py-bus"
}
        
Elapsed time: 0.44521s