Django CQRS
===========
![pyversions](https://img.shields.io/pypi/pyversions/django-cqrs.svg)
![PyPI](https://img.shields.io/pypi/v/django-cqrs)
[![Docs](https://readthedocs.org/projects/django-cqrs/badge/?version=latest)](https://readthedocs.org/projects/django-cqrs)
[![Coverage](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=coverage)](https://sonarcloud.io/dashboard?id=django-cqrs)
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/cloudblue/django-cqrs/build.yml)
[![PyPI status](https://img.shields.io/pypi/status/django-cqrs.svg)](https://pypi.python.org/pypi/django-cqrs/)
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=alert_status)](https://sonarcloud.io/dashboard?id=django-cqrs)
[![PyPI Downloads](https://img.shields.io/pypi/dm/django-cqrs)](https://pypi.org/project/django-cqrs/)
![GitHub](https://img.shields.io/github/license/cloudblue/django-cqrs)
`django-cqrs` is an Django application, that implements CQRS data synchronisation between several Django microservices.
CQRS
----
In Connect we have a rather complex Domain Model. There are many microservices, that are [decomposed by subdomain](https://microservices.io/patterns/decomposition/decompose-by-subdomain.html) and which follow [database-per-service](https://microservices.io/patterns/data/database-per-service.html) pattern. These microservices have rich and consistent APIs. They are deployed in cloud k8s cluster and scale automatically under load. Many of these services aggregate data from other ones and usually [API Composition](https://microservices.io/patterns/data/api-composition.html) is totally enough. But, some services are working too slowly with API JOINS, so another pattern needs to be applied.
The pattern, that solves this issue is called [CQRS - Command Query Responsibility Segregation](https://microservices.io/patterns/data/cqrs.html). Core idea behind this pattern is that view databases (replicas) are defined for efficient querying and DB joins. Applications keep their replicas up to data by subscribing to [Domain events](https://microservices.io/patterns/data/domain-event.html) published by the service that owns the data. Data is [eventually consistent](https://en.wikipedia.org/wiki/Eventual_consistency) and that's okay for non-critical business transactions.
Documentation
=============
Full documentation is available at [https://django-cqrs.readthedocs.org](https://django-cqrs.readthedocs.org).
Examples
========
You can find an example project [here](examples/demo_project/README.md)
Integration
-----------
* Setup `RabbitMQ`
* Install `django-cqrs`
* Apply changes to master service, according to RabbitMQ settings
```python
# models.py
from django.db import models
from dj_cqrs.mixins import MasterMixin, RawMasterMixin
class Account(MasterMixin, models.Model):
CQRS_ID = 'account'
CQRS_PRODUCE = True # set this to False to prevent sending instances to Transport
class Author(MasterMixin, models.Model):
CQRS_ID = 'author'
CQRS_SERIALIZER = 'app.api.AuthorSerializer'
# For cases of Diamond Multi-inheritance or in case of Proxy Django-models the following approach could be used:
from mptt.models import MPTTModel
from dj_cqrs.metas import MasterMeta
class ComplexInheritanceModel(MPTTModel, RawMasterMixin):
CQRS_ID = 'diamond'
class BaseModel(RawMasterMixin):
CQRS_ID = 'base'
class ProxyModel(BaseModel):
class Meta:
proxy = True
MasterMeta.register(ComplexInheritanceModel)
MasterMeta.register(BaseModel)
```
```python
# settings.py
CQRS = {
'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
'host': RABBITMQ_HOST,
'port': RABBITMQ_PORT,
'user': RABBITMQ_USERNAME,
'password': RABBITMQ_PASSWORD,
}
```
* Apply changes to replica service, according to RabbitMQ settings
```python
from django.db import models
from dj_cqrs.mixins import ReplicaMixin
class AccountRef(ReplicaMixin, models.Model):
CQRS_ID = 'account'
id = models.IntegerField(primary_key=True)
class AuthorRef(ReplicaMixin, models.Model):
CQRS_ID = 'author'
CQRS_CUSTOM_SERIALIZATION = True
@classmethod
def cqrs_create(cls, sync, mapped_data, previous_data=None, meta=None):
# Override here
pass
def cqrs_update(self, sync, mapped_data, previous_data=None, meta=None):
# Override here
pass
```
```python
# settings.py
CQRS = {
'transport': 'dj_cqrs.transport.RabbitMQTransport',
'queue': 'account_replica',
'host': RABBITMQ_HOST,
'port': RABBITMQ_PORT,
'user': RABBITMQ_USERNAME,
'password': RABBITMQ_PASSWORD,
}
```
* Apply migrations on both services
* Run consumer worker on replica service. Management command: `python manage.py cqrs_consume -w 2`
Notes
-----
* When there are master models with related entities in CQRS_SERIALIZER, it's important to have operations within atomic transactions. CQRS sync will happen on transaction commit.
* Please, avoid saving different instances of the same entity within transaction to reduce syncing and potential racing on replica side.
* Updating of related model won't trigger CQRS automatic synchronization for master model. This needs to be done manually.
* By default `update_fields` doesn't trigger CQRS logic, but it can be overridden for the whole application in settings:
```python
settings.CQRS = {
...
'master': {
'CQRS_AUTO_UPDATE_FIELDS': True,
},
...
}
```
or a special flag can be used in each place, where it's required to trigger CQRS flow:
```python
instance.save(update_fields=['name'], update_cqrs_fields=True)
```
* When only needed instances need to be synchronized, there is a method `is_sync_instance` to set filtering rule.
It's important to understand, that CQRS counting works even without syncing and rule is applied every time model is updated.
Example:
```python
class FilteredSimplestModel(MasterMixin, models.Model):
CQRS_ID = 'filter'
name = models.CharField(max_length=200)
def is_sync_instance(self):
return len(str(self.name)) > 2
```
Django Admin
-----------
Add action to synchronize master items from Django Admin page.
```python
from django.db import models
from django.contrib import admin
from dj_cqrs.admin_mixins import CQRSAdminMasterSyncMixin
class AccountAdmin(CQRSAdminMasterSyncMixin, admin.ModelAdmin):
...
admin.site.register(models.Account, AccountAdmin)
```
* If necessary, override ```_cqrs_sync_queryset``` from ```CQRSAdminMasterSyncMixin``` to adjust the QuerySet and use it for synchronization.
Utilities
---------
Bulk synchronizer without transport (usage example: it may be used for initial configuration). May be used at planned downtime.
* On master service: `python manage.py cqrs_bulk_dump --cqrs-id=author` -> `author.dump`
* On replica service: `python manage.py cqrs_bulk_load -i=author.dump`
Filter synchronizer over transport (usage example: sync some specific records to a given replica). Can be used dynamically.
* To sync all replicas: `python manage.py cqrs_sync --cqrs-id=author -f={"id__in": [1, 2]}`
* To sync all instances only with one replica: `python manage.py cqrs_sync --cqrs-id=author -f={} -q=replica`
Set of diff synchronization tools:
* To get diff and synchronize master service with replica service in K8S:
```bash
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=author |
kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_diff_replica |
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_sync
```
* If it's important to check sync and clean up deleted objects within replica service in K8S:
```bash
kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_deleted_diff_replica --cqrs-id=author |
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_deleted_diff_master |
kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_deleted_sync_replica
```
Development
===========
1. Python >= 3.8
2. Install dependencies `requirements/dev.txt`
3. We use `isort` library to order and format our imports, and `black` - to format the code.
We check it using `flake8-isort` and `flake8-black` libraries (automatically on `flake8` run).
For convenience you may run `isort . && black .` to format the code.
Testing
=======
Unit testing
------
1. Python >= 3.8
2. Install dependencies `requirements/test.txt`
3. `export PYTHONPATH=/your/path/to/django-cqrs/`
Run tests with various RDBMS:
- `cd integration_tests`
- `DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test`
- `DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test`
Check code style: `flake8`
Run tests: `pytest`
Tests reports are generated in `tests/reports`.
* `out.xml` - JUnit test results
* `coverage.xml` - Coverage xml results
To generate HTML coverage reports use:
`--cov-report html:tests/reports/cov_html`
Integrational testing
------
1. docker-compose
2. `cd integration_tests`
3. `docker-compose run master`
Raw data
{
"_id": null,
"home_page": "https://django-cqrs.readthedocs.org",
"name": "django-cqrs",
"maintainer": null,
"docs_url": null,
"requires_python": "<4,>=3.8",
"maintainer_email": null,
"keywords": "django, cqrs, sql, mixin, amqp",
"author": "CloudBlue LLC",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/87/d1/62711b84e15cda4d294e876e46de58ead34a7bc3b3829f72ce227c29bd77/django_cqrs-2.7.3.tar.gz",
"platform": null,
"description": "Django CQRS\n===========\n![pyversions](https://img.shields.io/pypi/pyversions/django-cqrs.svg)\n![PyPI](https://img.shields.io/pypi/v/django-cqrs)\n[![Docs](https://readthedocs.org/projects/django-cqrs/badge/?version=latest)](https://readthedocs.org/projects/django-cqrs)\n[![Coverage](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=coverage)](https://sonarcloud.io/dashboard?id=django-cqrs)\n![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/cloudblue/django-cqrs/build.yml)\n[![PyPI status](https://img.shields.io/pypi/status/django-cqrs.svg)](https://pypi.python.org/pypi/django-cqrs/)\n[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=django-cqrs&metric=alert_status)](https://sonarcloud.io/dashboard?id=django-cqrs)\n[![PyPI Downloads](https://img.shields.io/pypi/dm/django-cqrs)](https://pypi.org/project/django-cqrs/)\n![GitHub](https://img.shields.io/github/license/cloudblue/django-cqrs)\n\n`django-cqrs` is an Django application, that implements CQRS data synchronisation between several Django microservices.\n\n\nCQRS\n----\nIn Connect we have a rather complex Domain Model. There are many microservices, that are [decomposed by subdomain](https://microservices.io/patterns/decomposition/decompose-by-subdomain.html) and which follow [database-per-service](https://microservices.io/patterns/data/database-per-service.html) pattern. These microservices have rich and consistent APIs. They are deployed in cloud k8s cluster and scale automatically under load. Many of these services aggregate data from other ones and usually [API Composition](https://microservices.io/patterns/data/api-composition.html) is totally enough. But, some services are working too slowly with API JOINS, so another pattern needs to be applied.\n\nThe pattern, that solves this issue is called [CQRS - Command Query Responsibility Segregation](https://microservices.io/patterns/data/cqrs.html). Core idea behind this pattern is that view databases (replicas) are defined for efficient querying and DB joins. Applications keep their replicas up to data by subscribing to [Domain events](https://microservices.io/patterns/data/domain-event.html) published by the service that owns the data. Data is [eventually consistent](https://en.wikipedia.org/wiki/Eventual_consistency) and that's okay for non-critical business transactions.\n\n\nDocumentation\n=============\n\nFull documentation is available at [https://django-cqrs.readthedocs.org](https://django-cqrs.readthedocs.org).\n\n\nExamples\n========\n\nYou can find an example project [here](examples/demo_project/README.md)\n\nIntegration\n-----------\n* Setup `RabbitMQ`\n* Install `django-cqrs`\n* Apply changes to master service, according to RabbitMQ settings\n```python\n# models.py\n\nfrom django.db import models\nfrom dj_cqrs.mixins import MasterMixin, RawMasterMixin\n\n\nclass Account(MasterMixin, models.Model):\n CQRS_ID = 'account'\n CQRS_PRODUCE = True # set this to False to prevent sending instances to Transport\n \n \nclass Author(MasterMixin, models.Model):\n CQRS_ID = 'author'\n CQRS_SERIALIZER = 'app.api.AuthorSerializer'\n\n\n# For cases of Diamond Multi-inheritance or in case of Proxy Django-models the following approach could be used:\nfrom mptt.models import MPTTModel\nfrom dj_cqrs.metas import MasterMeta\n\nclass ComplexInheritanceModel(MPTTModel, RawMasterMixin):\n CQRS_ID = 'diamond'\n\nclass BaseModel(RawMasterMixin):\n CQRS_ID = 'base'\n\nclass ProxyModel(BaseModel):\n class Meta:\n proxy = True\n\nMasterMeta.register(ComplexInheritanceModel)\nMasterMeta.register(BaseModel)\n```\n\n```python\n# settings.py\n\nCQRS = {\n 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',\n 'host': RABBITMQ_HOST,\n 'port': RABBITMQ_PORT,\n 'user': RABBITMQ_USERNAME,\n 'password': RABBITMQ_PASSWORD,\n}\n\n```\n* Apply changes to replica service, according to RabbitMQ settings\n```python\nfrom django.db import models\nfrom dj_cqrs.mixins import ReplicaMixin\n\n\nclass AccountRef(ReplicaMixin, models.Model):\n CQRS_ID = 'account'\n \n id = models.IntegerField(primary_key=True)\n \n\nclass AuthorRef(ReplicaMixin, models.Model):\n CQRS_ID = 'author'\n CQRS_CUSTOM_SERIALIZATION = True\n \n @classmethod\n def cqrs_create(cls, sync, mapped_data, previous_data=None, meta=None):\n # Override here\n pass\n \n def cqrs_update(self, sync, mapped_data, previous_data=None, meta=None):\n # Override here\n pass\n```\n\n```python\n# settings.py\n\nCQRS = {\n 'transport': 'dj_cqrs.transport.RabbitMQTransport',\n 'queue': 'account_replica',\n 'host': RABBITMQ_HOST,\n 'port': RABBITMQ_PORT,\n 'user': RABBITMQ_USERNAME,\n 'password': RABBITMQ_PASSWORD,\n}\n```\n* Apply migrations on both services\n* Run consumer worker on replica service. Management command: `python manage.py cqrs_consume -w 2`\n\nNotes\n-----\n\n* When there are master models with related entities in CQRS_SERIALIZER, it's important to have operations within atomic transactions. CQRS sync will happen on transaction commit. \n* Please, avoid saving different instances of the same entity within transaction to reduce syncing and potential racing on replica side.\n* Updating of related model won't trigger CQRS automatic synchronization for master model. This needs to be done manually.\n* By default `update_fields` doesn't trigger CQRS logic, but it can be overridden for the whole application in settings:\n```python\nsettings.CQRS = {\n ...\n 'master': {\n 'CQRS_AUTO_UPDATE_FIELDS': True,\n },\n ...\n}\n```\nor a special flag can be used in each place, where it's required to trigger CQRS flow:\n```python\ninstance.save(update_fields=['name'], update_cqrs_fields=True)\n```\n* When only needed instances need to be synchronized, there is a method `is_sync_instance` to set filtering rule. \nIt's important to understand, that CQRS counting works even without syncing and rule is applied every time model is updated.\n\nExample:\n```python\n\nclass FilteredSimplestModel(MasterMixin, models.Model):\n CQRS_ID = 'filter'\n\n name = models.CharField(max_length=200)\n\n def is_sync_instance(self):\n return len(str(self.name)) > 2\n```\n\nDjango Admin\n-----------\n\nAdd action to synchronize master items from Django Admin page.\n\n```python\nfrom django.db import models\nfrom django.contrib import admin\n\nfrom dj_cqrs.admin_mixins import CQRSAdminMasterSyncMixin\n\n\nclass AccountAdmin(CQRSAdminMasterSyncMixin, admin.ModelAdmin):\n ...\n\n\nadmin.site.register(models.Account, AccountAdmin)\n\n```\n\n* If necessary, override ```_cqrs_sync_queryset``` from ```CQRSAdminMasterSyncMixin``` to adjust the QuerySet and use it for synchronization.\n\n\nUtilities\n---------\nBulk synchronizer without transport (usage example: it may be used for initial configuration). May be used at planned downtime.\n* On master service: `python manage.py cqrs_bulk_dump --cqrs-id=author` -> `author.dump`\n* On replica service: `python manage.py cqrs_bulk_load -i=author.dump`\n\nFilter synchronizer over transport (usage example: sync some specific records to a given replica). Can be used dynamically.\n* To sync all replicas: `python manage.py cqrs_sync --cqrs-id=author -f={\"id__in\": [1, 2]}`\n* To sync all instances only with one replica: `python manage.py cqrs_sync --cqrs-id=author -f={} -q=replica`\n\nSet of diff synchronization tools:\n* To get diff and synchronize master service with replica service in K8S: \n```bash\nkubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=author | \n kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_diff_replica |\n kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_sync\n```\n\n* If it's important to check sync and clean up deleted objects within replica service in K8S:\n```bash\nkubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_deleted_diff_replica --cqrs-id=author | \n kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_deleted_diff_master |\n kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_deleted_sync_replica\n```\n\nDevelopment\n===========\n\n1. Python >= 3.8\n2. Install dependencies `requirements/dev.txt`\n3. We use `isort` library to order and format our imports, and `black` - to format the code. \nWe check it using `flake8-isort` and `flake8-black` libraries (automatically on `flake8` run). \nFor convenience you may run `isort . && black .` to format the code.\n\n\nTesting\n=======\n\nUnit testing\n------\n1. Python >= 3.8\n2. Install dependencies `requirements/test.txt`\n3. `export PYTHONPATH=/your/path/to/django-cqrs/`\n\nRun tests with various RDBMS:\n- `cd integration_tests`\n- `DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test`\n- `DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test`\n\nCheck code style: `flake8`\nRun tests: `pytest`\n\nTests reports are generated in `tests/reports`. \n* `out.xml` - JUnit test results\n* `coverage.xml` - Coverage xml results\n\nTo generate HTML coverage reports use:\n`--cov-report html:tests/reports/cov_html`\n\n\nIntegrational testing\n------\n1. docker-compose\n2. `cd integration_tests`\n3. `docker-compose run master`\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Django CQRS data synchronisation",
"version": "2.7.3",
"project_urls": {
"Homepage": "https://django-cqrs.readthedocs.org",
"Repository": "https://github.com/cloudblue/django-cqrs"
},
"split_keywords": [
"django",
" cqrs",
" sql",
" mixin",
" amqp"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "49bbf0fa585813f322dba87412d8cc029ce767219561ddeadaf9e1884899e453",
"md5": "9b43ed02b5fde6c51b1594c96c9ecd29",
"sha256": "aa087a23939300d48a0e52b5d694ecb9994ee8b651d97b795c871987d17a3fe5"
},
"downloads": -1,
"filename": "django_cqrs-2.7.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "9b43ed02b5fde6c51b1594c96c9ecd29",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4,>=3.8",
"size": 54277,
"upload_time": "2024-04-19T08:58:48",
"upload_time_iso_8601": "2024-04-19T08:58:48.448415Z",
"url": "https://files.pythonhosted.org/packages/49/bb/f0fa585813f322dba87412d8cc029ce767219561ddeadaf9e1884899e453/django_cqrs-2.7.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "87d162711b84e15cda4d294e876e46de58ead34a7bc3b3829f72ce227c29bd77",
"md5": "9e8d93252614ca9a386b8311b4ecff67",
"sha256": "d156fe83e657d2080ece9b8c37e4d1b424303ddf9a5df8ec793e4731c4966e28"
},
"downloads": -1,
"filename": "django_cqrs-2.7.3.tar.gz",
"has_sig": false,
"md5_digest": "9e8d93252614ca9a386b8311b4ecff67",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4,>=3.8",
"size": 40080,
"upload_time": "2024-04-19T08:58:50",
"upload_time_iso_8601": "2024-04-19T08:58:50.640246Z",
"url": "https://files.pythonhosted.org/packages/87/d1/62711b84e15cda4d294e876e46de58ead34a7bc3b3829f72ce227c29bd77/django_cqrs-2.7.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-19 08:58:50",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "cloudblue",
"github_project": "django-cqrs",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "django-cqrs"
}