Name | mqroute JSON |
Version |
0.3.0
JSON |
| download |
home_page | None |
Summary | MQRoute is a Python library designed to simplify working with MQTT |
upload_time | 2025-01-05 12:25:27 |
maintainer | None |
docs_url | None |
author | ehyde74 |
requires_python | >=3.13 |
license | MIT License |
keywords |
mqtt
asyncio
decorators
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# **MQRoute**
`MQRoute` is a Python MQTT routing library designed to simplify working with MQTT topics by abstracting complexity. It supports advanced topic matching (including wildcards and parameterized topics), allows easy registration of callbacks using decorators, and provides scalable asynchronous callback handling.
Whether you're building an IoT platform or a messaging service, `MQRoute` makes it easy to manage MQTT subscriptions, streamline message processing, and publish messages to MQTT topics with minimal effort.
---
## **Features**
- **Flexible Publishing:**
Easily publish messages to MQTT topics with simple method calls. Supports both JSON and raw payloads.
- **Dynamic Topic Matching:**
Supports `+` and `#` MQTT wildcards, as well as parameterized topics (`+parameter_name+`) for extracting parameters from topic strings.
- **Asynchronous by Design:**
Built with `asyncio` for responsive handling of incoming MQTT messages and user-defined asynchronous callbacks.
- **Decorator-Based Callbacks:**
Subscribe to MQTT topics effortlessly using Python decorators.
- **Type Safety:**
Includes type hints and validation with the `typeguard` library.
- **Extensive Logging and Debugging:**
Detailed logs for easy troubleshooting of MQTT operations and callbacks.
- **Customizable Payload Handling:**
Easy-to-use mechanisms for handling raw or JSON-formatted payloads.
---
## **Installation**
You can install MQRoute simply by using pip:
```shell
pip install mqroute
```
You may also download it from GitHub, for example, when local modifications are needed. That's your call!
---
## **Getting Started**
### Publish Messages
`MQRoute` makes it simple to publish messages to any topic, and it supports JSON encoding by default. Use the `publish` method for synchronous publishing or `publish_async` for asynchronous needs.
Below are the steps to start using `MQRoute`. For more advanced usage, refer to detailed examples in [the `testclient.py`](./testclient.py).
### Initialize the MQTT Client
Use the `MQTTClient` class to connect to the MQTT broker, subscribe to topics, and handle messages.
```python
import asyncio
from mqroute.mqtt_client import MQTTClient
mqtt = MQTTClient(host="test.mosquitto.org", port=1883)
asyncio.run(mqtt.run()) # Establishes connection and starts listening
```
### Subscribe to Topics
Use the `@mqtt.subscribe` decorator to register a specific callback for a topic. The library supports `+` and `#` MQTT wildcards and parameterized topics.
```python
@mqtt.subscribe(topic="devices/+/status")
async def handle_device_status(topic, msg, params):
print(f"Device {params[0]} status: {msg.message}")
@mqtt.subscribe(topic="sensors/+/data/+/type/#")
async def handle_sensor_data(topic, msg, params):
print(f"Sensor {params[0]} data at {params[1]}: {msg.message}")
```
### Publish to Topics
Use the `@mqtt.publish` decorator to register a specific method to publish message to topic. The return value
that is either dict or str will be send to this topic. For more complex cases the functional interface
is also available.
```python
async def publish_reset_command(*args, **kwargs):
# any parameters can be added to the signature
await mqtt.async_publish_message(topic="devices/thing/command", payload="do_reset")
def publish_restart_command(*args, **kwargs):
# any parameters can be added to the signature
mqtt.publish_message(topic="devices/thing/command", payload="do_restart")
@mqtt.publish(topic="devices/thing/command")
def publish_send_status_command(*args, **kwargs):
# any parameters can be added to the signature
return "send_status"
@mqtt.publish(topic="devices/thing/command")
async def publish_do_factory_reset(passkey: str):
# any parameters can be added to the signature
return f"do_factory_reset {passkey}"
```
### Handle JSON Payloads Automatically
JSON payloads are converted automatically to dictionaries. If this behavior is not desired,
set the `raw_payload` parameter in the decorator to `True` to receive raw data in the callback instead.
The value of `raw_payload` defaults to `False`. Callbacks can also be marked as fallback, meaning
they are only called if a topic doesn't match any non-fallback subscriptions. Note: Multiple fallback methods
can be defined, and multiple fallbacks may match and thus be called.
```python
@mqtt.subscribe(topic="config/update/json")
async def handle_config_update1(topic, msg, params):
# Access the payload as a Python dictionary
config_data_as_dict = msg.message
print(f"Received config update: {config_data_as_dict}")
@mqtt.subscribe(topic="config/update/raw", raw_payload=True)
async def handle_config_update2(topic, msg, params):
# Access the payload as a raw string
config_data_as_raw = msg.message
print(f"Received config update: {config_data_as_raw}")
@mqtt.subscribe(topic="config/#", raw_payload=True, fallback=True)
async def handle_config_update3(topic, msg, params):
# Access the payload as a raw string
config_data_as_raw = msg.message
print(f"Received config update: {config_data_as_raw}")
```
---
### Custom signal handling for terminating application
Custom termination logic can be applied by using decorator sigstop:
```python
@mqtt.sigstop
async def sigstop_handler():
# termination requested
print(f"Received request to terminate application.")
```
---
## **Example: Full Client Code**
Below is an updated example that demonstrates how to use `MQRoute`:
The updated example below demonstrates how to use `MQRoute` for subscribing to and publishing MQTT messages:
```python
import asyncio
from mqroute.client import MQTTClient
from mqroute.mqtt_client import MQTTClient
mqtt = MQTTClient(host="mqtt.example.com", port=1883)
@mqtt.subscribe(topic="devices/+/status")
async def handle_device_status(topic, msg, params):
print(f"Device {params[0]} status: {msg.message}")
@mqtt.subscribe(topic="sensors/+/status", raw_payload=True)
async def handle_sensor_status(topic, msg, params):
sensor_id = params[0]
print(f"Sensor {sensor_id} received raw status: {msg.message}")
@mqtt.subscribe(topic="sensors/#", fallback=True)
async def handle_sensor_data(topic, msg, params):
print(f"Sensor data received on topic {topic}: {msg.message}")
@mqtt.sigstop
async def sigstop_handler():
# termination requested
print(f"Received request to terminate application.")
async def handle_green_light_status(topic, msg, params):
print(f"Green sensor status: {msg.message}")
async def handle_green_light_status(topic, msg, params):
print(f"Green sensor status: {msg.message}")
async def publish_example():
# Publish using JSON payload and more explicit functional interface
await mqtt.publish(topic="devices/control", payload={"command": "restart", "timeout": 5})
@mqtt.publish(topic="raw/commands")
async def publish_example2(data: str):
# Publish using raw payload and simpler decorated interface
return f"Raw data string with parameter '{data}'"
async def main():
# callback can also be added using functional interface.
mqtt.add_subscription(handle_green_light_status,
topic="sensors/green/status")
await mqtt.run()
# Include publishing example
await publish_example()
await publish_example2("this data")
# Keep the client running
while mqtt.running:
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
```
---
## **Advanced Features**
### 1. **Parameterized Topics**
Extract dynamic portions of a topic using parameterized syntax:
```python
@mqtt.subscribe(topic="room/+room+/device/+device+/status")
async def handle_parametrized(topic, msg, params):
print(f"Device {params['device']} in room {params['room']} has status: {msg.message}")
```
### 2. **Custom Callback Runner**
For advanced use cases, directly manage callbacks using the `CallbackRunner` class.
---
## **Testing**
Integration and unit testing can be performed using `pytest`. Sample test cases are provided in the repository.
Run the tests:
```bash
pytest tests/
```
---
## **Planned Improvements**
- **Customization and extendability:** Allow easy means to support for example custom payload formats
- **Demo environment**: Demo environment with mqtt router and two mqroute clients talking. This would allow
demo client to not depend on test.mosquitto.org
demo client to not depend on third-party MQTT brokers.
## **Contributing**
Contributions and feedback are welcome! If you'd like to contribute, please follow these steps:
1. Fork the repository.
2. Create a feature branch (`git checkout -b feature-name`).
3. Commit your changes (`git commit -m 'Add new feature'`).
4. Push to the branch (`git push origin feature-name`).
5. Submit a pull request.
For major changes, please open an issue first to discuss what you'd like to improve.
---
## **License**
This project is licensed under the MIT License. See the [LICENSE](./LICENSE) file for details.
---
## **Additional Notes**
- For complete functionality and advanced examples, refer to the `testclient.py` file provided in the repository.
- MQRoute is in active development. Feel free to report bugs.
Raw data
{
"_id": null,
"home_page": null,
"name": "mqroute",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.13",
"maintainer_email": null,
"keywords": "mqtt, asyncio, decorators",
"author": "ehyde74",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/39/0f/90a2bcedc14cd940fcf305bb995cd7a3c30efdc58331a4378f389cd29e0f/mqroute-0.3.0.tar.gz",
"platform": null,
"description": "# **MQRoute**\r\n\r\n`MQRoute` is a Python MQTT routing library designed to simplify working with MQTT topics by abstracting complexity. It supports advanced topic matching (including wildcards and parameterized topics), allows easy registration of callbacks using decorators, and provides scalable asynchronous callback handling.\r\n\r\nWhether you're building an IoT platform or a messaging service, `MQRoute` makes it easy to manage MQTT subscriptions, streamline message processing, and publish messages to MQTT topics with minimal effort.\r\n\r\n---\r\n\r\n## **Features**\r\n- **Flexible Publishing:** \r\n Easily publish messages to MQTT topics with simple method calls. Supports both JSON and raw payloads.\r\n\r\n- **Dynamic Topic Matching:** \r\n Supports `+` and `#` MQTT wildcards, as well as parameterized topics (`+parameter_name+`) for extracting parameters from topic strings.\r\n\r\n- **Asynchronous by Design:** \r\n Built with `asyncio` for responsive handling of incoming MQTT messages and user-defined asynchronous callbacks.\r\n\r\n- **Decorator-Based Callbacks:** \r\n Subscribe to MQTT topics effortlessly using Python decorators.\r\n\r\n- **Type Safety:** \r\n Includes type hints and validation with the `typeguard` library.\r\n\r\n- **Extensive Logging and Debugging:** \r\n Detailed logs for easy troubleshooting of MQTT operations and callbacks.\r\n\r\n- **Customizable Payload Handling:** \r\n Easy-to-use mechanisms for handling raw or JSON-formatted payloads.\r\n\r\n---\r\n\r\n## **Installation**\r\n\r\nYou can install MQRoute simply by using pip:\r\n\r\n```shell\r\npip install mqroute\r\n```\r\n\r\nYou may also download it from GitHub, for example, when local modifications are needed. That's your call!\r\n---\r\n\r\n## **Getting Started**\r\n### Publish Messages\r\n`MQRoute` makes it simple to publish messages to any topic, and it supports JSON encoding by default. Use the `publish` method for synchronous publishing or `publish_async` for asynchronous needs.\r\n\r\nBelow are the steps to start using `MQRoute`. For more advanced usage, refer to detailed examples in [the `testclient.py`](./testclient.py).\r\n\r\n### Initialize the MQTT Client\r\n\r\nUse the `MQTTClient` class to connect to the MQTT broker, subscribe to topics, and handle messages.\r\n\r\n```python\r\nimport asyncio\r\nfrom mqroute.mqtt_client import MQTTClient\r\n\r\nmqtt = MQTTClient(host=\"test.mosquitto.org\", port=1883)\r\nasyncio.run(mqtt.run()) # Establishes connection and starts listening\r\n```\r\n\r\n### Subscribe to Topics\r\n\r\nUse the `@mqtt.subscribe` decorator to register a specific callback for a topic. The library supports `+` and `#` MQTT wildcards and parameterized topics.\r\n\r\n```python\r\n@mqtt.subscribe(topic=\"devices/+/status\")\r\nasync def handle_device_status(topic, msg, params):\r\n print(f\"Device {params[0]} status: {msg.message}\")\r\n\r\n@mqtt.subscribe(topic=\"sensors/+/data/+/type/#\")\r\nasync def handle_sensor_data(topic, msg, params):\r\n print(f\"Sensor {params[0]} data at {params[1]}: {msg.message}\")\r\n```\r\n\r\n### Publish to Topics\r\n\r\nUse the `@mqtt.publish` decorator to register a specific method to publish message to topic. The return value \r\nthat is either dict or str will be send to this topic. For more complex cases the functional interface\r\nis also available.\r\n\r\n```python\r\nasync def publish_reset_command(*args, **kwargs):\r\n # any parameters can be added to the signature\r\n await mqtt.async_publish_message(topic=\"devices/thing/command\", payload=\"do_reset\")\r\n \r\ndef publish_restart_command(*args, **kwargs):\r\n # any parameters can be added to the signature\r\n mqtt.publish_message(topic=\"devices/thing/command\", payload=\"do_restart\")\r\n\r\n@mqtt.publish(topic=\"devices/thing/command\")\r\ndef publish_send_status_command(*args, **kwargs):\r\n # any parameters can be added to the signature\r\n return \"send_status\"\r\n\r\n@mqtt.publish(topic=\"devices/thing/command\")\r\nasync def publish_do_factory_reset(passkey: str):\r\n # any parameters can be added to the signature\r\n return f\"do_factory_reset {passkey}\"\r\n\r\n\r\n```\r\n\r\n### Handle JSON Payloads Automatically\r\n\r\nJSON payloads are converted automatically to dictionaries. If this behavior is not desired,\r\n set the `raw_payload` parameter in the decorator to `True` to receive raw data in the callback instead.\r\n The value of `raw_payload` defaults to `False`. Callbacks can also be marked as fallback, meaning\r\n they are only called if a topic doesn't match any non-fallback subscriptions. Note: Multiple fallback methods\r\n can be defined, and multiple fallbacks may match and thus be called.\r\n\r\n```python\r\n@mqtt.subscribe(topic=\"config/update/json\")\r\nasync def handle_config_update1(topic, msg, params):\r\n # Access the payload as a Python dictionary\r\n config_data_as_dict = msg.message\r\n print(f\"Received config update: {config_data_as_dict}\")\r\n\r\n@mqtt.subscribe(topic=\"config/update/raw\", raw_payload=True)\r\nasync def handle_config_update2(topic, msg, params):\r\n # Access the payload as a raw string\r\n config_data_as_raw = msg.message\r\n print(f\"Received config update: {config_data_as_raw}\")\r\n \r\n@mqtt.subscribe(topic=\"config/#\", raw_payload=True, fallback=True)\r\nasync def handle_config_update3(topic, msg, params):\r\n # Access the payload as a raw string\r\n config_data_as_raw = msg.message\r\n print(f\"Received config update: {config_data_as_raw}\")\r\n\r\n```\r\n\r\n---\r\n\r\n### Custom signal handling for terminating application\r\nCustom termination logic can be applied by using decorator sigstop:\r\n\r\n```python\r\n@mqtt.sigstop\r\nasync def sigstop_handler():\r\n # termination requested\r\n print(f\"Received request to terminate application.\")\r\n```\r\n---\r\n\r\n## **Example: Full Client Code**\r\nBelow is an updated example that demonstrates how to use `MQRoute`:\r\nThe updated example below demonstrates how to use `MQRoute` for subscribing to and publishing MQTT messages:\r\n```python\r\nimport asyncio\r\nfrom mqroute.client import MQTTClient\r\nfrom mqroute.mqtt_client import MQTTClient\r\n\r\nmqtt = MQTTClient(host=\"mqtt.example.com\", port=1883)\r\n\r\n\r\n@mqtt.subscribe(topic=\"devices/+/status\")\r\nasync def handle_device_status(topic, msg, params):\r\n print(f\"Device {params[0]} status: {msg.message}\")\r\n\r\n\r\n@mqtt.subscribe(topic=\"sensors/+/status\", raw_payload=True)\r\nasync def handle_sensor_status(topic, msg, params):\r\n sensor_id = params[0]\r\n print(f\"Sensor {sensor_id} received raw status: {msg.message}\")\r\n\r\n@mqtt.subscribe(topic=\"sensors/#\", fallback=True)\r\nasync def handle_sensor_data(topic, msg, params):\r\n print(f\"Sensor data received on topic {topic}: {msg.message}\")\r\n \r\n@mqtt.sigstop\r\nasync def sigstop_handler():\r\n # termination requested\r\n print(f\"Received request to terminate application.\")\r\n\r\n \r\nasync def handle_green_light_status(topic, msg, params):\r\n print(f\"Green sensor status: {msg.message}\")\r\n\r\nasync def handle_green_light_status(topic, msg, params):\r\n print(f\"Green sensor status: {msg.message}\")\r\n\r\nasync def publish_example():\r\n # Publish using JSON payload and more explicit functional interface\r\n await mqtt.publish(topic=\"devices/control\", payload={\"command\": \"restart\", \"timeout\": 5})\r\n\r\n@mqtt.publish(topic=\"raw/commands\") \r\nasync def publish_example2(data: str):\r\n # Publish using raw payload and simpler decorated interface\r\n return f\"Raw data string with parameter '{data}'\"\r\n\r\n\r\nasync def main(): \r\n # callback can also be added using functional interface.\r\n mqtt.add_subscription(handle_green_light_status,\r\n topic=\"sensors/green/status\")\r\n await mqtt.run()\r\n \r\n # Include publishing example\r\n await publish_example()\r\n await publish_example2(\"this data\")\r\n \r\n # Keep the client running\r\n while mqtt.running:\r\n await asyncio.sleep(0.1)\r\n\r\n\r\nif __name__ == \"__main__\":\r\n asyncio.run(main())\r\n```\r\n\r\n---\r\n\r\n## **Advanced Features**\r\n\r\n### 1. **Parameterized Topics**\r\nExtract dynamic portions of a topic using parameterized syntax:\r\n```python\r\n@mqtt.subscribe(topic=\"room/+room+/device/+device+/status\")\r\nasync def handle_parametrized(topic, msg, params):\r\n print(f\"Device {params['device']} in room {params['room']} has status: {msg.message}\")\r\n```\r\n\r\n### 2. **Custom Callback Runner**\r\nFor advanced use cases, directly manage callbacks using the `CallbackRunner` class.\r\n\r\n---\r\n\r\n## **Testing**\r\n\r\nIntegration and unit testing can be performed using `pytest`. Sample test cases are provided in the repository.\r\n\r\nRun the tests:\r\n```bash\r\npytest tests/\r\n```\r\n\r\n---\r\n\r\n## **Planned Improvements**\r\n- **Customization and extendability:** Allow easy means to support for example custom payload formats\r\n- **Demo environment**: Demo environment with mqtt router and two mqroute clients talking. This would allow\r\n demo client to not depend on test.mosquitto.org\r\n demo client to not depend on third-party MQTT brokers.\r\n\r\n## **Contributing**\r\n\r\nContributions and feedback are welcome! If you'd like to contribute, please follow these steps:\r\n\r\n1. Fork the repository.\r\n2. Create a feature branch (`git checkout -b feature-name`).\r\n3. Commit your changes (`git commit -m 'Add new feature'`).\r\n4. Push to the branch (`git push origin feature-name`).\r\n5. Submit a pull request.\r\n\r\nFor major changes, please open an issue first to discuss what you'd like to improve.\r\n\r\n---\r\n\r\n## **License**\r\n\r\nThis project is licensed under the MIT License. See the [LICENSE](./LICENSE) file for details.\r\n\r\n---\r\n\r\n## **Additional Notes**\r\n\r\n- For complete functionality and advanced examples, refer to the `testclient.py` file provided in the repository.\r\n- MQRoute is in active development. Feel free to report bugs.\r\n",
"bugtrack_url": null,
"license": "MIT License",
"summary": "MQRoute is a Python library designed to simplify working with MQTT",
"version": "0.3.0",
"project_urls": {
"Homepage": "https://github.com/ehyde74/mqroute"
},
"split_keywords": [
"mqtt",
" asyncio",
" decorators"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "29dec489907faf942571d27190c5bc901c346a75c592fc0816a71173c772caaf",
"md5": "3b153e3c44e606d4f65ce4d51d31cba2",
"sha256": "f47bcb40a6cc8f6a9ff8b37d1640dc335a6041cc8f073abaed5898b5ed380455"
},
"downloads": -1,
"filename": "mqroute-0.3.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "3b153e3c44e606d4f65ce4d51d31cba2",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.13",
"size": 32739,
"upload_time": "2025-01-05T12:25:24",
"upload_time_iso_8601": "2025-01-05T12:25:24.143797Z",
"url": "https://files.pythonhosted.org/packages/29/de/c489907faf942571d27190c5bc901c346a75c592fc0816a71173c772caaf/mqroute-0.3.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "390f90a2bcedc14cd940fcf305bb995cd7a3c30efdc58331a4378f389cd29e0f",
"md5": "4ba374805d21cf6a197a36fcddf3acff",
"sha256": "b549a7d484978c4b4413bc3b09d742559bc520ea905e0336f1da6dd1ac7d9c06"
},
"downloads": -1,
"filename": "mqroute-0.3.0.tar.gz",
"has_sig": false,
"md5_digest": "4ba374805d21cf6a197a36fcddf3acff",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.13",
"size": 30547,
"upload_time": "2025-01-05T12:25:27",
"upload_time_iso_8601": "2025-01-05T12:25:27.305130Z",
"url": "https://files.pythonhosted.org/packages/39/0f/90a2bcedc14cd940fcf305bb995cd7a3c30efdc58331a4378f389cd29e0f/mqroute-0.3.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-05 12:25:27",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ehyde74",
"github_project": "mqroute",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [],
"lcname": "mqroute"
}