Name | aioddd JSON |
Version |
1.5.0
JSON |
| download |
home_page | None |
Summary | Async Python DDD utilities library. |
upload_time | 2024-09-26 06:54:38 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.9 |
license | MIT |
keywords |
ddd
hexagonal
cqrs
aio
async
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Async Python DDD utilities library
[![PyPI version](https://badge.fury.io/py/aioddd.svg)](https://badge.fury.io/py/aioddd)
[![PyPIDownloads](https://static.pepy.tech/badge/aioddd)](https://pepy.tech/project/aioddd)
[![CI](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml)
aioddd is an async Python DDD utilities library.
## Installation
Use the package manager [pip](https://pypi.org/project/aioddd/) to install aioddd.
```bash
pip install aioddd
```
## Documentation
- Visit [aioddd docs](https://aiopy.github.io/python-aioddd/).
## Usage
```python
from asyncio import get_event_loop
from dataclasses import dataclass
from typing import Type
from aioddd import NotFoundError, \
Command, CommandHandler, SimpleCommandBus, \
Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event
_products = []
class ProductStored(Event):
@dataclass
class Attributes:
ref: str
attributes: Attributes
class StoreProductCommand(Command):
def __init__(self, ref: str):
self.ref = ref
class StoreProductCommandHandler(CommandHandler):
def subscribed_to(self) -> Type[Command]:
return StoreProductCommand
async def handle(self, command: StoreProductCommand) -> None:
_products.append(command.ref)
class ProductNotFoundError(NotFoundError):
_code = 'product_not_found'
_title = 'Product not found'
class FindProductQuery(Query):
def __init__(self, ref: str):
self.ref = ref
class FindProductQueryHandler(QueryHandler):
def subscribed_to(self) -> Type[Query]:
return FindProductQuery
async def handle(self, query: FindProductQuery) -> OptionalResponse:
if query.ref != '123':
raise ProductNotFoundError.create(detail={'ref': query.ref})
return {'ref': query.ref}
async def main() -> None:
commands_bus = SimpleCommandBus([StoreProductCommandHandler()])
await commands_bus.dispatch(StoreProductCommand('123'))
query_bus = SimpleQueryBus([FindProductQueryHandler()])
response = await query_bus.ask(FindProductQuery('123'))
print(response)
if __name__ == '__main__':
get_event_loop().run_until_complete(main())
```
## Requirements
- Python >= 3.9
## Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.
## License
[MIT](https://github.com/aiopy/python-aioddd/blob/master/LICENSE)
Raw data
{
"_id": null,
"home_page": null,
"name": "aioddd",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": "ticdenis <navarrodenis940503@outlook.com>",
"keywords": "ddd, hexagonal, cqrs, aio, async",
"author": null,
"author_email": "ticdenis <navarrodenis940503@outlook.com>, yasti4 <adria_4_@hotmail.com>",
"download_url": "https://files.pythonhosted.org/packages/4d/7e/0e59e220ee9f4c9b7bba822790f08c7ebb991e9e45c1677ecae3a263ba8d/aioddd-1.5.0.tar.gz",
"platform": null,
"description": "# Async Python DDD utilities library\n\n[![PyPI version](https://badge.fury.io/py/aioddd.svg)](https://badge.fury.io/py/aioddd)\n[![PyPIDownloads](https://static.pepy.tech/badge/aioddd)](https://pepy.tech/project/aioddd)\n[![CI](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml)\n\naioddd is an async Python DDD utilities library.\n\n## Installation\n\nUse the package manager [pip](https://pypi.org/project/aioddd/) to install aioddd.\n\n```bash\npip install aioddd\n```\n\n## Documentation\n\n- Visit [aioddd docs](https://aiopy.github.io/python-aioddd/).\n\n## Usage\n\n```python\nfrom asyncio import get_event_loop\nfrom dataclasses import dataclass\nfrom typing import Type\nfrom aioddd import NotFoundError, \\\n Command, CommandHandler, SimpleCommandBus, \\\n Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event\n\n_products = []\n\nclass ProductStored(Event):\n @dataclass\n class Attributes:\n ref: str\n\n attributes: Attributes\n\nclass StoreProductCommand(Command):\n def __init__(self, ref: str):\n self.ref = ref\n\nclass StoreProductCommandHandler(CommandHandler):\n def subscribed_to(self) -> Type[Command]:\n return StoreProductCommand\n\n async def handle(self, command: StoreProductCommand) -> None:\n _products.append(command.ref)\n\nclass ProductNotFoundError(NotFoundError):\n _code = 'product_not_found'\n _title = 'Product not found'\n\nclass FindProductQuery(Query):\n def __init__(self, ref: str):\n self.ref = ref\n\nclass FindProductQueryHandler(QueryHandler):\n def subscribed_to(self) -> Type[Query]:\n return FindProductQuery\n\n async def handle(self, query: FindProductQuery) -> OptionalResponse:\n if query.ref != '123':\n raise ProductNotFoundError.create(detail={'ref': query.ref})\n return {'ref': query.ref}\n\nasync def main() -> None:\n commands_bus = SimpleCommandBus([StoreProductCommandHandler()])\n await commands_bus.dispatch(StoreProductCommand('123'))\n query_bus = SimpleQueryBus([FindProductQueryHandler()])\n response = await query_bus.ask(FindProductQuery('123'))\n print(response)\n\n\nif __name__ == '__main__':\n get_event_loop().run_until_complete(main())\n```\n\n## Requirements\n\n- Python >= 3.9\n\n## Contributing\n\nPull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.\n\nPlease make sure to update tests as appropriate.\n\n## License\n\n[MIT](https://github.com/aiopy/python-aioddd/blob/master/LICENSE)\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Async Python DDD utilities library.",
"version": "1.5.0",
"project_urls": {
"documentation": "https://aiopy.github.io/python-aioddd/",
"repository": "https://github.com/aiopy/python-aioddd"
},
"split_keywords": [
"ddd",
" hexagonal",
" cqrs",
" aio",
" async"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "7384c3f56765f0a9d08c922a0a9bbe0b57621f4f775a6bc48bbd173acfdf7d18",
"md5": "a8e82c3f52f00f7e2c2ef1f422da66e2",
"sha256": "c7d0fbf9d4f678cb43b63a72f95eb8cd442e0b8312eb3d5ea754d8ba44ff3911"
},
"downloads": -1,
"filename": "aioddd-1.5.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a8e82c3f52f00f7e2c2ef1f422da66e2",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 12330,
"upload_time": "2024-09-26T06:54:37",
"upload_time_iso_8601": "2024-09-26T06:54:37.082996Z",
"url": "https://files.pythonhosted.org/packages/73/84/c3f56765f0a9d08c922a0a9bbe0b57621f4f775a6bc48bbd173acfdf7d18/aioddd-1.5.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "4d7e0e59e220ee9f4c9b7bba822790f08c7ebb991e9e45c1677ecae3a263ba8d",
"md5": "965cad5ece748a669e48316e1bdd9c47",
"sha256": "1cc96309aab329a6a62f2f8d7058ec0872d0fd882d6b5ee1c220687356bf8545"
},
"downloads": -1,
"filename": "aioddd-1.5.0.tar.gz",
"has_sig": false,
"md5_digest": "965cad5ece748a669e48316e1bdd9c47",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 13300,
"upload_time": "2024-09-26T06:54:38",
"upload_time_iso_8601": "2024-09-26T06:54:38.541142Z",
"url": "https://files.pythonhosted.org/packages/4d/7e/0e59e220ee9f4c9b7bba822790f08c7ebb991e9e45c1677ecae3a263ba8d/aioddd-1.5.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-26 06:54:38",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "aiopy",
"github_project": "python-aioddd",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "aioddd"
}