# `mqtt-rl-bridge` — Event-Driven MQTT Bridge for Real-World Reinforcement Learning
[](https://pypi.org/project/mqtt-rl-bridge/)
[](https://pypi.org/project/mqtt-rl-bridge/)
[](LICENSE)
[](https://github.com/kagozi/mqtt-rl-bridge/actions)
**A lightweight, framework-agnostic package that connects any MQTT-enabled sensor or robot to a reinforcement learning (RL) agent — in real time.**
Use it to train RL policies on **physical systems**:
- HVAC controllers
- Robotic arms
- Drones
- Smart agriculture
- Industrial IoT
- Home automation
No Gymnasium dependency. No hardcoded examples. Just **three abstract methods** to implement.
---
## Features
- **Zero RL framework lock-in** — works with Stable-Baselines3, RLlib, CleanRL, or your custom loop
- **Event-driven MQTT** — real-time sensor updates via `paho-mqtt`
- **Thread-safe message queue** — no race conditions
- **Timeout fallbacks** — robust to network glitches
- **Minimal API** — subclass `MQTTRLEnv` and implement 3 methods
- **No hardware in tests** — full unit-testable with mocks
---
## Installation
```bash
pip install mqtt-rl-bridge
```
Optional extras:
```bash
pip install "mqtt-rl-bridge[gym]" # for Gymnasium space support
pip install "mqtt-rl-bridge[dev]" # for testing + linting
```
---
## Quick Start
### 1. Create Your Environment
```python
# my_env.py
import numpy as np
from mqtt_rl_bridge import MQTTRLEnv
class TemperatureControlEnv(MQTTRLEnv):
def __init__(self, **kwargs):
super().__init__(
sensor_topic="home/sensors",
action_topic="home/hvac",
timeout=8.0,
step_delay=2.0, # wait for HVAC to respond
**kwargs
)
def _extract_observation(self, raw: dict) -> np.ndarray:
temp = float(raw.get("temperature", 22.0))
target = float(raw.get("setpoint", 22.0))
return np.array([temp, target], dtype=np.float32)
def _encode_action(self, action: int) -> dict:
modes = ["cool", "off", "heat"]
return {"mode": modes[action]}
def _compute_reward(self, obs, action, next_obs) -> float:
temp, target = next_obs
error = abs(temp - target)
energy = 0.1 if action != 1 else 0.0
return -error - energy
def _is_done(self, obs, action) -> tuple[bool, bool]:
temp, target = obs
return abs(temp - target) < 0.5, False
```
---
### 2. Use in Any RL Loop
#### Custom Training Loop
```python
from my_env import TemperatureControlEnv
import numpy as np
env = TemperatureControlEnv(broker_host="192.168.1.100")
obs = env.reset()
for _ in range(200):
action = np.random.randint(0, 3)
obs, reward, done, truncated, info = env.step(action)
print(f"Temp: {obs[0]:.1f}°C → Reward: {reward:.3f}")
if done or truncated:
obs = env.reset()
env.close()
```
#### Stable-Baselines3 (PPO)
```python
from my_env import TemperatureControlEnv
from stable_baselines3 import PPO
env = TemperatureControlEnv(broker_host="192.168.1.100")
model = PPO("MlpPolicy", env, verbose=1, learning_rate=3e-4)
model.learn(total_timesteps=10_000)
model.save("temp_control_ppo")
env.close()
```
#### Ray RLlib
```python
import ray
from ray.rllib.algorithms.ppo import PPOConfig
from my_env import TemperatureControlEnv
ray.init()
config = PPOConfig().environment(
TemperatureControlEnv,
env_config={"broker_host": "192.168.1.100"}
)
algo = config.build()
algo.train()
algo.save("rllib_checkpoint")
algo.stop()
```
---
## API Reference
### `MQTTRLEnv` — Abstract Base Class
```python
class MQTTRLEnv(ABC):
def __init__(
self,
broker_host: str = "127.0.0.1",
sensor_topic: str = "sensors",
action_topic: str = "actions",
timeout: float = 5.0,
step_delay: float = 0.0,
)
```
| Parameter | Description |
|---------|-------------|
| `broker_host` | MQTT broker IP or hostname |
| `sensor_topic` | Topic where sensor data is published |
| `action_topic` | Topic where agent sends actions |
| `timeout` | Max wait time for sensor message (seconds) |
| `step_delay` | Sleep after sending action (for hardware latency) |
---
### Methods You **Must** Implement
| Method | Signature | Purpose |
|-------|---------|--------|
| `_extract_observation` | `raw: dict → np.ndarray` | Parse JSON payload into observation |
| `_encode_action` | `action: Any → dict` | Convert action to MQTT JSON |
| `_compute_reward` | `obs, action, next_obs → float` | Reward function |
---
### Optional Overrides
| Method | Default | Use Case |
|-------|--------|---------|
| `_is_done` | `(False, False)` | Terminate episode |
| `reset_hook` | `pass` | Send reset command on `reset()` |
---
### Core Methods (Gymnasium-Compatible)
```python
obs = env.reset() → np.ndarray
obs, reward, terminated, truncated, info = env.step(action)
env.close()
```
Works with **any** RL library that expects this signature.
---
## Testing Without Hardware
```python
from unittest.mock import MagicMock
from mqtt_rl_bridge import MQTTRLEnv
class MockEnv(MQTTRLEnv):
def __init__(self):
super().__init__(broker_host="localhost")
self.broker.get_message = MagicMock(
return_value={"temp": 25.0, "humidity": 60.0}
)
def _extract_observation(self, raw): return np.array([raw["temp"]])
def _encode_action(self, a): return {"fan": "on" if a == 1 else "off"}
def _compute_reward(self, o, a, no): return -abs(no[0] - 22.0)
```
Run unit tests without a broker.
---
## Project Structure
```bash
my-iot-rl-project/
├── envs/
│ └── temperature_env.py
├── models/
├── train_ppo.py
├── requirements.txt
└── tests/
```
`requirements.txt`:
```
mqtt-rl-bridge
stable-baselines3
gymnasium
```
---
## Contributing
1. Fork the repo
2. Create a branch: `git checkout -b feature/my-cool-env`
3. Install dev deps: `pip install "mqtt-rl-bridge[dev]"`
4. Write tests in `tests/`
5. Run: `pytest`
6. Submit PR
---
## Citation
```bibtex
@software{mqtt-rl-bridge-2025,
author = {Alex Kagozi},
title = {mqtt-rl-bridge: Event-Driven MQTT Bridge for Real-World RL},
year = {2025},
publisher = {GitHub},
url = {https://github.com/kagozi/mqtt_rl_bridge}
}
```
---
## License
[MIT License](LICENSE) — free for commercial and research use.
---
## Made with Real-World RL in Mind
No toy environments. No simulation gaps.
**Train your agent directly on the physical world.**
---
**Ready to connect your robot?**
Just `pip install mqtt-rl-bridge` and subclass `MQTTRLEnv`.
---
Made with **love** for **real hardware** and **real learning**.
Raw data
{
"_id": null,
"home_page": null,
"name": "mqtt-rl-bridge",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "reinforcement-learning, mqtt, iot, robotics, real-world-rl",
"author": null,
"author_email": "Alex Kagozi <alexkagozi@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/bb/cf/89c964f4cc5d778e787055ac6838a1c67f33e46f89bcd05cf2194f3bafa0/mqtt_rl_bridge-0.1.0.tar.gz",
"platform": null,
"description": "# `mqtt-rl-bridge` \u2014 Event-Driven MQTT Bridge for Real-World Reinforcement Learning\n\n[](https://pypi.org/project/mqtt-rl-bridge/)\n[](https://pypi.org/project/mqtt-rl-bridge/)\n[](LICENSE)\n[](https://github.com/kagozi/mqtt-rl-bridge/actions)\n\n**A lightweight, framework-agnostic package that connects any MQTT-enabled sensor or robot to a reinforcement learning (RL) agent \u2014 in real time.**\n\nUse it to train RL policies on **physical systems**: \n- HVAC controllers \n- Robotic arms \n- Drones \n- Smart agriculture \n- Industrial IoT \n- Home automation \n\nNo Gymnasium dependency. No hardcoded examples. Just **three abstract methods** to implement.\n\n---\n\n## Features\n\n- **Zero RL framework lock-in** \u2014 works with Stable-Baselines3, RLlib, CleanRL, or your custom loop\n- **Event-driven MQTT** \u2014 real-time sensor updates via `paho-mqtt`\n- **Thread-safe message queue** \u2014 no race conditions\n- **Timeout fallbacks** \u2014 robust to network glitches\n- **Minimal API** \u2014 subclass `MQTTRLEnv` and implement 3 methods\n- **No hardware in tests** \u2014 full unit-testable with mocks\n\n---\n\n## Installation\n\n```bash\npip install mqtt-rl-bridge\n```\n\nOptional extras:\n\n```bash\npip install \"mqtt-rl-bridge[gym]\" # for Gymnasium space support\npip install \"mqtt-rl-bridge[dev]\" # for testing + linting\n```\n\n---\n\n## Quick Start\n\n### 1. Create Your Environment\n\n```python\n# my_env.py\nimport numpy as np\nfrom mqtt_rl_bridge import MQTTRLEnv\n\nclass TemperatureControlEnv(MQTTRLEnv):\n def __init__(self, **kwargs):\n super().__init__(\n sensor_topic=\"home/sensors\",\n action_topic=\"home/hvac\",\n timeout=8.0,\n step_delay=2.0, # wait for HVAC to respond\n **kwargs\n )\n\n def _extract_observation(self, raw: dict) -> np.ndarray:\n temp = float(raw.get(\"temperature\", 22.0))\n target = float(raw.get(\"setpoint\", 22.0))\n return np.array([temp, target], dtype=np.float32)\n\n def _encode_action(self, action: int) -> dict:\n modes = [\"cool\", \"off\", \"heat\"]\n return {\"mode\": modes[action]}\n\n def _compute_reward(self, obs, action, next_obs) -> float:\n temp, target = next_obs\n error = abs(temp - target)\n energy = 0.1 if action != 1 else 0.0\n return -error - energy\n\n def _is_done(self, obs, action) -> tuple[bool, bool]:\n temp, target = obs\n return abs(temp - target) < 0.5, False\n```\n\n---\n\n### 2. Use in Any RL Loop\n\n#### Custom Training Loop\n\n```python\nfrom my_env import TemperatureControlEnv\nimport numpy as np\n\nenv = TemperatureControlEnv(broker_host=\"192.168.1.100\")\nobs = env.reset()\n\nfor _ in range(200):\n action = np.random.randint(0, 3)\n obs, reward, done, truncated, info = env.step(action)\n print(f\"Temp: {obs[0]:.1f}\u00b0C \u2192 Reward: {reward:.3f}\")\n if done or truncated:\n obs = env.reset()\n\nenv.close()\n```\n\n#### Stable-Baselines3 (PPO)\n\n```python\nfrom my_env import TemperatureControlEnv\nfrom stable_baselines3 import PPO\n\nenv = TemperatureControlEnv(broker_host=\"192.168.1.100\")\nmodel = PPO(\"MlpPolicy\", env, verbose=1, learning_rate=3e-4)\nmodel.learn(total_timesteps=10_000)\nmodel.save(\"temp_control_ppo\")\nenv.close()\n```\n\n#### Ray RLlib\n\n```python\nimport ray\nfrom ray.rllib.algorithms.ppo import PPOConfig\nfrom my_env import TemperatureControlEnv\n\nray.init()\nconfig = PPOConfig().environment(\n TemperatureControlEnv,\n env_config={\"broker_host\": \"192.168.1.100\"}\n)\nalgo = config.build()\nalgo.train()\nalgo.save(\"rllib_checkpoint\")\nalgo.stop()\n```\n\n---\n\n## API Reference\n\n### `MQTTRLEnv` \u2014 Abstract Base Class\n\n```python\nclass MQTTRLEnv(ABC):\n def __init__(\n self,\n broker_host: str = \"127.0.0.1\",\n sensor_topic: str = \"sensors\",\n action_topic: str = \"actions\",\n timeout: float = 5.0,\n step_delay: float = 0.0,\n )\n```\n\n| Parameter | Description |\n|---------|-------------|\n| `broker_host` | MQTT broker IP or hostname |\n| `sensor_topic` | Topic where sensor data is published |\n| `action_topic` | Topic where agent sends actions |\n| `timeout` | Max wait time for sensor message (seconds) |\n| `step_delay` | Sleep after sending action (for hardware latency) |\n\n---\n\n### Methods You **Must** Implement\n\n| Method | Signature | Purpose |\n|-------|---------|--------|\n| `_extract_observation` | `raw: dict \u2192 np.ndarray` | Parse JSON payload into observation |\n| `_encode_action` | `action: Any \u2192 dict` | Convert action to MQTT JSON |\n| `_compute_reward` | `obs, action, next_obs \u2192 float` | Reward function |\n\n---\n\n### Optional Overrides\n\n| Method | Default | Use Case |\n|-------|--------|---------|\n| `_is_done` | `(False, False)` | Terminate episode |\n| `reset_hook` | `pass` | Send reset command on `reset()` |\n\n---\n\n### Core Methods (Gymnasium-Compatible)\n\n```python\nobs = env.reset() \u2192 np.ndarray\nobs, reward, terminated, truncated, info = env.step(action)\nenv.close()\n```\n\nWorks with **any** RL library that expects this signature.\n\n---\n\n## Testing Without Hardware\n\n```python\nfrom unittest.mock import MagicMock\nfrom mqtt_rl_bridge import MQTTRLEnv\n\nclass MockEnv(MQTTRLEnv):\n def __init__(self):\n super().__init__(broker_host=\"localhost\")\n self.broker.get_message = MagicMock(\n return_value={\"temp\": 25.0, \"humidity\": 60.0}\n )\n\n def _extract_observation(self, raw): return np.array([raw[\"temp\"]])\n def _encode_action(self, a): return {\"fan\": \"on\" if a == 1 else \"off\"}\n def _compute_reward(self, o, a, no): return -abs(no[0] - 22.0)\n```\n\nRun unit tests without a broker.\n\n---\n\n## Project Structure\n\n```bash\nmy-iot-rl-project/\n\u251c\u2500\u2500 envs/\n\u2502 \u2514\u2500\u2500 temperature_env.py\n\u251c\u2500\u2500 models/\n\u251c\u2500\u2500 train_ppo.py\n\u251c\u2500\u2500 requirements.txt\n\u2514\u2500\u2500 tests/\n```\n\n`requirements.txt`:\n```\nmqtt-rl-bridge\nstable-baselines3\ngymnasium\n```\n\n---\n\n## Contributing\n\n1. Fork the repo\n2. Create a branch: `git checkout -b feature/my-cool-env`\n3. Install dev deps: `pip install \"mqtt-rl-bridge[dev]\"`\n4. Write tests in `tests/`\n5. Run: `pytest`\n6. Submit PR\n\n---\n\n## Citation\n\n```bibtex\n@software{mqtt-rl-bridge-2025,\n author = {Alex Kagozi},\n title = {mqtt-rl-bridge: Event-Driven MQTT Bridge for Real-World RL},\n year = {2025},\n publisher = {GitHub},\n url = {https://github.com/kagozi/mqtt_rl_bridge}\n}\n```\n\n---\n\n## License\n\n[MIT License](LICENSE) \u2014 free for commercial and research use.\n\n---\n\n## Made with Real-World RL in Mind\n\nNo toy environments. No simulation gaps. \n**Train your agent directly on the physical world.**\n\n---\n\n**Ready to connect your robot?** \nJust `pip install mqtt-rl-bridge` and subclass `MQTTRLEnv`.\n\n---\nMade with **love** for **real hardware** and **real learning**.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Event-driven MQTT bridge for real-world reinforcement learning",
"version": "0.1.0",
"project_urls": {
"Homepage": "https://github.com/kagozi/mqtt_rl_bridge",
"Issues": "https://github.com/kagozi/mqtt_rl_bridge/issues",
"Repository": "https://github.com/kagozi/mqtt_rl_bridge"
},
"split_keywords": [
"reinforcement-learning",
" mqtt",
" iot",
" robotics",
" real-world-rl"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "2bdd1ace58e89bb9cadfdc98108a4f8aaf881a1ef40bf9572e9bddd41b4fac12",
"md5": "c86d42a468771fb71252b3d5511efd4c",
"sha256": "8fc97ef98675d3d86184faf3919c5fac12420068d679b40a24db98124c5aee04"
},
"downloads": -1,
"filename": "mqtt_rl_bridge-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c86d42a468771fb71252b3d5511efd4c",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 8099,
"upload_time": "2025-11-01T08:34:43",
"upload_time_iso_8601": "2025-11-01T08:34:43.758457Z",
"url": "https://files.pythonhosted.org/packages/2b/dd/1ace58e89bb9cadfdc98108a4f8aaf881a1ef40bf9572e9bddd41b4fac12/mqtt_rl_bridge-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "bbcf89c964f4cc5d778e787055ac6838a1c67f33e46f89bcd05cf2194f3bafa0",
"md5": "b2224108f6c93e667c72b764a024bdef",
"sha256": "d4199e3db2dfbd3a47c9a0704565bda5c11a434d7aed6ab5416ed2cdf2588632"
},
"downloads": -1,
"filename": "mqtt_rl_bridge-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "b2224108f6c93e667c72b764a024bdef",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 7841,
"upload_time": "2025-11-01T08:34:45",
"upload_time_iso_8601": "2025-11-01T08:34:45.048475Z",
"url": "https://files.pythonhosted.org/packages/bb/cf/89c964f4cc5d778e787055ac6838a1c67f33e46f89bcd05cf2194f3bafa0/mqtt_rl_bridge-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-11-01 08:34:45",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "kagozi",
"github_project": "mqtt_rl_bridge",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "mqtt-rl-bridge"
}