mqtt-rl-bridge


Namemqtt-rl-bridge JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryEvent-driven MQTT bridge for real-world reinforcement learning
upload_time2025-11-01 08:34:45
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseMIT
keywords reinforcement-learning mqtt iot robotics real-world-rl
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # `mqtt-rl-bridge` — Event-Driven MQTT Bridge for Real-World Reinforcement Learning

[![PyPI](https://img.shields.io/pypi/v/mqtt-rl-bridge?color=blue)](https://pypi.org/project/mqtt-rl-bridge/)
[![Python](https://img.shields.io/pypi/pyversions/mqtt-rl-bridge)](https://pypi.org/project/mqtt-rl-bridge/)
[![License: MIT](https://img.shields.io/badge/license-MIT-green)](LICENSE)
[![Tests](https://github.com/kagozi/mqtt-rl-bridge/actions/workflows/ci.yml/badge.svg)](https://github.com/kagozi/mqtt-rl-bridge/actions)

**A lightweight, framework-agnostic package that connects any MQTT-enabled sensor or robot to a reinforcement learning (RL) agent — in real time.**

Use it to train RL policies on **physical systems**:  
- HVAC controllers  
- Robotic arms  
- Drones  
- Smart agriculture  
- Industrial IoT  
- Home automation  

No Gymnasium dependency. No hardcoded examples. Just **three abstract methods** to implement.

---

## Features

- **Zero RL framework lock-in** — works with Stable-Baselines3, RLlib, CleanRL, or your custom loop
- **Event-driven MQTT** — real-time sensor updates via `paho-mqtt`
- **Thread-safe message queue** — no race conditions
- **Timeout fallbacks** — robust to network glitches
- **Minimal API** — subclass `MQTTRLEnv` and implement 3 methods
- **No hardware in tests** — full unit-testable with mocks

---

## Installation

```bash
pip install mqtt-rl-bridge
```

Optional extras:

```bash
pip install "mqtt-rl-bridge[gym]"      # for Gymnasium space support
pip install "mqtt-rl-bridge[dev]"      # for testing + linting
```

---

## Quick Start

### 1. Create Your Environment

```python
# my_env.py
import numpy as np
from mqtt_rl_bridge import MQTTRLEnv

class TemperatureControlEnv(MQTTRLEnv):
    def __init__(self, **kwargs):
        super().__init__(
            sensor_topic="home/sensors",
            action_topic="home/hvac",
            timeout=8.0,
            step_delay=2.0,  # wait for HVAC to respond
            **kwargs
        )

    def _extract_observation(self, raw: dict) -> np.ndarray:
        temp = float(raw.get("temperature", 22.0))
        target = float(raw.get("setpoint", 22.0))
        return np.array([temp, target], dtype=np.float32)

    def _encode_action(self, action: int) -> dict:
        modes = ["cool", "off", "heat"]
        return {"mode": modes[action]}

    def _compute_reward(self, obs, action, next_obs) -> float:
        temp, target = next_obs
        error = abs(temp - target)
        energy = 0.1 if action != 1 else 0.0
        return -error - energy

    def _is_done(self, obs, action) -> tuple[bool, bool]:
        temp, target = obs
        return abs(temp - target) < 0.5, False
```

---

### 2. Use in Any RL Loop

#### Custom Training Loop

```python
from my_env import TemperatureControlEnv
import numpy as np

env = TemperatureControlEnv(broker_host="192.168.1.100")
obs = env.reset()

for _ in range(200):
    action = np.random.randint(0, 3)
    obs, reward, done, truncated, info = env.step(action)
    print(f"Temp: {obs[0]:.1f}°C → Reward: {reward:.3f}")
    if done or truncated:
        obs = env.reset()

env.close()
```

#### Stable-Baselines3 (PPO)

```python
from my_env import TemperatureControlEnv
from stable_baselines3 import PPO

env = TemperatureControlEnv(broker_host="192.168.1.100")
model = PPO("MlpPolicy", env, verbose=1, learning_rate=3e-4)
model.learn(total_timesteps=10_000)
model.save("temp_control_ppo")
env.close()
```

#### Ray RLlib

```python
import ray
from ray.rllib.algorithms.ppo import PPOConfig
from my_env import TemperatureControlEnv

ray.init()
config = PPOConfig().environment(
    TemperatureControlEnv,
    env_config={"broker_host": "192.168.1.100"}
)
algo = config.build()
algo.train()
algo.save("rllib_checkpoint")
algo.stop()
```

---

## API Reference

### `MQTTRLEnv` — Abstract Base Class

```python
class MQTTRLEnv(ABC):
    def __init__(
        self,
        broker_host: str = "127.0.0.1",
        sensor_topic: str = "sensors",
        action_topic: str = "actions",
        timeout: float = 5.0,
        step_delay: float = 0.0,
    )
```

| Parameter | Description |
|---------|-------------|
| `broker_host` | MQTT broker IP or hostname |
| `sensor_topic` | Topic where sensor data is published |
| `action_topic` | Topic where agent sends actions |
| `timeout` | Max wait time for sensor message (seconds) |
| `step_delay` | Sleep after sending action (for hardware latency) |

---

### Methods You **Must** Implement

| Method | Signature | Purpose |
|-------|---------|--------|
| `_extract_observation` | `raw: dict → np.ndarray` | Parse JSON payload into observation |
| `_encode_action` | `action: Any → dict` | Convert action to MQTT JSON |
| `_compute_reward` | `obs, action, next_obs → float` | Reward function |

---

### Optional Overrides

| Method | Default | Use Case |
|-------|--------|---------|
| `_is_done` | `(False, False)` | Terminate episode |
| `reset_hook` | `pass` | Send reset command on `reset()` |

---

### Core Methods (Gymnasium-Compatible)

```python
obs = env.reset() → np.ndarray
obs, reward, terminated, truncated, info = env.step(action)
env.close()
```

Works with **any** RL library that expects this signature.

---

## Testing Without Hardware

```python
from unittest.mock import MagicMock
from mqtt_rl_bridge import MQTTRLEnv

class MockEnv(MQTTRLEnv):
    def __init__(self):
        super().__init__(broker_host="localhost")
        self.broker.get_message = MagicMock(
            return_value={"temp": 25.0, "humidity": 60.0}
        )

    def _extract_observation(self, raw): return np.array([raw["temp"]])
    def _encode_action(self, a): return {"fan": "on" if a == 1 else "off"}
    def _compute_reward(self, o, a, no): return -abs(no[0] - 22.0)
```

Run unit tests without a broker.

---

## Project Structure

```bash
my-iot-rl-project/
├── envs/
│   └── temperature_env.py
├── models/
├── train_ppo.py
├── requirements.txt
└── tests/
```

`requirements.txt`:
```
mqtt-rl-bridge
stable-baselines3
gymnasium
```

---

## Contributing

1. Fork the repo
2. Create a branch: `git checkout -b feature/my-cool-env`
3. Install dev deps: `pip install "mqtt-rl-bridge[dev]"`
4. Write tests in `tests/`
5. Run: `pytest`
6. Submit PR

---

## Citation

```bibtex
@software{mqtt-rl-bridge-2025,
  author = {Alex Kagozi},
  title = {mqtt-rl-bridge: Event-Driven MQTT Bridge for Real-World RL},
  year = {2025},
  publisher = {GitHub},
  url = {https://github.com/kagozi/mqtt_rl_bridge}
}
```

---

## License

[MIT License](LICENSE) — free for commercial and research use.

---

## Made with Real-World RL in Mind

No toy environments. No simulation gaps.  
**Train your agent directly on the physical world.**

---

**Ready to connect your robot?**  
Just `pip install mqtt-rl-bridge` and subclass `MQTTRLEnv`.

---
Made with **love** for **real hardware** and **real learning**.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "mqtt-rl-bridge",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "reinforcement-learning, mqtt, iot, robotics, real-world-rl",
    "author": null,
    "author_email": "Alex Kagozi <alexkagozi@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/bb/cf/89c964f4cc5d778e787055ac6838a1c67f33e46f89bcd05cf2194f3bafa0/mqtt_rl_bridge-0.1.0.tar.gz",
    "platform": null,
    "description": "# `mqtt-rl-bridge` \u2014 Event-Driven MQTT Bridge for Real-World Reinforcement Learning\n\n[![PyPI](https://img.shields.io/pypi/v/mqtt-rl-bridge?color=blue)](https://pypi.org/project/mqtt-rl-bridge/)\n[![Python](https://img.shields.io/pypi/pyversions/mqtt-rl-bridge)](https://pypi.org/project/mqtt-rl-bridge/)\n[![License: MIT](https://img.shields.io/badge/license-MIT-green)](LICENSE)\n[![Tests](https://github.com/kagozi/mqtt-rl-bridge/actions/workflows/ci.yml/badge.svg)](https://github.com/kagozi/mqtt-rl-bridge/actions)\n\n**A lightweight, framework-agnostic package that connects any MQTT-enabled sensor or robot to a reinforcement learning (RL) agent \u2014 in real time.**\n\nUse it to train RL policies on **physical systems**:  \n- HVAC controllers  \n- Robotic arms  \n- Drones  \n- Smart agriculture  \n- Industrial IoT  \n- Home automation  \n\nNo Gymnasium dependency. No hardcoded examples. Just **three abstract methods** to implement.\n\n---\n\n## Features\n\n- **Zero RL framework lock-in** \u2014 works with Stable-Baselines3, RLlib, CleanRL, or your custom loop\n- **Event-driven MQTT** \u2014 real-time sensor updates via `paho-mqtt`\n- **Thread-safe message queue** \u2014 no race conditions\n- **Timeout fallbacks** \u2014 robust to network glitches\n- **Minimal API** \u2014 subclass `MQTTRLEnv` and implement 3 methods\n- **No hardware in tests** \u2014 full unit-testable with mocks\n\n---\n\n## Installation\n\n```bash\npip install mqtt-rl-bridge\n```\n\nOptional extras:\n\n```bash\npip install \"mqtt-rl-bridge[gym]\"      # for Gymnasium space support\npip install \"mqtt-rl-bridge[dev]\"      # for testing + linting\n```\n\n---\n\n## Quick Start\n\n### 1. Create Your Environment\n\n```python\n# my_env.py\nimport numpy as np\nfrom mqtt_rl_bridge import MQTTRLEnv\n\nclass TemperatureControlEnv(MQTTRLEnv):\n    def __init__(self, **kwargs):\n        super().__init__(\n            sensor_topic=\"home/sensors\",\n            action_topic=\"home/hvac\",\n            timeout=8.0,\n            step_delay=2.0,  # wait for HVAC to respond\n            **kwargs\n        )\n\n    def _extract_observation(self, raw: dict) -> np.ndarray:\n        temp = float(raw.get(\"temperature\", 22.0))\n        target = float(raw.get(\"setpoint\", 22.0))\n        return np.array([temp, target], dtype=np.float32)\n\n    def _encode_action(self, action: int) -> dict:\n        modes = [\"cool\", \"off\", \"heat\"]\n        return {\"mode\": modes[action]}\n\n    def _compute_reward(self, obs, action, next_obs) -> float:\n        temp, target = next_obs\n        error = abs(temp - target)\n        energy = 0.1 if action != 1 else 0.0\n        return -error - energy\n\n    def _is_done(self, obs, action) -> tuple[bool, bool]:\n        temp, target = obs\n        return abs(temp - target) < 0.5, False\n```\n\n---\n\n### 2. Use in Any RL Loop\n\n#### Custom Training Loop\n\n```python\nfrom my_env import TemperatureControlEnv\nimport numpy as np\n\nenv = TemperatureControlEnv(broker_host=\"192.168.1.100\")\nobs = env.reset()\n\nfor _ in range(200):\n    action = np.random.randint(0, 3)\n    obs, reward, done, truncated, info = env.step(action)\n    print(f\"Temp: {obs[0]:.1f}\u00b0C \u2192 Reward: {reward:.3f}\")\n    if done or truncated:\n        obs = env.reset()\n\nenv.close()\n```\n\n#### Stable-Baselines3 (PPO)\n\n```python\nfrom my_env import TemperatureControlEnv\nfrom stable_baselines3 import PPO\n\nenv = TemperatureControlEnv(broker_host=\"192.168.1.100\")\nmodel = PPO(\"MlpPolicy\", env, verbose=1, learning_rate=3e-4)\nmodel.learn(total_timesteps=10_000)\nmodel.save(\"temp_control_ppo\")\nenv.close()\n```\n\n#### Ray RLlib\n\n```python\nimport ray\nfrom ray.rllib.algorithms.ppo import PPOConfig\nfrom my_env import TemperatureControlEnv\n\nray.init()\nconfig = PPOConfig().environment(\n    TemperatureControlEnv,\n    env_config={\"broker_host\": \"192.168.1.100\"}\n)\nalgo = config.build()\nalgo.train()\nalgo.save(\"rllib_checkpoint\")\nalgo.stop()\n```\n\n---\n\n## API Reference\n\n### `MQTTRLEnv` \u2014 Abstract Base Class\n\n```python\nclass MQTTRLEnv(ABC):\n    def __init__(\n        self,\n        broker_host: str = \"127.0.0.1\",\n        sensor_topic: str = \"sensors\",\n        action_topic: str = \"actions\",\n        timeout: float = 5.0,\n        step_delay: float = 0.0,\n    )\n```\n\n| Parameter | Description |\n|---------|-------------|\n| `broker_host` | MQTT broker IP or hostname |\n| `sensor_topic` | Topic where sensor data is published |\n| `action_topic` | Topic where agent sends actions |\n| `timeout` | Max wait time for sensor message (seconds) |\n| `step_delay` | Sleep after sending action (for hardware latency) |\n\n---\n\n### Methods You **Must** Implement\n\n| Method | Signature | Purpose |\n|-------|---------|--------|\n| `_extract_observation` | `raw: dict \u2192 np.ndarray` | Parse JSON payload into observation |\n| `_encode_action` | `action: Any \u2192 dict` | Convert action to MQTT JSON |\n| `_compute_reward` | `obs, action, next_obs \u2192 float` | Reward function |\n\n---\n\n### Optional Overrides\n\n| Method | Default | Use Case |\n|-------|--------|---------|\n| `_is_done` | `(False, False)` | Terminate episode |\n| `reset_hook` | `pass` | Send reset command on `reset()` |\n\n---\n\n### Core Methods (Gymnasium-Compatible)\n\n```python\nobs = env.reset() \u2192 np.ndarray\nobs, reward, terminated, truncated, info = env.step(action)\nenv.close()\n```\n\nWorks with **any** RL library that expects this signature.\n\n---\n\n## Testing Without Hardware\n\n```python\nfrom unittest.mock import MagicMock\nfrom mqtt_rl_bridge import MQTTRLEnv\n\nclass MockEnv(MQTTRLEnv):\n    def __init__(self):\n        super().__init__(broker_host=\"localhost\")\n        self.broker.get_message = MagicMock(\n            return_value={\"temp\": 25.0, \"humidity\": 60.0}\n        )\n\n    def _extract_observation(self, raw): return np.array([raw[\"temp\"]])\n    def _encode_action(self, a): return {\"fan\": \"on\" if a == 1 else \"off\"}\n    def _compute_reward(self, o, a, no): return -abs(no[0] - 22.0)\n```\n\nRun unit tests without a broker.\n\n---\n\n## Project Structure\n\n```bash\nmy-iot-rl-project/\n\u251c\u2500\u2500 envs/\n\u2502   \u2514\u2500\u2500 temperature_env.py\n\u251c\u2500\u2500 models/\n\u251c\u2500\u2500 train_ppo.py\n\u251c\u2500\u2500 requirements.txt\n\u2514\u2500\u2500 tests/\n```\n\n`requirements.txt`:\n```\nmqtt-rl-bridge\nstable-baselines3\ngymnasium\n```\n\n---\n\n## Contributing\n\n1. Fork the repo\n2. Create a branch: `git checkout -b feature/my-cool-env`\n3. Install dev deps: `pip install \"mqtt-rl-bridge[dev]\"`\n4. Write tests in `tests/`\n5. Run: `pytest`\n6. Submit PR\n\n---\n\n## Citation\n\n```bibtex\n@software{mqtt-rl-bridge-2025,\n  author = {Alex Kagozi},\n  title = {mqtt-rl-bridge: Event-Driven MQTT Bridge for Real-World RL},\n  year = {2025},\n  publisher = {GitHub},\n  url = {https://github.com/kagozi/mqtt_rl_bridge}\n}\n```\n\n---\n\n## License\n\n[MIT License](LICENSE) \u2014 free for commercial and research use.\n\n---\n\n## Made with Real-World RL in Mind\n\nNo toy environments. No simulation gaps.  \n**Train your agent directly on the physical world.**\n\n---\n\n**Ready to connect your robot?**  \nJust `pip install mqtt-rl-bridge` and subclass `MQTTRLEnv`.\n\n---\nMade with **love** for **real hardware** and **real learning**.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Event-driven MQTT bridge for real-world reinforcement learning",
    "version": "0.1.0",
    "project_urls": {
        "Homepage": "https://github.com/kagozi/mqtt_rl_bridge",
        "Issues": "https://github.com/kagozi/mqtt_rl_bridge/issues",
        "Repository": "https://github.com/kagozi/mqtt_rl_bridge"
    },
    "split_keywords": [
        "reinforcement-learning",
        " mqtt",
        " iot",
        " robotics",
        " real-world-rl"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "2bdd1ace58e89bb9cadfdc98108a4f8aaf881a1ef40bf9572e9bddd41b4fac12",
                "md5": "c86d42a468771fb71252b3d5511efd4c",
                "sha256": "8fc97ef98675d3d86184faf3919c5fac12420068d679b40a24db98124c5aee04"
            },
            "downloads": -1,
            "filename": "mqtt_rl_bridge-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c86d42a468771fb71252b3d5511efd4c",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 8099,
            "upload_time": "2025-11-01T08:34:43",
            "upload_time_iso_8601": "2025-11-01T08:34:43.758457Z",
            "url": "https://files.pythonhosted.org/packages/2b/dd/1ace58e89bb9cadfdc98108a4f8aaf881a1ef40bf9572e9bddd41b4fac12/mqtt_rl_bridge-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "bbcf89c964f4cc5d778e787055ac6838a1c67f33e46f89bcd05cf2194f3bafa0",
                "md5": "b2224108f6c93e667c72b764a024bdef",
                "sha256": "d4199e3db2dfbd3a47c9a0704565bda5c11a434d7aed6ab5416ed2cdf2588632"
            },
            "downloads": -1,
            "filename": "mqtt_rl_bridge-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "b2224108f6c93e667c72b764a024bdef",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 7841,
            "upload_time": "2025-11-01T08:34:45",
            "upload_time_iso_8601": "2025-11-01T08:34:45.048475Z",
            "url": "https://files.pythonhosted.org/packages/bb/cf/89c964f4cc5d778e787055ac6838a1c67f33e46f89bcd05cf2194f3bafa0/mqtt_rl_bridge-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-11-01 08:34:45",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "kagozi",
    "github_project": "mqtt_rl_bridge",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "mqtt-rl-bridge"
}
        
Elapsed time: 1.92246s