| Name | robustmq JSON |
| Version |
0.0.2
JSON |
| download |
| home_page | None |
| Summary | A robust mqtt library with message caching |
| upload_time | 2024-08-31 10:48:30 |
| maintainer | None |
| docs_url | None |
| author | None |
| requires_python | >=3.8 |
| license | None |
| keywords |
|
| VCS |
 |
| bugtrack_url |
|
| requirements |
No requirements were recorded.
|
| Travis-CI |
No Travis.
|
| coveralls test coverage |
No coveralls.
|
# RobustMQ
**RobustMQ** is a reliable on-top mqtt-module designed to support dealing with unreliable connections. This project is aimed at providing a robust, easy-to-use messaging solution.
It is mainly designed for edge devices to reliable transfer produced data even when there are power outages or longer connection troubles. The messages are cached on the file system to be reloaded again when the data transfer is possible.
## Table of Contents
- [Features](#features)
- [Installation](#installation)
- [Usage](#usage)
- [Contributing](#contributing)
- [License](#license)
## Features
- **High Reliability:** Ensures message delivery even in the event of network failures or node crashes.
- **Message Caching:** Messages, which are not received by the broker are cached on the file system for later retries
- **Caching Methods:** Cache large messages directly to pickle or leightweight to sqlite database
- **Ease of Use:** Simple API for integrating with your application, allowing you to move easily from paho.mqtt bare implementation.
## Installation
To install **RobustMQ**, you can clone the repository and install the required dependencies:
```bash
git clone https://github.com/DaqOpen/robustmq.git
cd robustmq
pip install .
```
## Usage
Here is a simple example of how to use RobustMQ in your project:
```python
import time
from pathlib import Path
from robustmq.client import RobustClient
# Create a RobustClient instance
my_robust_client = RobustClient(client_id="testclient", cache_path=Path("/tmp/mymqttcache"))
# Establish a connection to the mqtt broker
my_robust_client.connect_async(mqtt_host="localhost")
# Send some messages
for i in range(20):
my_robust_client.publish("dt/blah", f"Test Message {i:d}")
time.sleep(1)
# Stop the client process
my_robust_client.stop()
```
## How it works
First, a separate process is spawned after the call of connect_async. This is done with the multiprocessing module. For further communication with this process, two queues are created.
The main goal is, to only put message by message into the queue of the mqtt-client, when the previous was sent successfully. This may reduce performance but allows to preserve messages when there are errors.
When publishing a message with the RobustClient, it follows this flow:
1. message is queued in the multiprocessing queue
2. the worker process checks the queue and consumes **one** message if available
3. this message is then published via the underlying paho.mqtt client
4. the process waits until the message has reached its destination (on_publish callback)
1. if a timeout has been reached, the whole input queue (including the actual in publish stuck message) gets cached in the file system
2. the caching is going on until the message has been successfully published
5. When the publish was successful, this meanwhile cached message is deleted
6. at the next loop, it will be checked, if cached data is available and starts publishing that
## Contributing
I welcome contributions to **RobustMQ**! If you'd like to contribute, please fork the repository, create a new branch, and submit a pull request.
## License
This project is licensed under the MIT License - see the LICENSE file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "robustmq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": null,
"author": null,
"author_email": "Michael Oberhofer <info@pqopen.com>",
"download_url": "https://files.pythonhosted.org/packages/c2/ec/da4aaa8b59355d1741f1be05d438ea232d561462ea78696964c1a7b764de/robustmq-0.0.2.tar.gz",
"platform": null,
"description": "# RobustMQ\n\n**RobustMQ** is a reliable on-top mqtt-module designed to support dealing with unreliable connections. This project is aimed at providing a robust, easy-to-use messaging solution.\n\nIt is mainly designed for edge devices to reliable transfer produced data even when there are power outages or longer connection troubles. The messages are cached on the file system to be reloaded again when the data transfer is possible.\n\n## Table of Contents\n- [Features](#features)\n- [Installation](#installation)\n- [Usage](#usage)\n- [Contributing](#contributing)\n- [License](#license)\n\n## Features\n- **High Reliability:** Ensures message delivery even in the event of network failures or node crashes.\n- **Message Caching:** Messages, which are not received by the broker are cached on the file system for later retries\n- **Caching Methods:** Cache large messages directly to pickle or leightweight to sqlite database\n- **Ease of Use:** Simple API for integrating with your application, allowing you to move easily from paho.mqtt bare implementation.\n\n## Installation\nTo install **RobustMQ**, you can clone the repository and install the required dependencies:\n\n\n```bash\ngit clone https://github.com/DaqOpen/robustmq.git\ncd robustmq\npip install .\n```\n\n## Usage\nHere is a simple example of how to use RobustMQ in your project:\n\n```python\nimport time\nfrom pathlib import Path\nfrom robustmq.client import RobustClient\n\n# Create a RobustClient instance\nmy_robust_client = RobustClient(client_id=\"testclient\", cache_path=Path(\"/tmp/mymqttcache\"))\n# Establish a connection to the mqtt broker\nmy_robust_client.connect_async(mqtt_host=\"localhost\")\n\n# Send some messages\nfor i in range(20):\n my_robust_client.publish(\"dt/blah\", f\"Test Message {i:d}\")\n time.sleep(1)\n\n# Stop the client process\nmy_robust_client.stop()\n```\n\n\n\n## How it works\n\nFirst, a separate process is spawned after the call of connect_async. This is done with the multiprocessing module. For further communication with this process, two queues are created.\n\nThe main goal is, to only put message by message into the queue of the mqtt-client, when the previous was sent successfully. This may reduce performance but allows to preserve messages when there are errors.\n\nWhen publishing a message with the RobustClient, it follows this flow:\n\n1. message is queued in the multiprocessing queue\n2. the worker process checks the queue and consumes **one** message if available\n3. this message is then published via the underlying paho.mqtt client\n4. the process waits until the message has reached its destination (on_publish callback)\n 1. if a timeout has been reached, the whole input queue (including the actual in publish stuck message) gets cached in the file system\n 2. the caching is going on until the message has been successfully published\n5. When the publish was successful, this meanwhile cached message is deleted\n6. at the next loop, it will be checked, if cached data is available and starts publishing that\n\n\n\n## Contributing\n\nI welcome contributions to **RobustMQ**! If you'd like to contribute, please fork the repository, create a new branch, and submit a pull request.\n\n## License\n\nThis project is licensed under the MIT License - see the LICENSE file for details.\n",
"bugtrack_url": null,
"license": null,
"summary": "A robust mqtt library with message caching",
"version": "0.0.2",
"project_urls": {
"Homepage": "https://github.com/DaqOpen/robustmq",
"Issues": "https://github.com/DaqOpen/robustmq/issues"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "2e096ade72655c9c180440479036b58c40588285c58c3cdeef5de13234b9905c",
"md5": "568de32ff3c8caa9aa027d2f339fd354",
"sha256": "02f697ced07ccb2818c992c05ccc3fa1835fce8f4202915716878480a63e6c24"
},
"downloads": -1,
"filename": "robustmq-0.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "568de32ff3c8caa9aa027d2f339fd354",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 6792,
"upload_time": "2024-08-31T10:48:28",
"upload_time_iso_8601": "2024-08-31T10:48:28.724076Z",
"url": "https://files.pythonhosted.org/packages/2e/09/6ade72655c9c180440479036b58c40588285c58c3cdeef5de13234b9905c/robustmq-0.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "c2ecda4aaa8b59355d1741f1be05d438ea232d561462ea78696964c1a7b764de",
"md5": "7030b9347bd55b38f06f8cf191408ac8",
"sha256": "b3251b26efb51e439b0bb33adc7ceb9f6b492369f8a75c8d98e1379dbb39461b"
},
"downloads": -1,
"filename": "robustmq-0.0.2.tar.gz",
"has_sig": false,
"md5_digest": "7030b9347bd55b38f06f8cf191408ac8",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 7585,
"upload_time": "2024-08-31T10:48:30",
"upload_time_iso_8601": "2024-08-31T10:48:30.742189Z",
"url": "https://files.pythonhosted.org/packages/c2/ec/da4aaa8b59355d1741f1be05d438ea232d561462ea78696964c1a7b764de/robustmq-0.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-08-31 10:48:30",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "DaqOpen",
"github_project": "robustmq",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "robustmq"
}