aioddd


Nameaioddd JSON
Version 1.5.0 PyPI version JSON
download
home_pageNone
SummaryAsync Python DDD utilities library.
upload_time2024-09-26 06:54:38
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseMIT
keywords ddd hexagonal cqrs aio async
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Async Python DDD utilities library

[![PyPI version](https://badge.fury.io/py/aioddd.svg)](https://badge.fury.io/py/aioddd)
[![PyPIDownloads](https://static.pepy.tech/badge/aioddd)](https://pepy.tech/project/aioddd)
[![CI](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml)

aioddd is an async Python DDD utilities library.

## Installation

Use the package manager [pip](https://pypi.org/project/aioddd/) to install aioddd.

```bash
pip install aioddd
```

## Documentation

- Visit [aioddd docs](https://aiopy.github.io/python-aioddd/).

## Usage

```python
from asyncio import get_event_loop
from dataclasses import dataclass
from typing import Type
from aioddd import NotFoundError, \
    Command, CommandHandler, SimpleCommandBus, \
    Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event

_products = []

class ProductStored(Event):
    @dataclass
    class Attributes:
        ref: str

    attributes: Attributes

class StoreProductCommand(Command):
    def __init__(self, ref: str):
        self.ref = ref

class StoreProductCommandHandler(CommandHandler):
    def subscribed_to(self) -> Type[Command]:
        return StoreProductCommand

    async def handle(self, command: StoreProductCommand) -> None:
        _products.append(command.ref)

class ProductNotFoundError(NotFoundError):
    _code = 'product_not_found'
    _title = 'Product not found'

class FindProductQuery(Query):
    def __init__(self, ref: str):
        self.ref = ref

class FindProductQueryHandler(QueryHandler):
    def subscribed_to(self) -> Type[Query]:
        return FindProductQuery

    async def handle(self, query: FindProductQuery) -> OptionalResponse:
        if query.ref != '123':
            raise ProductNotFoundError.create(detail={'ref': query.ref})
        return {'ref': query.ref}

async def main() -> None:
    commands_bus = SimpleCommandBus([StoreProductCommandHandler()])
    await commands_bus.dispatch(StoreProductCommand('123'))
    query_bus = SimpleQueryBus([FindProductQueryHandler()])
    response = await query_bus.ask(FindProductQuery('123'))
    print(response)


if __name__ == '__main__':
    get_event_loop().run_until_complete(main())
```

## Requirements

- Python >= 3.9

## Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

## License

[MIT](https://github.com/aiopy/python-aioddd/blob/master/LICENSE)

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "aioddd",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": "ticdenis <navarrodenis940503@outlook.com>",
    "keywords": "ddd, hexagonal, cqrs, aio, async",
    "author": null,
    "author_email": "ticdenis <navarrodenis940503@outlook.com>, yasti4 <adria_4_@hotmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/4d/7e/0e59e220ee9f4c9b7bba822790f08c7ebb991e9e45c1677ecae3a263ba8d/aioddd-1.5.0.tar.gz",
    "platform": null,
    "description": "# Async Python DDD utilities library\n\n[![PyPI version](https://badge.fury.io/py/aioddd.svg)](https://badge.fury.io/py/aioddd)\n[![PyPIDownloads](https://static.pepy.tech/badge/aioddd)](https://pepy.tech/project/aioddd)\n[![CI](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml)\n\naioddd is an async Python DDD utilities library.\n\n## Installation\n\nUse the package manager [pip](https://pypi.org/project/aioddd/) to install aioddd.\n\n```bash\npip install aioddd\n```\n\n## Documentation\n\n- Visit [aioddd docs](https://aiopy.github.io/python-aioddd/).\n\n## Usage\n\n```python\nfrom asyncio import get_event_loop\nfrom dataclasses import dataclass\nfrom typing import Type\nfrom aioddd import NotFoundError, \\\n    Command, CommandHandler, SimpleCommandBus, \\\n    Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event\n\n_products = []\n\nclass ProductStored(Event):\n    @dataclass\n    class Attributes:\n        ref: str\n\n    attributes: Attributes\n\nclass StoreProductCommand(Command):\n    def __init__(self, ref: str):\n        self.ref = ref\n\nclass StoreProductCommandHandler(CommandHandler):\n    def subscribed_to(self) -> Type[Command]:\n        return StoreProductCommand\n\n    async def handle(self, command: StoreProductCommand) -> None:\n        _products.append(command.ref)\n\nclass ProductNotFoundError(NotFoundError):\n    _code = 'product_not_found'\n    _title = 'Product not found'\n\nclass FindProductQuery(Query):\n    def __init__(self, ref: str):\n        self.ref = ref\n\nclass FindProductQueryHandler(QueryHandler):\n    def subscribed_to(self) -> Type[Query]:\n        return FindProductQuery\n\n    async def handle(self, query: FindProductQuery) -> OptionalResponse:\n        if query.ref != '123':\n            raise ProductNotFoundError.create(detail={'ref': query.ref})\n        return {'ref': query.ref}\n\nasync def main() -> None:\n    commands_bus = SimpleCommandBus([StoreProductCommandHandler()])\n    await commands_bus.dispatch(StoreProductCommand('123'))\n    query_bus = SimpleQueryBus([FindProductQueryHandler()])\n    response = await query_bus.ask(FindProductQuery('123'))\n    print(response)\n\n\nif __name__ == '__main__':\n    get_event_loop().run_until_complete(main())\n```\n\n## Requirements\n\n- Python >= 3.9\n\n## Contributing\n\nPull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.\n\nPlease make sure to update tests as appropriate.\n\n## License\n\n[MIT](https://github.com/aiopy/python-aioddd/blob/master/LICENSE)\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Async Python DDD utilities library.",
    "version": "1.5.0",
    "project_urls": {
        "documentation": "https://aiopy.github.io/python-aioddd/",
        "repository": "https://github.com/aiopy/python-aioddd"
    },
    "split_keywords": [
        "ddd",
        " hexagonal",
        " cqrs",
        " aio",
        " async"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "7384c3f56765f0a9d08c922a0a9bbe0b57621f4f775a6bc48bbd173acfdf7d18",
                "md5": "a8e82c3f52f00f7e2c2ef1f422da66e2",
                "sha256": "c7d0fbf9d4f678cb43b63a72f95eb8cd442e0b8312eb3d5ea754d8ba44ff3911"
            },
            "downloads": -1,
            "filename": "aioddd-1.5.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a8e82c3f52f00f7e2c2ef1f422da66e2",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 12330,
            "upload_time": "2024-09-26T06:54:37",
            "upload_time_iso_8601": "2024-09-26T06:54:37.082996Z",
            "url": "https://files.pythonhosted.org/packages/73/84/c3f56765f0a9d08c922a0a9bbe0b57621f4f775a6bc48bbd173acfdf7d18/aioddd-1.5.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4d7e0e59e220ee9f4c9b7bba822790f08c7ebb991e9e45c1677ecae3a263ba8d",
                "md5": "965cad5ece748a669e48316e1bdd9c47",
                "sha256": "1cc96309aab329a6a62f2f8d7058ec0872d0fd882d6b5ee1c220687356bf8545"
            },
            "downloads": -1,
            "filename": "aioddd-1.5.0.tar.gz",
            "has_sig": false,
            "md5_digest": "965cad5ece748a669e48316e1bdd9c47",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 13300,
            "upload_time": "2024-09-26T06:54:38",
            "upload_time_iso_8601": "2024-09-26T06:54:38.541142Z",
            "url": "https://files.pythonhosted.org/packages/4d/7e/0e59e220ee9f4c9b7bba822790f08c7ebb991e9e45c1677ecae3a263ba8d/aioddd-1.5.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-26 06:54:38",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "aiopy",
    "github_project": "python-aioddd",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "aioddd"
}
        
Elapsed time: 0.33138s