# Meator
[![image](https://img.shields.io/pypi/v/meator.svg)](https://pypi.python.org/pypi/meator)
<a href="http://mypy-lang.org/" target="_blank"><img src="https://img.shields.io/badge/mypy-checked-1F5082.svg" alt="Mypy checked"></a>
[![codecov](https://codecov.io/gh/likeinlife/cqrs_mediator/graph/badge.svg?token=BVDRFQ61R6)](https://codecov.io/gh/likeinlife/cqrs_mediator)
[![image](https://img.shields.io/pypi/l/meator.svg)](https://github.com/likeinlife/cqrs_mediator/blob/main/LICENSE)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[![image](https://img.shields.io/pypi/pyversions/meator.svg)](https://pypi.python.org/pypi/meator)
Python CQRS pattern implementation.
[**Docs**](https://github.com/likeinlife/cqrs_mediator/wiki)
# Installation
```shell
pip install meator
```
# Available
- Dispatchers:
- Command
- Query
- Observers:
- Event
- Entities:
- Command
- Event
- Query
- Middlewares
# Usecases
## Command/Event/Query
```python
from dataclasses import dataclass
from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command
from meator.interfaces import CommandHandler
@dataclass
class IntCommand(Command[int]):
answer: int
class IntCommandHandler(CommandHandler[IntCommand, int]):
async def __call__(self, request: IntCommand) -> int:
return request.answer
async def main():
c = CommandDispatcherImpl()
c.register(IntCommand, IntCommandHandler())
await c.handle(IntCommand(1))
```
## Middleware
```python
from dataclasses import dataclass
from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command, Request
from meator.interfaces import CommandHandler, Handler, Middleware
class SimpleMiddleware(Middleware):
async def __call__(self, call_next: Handler, request: Request):
return await call_next(request)
@dataclass
class IntCommand(Command[int]):
answer: int
class IntCommandHandler(CommandHandler[IntCommand, int]):
async def __call__(self, request: IntCommand) -> int:
return request.answer
async def main():
c = CommandDispatcherImpl(middlewares=[SimpleMiddleware()])
c.register(IntCommand, IntCommandHandler())
await c.handle(IntCommand(1))
```
# Tests
- `pytest tests`
# Additional
Inspired by [didator](https://github.com/SamWarden/didiator)
Raw data
{
"_id": null,
"home_page": "https://pypi.org/project/meator/",
"name": "meator",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.11",
"maintainer_email": null,
"keywords": "dispatcher, command, event, query, mediator, cqrs",
"author": "likeinlife",
"author_email": "likeinlife@outlook.com",
"download_url": "https://files.pythonhosted.org/packages/16/32/c33cd5516f8bbb0ccd20308a242326230057a690acd9f3fa2904e67fe7df/meator-2.0.1.tar.gz",
"platform": null,
"description": "# Meator\n\n[![image](https://img.shields.io/pypi/v/meator.svg)](https://pypi.python.org/pypi/meator)\n<a href=\"http://mypy-lang.org/\" target=\"_blank\"><img src=\"https://img.shields.io/badge/mypy-checked-1F5082.svg\" alt=\"Mypy checked\"></a>\n[![codecov](https://codecov.io/gh/likeinlife/cqrs_mediator/graph/badge.svg?token=BVDRFQ61R6)](https://codecov.io/gh/likeinlife/cqrs_mediator)\n[![image](https://img.shields.io/pypi/l/meator.svg)](https://github.com/likeinlife/cqrs_mediator/blob/main/LICENSE)\n[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)\n[![image](https://img.shields.io/pypi/pyversions/meator.svg)](https://pypi.python.org/pypi/meator)\n\nPython CQRS pattern implementation.\n\n[**Docs**](https://github.com/likeinlife/cqrs_mediator/wiki)\n\n# Installation\n\n```shell\npip install meator\n```\n\n# Available\n\n- Dispatchers:\n - Command\n - Query\n- Observers:\n - Event\n- Entities:\n - Command\n - Event\n - Query\n- Middlewares\n\n# Usecases\n\n## Command/Event/Query\n\n```python\nfrom dataclasses import dataclass\n\nfrom meator.dispatchers import CommandDispatcherImpl\nfrom meator.entities import Command\nfrom meator.interfaces import CommandHandler\n\n\n@dataclass\nclass IntCommand(Command[int]):\n answer: int\n\n\nclass IntCommandHandler(CommandHandler[IntCommand, int]):\n async def __call__(self, request: IntCommand) -> int:\n return request.answer\n\n\nasync def main():\n c = CommandDispatcherImpl()\n\n c.register(IntCommand, IntCommandHandler())\n\n await c.handle(IntCommand(1))\n\n```\n\n## Middleware\n\n```python\nfrom dataclasses import dataclass\n\nfrom meator.dispatchers import CommandDispatcherImpl\nfrom meator.entities import Command, Request\nfrom meator.interfaces import CommandHandler, Handler, Middleware\n\n\nclass SimpleMiddleware(Middleware):\n async def __call__(self, call_next: Handler, request: Request):\n return await call_next(request)\n\n\n@dataclass\nclass IntCommand(Command[int]):\n answer: int\n\n\nclass IntCommandHandler(CommandHandler[IntCommand, int]):\n async def __call__(self, request: IntCommand) -> int:\n return request.answer\n\n\nasync def main():\n c = CommandDispatcherImpl(middlewares=[SimpleMiddleware()])\n\n c.register(IntCommand, IntCommandHandler())\n\n await c.handle(IntCommand(1))\n```\n# Tests\n\n- `pytest tests`\n\n# Additional\n\nInspired by [didator](https://github.com/SamWarden/didiator)\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Mediator pattern impl",
"version": "2.0.1",
"project_urls": {
"Documentation": "https://github.com/likeinlife/cqrs_mediator/wiki",
"Homepage": "https://pypi.org/project/meator/",
"Repository": "https://github.com/likeinlife/cqrs_mediator"
},
"split_keywords": [
"dispatcher",
" command",
" event",
" query",
" mediator",
" cqrs"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "be5e2fee9a3432c2a72721927d4373d729655582a855274bf3f7cdead70295ef",
"md5": "8ad9cad259d95ad040dcdcbe27ddc155",
"sha256": "a86851c8248830fd96f199382a16ceda3088c0903be645a6675ebbfcf2cab796"
},
"downloads": -1,
"filename": "meator-2.0.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "8ad9cad259d95ad040dcdcbe27ddc155",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.11",
"size": 14065,
"upload_time": "2024-08-26T12:23:16",
"upload_time_iso_8601": "2024-08-26T12:23:16.987328Z",
"url": "https://files.pythonhosted.org/packages/be/5e/2fee9a3432c2a72721927d4373d729655582a855274bf3f7cdead70295ef/meator-2.0.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "1632c33cd5516f8bbb0ccd20308a242326230057a690acd9f3fa2904e67fe7df",
"md5": "b5792cfe87ce83339e0e7b3220a4e82d",
"sha256": "82f1b0aededcaa985191c4aa66788e95fb40fecbf765565001aeca5cb1a2557f"
},
"downloads": -1,
"filename": "meator-2.0.1.tar.gz",
"has_sig": false,
"md5_digest": "b5792cfe87ce83339e0e7b3220a4e82d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.11",
"size": 7233,
"upload_time": "2024-08-26T12:23:18",
"upload_time_iso_8601": "2024-08-26T12:23:18.406660Z",
"url": "https://files.pythonhosted.org/packages/16/32/c33cd5516f8bbb0ccd20308a242326230057a690acd9f3fa2904e67fe7df/meator-2.0.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-08-26 12:23:18",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "likeinlife",
"github_project": "cqrs_mediator",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "meator"
}