persistmq


Namepersistmq JSON
Version 0.0.4 PyPI version JSON
download
home_pageNone
SummaryA robust mqtt library with message caching
upload_time2024-09-03 16:53:56
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # PersistMQ

**PersistMQ** is a reliable on-top mqtt-module designed to support dealing with unreliable connections. This project is aimed at providing a robust, easy-to-use messaging solution.

It is mainly designed for edge devices to  reliable transfer produced data even when there are power outages or longer connection troubles. The messages are cached on the file system to be reloaded again when the data transfer is possible.

## Table of Contents
- [Features](#features)
- [Intended Use](#Intended-Use)
- [Installation](#installation)
- [Usage](#usage)
- [Contributing](#contributing)
- [License](#license)

## Features
- **High Reliability:** Ensures message delivery even in the event of network failures or node crashes.
- **Message Caching:** Messages, which are not received by the broker are cached on the file system for later retries
- **Caching Methods:** Cache large messages directly to pickle or leightweight to sqlite database
- **Ease of Use:** Simple API for integrating with your application, allowing you to move easily from paho.mqtt bare implementation.

## Intended Use

✅When you should use this library:

- Long running applications which produces cyclically data (e.g. measurement devices)
- Historic data is important, not only the most recent (provide timestamp in your payload!)
- Typical message transmission time is lower than your message period (a jam could occur)
- QoS=2 is necessary

❌When you better go with others:

- High amount of messages (more than 10 per Second)
- QoS=0 is enough

## Installation
For easy use, this package can be installed via pip from pypi:

```bash
pip install persistmq
```

As an alternative, you can clone the repository and install the required dependencies:


```bash
git clone https://github.com/DaqOpen/persistmq.git
cd persistmq
pip install .
```

## Usage
Here is a simple example of how to use PersistMQ in your project:

```python
import time
from pathlib import Path
from persistmq.client import PersistClient

# Create a PersistClient instance
my_robust_client = PersistClient(client_id="testclient", cache_path=Path("/tmp/mymqttcache"))
# Establish a connection to the mqtt broker
my_robust_client.connect_async(mqtt_host="localhost")

# Send some messages
for i in range(20):
    my_robust_client.publish("dt/blah", f"Test Message {i:d}")
    time.sleep(1)

# Stop the client process
my_robust_client.stop()
```



## How it works

First, a separate process is spawned after the call of connect_async. This is done with the multiprocessing module. For further communication with this process, two queues are created.

The main goal is, to only put message by message into the queue of the mqtt-client, when the previous was sent successfully. This may reduce performance but allows to preserve messages when there are errors.

When publishing a message with the PersistClient, it follows this flow:

1. message is queued in the multiprocessing queue
2. the worker process checks the queue and consumes **one** message if available
3. this message is then published via the underlying paho.mqtt client
4. the process waits until the message has reached its destination (on_publish callback)
   1. if a timeout has been reached, the whole input queue (including the actual in publish stuck message) gets cached in the file system
   2. the caching is going on until the message has been successfully published
5. When the publish was successful, this meanwhile cached message is deleted
6. at the next loop, it will be checked, if cached data is available and starts publishing that



## Roadmap

A quick and dirty roadmap to show what is planned for the future:

- Transmission of "Bulk" messages: In unreliable networks, the QoS=2 ping-pong of MQTT takes a long time and can generate unnecessary delay when transmitting many messages



## Contributing

I welcome contributions to **PersistMQ**! If you'd like to contribute, please fork the repository, create a new branch, and submit a pull request.

## License

This project is licensed under the MIT License - see the LICENSE file for details.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "persistmq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": null,
    "author_email": "Michael Oberhofer <info@pqopen.com>",
    "download_url": "https://files.pythonhosted.org/packages/23/2e/ffa3b0d0247643ac8f378319e8002ada3cfb9ba9fad672a766f7a01f478f/persistmq-0.0.4.tar.gz",
    "platform": null,
    "description": "# PersistMQ\n\n**PersistMQ** is a reliable on-top mqtt-module designed to support dealing with unreliable connections. This project is aimed at providing a robust, easy-to-use messaging solution.\n\nIt is mainly designed for edge devices to  reliable transfer produced data even when there are power outages or longer connection troubles. The messages are cached on the file system to be reloaded again when the data transfer is possible.\n\n## Table of Contents\n- [Features](#features)\n- [Intended Use](#Intended-Use)\n- [Installation](#installation)\n- [Usage](#usage)\n- [Contributing](#contributing)\n- [License](#license)\n\n## Features\n- **High Reliability:** Ensures message delivery even in the event of network failures or node crashes.\n- **Message Caching:** Messages, which are not received by the broker are cached on the file system for later retries\n- **Caching Methods:** Cache large messages directly to pickle or leightweight to sqlite database\n- **Ease of Use:** Simple API for integrating with your application, allowing you to move easily from paho.mqtt bare implementation.\n\n## Intended Use\n\n\u2705When you should use this library:\n\n- Long running applications which produces cyclically data (e.g. measurement devices)\n- Historic data is important, not only the most recent (provide timestamp in your payload!)\n- Typical message transmission time is lower than your message period (a jam could occur)\n- QoS=2 is necessary\n\n\u274cWhen you better go with others:\n\n- High amount of messages (more than 10 per Second)\n- QoS=0 is enough\n\n## Installation\nFor easy use, this package can be installed via pip from pypi:\n\n```bash\npip install persistmq\n```\n\nAs an alternative, you can clone the repository and install the required dependencies:\n\n\n```bash\ngit clone https://github.com/DaqOpen/persistmq.git\ncd persistmq\npip install .\n```\n\n## Usage\nHere is a simple example of how to use PersistMQ in your project:\n\n```python\nimport time\nfrom pathlib import Path\nfrom persistmq.client import PersistClient\n\n# Create a PersistClient instance\nmy_robust_client = PersistClient(client_id=\"testclient\", cache_path=Path(\"/tmp/mymqttcache\"))\n# Establish a connection to the mqtt broker\nmy_robust_client.connect_async(mqtt_host=\"localhost\")\n\n# Send some messages\nfor i in range(20):\n    my_robust_client.publish(\"dt/blah\", f\"Test Message {i:d}\")\n    time.sleep(1)\n\n# Stop the client process\nmy_robust_client.stop()\n```\n\n\n\n## How it works\n\nFirst, a separate process is spawned after the call of connect_async. This is done with the multiprocessing module. For further communication with this process, two queues are created.\n\nThe main goal is, to only put message by message into the queue of the mqtt-client, when the previous was sent successfully. This may reduce performance but allows to preserve messages when there are errors.\n\nWhen publishing a message with the PersistClient, it follows this flow:\n\n1. message is queued in the multiprocessing queue\n2. the worker process checks the queue and consumes **one** message if available\n3. this message is then published via the underlying paho.mqtt client\n4. the process waits until the message has reached its destination (on_publish callback)\n   1. if a timeout has been reached, the whole input queue (including the actual in publish stuck message) gets cached in the file system\n   2. the caching is going on until the message has been successfully published\n5. When the publish was successful, this meanwhile cached message is deleted\n6. at the next loop, it will be checked, if cached data is available and starts publishing that\n\n\n\n## Roadmap\n\nA quick and dirty roadmap to show what is planned for the future:\n\n- Transmission of \"Bulk\" messages: In unreliable networks, the QoS=2 ping-pong of MQTT takes a long time and can generate unnecessary delay when transmitting many messages\n\n\n\n## Contributing\n\nI welcome contributions to **PersistMQ**! If you'd like to contribute, please fork the repository, create a new branch, and submit a pull request.\n\n## License\n\nThis project is licensed under the MIT License - see the LICENSE file for details.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A robust mqtt library with message caching",
    "version": "0.0.4",
    "project_urls": {
        "Homepage": "https://github.com/DaqOpen/persistmq",
        "Issues": "https://github.com/DaqOpen/persistmq/issues"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "2c55723e13e96f7cf905f8d0962449918bf8f2ae17f5f2f67bb5c3e179db2b18",
                "md5": "6ad590d7b581e6af2b5060dd13ad1b3f",
                "sha256": "fbb7be021fc795e21acdffbcdb64b01f0559a5726957aec94d75963c1e3d2dfc"
            },
            "downloads": -1,
            "filename": "persistmq-0.0.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6ad590d7b581e6af2b5060dd13ad1b3f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 7281,
            "upload_time": "2024-09-03T16:53:54",
            "upload_time_iso_8601": "2024-09-03T16:53:54.930480Z",
            "url": "https://files.pythonhosted.org/packages/2c/55/723e13e96f7cf905f8d0962449918bf8f2ae17f5f2f67bb5c3e179db2b18/persistmq-0.0.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "232effa3b0d0247643ac8f378319e8002ada3cfb9ba9fad672a766f7a01f478f",
                "md5": "85adbf1f5d75debe956176dd8010aae1",
                "sha256": "21e1e1e1297004d71d3e8b12b65d916300271cde43d4b1196d0003bcd9a34e47"
            },
            "downloads": -1,
            "filename": "persistmq-0.0.4.tar.gz",
            "has_sig": false,
            "md5_digest": "85adbf1f5d75debe956176dd8010aae1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 8085,
            "upload_time": "2024-09-03T16:53:56",
            "upload_time_iso_8601": "2024-09-03T16:53:56.810613Z",
            "url": "https://files.pythonhosted.org/packages/23/2e/ffa3b0d0247643ac8f378319e8002ada3cfb9ba9fad672a766f7a01f478f/persistmq-0.0.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-03 16:53:56",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "DaqOpen",
    "github_project": "persistmq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "persistmq"
}
        
Elapsed time: 0.30377s