rabbitmq-utils


Namerabbitmq-utils JSON
Version 1.4.1 PyPI version JSON
download
home_page
SummaryProvide easy connection to rabbitmq server.
upload_time2024-01-17 17:17:00
maintainer
docs_urlNone
authorTahir Rafique
requires_python
licenseMIT
keywords rabbitmq consumer producer publisher subscriber
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # rabbitmq-utils
This package will provide easy connection to rabbitmq server.

## Producer

Following sample code will allow you to send the message to desire queue. 

In **RabbitMQProducer** class **Publisher Confirms** is implemented, So it will tell you weather the message is send to desire location or not.

**Note: In order to use default exchange, use "" as exchange name. When using the default exchange then routing key will be the name of queue.**

In order to make message persistent use **persistent_message=True**. This will increase the disk size as well as latency. The message will be recovered even after the rabbitmq server is restarted. 

If you have priority base queue then you can pass **priority=(some int)** in **sendMessage**.

```python
from rabbitmq_utils import RabbitMQProducer
import json

# DEFINING MESSAGE
message = json.dumps({'hello': 'world'})

# SENDING
rmqp = RabbitMQProducer(
    host='localhost', port=5672, virtual_host='/', 
    username='guest', password='guest', 
    exchange='test_exc', exchange_type='topic',
    persistent_message=False
)
is_sent = rmqp.sendMessage(
    message,
    routing_key='test_key'
)

# RESULT
if is_sent:
    print('INFO: Message sent.')
else:
    print('ERROR: Unable to send on desire routing key.')
```

## Consumer

**RabbitMQConsumer** class allow you define queue, define exchange and bind queue and exchange using routing key.

Queue and Exchange are consider to be durable. If you are getting some error then remove the existing queue and exchange, before running this code.

Callback function is called when message is received from the rabbitmq server. So define your callback function using following example:

```python
def my_callback_function(ch, method, properties, body):
    # GETTING MESSAGE
    message = body.decode()
    
    # PERFORM YOUR LOGIC HERE
    myLogic()
    
    # ACKNOWLEDGE WORK IS DONE
    ch.basic_ack(delivery_tag=method.delivery_tag)
    return None
```

Following sample code will allow you to receive message from rabbitmq server.

```python
from rabbitmq_utils import RabbitMQConsumer

# STARTING RABBITMQ CONSUMER
rmqc = RabbitMQConsumer(
        host='localhost', port=5672, virtual_host='/', 
        username='guest', password='guest', 
        queue_name='test_que', routing_key='test_key',
    	exchange='test_exc', exchange_type='topic',
    	callback_fun=my_callback_function,
    	max_priority=2 # Use this if you want priority base queue. (Default it is None)
)
rmqc.receiveMessage()
```

## Remote Procedure Call (RPC)

RPC allow to run a function on a remote computer and wait for the result. It is a synchronous call. This package provide simplest implementation of RPC.

RPC consist of two parts. One is the **server** that will process the request and other is **client** that will generate the request to server. Following are example of RPC implementation.

### Server

```python
from rabbitmq_utils.rpc import RPCServer

# STARTING RPC SERVER
server = RPCServer(
        host='localhost', port=5672, virtual_host='/', 
        username='guest', password='guest', 
        queue_name='test_que', routing_key='test_key',
    	exchange='test_exc', exchange_type='topic',
    	callback_fun=rpc_callback_function
)
server.receiveMessage()
```

Callback function of RPC is different from consumer callback function. In this callback we will return the result back to client.

Note: **result** must be string. If it is not then use **json.dumps(**result**)** to convert it to string.

```python
def rpc_callback_function(ch, method, properties, body):
    # GETTING MESSAGE
    message = body.decode()
    
    # PERFORM YOUR LOGIC HERE
    result = myLogic()
    
    # RETURING RESPONSE
    ch.basic_publish(
        exchange='', routing_key=properties.reply_to,
        properties=pika.BasicProperties(
            correlation_id = properties.correlation_id
        ),
        body=result
    )
    
    # ACKNOWLEDGE WORK IS DONE
    ch.basic_ack(delivery_tag=method.delivery_tag)
    return None
```

### Client

```python
from rabbitmq_utils.rpc import RPCClient
import json

# DEFINING MESSAGE
message = json.dumps({'hello': 'world'})

# SENDING
client = RPCClient(
    host='localhost', port=5672, virtual_host='/', 
    username='guest', password='guest', 
    exchange='test_exc', exchange_type='topic',
    timeout=3, # wait 3 seconds for response. default is None (infinite wait).
    persistent_message=False
)
is_sent, response = client.sendMessage(
    message,
    routing_key='test_key',
    return_response=True
)

# OUTPUT
print(f'is_sent: {is_sent} \t code: {client.getCode()} \t response: {response}')
```

Client **sendMessage** receive **return_response** argument (default=False). If this is **True** then client will wait for response for desire **timeout** period.  You can receive response later if you want by using follow sample code:

```python
# SEND REQUEST
is_sent = client.sendMessage(
    message,
    routing_key
)

# PERFORM YOUR LOGIC

# RECEIVE RESPONSE
response = client.receiveResponse()
```

Always check the validity of response using status code. Following code will help you check it:

```python
status_code = client.getCode()
print(status_code)
```

Code is **integer**. Following table shows the meanings:

| Code | Meaning                            |
| ---- | ---------------------------------- |
| 200  | Response is successfully obtained. |
| 408  | Timeout occur.                     |

## Author

**Tahir Rafique**

## Releases

| Date      | Version | Summary                                                      |
| --------- | ------- | ------------------------------------------------------------ |
| 17-Jan-24 | v1.4.1  | Adding exception handling in default callback function of consumer. |
| 13-Dec-23 | v1.4.0  | Adding queue priority in producer and consumer.              |
| 14-Jul-23 | v1.3.0  | Adding persistent message option.                            |
| 14-Jul-23 | v1.2.1  | Correcting documentation.                                    |
| 21-Jun-23 | v1.2.0  | Adding RPC to module.                                        |
| 27-Apr-23 | v1.0.1  | Improving default callback function.                         |
| 27-Apr-23 | v1.0.0  | Initial build                                                |


            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "rabbitmq-utils",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "rabbitmq,consumer,producer,publisher,subscriber",
    "author": "Tahir Rafique",
    "author_email": "tahirrafiqueasad@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/a8/f4/2765658a8ae6a2ee5a1724cbab0ec29155010cf1d756a96d6f7d1d3c403f/rabbitmq-utils-1.4.1.tar.gz",
    "platform": null,
    "description": "# rabbitmq-utils\nThis package will provide easy connection to rabbitmq server.\n\n## Producer\n\nFollowing sample code will allow you to send the message to desire queue. \n\nIn **RabbitMQProducer** class **Publisher Confirms** is implemented, So it will tell you weather the message is send to desire location or not.\n\n**Note: In order to use default exchange, use \"\" as exchange name. When using the default exchange then routing key will be the name of queue.**\n\nIn order to make message persistent use **persistent_message=True**. This will increase the disk size as well as latency. The message will be recovered even after the rabbitmq server is restarted. \n\nIf you have priority base queue then you can pass **priority=(some int)** in **sendMessage**.\n\n```python\nfrom rabbitmq_utils import RabbitMQProducer\nimport json\n\n# DEFINING MESSAGE\nmessage = json.dumps({'hello': 'world'})\n\n# SENDING\nrmqp = RabbitMQProducer(\n    host='localhost', port=5672, virtual_host='/', \n    username='guest', password='guest', \n    exchange='test_exc', exchange_type='topic',\n    persistent_message=False\n)\nis_sent = rmqp.sendMessage(\n    message,\n    routing_key='test_key'\n)\n\n# RESULT\nif is_sent:\n    print('INFO: Message sent.')\nelse:\n    print('ERROR: Unable to send on desire routing key.')\n```\n\n## Consumer\n\n**RabbitMQConsumer** class allow you define queue, define exchange and bind queue and exchange using routing key.\n\nQueue and Exchange are consider to be durable. If you are getting some error then remove the existing queue and exchange, before running this code.\n\nCallback function is called when message is received from the rabbitmq server. So define your callback function using following example:\n\n```python\ndef my_callback_function(ch, method, properties, body):\n    # GETTING MESSAGE\n    message = body.decode()\n    \n    # PERFORM YOUR LOGIC HERE\n    myLogic()\n    \n    # ACKNOWLEDGE WORK IS DONE\n    ch.basic_ack(delivery_tag=method.delivery_tag)\n    return None\n```\n\nFollowing sample code will allow you to receive message from rabbitmq server.\n\n```python\nfrom rabbitmq_utils import RabbitMQConsumer\n\n# STARTING RABBITMQ CONSUMER\nrmqc = RabbitMQConsumer(\n        host='localhost', port=5672, virtual_host='/', \n        username='guest', password='guest', \n        queue_name='test_que', routing_key='test_key',\n    \texchange='test_exc', exchange_type='topic',\n    \tcallback_fun=my_callback_function,\n    \tmax_priority=2 # Use this if you want priority base queue. (Default it is None)\n)\nrmqc.receiveMessage()\n```\n\n## Remote Procedure Call (RPC)\n\nRPC allow to run a function on a remote computer and wait for the result. It is a synchronous call. This package provide simplest implementation of RPC.\n\nRPC consist of two parts. One is the **server** that will process the request and other is **client** that will generate the request to server. Following are example of RPC implementation.\n\n### Server\n\n```python\nfrom rabbitmq_utils.rpc import RPCServer\n\n# STARTING RPC SERVER\nserver = RPCServer(\n        host='localhost', port=5672, virtual_host='/', \n        username='guest', password='guest', \n        queue_name='test_que', routing_key='test_key',\n    \texchange='test_exc', exchange_type='topic',\n    \tcallback_fun=rpc_callback_function\n)\nserver.receiveMessage()\n```\n\nCallback function of RPC is different from consumer callback function. In this callback we will return the result back to client.\n\nNote: **result** must be string. If it is not then use **json.dumps(**result**)** to convert it to string.\n\n```python\ndef rpc_callback_function(ch, method, properties, body):\n    # GETTING MESSAGE\n    message = body.decode()\n    \n    # PERFORM YOUR LOGIC HERE\n    result = myLogic()\n    \n    # RETURING RESPONSE\n    ch.basic_publish(\n        exchange='', routing_key=properties.reply_to,\n        properties=pika.BasicProperties(\n            correlation_id = properties.correlation_id\n        ),\n        body=result\n    )\n    \n    # ACKNOWLEDGE WORK IS DONE\n    ch.basic_ack(delivery_tag=method.delivery_tag)\n    return None\n```\n\n### Client\n\n```python\nfrom rabbitmq_utils.rpc import RPCClient\nimport json\n\n# DEFINING MESSAGE\nmessage = json.dumps({'hello': 'world'})\n\n# SENDING\nclient = RPCClient(\n    host='localhost', port=5672, virtual_host='/', \n    username='guest', password='guest', \n    exchange='test_exc', exchange_type='topic',\n    timeout=3, # wait 3 seconds for response. default is None (infinite wait).\n    persistent_message=False\n)\nis_sent, response = client.sendMessage(\n    message,\n    routing_key='test_key',\n    return_response=True\n)\n\n# OUTPUT\nprint(f'is_sent: {is_sent} \\t code: {client.getCode()} \\t response: {response}')\n```\n\nClient **sendMessage** receive **return_response** argument (default=False). If this is **True** then client will wait for response for desire **timeout** period.  You can receive response later if you want by using follow sample code:\n\n```python\n# SEND REQUEST\nis_sent = client.sendMessage(\n    message,\n    routing_key\n)\n\n# PERFORM YOUR LOGIC\n\n# RECEIVE RESPONSE\nresponse = client.receiveResponse()\n```\n\nAlways check the validity of response using status code. Following code will help you check it:\n\n```python\nstatus_code = client.getCode()\nprint(status_code)\n```\n\nCode is **integer**. Following table shows the meanings:\n\n| Code | Meaning                            |\n| ---- | ---------------------------------- |\n| 200  | Response is successfully obtained. |\n| 408  | Timeout occur.                     |\n\n## Author\n\n**Tahir Rafique**\n\n## Releases\n\n| Date      | Version | Summary                                                      |\n| --------- | ------- | ------------------------------------------------------------ |\n| 17-Jan-24 | v1.4.1  | Adding exception handling in default callback function of consumer. |\n| 13-Dec-23 | v1.4.0  | Adding queue priority in producer and consumer.              |\n| 14-Jul-23 | v1.3.0  | Adding persistent message option.                            |\n| 14-Jul-23 | v1.2.1  | Correcting documentation.                                    |\n| 21-Jun-23 | v1.2.0  | Adding RPC to module.                                        |\n| 27-Apr-23 | v1.0.1  | Improving default callback function.                         |\n| 27-Apr-23 | v1.0.0  | Initial build                                                |\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Provide easy connection to rabbitmq server.",
    "version": "1.4.1",
    "project_urls": null,
    "split_keywords": [
        "rabbitmq",
        "consumer",
        "producer",
        "publisher",
        "subscriber"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "aa9367131d4a44af5f96f99000b024460d57defec844ce2035b10223559b8e5d",
                "md5": "18619eb693611b43893d9b71d56568ce",
                "sha256": "fdfaf06b77b227bee0a855a74f4c1682ed92cd3e16c8afc9bb1b4a6122936ece"
            },
            "downloads": -1,
            "filename": "rabbitmq_utils-1.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "18619eb693611b43893d9b71d56568ce",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 9466,
            "upload_time": "2024-01-17T17:16:58",
            "upload_time_iso_8601": "2024-01-17T17:16:58.337019Z",
            "url": "https://files.pythonhosted.org/packages/aa/93/67131d4a44af5f96f99000b024460d57defec844ce2035b10223559b8e5d/rabbitmq_utils-1.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a8f42765658a8ae6a2ee5a1724cbab0ec29155010cf1d756a96d6f7d1d3c403f",
                "md5": "038fc84427faba5ede97dc0eb965d137",
                "sha256": "5839271cb6d43a9ea38336bc11b624e95b89647bb8fa428e7bcd646a7a010a72"
            },
            "downloads": -1,
            "filename": "rabbitmq-utils-1.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "038fc84427faba5ede97dc0eb965d137",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 8746,
            "upload_time": "2024-01-17T17:17:00",
            "upload_time_iso_8601": "2024-01-17T17:17:00.486238Z",
            "url": "https://files.pythonhosted.org/packages/a8/f4/2765658a8ae6a2ee5a1724cbab0ec29155010cf1d756a96d6f7d1d3c403f/rabbitmq-utils-1.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-01-17 17:17:00",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "rabbitmq-utils"
}
        
Elapsed time: 0.16351s