# MQTT Application
A comprehensive asynchronous MQTT client library for Python, designed for building robust IoT applications and message processing systems.
## Features
- **Asynchronous Architecture**: Built with `asyncio` for high-performance concurrent operations
- **Automatic Reconnection**: Robust connection handling with configurable retry logic
- **Message Processing**: Worker-based system for concurrent message handling
- **Status Publishing**: Periodic device status reporting
- **Command Handling**: Built-in command processing with acknowledgment system
- **Configuration Management**: YAML-based configuration with environment variable support
- **Logging Integration**: Seamless integration with mqtt-logger for distributed logging
## Installation
```bash
pip install muxu-io-mqtt-application
```
## Installation from Source
If you're working with the source code from this repository, you'll need to install the dependencies in the correct order:
```bash
cd ~/projects/icsia/dummy-icsia
# Create and activate virtual environment (recommended)
python3 -m venv .venv
source .venv/bin/activate # On Linux/macOS
# or
.venv\Scripts\activate # On Windows
# Install the mqtt-logger package
pushd ../mqtt-logger && pip install -e "." && popd
# Install the mqtt-application package with dev dependencies
pushd ../mqtt-application && pip install -e "." && popd
```
## Dependencies
This package requires the following external modules:
- `mqtt-logger`: For MQTT-enabled logging capabilities
- `muxu-io-mqtt-connector`: For low-level MQTT connection management (PyPI package)
## Quick Start
### Simplified Usage (Recommended)
The easiest way to use the library is with the `MqttApplication` class that handles everything automatically:
```python
import asyncio
from mqtt_application import MqttApplication
async def main():
# Everything configured from config.yaml
async with MqttApplication() as app:
await app.run()
if __name__ == "__main__":
asyncio.run(main())
```
Or even simpler for standalone usage:
```python
from mqtt_application import MqttApplication
# One-liner to run the application
if __name__ == "__main__":
MqttApplication.run_from_config()
```
### Custom Command Handlers
To add custom business logic, register command handlers:
```python
import asyncio
from mqtt_application import MqttApplication
async def my_custom_command(data):
"""Handle custom command."""
# Your business logic here
print(f"Processing custom command: {data}")
return {"status": "completed", "result": "success"}
async def main():
async with MqttApplication() as app:
# Register custom commands
app.register_command("my_command", my_custom_command)
await app.run()
if __name__ == "__main__":
asyncio.run(main())
```
### Message Subscriptions
Subscribe to MQTT messages using config-based subscriptions or programmatic registration:
#### Config-Based Subscriptions (Recommended)
```yaml
# config.yaml
subscriptions:
status_messages:
topic_pattern: "icsia/+/status/current"
callback_method: "_on_status_message"
ack_messages:
topic_pattern: "icsia/+/status/ack"
callback_method: "_on_ack_message"
```
```python
class MyApplication:
def __init__(self):
# Pass self as callback_context so config can find your methods
self.app = MqttApplication(callback_context=self)
async def _on_status_message(self, topic: str, payload: str, properties):
"""Handle status messages from any device."""
print(f"Status from {topic}: {payload}")
async def _on_ack_message(self, topic: str, payload: str, properties):
"""Handle acknowledgment messages."""
print(f"ACK from {topic}: {payload}")
async def run(self):
async with self.app:
await self.app.run()
```
#### Programmatic Registration
```python
async def my_handler(topic: str, payload: str, properties):
print(f"Message on {topic}: {payload}")
async def main():
async with MqttApplication() as app:
# Register handler for config-based subscriptions
app.register_callback_handler("my_handler", my_handler)
await app.run()
```
## Configuration
Create a `config.yaml` file in your project root:
```yaml
---
# MQTT Broker settings
mqtt:
broker: "localhost"
port: 1883
reconnect_interval: 5
max_reconnect_attempts: -1 # -1 means infinite attempts
throttle_interval: 0.1
# Device configuration
device:
device_id: "my_device_01"
namespace: "icsia" # Configurable namespace for topic patterns
status_publish_interval: 30.0
# Auto-generated topic patterns from namespace + device_id:
# {namespace}/+/cmd/#
# {namespace}/{device_id}/logs
# {namespace}/{device_id}/status/ack
# {namespace}/{device_id}/status/completion
# {namespace}/{device_id}/status/current
# Logger settings
logger:
log_file: "{device_id}.log"
log_level: "INFO"
# Worker configuration
workers:
count: 3
```
## API Reference
### AsyncMqttClient
The main MQTT client class for connecting to brokers and handling messages.
```python
AsyncMqttClient(
broker_address: str,
port: int,
topics: list[str],
message_queue: asyncio.Queue,
logger: MqttLogger,
reconnect_interval: int = 5,
max_reconnect_attempts: int = -1
)
```
### AsyncCommandHandler
Handles command processing and acknowledgments.
```python
AsyncCommandHandler(
logger: MqttLogger,
mqtt_broker: Optional[str] = None,
mqtt_port: Optional[int] = None,
ack_topic_pattern: str = "devices/{device_id}/status/ack",
completion_topic_pattern: str = "devices/{device_id}/status/completion"
)
```
### PeriodicStatusPublisher
Publishes device status at regular intervals.
```python
PeriodicStatusPublisher(
device_id: str,
logger: MqttLogger,
mqtt_broker: str,
mqtt_port: int,
publish_interval: float = 30.0,
status_topic_pattern: str = "devices/{device_id}/status/current"
)
```
### Config
Configuration management with YAML support.
```python
from mqtt_application import config
# Get configuration values
mqtt_config = config.get_mqtt_config()
device_id = config.get("device.device_id", "default")
log_level = config.get_log_level()
```
## Command Line Usage
You can also run the library as a standalone application:
```bash
mqtt-application # Uses config.yaml in current directory
```
## Development Setup
### Virtual Environment Setup
It's recommended to use a virtual environment for development:
```bash
# Create virtual environment
python3 -m venv .venv
# Activate virtual environment
source .venv/bin/activate # On Linux/macOS
# or
.venv\Scripts\activate # On Windows
# Install development dependencies
pip install -e ".[dev]"
```
### Running Integration Tests
This project uses comprehensive integration tests that validate real-world functionality with actual MQTT brokers and network connections.
**Prerequisites**: Ensure your virtual environment is activated and development dependencies are installed:
```bash
# Activate virtual environment
source .venv/bin/activate # On Linux/macOS
.venv\Scripts\activate # On Windows
# Install development dependencies
pip install -e ".[dev]"
```
**Note**: If you encounter issues with `python` command not finding pytest, use the virtual environment directly:
```bash
# Direct virtual environment usage (Linux/macOS)
.venv/bin/python -m pytest
# Direct virtual environment usage (Windows)
.venv\Scripts\python -m pytest
```
### Running Tests
This project uses comprehensive integration tests that validate real-world functionality with actual MQTT brokers and network connections.
#### Prerequisites
Make sure your virtual environment is activated and development dependencies are installed:
```bash
# Activate virtual environment
source .venv/bin/activate # On Linux/macOS
.venv\Scripts\activate # On Windows
# Install development dependencies
pip install -e ".[dev]"
```
**Note**: If you encounter `No module named pytest` errors, make sure your virtual environment is activated:
```bash
# Activate virtual environment
source .venv/bin/activate # On Linux/macOS
.venv\Scripts\activate # On Windows
# Install development dependencies
pip install -e ".[dev]"
```
**Alternative**: Use the virtual environment directly without activation:
```bash
# Direct virtual environment usage (Linux/macOS)
.venv/bin/python -m pytest
# Direct virtual environment usage (Windows)
.venv\Scripts\python -m pytest
```
#### Basic Test Commands
```bash
# Run all tests (integration and unit tests)
python -m pytest
# Run only integration tests
python -m pytest -m integration
# Run only unit tests (non-integration)
python -m pytest -m "not integration"
# Run with verbose output
python -m pytest -v
# Run specific test file
python -m pytest tests/test_integration.py
# Run specific test class
python -m pytest tests/test_integration.py::TestMqttIntegration
# Run specific test method
python -m pytest tests/test_integration.py::TestMqttIntegration::test_mqtt_logger_connection
# Run tests matching a pattern
python -m pytest -k "mqtt_logger"
```
#### Advanced Testing Options
```bash
# Skip slow tests during development
python -m pytest -m "integration and not slow"
# Run tests with coverage reporting (requires pytest-cov)
python -m pytest --cov=src/mqtt_application --cov-report=html --cov-report=term
# Generate XML coverage report for CI/CD
python -m pytest --cov=src/mqtt_application --cov-report=xml
# Run tests in parallel (requires pytest-xdist)
python -m pytest -n auto
# Stop on first failure (useful during development)
python -m pytest -x
# Run last failed tests only
python -m pytest --lf
# Show local variables in tracebacks for debugging
python -m pytest -l
# Run in quiet mode (less verbose output)
python -m pytest -q
# Show test durations (identify slow tests)
python -m pytest --durations=10
```
#### Common Development Workflows
```bash
# Quick feedback during development (unit tests only)
python -m pytest -m "not integration" -x
# Fast integration tests only (skip slow network tests)
python -m pytest -m "integration and not slow"
# Full test suite before committing
python -m pytest -v
# Debug a specific failing test with maximum verbosity
python -m pytest tests/test_integration.py::TestMqttIntegration::test_mqtt_logger_connection -vvv -s
# Test specific functionality you're working on
python -m pytest -k "command_handler" -v
# Continuous testing during development (requires pytest-watch)
ptw -- -m "not integration"
```
#### Test Organization
This project uses pytest markers to categorize tests:
- `@pytest.mark.integration`: Tests requiring network access and real MQTT connections
- `@pytest.mark.slow`: Tests that take longer to run (network resilience, retry mechanisms)
View all available markers:
```bash
python -m pytest --markers
```
#### CI/CD Integration
```bash
# Full test suite with coverage for CI
python -m pytest -v --cov=src/mqtt_application --cov-report=xml --cov-report=term
# Integration tests only for production validation
python -m pytest -v -m integration
# Quick validation (unit tests + fast integration tests)
python -m pytest -m "not slow" -v
```
**Note**: Integration tests use real MQTT connections to `test.mosquitto.org` and test actual component interactions. This provides more reliable validation than mock-based unit tests, but requires network access.
#### Troubleshooting
**Virtual Environment Issues:**
```bash
# If 'python' command doesn't work, use the virtual environment directly
.venv/bin/python -m pytest # Linux/macOS
.venv\Scripts\python -m pytest # Windows
# Check you're using the right Python version (3.8+)
python --version
which python # Should point to .venv/bin/python (Linux/macOS)
```
**Common Issues and Solutions:**
1. **SyntaxError with pytest**: Ensure you're using Python 3.8+ from your virtual environment, not system Python 2.7
2. **ModuleNotFoundError**: Install development dependencies with `pip install -e ".[dev]"`
3. **Network timeouts**: Integration tests require internet access to `test.mosquitto.org`. Use `-m "not integration"` to skip them
4. **Permission denied**: Use the full path to the virtual environment's Python executable
5. **Tests hang**: Some integration tests make real network connections. Use `Ctrl+C` to interrupt and check your network connection
**Getting Help:**
```bash
# Show pytest help
python -m pytest --help
# Show available fixtures
python -m pytest --fixtures
# Show available markers
python -m pytest --markers
# Collect tests without running them
python -m pytest --collect-only
```
```bash
python -m pytest --markers | grep @pytest.mark
```
The integration test suite covers:
- Real MQTT broker connections and message publishing/subscribing
- End-to-end command processing workflows
- Status publishing and periodic operations
- Network resilience and error handling
- Malformed message handling
- Connection retry mechanisms
### Code Formatting
```bash
black .
ruff check .
```
## Architecture
The library is designed with a modular architecture:
- **AsyncMqttClient**: Core MQTT connectivity and message routing
- **AsyncCommandHandler**: Command processing with built-in acknowledgment system
- **PeriodicStatusPublisher**: Regular status reporting functionality
- **Workers**: Concurrent message processing system
- **Config**: Centralized configuration management
## Dependencies
- **External modules** (must be installed separately):
- `mqtt-logger`: MQTT-enabled logging
- `muxu-io-mqtt-connector`: Low-level MQTT operations
- **Standard dependencies**:
- `aiomqtt`: Async MQTT client
- `pyyaml`: YAML configuration parsing
## License
MIT License - see LICENSE file for details.
## Contributing
1. Fork the repository
2. Create a feature branch
3. Set up development environment:
```bash
python3 -m venv .venv
source .venv/bin/activate # On Linux/macOS
pip install -e ".[dev]"
```
4. Make your changes
5. Add tests for new functionality
6. Run the test suite:
```bash
# Run all tests
python -m pytest
# Or run quick tests during development
python -m pytest -m "not integration" -x
```
7. Submit a pull request
## Support
For issues and questions:
- GitHub Issues: [Project Issues](https://github.com/muxu-io/mqtt-application/issues)
- Email: alex@muxu.io
Raw data
{
"_id": null,
"home_page": null,
"name": "muxu-io-mqtt-application",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "mqtt, async, iot, messaging, client",
"author": null,
"author_email": "Alex Gonzalez <alex@muxu.io>",
"download_url": "https://files.pythonhosted.org/packages/f7/3e/1c1ebbec2cbc20001b6e54be2e21a250d9c491371c4f021feb3a3289060c/muxu_io_mqtt_application-1.0.4.tar.gz",
"platform": null,
"description": "# MQTT Application\n\nA comprehensive asynchronous MQTT client library for Python, designed for building robust IoT applications and message processing systems.\n\n## Features\n\n- **Asynchronous Architecture**: Built with `asyncio` for high-performance concurrent operations\n- **Automatic Reconnection**: Robust connection handling with configurable retry logic\n- **Message Processing**: Worker-based system for concurrent message handling\n- **Status Publishing**: Periodic device status reporting\n- **Command Handling**: Built-in command processing with acknowledgment system\n- **Configuration Management**: YAML-based configuration with environment variable support\n- **Logging Integration**: Seamless integration with mqtt-logger for distributed logging\n\n## Installation\n\n```bash\npip install muxu-io-mqtt-application\n```\n\n## Installation from Source\n\nIf you're working with the source code from this repository, you'll need to install the dependencies in the correct order:\n\n```bash\ncd ~/projects/icsia/dummy-icsia\n\n# Create and activate virtual environment (recommended)\npython3 -m venv .venv\nsource .venv/bin/activate # On Linux/macOS\n# or\n.venv\\Scripts\\activate # On Windows\n\n# Install the mqtt-logger package\npushd ../mqtt-logger && pip install -e \".\" && popd\n\n# Install the mqtt-application package with dev dependencies\npushd ../mqtt-application && pip install -e \".\" && popd\n```\n\n## Dependencies\n\nThis package requires the following external modules:\n- `mqtt-logger`: For MQTT-enabled logging capabilities\n- `muxu-io-mqtt-connector`: For low-level MQTT connection management (PyPI package)\n\n## Quick Start\n\n### Simplified Usage (Recommended)\n\nThe easiest way to use the library is with the `MqttApplication` class that handles everything automatically:\n\n```python\nimport asyncio\nfrom mqtt_application import MqttApplication\n\nasync def main():\n # Everything configured from config.yaml\n async with MqttApplication() as app:\n await app.run()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\nOr even simpler for standalone usage:\n\n```python\nfrom mqtt_application import MqttApplication\n\n# One-liner to run the application\nif __name__ == \"__main__\":\n MqttApplication.run_from_config()\n```\n\n### Custom Command Handlers\n\nTo add custom business logic, register command handlers:\n\n```python\nimport asyncio\nfrom mqtt_application import MqttApplication\n\nasync def my_custom_command(data):\n \"\"\"Handle custom command.\"\"\"\n # Your business logic here\n print(f\"Processing custom command: {data}\")\n return {\"status\": \"completed\", \"result\": \"success\"}\n\nasync def main():\n async with MqttApplication() as app:\n # Register custom commands\n app.register_command(\"my_command\", my_custom_command)\n await app.run()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n### Message Subscriptions\n\nSubscribe to MQTT messages using config-based subscriptions or programmatic registration:\n\n#### Config-Based Subscriptions (Recommended)\n\n```yaml\n# config.yaml\nsubscriptions:\n status_messages:\n topic_pattern: \"icsia/+/status/current\"\n callback_method: \"_on_status_message\"\n ack_messages:\n topic_pattern: \"icsia/+/status/ack\"\n callback_method: \"_on_ack_message\"\n```\n\n```python\nclass MyApplication:\n def __init__(self):\n # Pass self as callback_context so config can find your methods\n self.app = MqttApplication(callback_context=self)\n\n async def _on_status_message(self, topic: str, payload: str, properties):\n \"\"\"Handle status messages from any device.\"\"\"\n print(f\"Status from {topic}: {payload}\")\n\n async def _on_ack_message(self, topic: str, payload: str, properties):\n \"\"\"Handle acknowledgment messages.\"\"\"\n print(f\"ACK from {topic}: {payload}\")\n\n async def run(self):\n async with self.app:\n await self.app.run()\n```\n\n#### Programmatic Registration\n\n```python\nasync def my_handler(topic: str, payload: str, properties):\n print(f\"Message on {topic}: {payload}\")\n\nasync def main():\n async with MqttApplication() as app:\n # Register handler for config-based subscriptions\n app.register_callback_handler(\"my_handler\", my_handler)\n await app.run()\n```\n\n## Configuration\n\nCreate a `config.yaml` file in your project root:\n\n```yaml\n---\n# MQTT Broker settings\nmqtt:\n broker: \"localhost\"\n port: 1883\n reconnect_interval: 5\n max_reconnect_attempts: -1 # -1 means infinite attempts\n throttle_interval: 0.1\n\n# Device configuration\ndevice:\n device_id: \"my_device_01\"\n namespace: \"icsia\" # Configurable namespace for topic patterns\n status_publish_interval: 30.0\n\n# Auto-generated topic patterns from namespace + device_id:\n# {namespace}/+/cmd/#\n# {namespace}/{device_id}/logs\n# {namespace}/{device_id}/status/ack\n# {namespace}/{device_id}/status/completion\n# {namespace}/{device_id}/status/current\n\n# Logger settings\nlogger:\n log_file: \"{device_id}.log\"\n log_level: \"INFO\"\n\n# Worker configuration\nworkers:\n count: 3\n```\n\n## API Reference\n\n### AsyncMqttClient\n\nThe main MQTT client class for connecting to brokers and handling messages.\n\n```python\nAsyncMqttClient(\n broker_address: str,\n port: int,\n topics: list[str],\n message_queue: asyncio.Queue,\n logger: MqttLogger,\n reconnect_interval: int = 5,\n max_reconnect_attempts: int = -1\n)\n```\n\n### AsyncCommandHandler\n\nHandles command processing and acknowledgments.\n\n```python\nAsyncCommandHandler(\n logger: MqttLogger,\n mqtt_broker: Optional[str] = None,\n mqtt_port: Optional[int] = None,\n ack_topic_pattern: str = \"devices/{device_id}/status/ack\",\n completion_topic_pattern: str = \"devices/{device_id}/status/completion\"\n)\n```\n\n### PeriodicStatusPublisher\n\nPublishes device status at regular intervals.\n\n```python\nPeriodicStatusPublisher(\n device_id: str,\n logger: MqttLogger,\n mqtt_broker: str,\n mqtt_port: int,\n publish_interval: float = 30.0,\n status_topic_pattern: str = \"devices/{device_id}/status/current\"\n)\n```\n\n### Config\n\nConfiguration management with YAML support.\n\n```python\nfrom mqtt_application import config\n\n# Get configuration values\nmqtt_config = config.get_mqtt_config()\ndevice_id = config.get(\"device.device_id\", \"default\")\nlog_level = config.get_log_level()\n```\n\n## Command Line Usage\n\nYou can also run the library as a standalone application:\n\n```bash\nmqtt-application # Uses config.yaml in current directory\n```\n\n## Development Setup\n\n### Virtual Environment Setup\n\nIt's recommended to use a virtual environment for development:\n\n```bash\n# Create virtual environment\npython3 -m venv .venv\n\n# Activate virtual environment\nsource .venv/bin/activate # On Linux/macOS\n# or\n.venv\\Scripts\\activate # On Windows\n\n# Install development dependencies\npip install -e \".[dev]\"\n```\n\n### Running Integration Tests\n\nThis project uses comprehensive integration tests that validate real-world functionality with actual MQTT brokers and network connections.\n\n**Prerequisites**: Ensure your virtual environment is activated and development dependencies are installed:\n\n```bash\n# Activate virtual environment\nsource .venv/bin/activate # On Linux/macOS\n.venv\\Scripts\\activate # On Windows\n\n# Install development dependencies\npip install -e \".[dev]\"\n```\n\n**Note**: If you encounter issues with `python` command not finding pytest, use the virtual environment directly:\n```bash\n# Direct virtual environment usage (Linux/macOS)\n.venv/bin/python -m pytest\n\n# Direct virtual environment usage (Windows)\n.venv\\Scripts\\python -m pytest\n```\n\n### Running Tests\n\nThis project uses comprehensive integration tests that validate real-world functionality with actual MQTT brokers and network connections.\n\n#### Prerequisites\n\nMake sure your virtual environment is activated and development dependencies are installed:\n\n```bash\n# Activate virtual environment\nsource .venv/bin/activate # On Linux/macOS\n.venv\\Scripts\\activate # On Windows\n\n# Install development dependencies\npip install -e \".[dev]\"\n```\n\n**Note**: If you encounter `No module named pytest` errors, make sure your virtual environment is activated:\n\n```bash\n# Activate virtual environment\nsource .venv/bin/activate # On Linux/macOS\n.venv\\Scripts\\activate # On Windows\n\n# Install development dependencies\npip install -e \".[dev]\"\n```\n\n**Alternative**: Use the virtual environment directly without activation:\n```bash\n# Direct virtual environment usage (Linux/macOS)\n.venv/bin/python -m pytest\n\n# Direct virtual environment usage (Windows)\n.venv\\Scripts\\python -m pytest\n```\n\n#### Basic Test Commands\n\n```bash\n# Run all tests (integration and unit tests)\npython -m pytest\n\n# Run only integration tests\npython -m pytest -m integration\n\n# Run only unit tests (non-integration)\npython -m pytest -m \"not integration\"\n\n# Run with verbose output\npython -m pytest -v\n\n# Run specific test file\npython -m pytest tests/test_integration.py\n\n# Run specific test class\npython -m pytest tests/test_integration.py::TestMqttIntegration\n\n# Run specific test method\npython -m pytest tests/test_integration.py::TestMqttIntegration::test_mqtt_logger_connection\n\n# Run tests matching a pattern\npython -m pytest -k \"mqtt_logger\"\n```\n\n#### Advanced Testing Options\n\n```bash\n# Skip slow tests during development\npython -m pytest -m \"integration and not slow\"\n\n# Run tests with coverage reporting (requires pytest-cov)\npython -m pytest --cov=src/mqtt_application --cov-report=html --cov-report=term\n\n# Generate XML coverage report for CI/CD\npython -m pytest --cov=src/mqtt_application --cov-report=xml\n\n# Run tests in parallel (requires pytest-xdist)\npython -m pytest -n auto\n\n# Stop on first failure (useful during development)\npython -m pytest -x\n\n# Run last failed tests only\npython -m pytest --lf\n\n# Show local variables in tracebacks for debugging\npython -m pytest -l\n\n# Run in quiet mode (less verbose output)\npython -m pytest -q\n\n# Show test durations (identify slow tests)\npython -m pytest --durations=10\n```\n\n#### Common Development Workflows\n\n```bash\n# Quick feedback during development (unit tests only)\npython -m pytest -m \"not integration\" -x\n\n# Fast integration tests only (skip slow network tests)\npython -m pytest -m \"integration and not slow\"\n\n# Full test suite before committing\npython -m pytest -v\n\n# Debug a specific failing test with maximum verbosity\npython -m pytest tests/test_integration.py::TestMqttIntegration::test_mqtt_logger_connection -vvv -s\n\n# Test specific functionality you're working on\npython -m pytest -k \"command_handler\" -v\n\n# Continuous testing during development (requires pytest-watch)\nptw -- -m \"not integration\"\n```\n\n#### Test Organization\n\nThis project uses pytest markers to categorize tests:\n\n- `@pytest.mark.integration`: Tests requiring network access and real MQTT connections\n- `@pytest.mark.slow`: Tests that take longer to run (network resilience, retry mechanisms)\n\nView all available markers:\n```bash\npython -m pytest --markers\n```\n\n#### CI/CD Integration\n\n```bash\n# Full test suite with coverage for CI\npython -m pytest -v --cov=src/mqtt_application --cov-report=xml --cov-report=term\n\n# Integration tests only for production validation\npython -m pytest -v -m integration\n\n# Quick validation (unit tests + fast integration tests)\npython -m pytest -m \"not slow\" -v\n```\n\n**Note**: Integration tests use real MQTT connections to `test.mosquitto.org` and test actual component interactions. This provides more reliable validation than mock-based unit tests, but requires network access.\n\n#### Troubleshooting\n\n**Virtual Environment Issues:**\n```bash\n# If 'python' command doesn't work, use the virtual environment directly\n.venv/bin/python -m pytest # Linux/macOS\n.venv\\Scripts\\python -m pytest # Windows\n\n# Check you're using the right Python version (3.8+)\npython --version\nwhich python # Should point to .venv/bin/python (Linux/macOS)\n```\n\n**Common Issues and Solutions:**\n\n1. **SyntaxError with pytest**: Ensure you're using Python 3.8+ from your virtual environment, not system Python 2.7\n2. **ModuleNotFoundError**: Install development dependencies with `pip install -e \".[dev]\"`\n3. **Network timeouts**: Integration tests require internet access to `test.mosquitto.org`. Use `-m \"not integration\"` to skip them\n4. **Permission denied**: Use the full path to the virtual environment's Python executable\n5. **Tests hang**: Some integration tests make real network connections. Use `Ctrl+C` to interrupt and check your network connection\n\n**Getting Help:**\n```bash\n# Show pytest help\npython -m pytest --help\n\n# Show available fixtures\npython -m pytest --fixtures\n\n# Show available markers\npython -m pytest --markers\n\n# Collect tests without running them\npython -m pytest --collect-only\n```\n```bash\npython -m pytest --markers | grep @pytest.mark\n```\n\nThe integration test suite covers:\n- Real MQTT broker connections and message publishing/subscribing\n- End-to-end command processing workflows\n- Status publishing and periodic operations\n- Network resilience and error handling\n- Malformed message handling\n- Connection retry mechanisms\n\n### Code Formatting\n\n```bash\nblack .\nruff check .\n```\n\n## Architecture\n\nThe library is designed with a modular architecture:\n\n- **AsyncMqttClient**: Core MQTT connectivity and message routing\n- **AsyncCommandHandler**: Command processing with built-in acknowledgment system\n- **PeriodicStatusPublisher**: Regular status reporting functionality\n- **Workers**: Concurrent message processing system\n- **Config**: Centralized configuration management\n\n## Dependencies\n\n- **External modules** (must be installed separately):\n - `mqtt-logger`: MQTT-enabled logging\n - `muxu-io-mqtt-connector`: Low-level MQTT operations\n- **Standard dependencies**:\n - `aiomqtt`: Async MQTT client\n - `pyyaml`: YAML configuration parsing\n\n## License\n\nMIT License - see LICENSE file for details.\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Set up development environment:\n ```bash\n python3 -m venv .venv\n source .venv/bin/activate # On Linux/macOS\n pip install -e \".[dev]\"\n ```\n4. Make your changes\n5. Add tests for new functionality\n6. Run the test suite:\n ```bash\n # Run all tests\n python -m pytest\n\n # Or run quick tests during development\n python -m pytest -m \"not integration\" -x\n ```\n7. Submit a pull request\n\n## Support\n\nFor issues and questions:\n- GitHub Issues: [Project Issues](https://github.com/muxu-io/mqtt-application/issues)\n- Email: alex@muxu.io\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A comprehensive asynchronous MQTT application library",
"version": "1.0.4",
"project_urls": {
"Homepage": "https://github.com/muxu-io/mqtt-application",
"Issues": "https://github.com/muxu-io/mqtt-application/issues",
"Repository": "https://github.com/muxu-io/mqtt-application.git"
},
"split_keywords": [
"mqtt",
" async",
" iot",
" messaging",
" client"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "731a769da457875179401659b2f513813aa5b0a0f9fb97bd67816ba5b5a259b6",
"md5": "76d6b0d87588d1a73b9bbc4a585e3517",
"sha256": "6f289f08a3ebcbea237ae1fcdebfe2ce8ac5a3f77571115019ebe10940cf78aa"
},
"downloads": -1,
"filename": "muxu_io_mqtt_application-1.0.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "76d6b0d87588d1a73b9bbc4a585e3517",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 33533,
"upload_time": "2025-08-16T21:22:19",
"upload_time_iso_8601": "2025-08-16T21:22:19.184740Z",
"url": "https://files.pythonhosted.org/packages/73/1a/769da457875179401659b2f513813aa5b0a0f9fb97bd67816ba5b5a259b6/muxu_io_mqtt_application-1.0.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "f73e1c1ebbec2cbc20001b6e54be2e21a250d9c491371c4f021feb3a3289060c",
"md5": "effc1c1fb0e34f5137a4eeb2e3d0861c",
"sha256": "9330b55e117eb892b306a746ee8108b19cc1d2c671e30e6c86b0bb1a81948ca1"
},
"downloads": -1,
"filename": "muxu_io_mqtt_application-1.0.4.tar.gz",
"has_sig": false,
"md5_digest": "effc1c1fb0e34f5137a4eeb2e3d0861c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 58108,
"upload_time": "2025-08-16T21:22:21",
"upload_time_iso_8601": "2025-08-16T21:22:21.061954Z",
"url": "https://files.pythonhosted.org/packages/f7/3e/1c1ebbec2cbc20001b6e54be2e21a250d9c491371c4f021feb3a3289060c/muxu_io_mqtt_application-1.0.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-16 21:22:21",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "muxu-io",
"github_project": "mqtt-application",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "muxu-io-mqtt-application"
}