django-microservices-communication


Namedjango-microservices-communication JSON
Version 2.4.0 PyPI version JSON
download
home_pagehttps://github.com/ksinn/django-microservices-communication
SummaryPub/Sub for microservice on django
upload_time2024-03-14 06:47:04
maintainer
docs_urlNone
authorksinn
requires_python
license
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            Easy communication for django based microservices
=======================
Library provides tools for:
- Publisher/Subscription pattern
- Sending async command
- REST API 

Installation
----------------
```commandline
 pip install git+https://github.com/ksinn/django-microservices-communication
```

In _requirements.txt_ file
```python
...
Django==4.2
git+https://github.com/ksinn/django-microservices-communication
django-cors-headers==3.14.0
...
```
*Installation in Docker*
If pip install execute in docker, you require git in image.


Add 'services_communication' to your INSTALLED_APPS setting.
```python
INSTALLED_APPS = [
    ...
    'services_communication',
]
```

Any global settings are kept in a single configuration dictionary named MICROSERVICES_COMMUNICATION_SETTINGS. 
Start off by adding the following to your settings.py module:
```python
MICROSERVICES_COMMUNICATION_SETTINGS = {
    'APP_ID': 'my-service',
    'BROKER_CONNECTION_URL': 'amqp://guest:guest@localhost:5672',
    # or set as connection params dict
    'BROKER_CONNECTION_PARAMETERS': {
        'host': 'localhost',
        'port': 5672,
        'virtual_host': None,
        'username': 'guest',
        'password': 'guest',
        # Instead username and password you may set pika credentials object
        'credentials': pika.PlainCredentials('guest', 'guest')
    },
    'QUEUE': 'my-queue',
    'EXCHANGES': [
        'my-exchange1',
        ('my-other-exchange', 'fanout'),
        'exchange3',
    ],
    'BINDS': [
        ('my-exchange1', 'event.*'),
        'my-other-exchange',
    ],
    
    'REST_API_HOST': 'http://api.example.com', 
    'REST_API_AUTH_URL': 'api/v1/login',
    
    # Set auth request body
    'REST_API_CREDENTIAL': {
        'login': 'sarvar',
        'password': 'sarvar',
    },
    
    # Or username and password, if filed named as 'username' and 'password'
    'REST_API_USERNAME': 'myusername',
    'REST_API_PASSWORD': '12345',
    
    # Set True for use future event
    'PUBLISHER_FUTURE_EVENT_ENABLE': False,
}
```
Defaults:
- exchange type - _topic_
- bind routing key - _'#'_


*Async communication*
---------------------------------

Consuming
----------------

Run consumer
```commandline
python manage.py runconsumer
```

Write logical consuming function in file 'consumers.py' in django app
```
some_project/
    | some_project/
        | settings.py
        | urls.py
    | some_app/
        | __init__.py
        | admin.py
        | apps.py
        | consumers.py  <---- 
        | models.py
        | tests.py
        | viwes.py
    | some_other_app/
        | __init__.py
        | admin.py
        | apps.py
        | consumers.py  <----
        | models.py
        | tests.py
        | viwes.py
```

Consumer function must be registered in message router.
Basic consumer function mast accept 2 positional arguments: _routing key_ and _message body_.

Example  consumers.py file:
```
from services_communication.consumer import message_router

@message_router.consumer('my-exchange1', 'event.update')
@message_router.consumer('my-exchange1', 'event.create')
@message_router.consumer('my-other-exchange')  # For get all routing keys
@message_router.consumer()  # For get all exchange (default consumer)
def stupid_consume_function(routing_key, body):
    print(routing_key, body)


@message_router.default_consumer  # For get message not routed to other consumers
def stupid_consume_function(routing_key, body):
    print(payload)
```

If you want to consume aggregate event, use decorator _@event_consumer_ and after then consumer function mast accept only on positional argument _event payload_ and other event data as _kwargs_
Example  consumers.py file:
```
from services_communication.consumer import message_router

@message_router.consumer('my-exchange1', 'event.update')
@message_router.consumer('my-exchange1', 'event.create')
@message_router.consumer('my-ether_exchange')  # For get all routing keys
@event_consumer
def stupid_consume_function(payload, **kwargs):
    print(payload)
```


Or user _devconsumer_ for auto reloading on change files

Publishing
--------------

*Publishing in transaction*

For publish event happened with [aggregate](https://microservices.io/patterns/data/aggregate.html) in transaction use publish_aggregate_event
```python
from services_communication.publisher import publish_aggregate_event

def update_user_name(user, new_name):
    user.name = new_name
    user.save()
    publish_aggregate_event(
                aggregate='user',
                event_type='update.base',
                payload=build_user_data(user),
            )
```

This function save event data in db table. 
Then publisher process will read the event from the table and publish it to the broker in _exchange_ same as aggregate name with _routing key_ same as event type,
                event_type and body:
```json
{
    "eventId": "2",
    "eventTime": "2023-06-02T10:58:58.340174Z",
    "eventType": "update.base",
    "aggregate": "user",
    "payload": {
      ...
    },
}
```

*Scheduling future event (in transaction)*

Work the same as regular events, but you can specify a time (in the future) when the event will be sent. 
Tags can be used to later cancel an event. Tag values are cast to string and value 1 equals '1', but not equals True!
Need _PUBLISHER_FUTURE_EVENT_ENABLE_ setting in True.

```python
from services_communication.publisher import publish_future_aggregate_event, cancel_future_aggregate_event


def registrate_user(user):
    ....
    user.save()
    
    # remind the user to set a photo
    publish_future_aggregate_event(
        aggregate='user',
        event_type='remind.photo',
        event_time=now() + timedelta(seconds=delay),
        payload=build_user_data(user),
        tags={'user_id': user.id, 'any_other_tag': 'any_value'}
        
    )

def set_user_photo(user, photo):
    user.photo = photo
    user.save()
    
    # cancel remind
    cancel_future_aggregate_event(
        aggregate='user',
        event_type='remind.photo',
        tags={'user_id': user.id}
        
    )
```

Run publisher process
```commandline
python manage.py runpublisher
```

Or user _devpublisher_ for auto reloading on change files

Commands
--------------
A command is a way of telling remote service to do something without waiting for a response from it.

For send command immediately, without regard to transactionality, use _send_command_ with service name and payloads as arguments.

You can set timeout in seconds for command executing, this means that the executor of the command should not execute it if the specified time has passed since the time it was sent (called _send_command_ function).

```python
from services_communication.call import send_command

send_command(
    'sms',
    {
        'phone': '998990000000',
        'text': 'Hello world!',
    }
)
```

If remote service has any commands, you may want to use optional argument _command_name_.


*Sync communication*
---------------------

REST API
----------
For request endpoint use method functions from rest_api package.

```python
from services_communication.rest_api import get, post, head, delete
from services_communication.rest_api.formatter import full_response

first_subject = get('api/v1/subjects/1')  # return only response body as dict

first_subject = get(
    'api/v1/subjects',
    params={
        'page': 2,
        'size': 20,
    },
)  # sending query params

response = get('api/v1/subjects/1', response_formatter=full_response)  # return response object

new_subject = post(
    'api/v1/subjects',
    json={
        'name': 'My new subject',
        'order': 5,
    },
)  # sending request body
```
In all methods function you can send additional keyword argument, it was sent to request.

For formatting request and response uoy can send custom function as *request_formatter* and *response_formatter* keyword arguments. 

*request_formatter* will be applied to other request arguments(params, json, data).

*response_formatter* will be applied to response and it result be returned from method.

By default:

- get, post, delete methods return response.json
- head method return full response


            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/ksinn/django-microservices-communication",
    "name": "django-microservices-communication",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "",
    "author": "ksinn",
    "author_email": "ksinnd@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/2b/db/78f30b3d9799f18f09423fe55fd6a5ed64fd63ccf7a5af8a77a18483ad20/django-microservices-communication-2.4.0.tar.gz",
    "platform": null,
    "description": "Easy communication for django based microservices\n=======================\nLibrary provides tools for:\n- Publisher/Subscription pattern\n- Sending async command\n- REST API \n\nInstallation\n----------------\n```commandline\n pip install git+https://github.com/ksinn/django-microservices-communication\n```\n\nIn _requirements.txt_ file\n```python\n...\nDjango==4.2\ngit+https://github.com/ksinn/django-microservices-communication\ndjango-cors-headers==3.14.0\n...\n```\n*Installation in Docker*\nIf pip install execute in docker, you require git in image.\n\n\nAdd 'services_communication' to your INSTALLED_APPS setting.\n```python\nINSTALLED_APPS = [\n    ...\n    'services_communication',\n]\n```\n\nAny global settings are kept in a single configuration dictionary named MICROSERVICES_COMMUNICATION_SETTINGS. \nStart off by adding the following to your settings.py module:\n```python\nMICROSERVICES_COMMUNICATION_SETTINGS = {\n    'APP_ID': 'my-service',\n    'BROKER_CONNECTION_URL': 'amqp://guest:guest@localhost:5672',\n    # or set as connection params dict\n    'BROKER_CONNECTION_PARAMETERS': {\n        'host': 'localhost',\n        'port': 5672,\n        'virtual_host': None,\n        'username': 'guest',\n        'password': 'guest',\n        # Instead username and password you may set pika credentials object\n        'credentials': pika.PlainCredentials('guest', 'guest')\n    },\n    'QUEUE': 'my-queue',\n    'EXCHANGES': [\n        'my-exchange1',\n        ('my-other-exchange', 'fanout'),\n        'exchange3',\n    ],\n    'BINDS': [\n        ('my-exchange1', 'event.*'),\n        'my-other-exchange',\n    ],\n    \n    'REST_API_HOST': 'http://api.example.com', \n    'REST_API_AUTH_URL': 'api/v1/login',\n    \n    # Set auth request body\n    'REST_API_CREDENTIAL': {\n        'login': 'sarvar',\n        'password': 'sarvar',\n    },\n    \n    # Or username and password, if filed named as 'username' and 'password'\n    'REST_API_USERNAME': 'myusername',\n    'REST_API_PASSWORD': '12345',\n    \n    # Set True for use future event\n    'PUBLISHER_FUTURE_EVENT_ENABLE': False,\n}\n```\nDefaults:\n- exchange type - _topic_\n- bind routing key - _'#'_\n\n\n*Async communication*\n---------------------------------\n\nConsuming\n----------------\n\nRun consumer\n```commandline\npython manage.py runconsumer\n```\n\nWrite logical consuming function in file 'consumers.py' in django app\n```\nsome_project/\n    | some_project/\n        | settings.py\n        | urls.py\n    | some_app/\n        | __init__.py\n        | admin.py\n        | apps.py\n        | consumers.py  <---- \n        | models.py\n        | tests.py\n        | viwes.py\n    | some_other_app/\n        | __init__.py\n        | admin.py\n        | apps.py\n        | consumers.py  <----\n        | models.py\n        | tests.py\n        | viwes.py\n```\n\nConsumer function must be registered in message router.\nBasic consumer function mast accept 2 positional arguments: _routing key_ and _message body_.\n\nExample  consumers.py file:\n```\nfrom services_communication.consumer import message_router\n\n@message_router.consumer('my-exchange1', 'event.update')\n@message_router.consumer('my-exchange1', 'event.create')\n@message_router.consumer('my-other-exchange')  # For get all routing keys\n@message_router.consumer()  # For get all exchange (default consumer)\ndef stupid_consume_function(routing_key, body):\n    print(routing_key, body)\n\n\n@message_router.default_consumer  # For get message not routed to other consumers\ndef stupid_consume_function(routing_key, body):\n    print(payload)\n```\n\nIf you want to consume aggregate event, use decorator _@event_consumer_ and after then consumer function mast accept only on positional argument _event payload_ and other event data as _kwargs_\nExample  consumers.py file:\n```\nfrom services_communication.consumer import message_router\n\n@message_router.consumer('my-exchange1', 'event.update')\n@message_router.consumer('my-exchange1', 'event.create')\n@message_router.consumer('my-ether_exchange')  # For get all routing keys\n@event_consumer\ndef stupid_consume_function(payload, **kwargs):\n    print(payload)\n```\n\n\nOr user _devconsumer_ for auto reloading on change files\n\nPublishing\n--------------\n\n*Publishing in transaction*\n\nFor publish event happened with [aggregate](https://microservices.io/patterns/data/aggregate.html) in transaction use publish_aggregate_event\n```python\nfrom services_communication.publisher import publish_aggregate_event\n\ndef update_user_name(user, new_name):\n    user.name = new_name\n    user.save()\n    publish_aggregate_event(\n                aggregate='user',\n                event_type='update.base',\n                payload=build_user_data(user),\n            )\n```\n\nThis function save event data in db table. \nThen publisher process will read the event from the table and publish it to the broker in _exchange_ same as aggregate name with _routing key_ same as event type,\n                event_type and body:\n```json\n{\n    \"eventId\": \"2\",\n    \"eventTime\": \"2023-06-02T10:58:58.340174Z\",\n    \"eventType\": \"update.base\",\n    \"aggregate\": \"user\",\n    \"payload\": {\n      ...\n    },\n}\n```\n\n*Scheduling future event (in transaction)*\n\nWork the same as regular events, but you can specify a time (in the future) when the event will be sent. \nTags can be used to later cancel an event. Tag values are cast to string and value 1 equals '1', but not equals True!\nNeed _PUBLISHER_FUTURE_EVENT_ENABLE_ setting in True.\n\n```python\nfrom services_communication.publisher import publish_future_aggregate_event, cancel_future_aggregate_event\n\n\ndef registrate_user(user):\n    ....\n    user.save()\n    \n    # remind the user to set a photo\n    publish_future_aggregate_event(\n        aggregate='user',\n        event_type='remind.photo',\n        event_time=now() + timedelta(seconds=delay),\n        payload=build_user_data(user),\n        tags={'user_id': user.id, 'any_other_tag': 'any_value'}\n        \n    )\n\ndef set_user_photo(user, photo):\n    user.photo = photo\n    user.save()\n    \n    # cancel remind\n    cancel_future_aggregate_event(\n        aggregate='user',\n        event_type='remind.photo',\n        tags={'user_id': user.id}\n        \n    )\n```\n\nRun publisher process\n```commandline\npython manage.py runpublisher\n```\n\nOr user _devpublisher_ for auto reloading on change files\n\nCommands\n--------------\nA command is a way of telling remote service to do something without waiting for a response from it.\n\nFor send command immediately, without regard to transactionality, use _send_command_ with service name and payloads as arguments.\n\nYou can set timeout in seconds for command executing, this means that the executor of the command should not execute it if the specified time has passed since the time it was sent (called _send_command_ function).\n\n```python\nfrom services_communication.call import send_command\n\nsend_command(\n    'sms',\n    {\n        'phone': '998990000000',\n        'text': 'Hello world!',\n    }\n)\n```\n\nIf remote service has any commands, you may want to use optional argument _command_name_.\n\n\n*Sync communication*\n---------------------\n\nREST API\n----------\nFor request endpoint use method functions from rest_api package.\n\n```python\nfrom services_communication.rest_api import get, post, head, delete\nfrom services_communication.rest_api.formatter import full_response\n\nfirst_subject = get('api/v1/subjects/1')  # return only response body as dict\n\nfirst_subject = get(\n    'api/v1/subjects',\n    params={\n        'page': 2,\n        'size': 20,\n    },\n)  # sending query params\n\nresponse = get('api/v1/subjects/1', response_formatter=full_response)  # return response object\n\nnew_subject = post(\n    'api/v1/subjects',\n    json={\n        'name': 'My new subject',\n        'order': 5,\n    },\n)  # sending request body\n```\nIn all methods function you can send additional keyword argument, it was sent to request.\n\nFor formatting request and response uoy can send custom function as *request_formatter* and *response_formatter* keyword arguments. \n\n*request_formatter* will be applied to other request arguments(params, json, data).\n\n*response_formatter* will be applied to response and it result be returned from method.\n\nBy default:\n\n- get, post, delete methods return response.json\n- head method return full response\n\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Pub/Sub for microservice on django",
    "version": "2.4.0",
    "project_urls": {
        "Homepage": "https://github.com/ksinn/django-microservices-communication"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "2bdb78f30b3d9799f18f09423fe55fd6a5ed64fd63ccf7a5af8a77a18483ad20",
                "md5": "81daf358b17f72c7c5f4eacd13573492",
                "sha256": "7cc24fd6e2b6428e5ce88f0376edaddb6608a3dac7caf49853fe4fbcdd45cf99"
            },
            "downloads": -1,
            "filename": "django-microservices-communication-2.4.0.tar.gz",
            "has_sig": false,
            "md5_digest": "81daf358b17f72c7c5f4eacd13573492",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 18594,
            "upload_time": "2024-03-14T06:47:04",
            "upload_time_iso_8601": "2024-03-14T06:47:04.203783Z",
            "url": "https://files.pythonhosted.org/packages/2b/db/78f30b3d9799f18f09423fe55fd6a5ed64fd63ccf7a5af8a77a18483ad20/django-microservices-communication-2.4.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-14 06:47:04",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "ksinn",
    "github_project": "django-microservices-communication",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "django-microservices-communication"
}
        
Elapsed time: 0.20069s