Name | aioddd JSON |
Version |
1.4.0
JSON |
| download |
home_page | |
Summary | Async Python DDD utilities library. |
upload_time | 2023-11-06 09:27:29 |
maintainer | |
docs_url | None |
author | |
requires_python | >=3.7 |
license | MIT |
keywords |
ddd
hexagonal
cqrs
aio
async
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Async Python DDD utilities library
[![PyPI version](https://badge.fury.io/py/aioddd.svg)](https://badge.fury.io/py/aioddd)
[![PyPIDownloads](https://static.pepy.tech/badge/aioddd)](https://pepy.tech/project/aioddd)
[![CI](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml)
aioddd is an async Python DDD utilities library.
## Installation
Use the package manager [pip](https://pypi.org/project/aioddd/) to install aioddd.
```bash
pip install aioddd
```
## Documentation
- Visit [aioddd docs](https://aiopy.github.io/python-aioddd/).
## Usage
```python
from asyncio import get_event_loop
from dataclasses import dataclass
from typing import Type
from aioddd import NotFoundError, \
Command, CommandHandler, SimpleCommandBus, \
Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event
_products = []
class ProductStored(Event):
@dataclass
class Attributes:
ref: str
attributes: Attributes
class StoreProductCommand(Command):
def __init__(self, ref: str):
self.ref = ref
class StoreProductCommandHandler(CommandHandler):
def subscribed_to(self) -> Type[Command]:
return StoreProductCommand
async def handle(self, command: StoreProductCommand) -> None:
_products.append(command.ref)
class ProductNotFoundError(NotFoundError):
_code = 'product_not_found'
_title = 'Product not found'
class FindProductQuery(Query):
def __init__(self, ref: str):
self.ref = ref
class FindProductQueryHandler(QueryHandler):
def subscribed_to(self) -> Type[Query]:
return FindProductQuery
async def handle(self, query: FindProductQuery) -> OptionalResponse:
if query.ref != '123':
raise ProductNotFoundError.create(detail={'ref': query.ref})
return {'ref': query.ref}
async def main() -> None:
commands_bus = SimpleCommandBus([StoreProductCommandHandler()])
await commands_bus.dispatch(StoreProductCommand('123'))
query_bus = SimpleQueryBus([FindProductQueryHandler()])
response = await query_bus.ask(FindProductQuery('123'))
print(response)
if __name__ == '__main__':
get_event_loop().run_until_complete(main())
```
## Requirements
- Python >= 3.7
## Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.
## License
[MIT](https://github.com/aiopy/python-aioddd/blob/master/LICENSE)
Raw data
{
"_id": null,
"home_page": "",
"name": "aioddd",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": "ticdenis <denisnavarroalcaide@outlook.es>",
"keywords": "ddd,hexagonal,cqrs,aio,async",
"author": "",
"author_email": "ticdenis <denisnavarroalcaide@outlook.es>, yasti4 <adria_4_@hotmail.com>",
"download_url": "https://files.pythonhosted.org/packages/2b/b1/72fe30efe37043291746555f143ecc3cbeb02c9ced9a2439bf90fdea2221/aioddd-1.4.0.tar.gz",
"platform": null,
"description": "# Async Python DDD utilities library\n\n[![PyPI version](https://badge.fury.io/py/aioddd.svg)](https://badge.fury.io/py/aioddd)\n[![PyPIDownloads](https://static.pepy.tech/badge/aioddd)](https://pepy.tech/project/aioddd)\n[![CI](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/aiopy/python-aioddd/actions/workflows/ci.yml)\n\naioddd is an async Python DDD utilities library.\n\n## Installation\n\nUse the package manager [pip](https://pypi.org/project/aioddd/) to install aioddd.\n\n```bash\npip install aioddd\n```\n\n## Documentation\n\n- Visit [aioddd docs](https://aiopy.github.io/python-aioddd/).\n\n## Usage\n\n```python\nfrom asyncio import get_event_loop\nfrom dataclasses import dataclass\nfrom typing import Type\nfrom aioddd import NotFoundError, \\\n Command, CommandHandler, SimpleCommandBus, \\\n Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event\n\n_products = []\n\nclass ProductStored(Event):\n @dataclass\n class Attributes:\n ref: str\n\n attributes: Attributes\n\nclass StoreProductCommand(Command):\n def __init__(self, ref: str):\n self.ref = ref\n\nclass StoreProductCommandHandler(CommandHandler):\n def subscribed_to(self) -> Type[Command]:\n return StoreProductCommand\n\n async def handle(self, command: StoreProductCommand) -> None:\n _products.append(command.ref)\n\nclass ProductNotFoundError(NotFoundError):\n _code = 'product_not_found'\n _title = 'Product not found'\n\nclass FindProductQuery(Query):\n def __init__(self, ref: str):\n self.ref = ref\n\nclass FindProductQueryHandler(QueryHandler):\n def subscribed_to(self) -> Type[Query]:\n return FindProductQuery\n\n async def handle(self, query: FindProductQuery) -> OptionalResponse:\n if query.ref != '123':\n raise ProductNotFoundError.create(detail={'ref': query.ref})\n return {'ref': query.ref}\n\nasync def main() -> None:\n commands_bus = SimpleCommandBus([StoreProductCommandHandler()])\n await commands_bus.dispatch(StoreProductCommand('123'))\n query_bus = SimpleQueryBus([FindProductQueryHandler()])\n response = await query_bus.ask(FindProductQuery('123'))\n print(response)\n\n\nif __name__ == '__main__':\n get_event_loop().run_until_complete(main())\n```\n\n## Requirements\n\n- Python >= 3.7\n\n## Contributing\n\nPull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.\n\nPlease make sure to update tests as appropriate.\n\n## License\n\n[MIT](https://github.com/aiopy/python-aioddd/blob/master/LICENSE)\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Async Python DDD utilities library.",
"version": "1.4.0",
"project_urls": {
"documentation": "https://aiopy.github.io/python-aioddd/",
"repository": "https://github.com/aiopy/python-aioddd"
},
"split_keywords": [
"ddd",
"hexagonal",
"cqrs",
"aio",
"async"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "2296ee099145d3aef8c3b102b83c8210c6367dc2b941bad65595edb096388d69",
"md5": "2d53ad6744edbd8b69fd2949dc0892d8",
"sha256": "b6805f4d60e100e673a29fe498a78b36aac89910fe96bd0337f924bb81c4caa6"
},
"downloads": -1,
"filename": "aioddd-1.4.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "2d53ad6744edbd8b69fd2949dc0892d8",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 12313,
"upload_time": "2023-11-06T09:27:27",
"upload_time_iso_8601": "2023-11-06T09:27:27.959505Z",
"url": "https://files.pythonhosted.org/packages/22/96/ee099145d3aef8c3b102b83c8210c6367dc2b941bad65595edb096388d69/aioddd-1.4.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "2bb172fe30efe37043291746555f143ecc3cbeb02c9ced9a2439bf90fdea2221",
"md5": "e953dbf0957677dfa72a5e36678d6ee1",
"sha256": "f80df75a67bfab3bb54557da2664c6824fc2b5689c6d95bf84f9c5009267883e"
},
"downloads": -1,
"filename": "aioddd-1.4.0.tar.gz",
"has_sig": false,
"md5_digest": "e953dbf0957677dfa72a5e36678d6ee1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 12643,
"upload_time": "2023-11-06T09:27:29",
"upload_time_iso_8601": "2023-11-06T09:27:29.444896Z",
"url": "https://files.pythonhosted.org/packages/2b/b1/72fe30efe37043291746555f143ecc3cbeb02c9ced9a2439bf90fdea2221/aioddd-1.4.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-11-06 09:27:29",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "aiopy",
"github_project": "python-aioddd",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "aioddd"
}