meator


Namemeator JSON
Version 2.0.1 PyPI version JSON
download
home_pagehttps://pypi.org/project/meator/
SummaryMediator pattern impl
upload_time2024-08-26 12:23:18
maintainerNone
docs_urlNone
authorlikeinlife
requires_python<4.0,>=3.11
licenseMIT
keywords dispatcher command event query mediator cqrs
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Meator

[![image](https://img.shields.io/pypi/v/meator.svg)](https://pypi.python.org/pypi/meator)
<a href="http://mypy-lang.org/" target="_blank"><img src="https://img.shields.io/badge/mypy-checked-1F5082.svg" alt="Mypy checked"></a>
[![codecov](https://codecov.io/gh/likeinlife/cqrs_mediator/graph/badge.svg?token=BVDRFQ61R6)](https://codecov.io/gh/likeinlife/cqrs_mediator)
[![image](https://img.shields.io/pypi/l/meator.svg)](https://github.com/likeinlife/cqrs_mediator/blob/main/LICENSE)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[![image](https://img.shields.io/pypi/pyversions/meator.svg)](https://pypi.python.org/pypi/meator)

Python CQRS pattern implementation.

[**Docs**](https://github.com/likeinlife/cqrs_mediator/wiki)

# Installation

```shell
pip install meator
```

# Available

- Dispatchers:
    - Command
    - Query
- Observers:
    - Event
- Entities:
    - Command
    - Event
    - Query
- Middlewares

# Usecases

## Command/Event/Query

```python
from dataclasses import dataclass

from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command
from meator.interfaces import CommandHandler


@dataclass
class IntCommand(Command[int]):
    answer: int


class IntCommandHandler(CommandHandler[IntCommand, int]):
    async def __call__(self, request: IntCommand) -> int:
        return request.answer


async def main():
    c = CommandDispatcherImpl()

    c.register(IntCommand, IntCommandHandler())

    await c.handle(IntCommand(1))

```

## Middleware

```python
from dataclasses import dataclass

from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command, Request
from meator.interfaces import CommandHandler, Handler, Middleware


class SimpleMiddleware(Middleware):
    async def __call__(self, call_next: Handler, request: Request):
        return await call_next(request)


@dataclass
class IntCommand(Command[int]):
    answer: int


class IntCommandHandler(CommandHandler[IntCommand, int]):
    async def __call__(self, request: IntCommand) -> int:
        return request.answer


async def main():
    c = CommandDispatcherImpl(middlewares=[SimpleMiddleware()])

    c.register(IntCommand, IntCommandHandler())

    await c.handle(IntCommand(1))
```
# Tests

- `pytest tests`

# Additional

Inspired by [didator](https://github.com/SamWarden/didiator)

            

Raw data

            {
    "_id": null,
    "home_page": "https://pypi.org/project/meator/",
    "name": "meator",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.11",
    "maintainer_email": null,
    "keywords": "dispatcher, command, event, query, mediator, cqrs",
    "author": "likeinlife",
    "author_email": "likeinlife@outlook.com",
    "download_url": "https://files.pythonhosted.org/packages/16/32/c33cd5516f8bbb0ccd20308a242326230057a690acd9f3fa2904e67fe7df/meator-2.0.1.tar.gz",
    "platform": null,
    "description": "# Meator\n\n[![image](https://img.shields.io/pypi/v/meator.svg)](https://pypi.python.org/pypi/meator)\n<a href=\"http://mypy-lang.org/\" target=\"_blank\"><img src=\"https://img.shields.io/badge/mypy-checked-1F5082.svg\" alt=\"Mypy checked\"></a>\n[![codecov](https://codecov.io/gh/likeinlife/cqrs_mediator/graph/badge.svg?token=BVDRFQ61R6)](https://codecov.io/gh/likeinlife/cqrs_mediator)\n[![image](https://img.shields.io/pypi/l/meator.svg)](https://github.com/likeinlife/cqrs_mediator/blob/main/LICENSE)\n[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)\n[![image](https://img.shields.io/pypi/pyversions/meator.svg)](https://pypi.python.org/pypi/meator)\n\nPython CQRS pattern implementation.\n\n[**Docs**](https://github.com/likeinlife/cqrs_mediator/wiki)\n\n# Installation\n\n```shell\npip install meator\n```\n\n# Available\n\n- Dispatchers:\n    - Command\n    - Query\n- Observers:\n    - Event\n- Entities:\n    - Command\n    - Event\n    - Query\n- Middlewares\n\n# Usecases\n\n## Command/Event/Query\n\n```python\nfrom dataclasses import dataclass\n\nfrom meator.dispatchers import CommandDispatcherImpl\nfrom meator.entities import Command\nfrom meator.interfaces import CommandHandler\n\n\n@dataclass\nclass IntCommand(Command[int]):\n    answer: int\n\n\nclass IntCommandHandler(CommandHandler[IntCommand, int]):\n    async def __call__(self, request: IntCommand) -> int:\n        return request.answer\n\n\nasync def main():\n    c = CommandDispatcherImpl()\n\n    c.register(IntCommand, IntCommandHandler())\n\n    await c.handle(IntCommand(1))\n\n```\n\n## Middleware\n\n```python\nfrom dataclasses import dataclass\n\nfrom meator.dispatchers import CommandDispatcherImpl\nfrom meator.entities import Command, Request\nfrom meator.interfaces import CommandHandler, Handler, Middleware\n\n\nclass SimpleMiddleware(Middleware):\n    async def __call__(self, call_next: Handler, request: Request):\n        return await call_next(request)\n\n\n@dataclass\nclass IntCommand(Command[int]):\n    answer: int\n\n\nclass IntCommandHandler(CommandHandler[IntCommand, int]):\n    async def __call__(self, request: IntCommand) -> int:\n        return request.answer\n\n\nasync def main():\n    c = CommandDispatcherImpl(middlewares=[SimpleMiddleware()])\n\n    c.register(IntCommand, IntCommandHandler())\n\n    await c.handle(IntCommand(1))\n```\n# Tests\n\n- `pytest tests`\n\n# Additional\n\nInspired by [didator](https://github.com/SamWarden/didiator)\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Mediator pattern impl",
    "version": "2.0.1",
    "project_urls": {
        "Documentation": "https://github.com/likeinlife/cqrs_mediator/wiki",
        "Homepage": "https://pypi.org/project/meator/",
        "Repository": "https://github.com/likeinlife/cqrs_mediator"
    },
    "split_keywords": [
        "dispatcher",
        " command",
        " event",
        " query",
        " mediator",
        " cqrs"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "be5e2fee9a3432c2a72721927d4373d729655582a855274bf3f7cdead70295ef",
                "md5": "8ad9cad259d95ad040dcdcbe27ddc155",
                "sha256": "a86851c8248830fd96f199382a16ceda3088c0903be645a6675ebbfcf2cab796"
            },
            "downloads": -1,
            "filename": "meator-2.0.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "8ad9cad259d95ad040dcdcbe27ddc155",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.11",
            "size": 14065,
            "upload_time": "2024-08-26T12:23:16",
            "upload_time_iso_8601": "2024-08-26T12:23:16.987328Z",
            "url": "https://files.pythonhosted.org/packages/be/5e/2fee9a3432c2a72721927d4373d729655582a855274bf3f7cdead70295ef/meator-2.0.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "1632c33cd5516f8bbb0ccd20308a242326230057a690acd9f3fa2904e67fe7df",
                "md5": "b5792cfe87ce83339e0e7b3220a4e82d",
                "sha256": "82f1b0aededcaa985191c4aa66788e95fb40fecbf765565001aeca5cb1a2557f"
            },
            "downloads": -1,
            "filename": "meator-2.0.1.tar.gz",
            "has_sig": false,
            "md5_digest": "b5792cfe87ce83339e0e7b3220a4e82d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.11",
            "size": 7233,
            "upload_time": "2024-08-26T12:23:18",
            "upload_time_iso_8601": "2024-08-26T12:23:18.406660Z",
            "url": "https://files.pythonhosted.org/packages/16/32/c33cd5516f8bbb0ccd20308a242326230057a690acd9f3fa2904e67fe7df/meator-2.0.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-08-26 12:23:18",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "likeinlife",
    "github_project": "cqrs_mediator",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "meator"
}
        
Elapsed time: 1.56250s