# ModbusLink
<div align="center">
[](https://pepy.tech/projects/modbuslink)
[](https://badge.fury.io/py/modbuslink)
[](https://pypi.org/project/modbuslink/)
[](LICENSE.txt)
**A Modern, High-Performance Python Modbus Library**
*Industrial-grade • Developer-friendly • Production-ready*
[English](#) | [中文版](README-zh_CN.md) | [Documentation](https://miraitowa-la.github.io/ModbusLink/en/index.html) | [Examples](#examples)
</div>
---
## 🚀 Why ModbusLink?
ModbusLink is a next-generation Python Modbus library designed for **industrial automation**, **IoT applications**, and **SCADA systems**. Built with modern Python practices, it provides unparalleled ease of use while maintaining enterprise-grade reliability.
### ✨ Key Features
| Feature | Description | Benefit |
|---------|-------------|----------|
| 🏗️ **Layered Architecture** | Clean separation of concerns | Easy maintenance & extension |
| 🔌 **Universal Transports** | TCP, RTU, ASCII support | Works with any Modbus device |
| ⚡ **Async Performance** | Native asyncio support | Handle 1000+ concurrent connections |
| 🛠️ **Developer Experience** | Intuitive APIs & full typing | Faster development & fewer bugs |
| 📊 **Rich Data Types** | float32, int32, strings & more | Handle complex industrial data |
| 🔍 **Advanced Debugging** | Protocol-level monitoring | Rapid troubleshooting |
| 🖥️ **Complete Server** | Full server implementation | Build custom Modbus devices |
| 🎯 **Production Ready** | Comprehensive error handling | Deploy with confidence |
## 🚀 Quick Start
### Installation
```bash
# Install from PyPI
pip install modbuslink
# Or install with development dependencies
pip install modbuslink[dev]
```
### 30-Second Demo
```python
from modbuslink import ModbusClient, TcpTransport
# Connect to Modbus TCP device
transport = TcpTransport(host='192.168.1.100', port=502)
client = ModbusClient(transport)
with client:
# Read temperature from holding registers
temp = client.read_float32(slave_id=1, start_address=100)
print(f"Temperature: {temp:.1f}°C")
# Control pump via coil
client.write_single_coil(slave_id=1, address=0, value=True)
print("Pump started!")
```
## 📚 Complete Usage Guide
### TCP Client (Ethernet)
Perfect for **PLCs**, **HMIs**, and **Ethernet-based devices**:
```python
from modbuslink import ModbusClient, TcpTransport
# Connect to PLC via Ethernet
transport = TcpTransport(
host='192.168.1.10',
port=502,
timeout=5.0
)
client = ModbusClient(transport)
with client:
# Read production counter
counter = client.read_int32(slave_id=1, start_address=1000)
print(f"Production count: {counter}")
# Read sensor array
sensors = client.read_holding_registers(slave_id=1, start_address=2000, quantity=10)
print(f"Sensor values: {sensors}")
# Update setpoint
client.write_float32(slave_id=1, start_address=3000, value=75.5)
```
### RTU Client (Serial RS485/RS232)
Ideal for **field instruments**, **sensors**, and **legacy devices**:
```python
from modbuslink import ModbusClient, RtuTransport
# Connect to field device via RS485
transport = RtuTransport(
port='COM3', # Linux: '/dev/ttyUSB0'
baudrate=9600,
parity='N', # None, Even, Odd
stopbits=1,
timeout=2.0
)
client = ModbusClient(transport)
with client:
# Read flow meter
flow_rate = client.read_float32(slave_id=5, start_address=0)
print(f"Flow rate: {flow_rate:.2f} L/min")
# Read pressure transmitter
pressure = client.read_input_registers(slave_id=6, start_address=0, quantity=1)[0]
pressure_bar = pressure / 100.0 # Convert to bar
print(f"Pressure: {pressure_bar:.2f} bar")
```
### ASCII Client (Serial Text Protocol)
Special applications and **debugging**:
```python
from modbuslink import ModbusClient, AsciiTransport
# ASCII mode for special devices
transport = AsciiTransport(
port='COM1',
baudrate=9600,
bytesize=7, # 7-bit ASCII
parity='E', # Even parity
timeout=3.0
)
client = ModbusClient(transport)
with client:
# Read laboratory instrument
temperature = client.read_float32(slave_id=2, start_address=100)
print(f"Lab temperature: {temperature:.3f}°C")
```
### High-Performance Async Operations
**Handle multiple devices simultaneously** with async/await:
```python
import asyncio
from modbuslink import AsyncModbusClient, AsyncTcpTransport
async def read_multiple_devices():
"""Read from multiple PLCs concurrently"""
# Create connections to different PLCs
plc1 = AsyncModbusClient(AsyncTcpTransport('192.168.1.10', 502))
plc2 = AsyncModbusClient(AsyncTcpTransport('192.168.1.11', 502))
plc3 = AsyncModbusClient(AsyncTcpTransport('192.168.1.12', 502))
async with plc1, plc2, plc3:
# Read all PLCs simultaneously
tasks = [
plc1.read_holding_registers(1, 0, 10), # Production line 1
plc2.read_holding_registers(1, 0, 10), # Production line 2
plc3.read_holding_registers(1, 0, 10), # Production line 3
]
results = await asyncio.gather(*tasks)
for i, data in enumerate(results, 1):
print(f"PLC {i} data: {data}")
# Run async example
asyncio.run(read_multiple_devices())
```
## 🖥️ Modbus Server Implementation
**Build your own Modbus devices** with ModbusLink's powerful server capabilities:
### TCP Server (Multi-Client Support)
Create **HMI simulators**, **device emulators**, or **data concentrators**:
```python
from modbuslink import AsyncTcpModbusServer, ModbusDataStore
import asyncio
async def industrial_tcp_server():
"""Simulate a complete industrial control system"""
# Create data store for 1000 points each
data_store = ModbusDataStore(
coils_size=1000, # Digital outputs (pumps, valves)
discrete_inputs_size=1000, # Digital inputs (sensors, switches)
holding_registers_size=1000, # Analog outputs (setpoints)
input_registers_size=1000 # Analog inputs (measurements)
)
# Initialize industrial data
# Pump and valve controls
data_store.write_coils(0, [True, False, True, False])
# Process setpoints (temperatures, pressures)
data_store.write_holding_registers(0, [750, 1200, 850, 600]) # °C * 10
# Sensor readings (simulated)
data_store.write_input_registers(0, [748, 1195, 847, 598]) # Current values
# Safety interlocks and limit switches
data_store.write_discrete_inputs(0, [True, True, False, True])
# Create multi-client TCP server
server = AsyncTcpModbusServer(
host="0.0.0.0", # Accept connections from any IP
port=502, # Standard Modbus port
data_store=data_store,
slave_id=1,
max_connections=50 # Support up to 50 HMI clients
)
print("Starting Industrial Control System Simulator...")
print("Connect your HMI to: <your_ip>:502")
print("Slave ID: 1")
try:
await server.start()
# Start background data simulation
simulation_task = asyncio.create_task(simulate_process_data(data_store))
# Run server forever
await server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down server...")
simulation_task.cancel()
finally:
await server.stop()
async def simulate_process_data(data_store):
"""Simulate changing process values"""
import random
while True:
# Simulate temperature fluctuations
temps = [random.randint(740, 760) for _ in range(4)]
data_store.write_input_registers(0, temps)
# Simulate pressure changes
pressures = [random.randint(1180, 1220) for _ in range(4)]
data_store.write_input_registers(10, pressures)
await asyncio.sleep(1.0) # Update every second
# Run the server
asyncio.run(industrial_tcp_server())
```
### RTU Server (Serial Field Device)
Emulate **field instruments** and **smart sensors**:
```python
from modbuslink import AsyncRtuModbusServer, ModbusDataStore
import asyncio
async def smart_sensor_rtu():
"""Simulate a smart temperature/pressure sensor"""
data_store = ModbusDataStore(
holding_registers_size=100, # Configuration registers
input_registers_size=100 # Measurement data
)
# Device configuration
data_store.write_holding_registers(0, [
250, # Temperature alarm high (°C * 10)
-50, # Temperature alarm low
1500, # Pressure alarm high (mbar)
500 # Pressure alarm low
])
# Create RTU field device
server = AsyncRtuModbusServer(
port="COM3", # Serial port
baudrate=9600,
parity="N",
data_store=data_store,
slave_id=15, # Field device address
timeout=2.0
)
print("Smart Sensor RTU Device Starting...")
print(f"Port: COM3, Baudrate: 9600, Slave ID: 15")
try:
await server.start()
# Start sensor simulation
sensor_task = asyncio.create_task(simulate_sensor_readings(data_store))
await server.serve_forever()
except KeyboardInterrupt:
print("\nSensor offline")
sensor_task.cancel()
finally:
await server.stop()
async def simulate_sensor_readings(data_store):
"""Simulate realistic sensor behavior"""
import random, math, time
start_time = time.time()
while True:
elapsed = time.time() - start_time
# Simulate daily temperature variation
base_temp = 200 + 50 * math.sin(elapsed / 3600) # Hourly cycle
temp = int(base_temp + random.uniform(-5, 5)) # Add noise
# Simulate correlated pressure
pressure = int(1000 + temp * 0.5 + random.uniform(-10, 10))
# Update input registers
data_store.write_input_registers(0, [temp, pressure])
await asyncio.sleep(5.0) # Update every 5 seconds
# Run the sensor
asyncio.run(smart_sensor_rtu())
```
### Multi-Server Deployment
**Run multiple servers simultaneously** for complex applications:
```python
from modbuslink import (
AsyncTcpModbusServer,
AsyncRtuModbusServer,
AsyncAsciiModbusServer,
ModbusDataStore
)
import asyncio
async def multi_protocol_gateway():
"""Create a multi-protocol Modbus gateway"""
# Shared data store for all protocols
shared_data = ModbusDataStore(
coils_size=1000,
discrete_inputs_size=1000,
holding_registers_size=1000,
input_registers_size=1000
)
# Initialize gateway data
shared_data.write_holding_registers(0, list(range(100, 200)))
# Create multiple servers
tcp_server = AsyncTcpModbusServer(
host="0.0.0.0", port=502,
data_store=shared_data, slave_id=1
)
rtu_server = AsyncRtuModbusServer(
port="COM3", baudrate=9600,
data_store=shared_data, slave_id=1
)
ascii_server = AsyncAsciiModbusServer(
port="COM4", baudrate=9600,
data_store=shared_data, slave_id=1
)
# Start all servers
servers = [tcp_server, rtu_server, ascii_server]
try:
# Start servers concurrently
await asyncio.gather(
*[server.start() for server in servers]
)
print("Multi-Protocol Gateway Online:")
print(" • TCP: 0.0.0.0:502")
print(" • RTU: COM3@9600")
print(" • ASCII: COM4@9600")
# Run all servers
await asyncio.gather(
*[server.serve_forever() for server in servers]
)
except KeyboardInterrupt:
print("\nShutting down gateway...")
finally:
await asyncio.gather(
*[server.stop() for server in servers]
)
# Run the gateway
asyncio.run(multi_protocol_gateway())
```
## 📊 Advanced Data Types & Industrial Applications
### Working with Industrial Data
ModbusLink provides **native support** for common industrial data formats:
```python
with client:
# ✨ 32-bit IEEE 754 Floating Point
# Perfect for: Temperature, Pressure, Flow rates, Analog measurements
client.write_float32(slave_id=1, start_address=100, value=25.67) # Temperature °C
temperature = client.read_float32(slave_id=1, start_address=100)
print(f"Process temperature: {temperature:.2f}°C")
# 🔢 32-bit Signed Integer
# Perfect for: Counters, Production counts, Encoder positions
client.write_int32(slave_id=1, start_address=102, value=-123456)
position = client.read_int32(slave_id=1, start_address=102)
print(f"Encoder position: {position} pulses")
# 📝 String Data
# Perfect for: Device names, Alarm messages, Part numbers
client.write_string(slave_id=1, start_address=110, value="PUMP_001")
device_name = client.read_string(slave_id=1, start_address=110, length=10)
print(f"Device: {device_name}")
# 🔄 Byte Order Control (Critical for multi-vendor compatibility)
# Handle different PLC manufacturers
# Siemens style: Big endian, high word first
client.write_float32(
slave_id=1, start_address=200, value=3.14159,
byte_order="big", word_order="high"
)
# Schneider style: Little endian, low word first
client.write_float32(
slave_id=1, start_address=202, value=3.14159,
byte_order="little", word_order="low"
)
```
### Real-World Industrial Example
```python
from modbuslink import ModbusClient, TcpTransport
import time
def monitor_production_line():
"""Complete production line monitoring system"""
transport = TcpTransport(host='192.168.1.50', port=502, timeout=3.0)
client = ModbusClient(transport)
with client:
print("🏭 Production Line Monitor Started")
print("=" * 50)
while True:
try:
# Read critical process parameters
# Temperature control loop (PID setpoint & process value)
temp_setpoint = client.read_float32(1, 1000) # Setpoint
temp_actual = client.read_float32(1, 1002) # Process value
# Production counter (32-bit integer)
parts_produced = client.read_int32(1, 2000)
# Quality metrics (holding registers)
quality_data = client.read_holding_registers(1, 3000, 5)
reject_count = quality_data[0]
efficiency = quality_data[1] / 100.0 # Convert to percentage
# System status (coils)
status_coils = client.read_coils(1, 0, 8)
line_running = status_coils[0]
emergency_stop = status_coils[1]
# Display real-time data
print(f"\r🌡️ Temp: {temp_actual:6.1f}°C (SP: {temp_setpoint:.1f}) "
f"🔢 Parts: {parts_produced:6d} "
f"🏆 Efficiency: {efficiency:5.1f}% "
f"🚨 Status: {'RUN' if line_running else 'STOP'}", end="")
# Automatic quality control
if efficiency < 85.0:
print("\n⚠️ Low efficiency detected - adjusting parameters...")
# Adjust temperature setpoint
new_setpoint = temp_setpoint + 0.5
client.write_float32(1, 1000, new_setpoint)
# Safety check
if temp_actual > 85.0:
print("\n🔥 OVERTEMPERATURE ALARM!")
# Emergency shutdown
client.write_single_coil(1, 0, False) # Stop line
break
time.sleep(1.0)
except KeyboardInterrupt:
print("\n🛱 Monitor stopped by user")
break
except Exception as e:
print(f"\n❌ Communication error: {e}")
time.sleep(5.0) # Retry after 5 seconds
# Run the monitoring system
monitor_production_line()
```
## 🛡️ Production-Ready Features
### Comprehensive Error Handling
**Never lose production data** with robust error management:
```python
from modbuslink import (
ModbusClient, TcpTransport,
ConnectionError, TimeoutError, ModbusException, CRCError
)
import time
def resilient_data_collector():
"""Production-grade data collection with full error handling"""
transport = TcpTransport(host='192.168.1.100', port=502)
client = ModbusClient(transport)
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
with client:
# Critical data collection
production_data = client.read_holding_registers(1, 1000, 50)
print(f"✅ Data collected successfully: {len(production_data)} points")
return production_data
except ConnectionError as e:
print(f"🔌 Network connection failed: {e}")
print(" • Check network cables")
print(" • Verify device IP address")
print(" • Check firewall settings")
except TimeoutError as e:
print(f"⏱️ Operation timed out: {e}")
print(" • Network may be congested")
print(" • Device may be overloaded")
except CRCError as e:
print(f"📊 Data corruption detected: {e}")
print(" • Check serial cable integrity")
print(" • Verify baud rate settings")
except ModbusException as e:
print(f"📝 Protocol error: {e}")
print(" • Invalid slave address")
print(" • Register address out of range")
print(" • Function not supported")
except Exception as e:
print(f"❌ Unknown error: {e}")
# Exponential backoff retry
retry_count += 1
wait_time = 2 ** retry_count
print(f"🔄 Retrying in {wait_time} seconds... ({retry_count}/{max_retries})")
time.sleep(wait_time)
print("❌ Failed to collect data after all retries")
return None
# Use in production
data = resilient_data_collector()
if data:
print("Processing data...")
else:
print("Activating backup data source...")
```
### Advanced Logging & Debugging
**Debug communication issues** with protocol-level monitoring:
```python
from modbuslink.utils import ModbusLogger
import logging
# Setup comprehensive logging
ModbusLogger.setup_logging(
level=logging.DEBUG,
enable_debug=True,
log_file='modbus_debug.log',
console_output=True
)
# Enable packet-level debugging
ModbusLogger.enable_protocol_debug()
# Now all Modbus communications are logged:
# 2024-08-30 10:15:23 [DEBUG] Sending: 01 03 00 00 00 0A C5 CD
# 2024-08-30 10:15:23 [DEBUG] Received: 01 03 14 00 64 00 C8 01 2C 01 90 01 F4 02 58 02 BC 03 20 03 84 E5 C6
```
### Performance Monitoring
```python
import asyncio
import time
from modbuslink import AsyncModbusClient, AsyncTcpTransport
async def performance_benchmark():
"""Measure ModbusLink performance"""
client = AsyncModbusClient(AsyncTcpTransport('192.168.1.100'))
async with client:
# Benchmark concurrent operations
start_time = time.time()
# 100 concurrent read operations
tasks = [
client.read_holding_registers(1, i*10, 10)
for i in range(100)
]
results = await asyncio.gather(*tasks)
end_time = time.time()
duration = end_time - start_time
print(f"🚀 Performance Results:")
print(f" • Operations: {len(tasks)}")
print(f" • Total time: {duration:.2f} seconds")
print(f" • Operations/sec: {len(tasks)/duration:.1f}")
print(f" • Avg response time: {duration*1000/len(tasks):.1f} ms")
# Run benchmark
asyncio.run(performance_benchmark())
```
## 📈 Supported Modbus Functions
Complete **Modbus specification** implementation:
| Function Code | Name | Description | Use Case |
|---------------|------|-------------|----------|
| **0x01** | Read Coils | Read 1-2000 coil status | Digital outputs (pumps, valves, motors) |
| **0x02** | Read Discrete Inputs | Read 1-2000 input status | Digital sensors (limit switches, buttons) |
| **0x03** | Read Holding Registers | Read 1-125 register values | Analog outputs (setpoints, parameters) |
| **0x04** | Read Input Registers | Read 1-125 input values | Analog inputs (temperature, pressure) |
| **0x05** | Write Single Coil | Write one coil | Control single device (start pump) |
| **0x06** | Write Single Register | Write one register | Set single parameter (temperature setpoint) |
| **0x0F** | Write Multiple Coils | Write 1-1968 coils | Batch control (production sequence) |
| **0x10** | Write Multiple Registers | Write 1-123 registers | Batch parameters (recipe download) |
### Transport Layer Architecture
ModbusLink's **layered design** supports all major Modbus variants:
#### Synchronous Transports
- 🌐 **TcpTransport**: Ethernet Modbus TCP/IP (IEEE 802.3)
- 📞 **RtuTransport**: Serial Modbus RTU (RS232/RS485)
- 📜 **AsciiTransport**: Serial Modbus ASCII (7-bit text)
#### Asynchronous Transports
- ⚡ **AsyncTcpTransport**: High-performance TCP (1000+ concurrent connections)
- ⚡ **AsyncRtuTransport**: Non-blocking serial RTU
- ⚡ **AsyncAsciiTransport**: Non-blocking serial ASCII
### Key Performance Metrics
| Metric | Sync Client | Async Client | Async Server |
|--------|-------------|--------------|-------------|
| **Throughput** | 100 ops/sec | 1000+ ops/sec | 5000+ ops/sec |
| **Connections** | 1 | 1000+ | 1000+ |
| **Memory Usage** | Low | Medium | Medium |
| **CPU Usage** | Low | Very Low | Low |
| **Latency** | 10-50ms | 5-20ms | 1-10ms |
## 📁 Project Architecture
**Clean, maintainable, and extensible** codebase structure:
```
ModbusLink/
├── src/modbuslink/
│ ├── client/ # 📱 Client Layer
│ │ ├── sync_client.py # Synchronous Modbus client
│ │ └── async_client.py # Asynchronous client with callbacks
│ │
│ ├── server/ # 🖥️ Server Layer
│ │ ├── data_store.py # Thread-safe data storage
│ │ ├── async_base_server.py # Server base class
│ │ ├── async_tcp_server.py # Multi-client TCP server
│ │ ├── async_rtu_server.py # Serial RTU server
│ │ └── async_ascii_server.py # Serial ASCII server
│ │
│ ├── transport/ # 🚚 Transport Layer
│ │ ├── base.py # Sync transport interface
│ │ ├── async_base.py # Async transport interface
│ │ ├── tcp.py # TCP/IP implementation
│ │ ├── rtu.py # RTU serial implementation
│ │ ├── ascii.py # ASCII serial implementation
│ │ ├── async_tcp.py # Async TCP with connection pooling
│ │ ├── async_rtu.py # Async RTU with frame detection
│ │ └── async_ascii.py # Async ASCII with message parsing
│ │
│ ├── utils/ # 🔧 Utility Layer
│ │ ├── crc.py # CRC16 validation (RTU)
│ │ ├── coder.py # Data type conversion
│ │ └── logging.py # Advanced logging system
│ │
│ └── common/ # 🛠️ Common Components
│ └── exceptions.py # Custom exception hierarchy
│
├── examples/ # 📚 Usage Examples
│ ├── sync_tcp_example.py # Basic TCP client
│ ├── async_tcp_example.py # High-performance async client
│ ├── sync_rtu_example.py # Serial RTU communication
│ ├── async_rtu_example.py # Async RTU with error recovery
│ ├── sync_ascii_example.py # ASCII mode debugging
│ ├── async_ascii_example.py # Async ASCII communication
│ ├── async_tcp_server_example.py # Multi-client TCP server
│ ├── async_rtu_server_example.py # RTU field device simulator
│ ├── async_ascii_server_example.py # ASCII device emulator
│ └── multi_server_example.py # Multi-protocol gateway
│
└── docs/ # 📜 Documentation
├── api/ # API reference
├── guides/ # User guides
└── examples/ # Advanced examples
```
## 📚 Examples
Explore **real-world scenarios** in the [examples](examples/) directory:
### 🔄 Synchronous Examples
- **Industrial Control**: Basic sync operations for PLCs and field devices
- **Data Acquisition**: Reliable data collection from sensors
- **Device Configuration**: Parameter setup and calibration
### ⚡ Asynchronous Examples
- **SCADA Systems**: High-performance monitoring of multiple devices
- **IoT Gateways**: Concurrent communication with hundreds of sensors
- **Real-time Control**: Sub-millisecond response applications
### 🖥️ Server Examples
- **Device Simulators**: Test HMI applications without physical hardware
- **Protocol Gateways**: Bridge different Modbus variants
- **Training Systems**: Educational Modbus lab setups
### 🎆 Advanced Features
- **Multi-Protocol**: Run TCP, RTU, and ASCII servers simultaneously
- **Error Recovery**: Automatic reconnection and retry logic
- **Performance Tuning**: Optimize for your specific use case
- **Production Deployment**: Best practices for 24/7 operation
## ⚙️ System Requirements
### Core Requirements
- **Python**: 3.8+ (3.9+ recommended for best performance)
- **Operating System**: Windows, Linux, macOS
- **Memory**: Minimum 64MB RAM
- **Network**: TCP/IP stack for Modbus TCP
- **Serial Ports**: RS232/RS485 for RTU/ASCII
### Dependencies
```bash
# Core dependencies (automatically installed)
pyserial >= 3.5 # Serial port communication
pyserial-asyncio >= 0.6 # Async serial support
typing_extensions >= 4.0.0 # Enhanced type hints
# Development dependencies (optional)
pytest >= 7.0 # Unit testing
pytest-mock >= 3.0 # Test mocking
black >= 22.0 # Code formatting
ruff >= 0.1.0 # Code linting
mypy >= 1.0 # Type checking
```
### Performance Recommendations
- **CPU**: Multi-core recommended for async servers (2+ cores)
- **Network**: Gigabit Ethernet for high-throughput TCP applications
- **Serial**: USB-to-RS485 converters with FTDI chipsets
- **Python**: Use CPython for best performance (avoid PyPy for serial I/O)
## 📜 License & Contributing
**MIT License** - Free for commercial use. See [LICENSE.txt](LICENSE.txt) for details.
### Contributing Guidelines
**We welcome contributions!** Please:
1. 🍿 **Fork** the repository
2. 🌱 **Create** a feature branch
3. ✨ **Add** tests for new functionality
4. 📝 **Update** documentation
5. 🚀 **Submit** a pull request
**Areas where we need help:**
- Additional Modbus function codes (0x14, 0x15, 0x16, 0x17)
- Performance optimizations
- Additional transport protocols (Modbus Plus, etc.)
- Documentation improvements
- Real-world testing and bug reports
### Community & Support
- 💬 **GitHub Issues**: Bug reports and feature requests
- 📧 **Email Support**: Technical questions and consulting
- 📚 **Documentation**: Comprehensive guides and API reference
- 🎆 **Examples**: Production-ready code samples
---
<div align="center">
**Built with ❤️ for the Industrial Automation Community**
*ModbusLink - Connecting Industrial Systems with Modern Python*
</div>
Raw data
{
"_id": null,
"home_page": null,
"name": "modbuslink",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "modbus, industrial, automation, communication, protocol, tcp, rtu, ascii, serial, plc, scada, iot",
"author": null,
"author_email": "Miraitowa-la <2056978412@qq.com>",
"download_url": "https://files.pythonhosted.org/packages/9a/a3/92f0c19d776051684d127968042b6c9f1cef65eb62c48926b1392088510e/modbuslink-1.2.0.tar.gz",
"platform": null,
"description": "# ModbusLink\r\n\r\n<div align=\"center\">\r\n\r\n[](https://pepy.tech/projects/modbuslink)\r\n[](https://badge.fury.io/py/modbuslink)\r\n[](https://pypi.org/project/modbuslink/)\r\n[](LICENSE.txt)\r\n\r\n**A Modern, High-Performance Python Modbus Library**\r\n\r\n*Industrial-grade \u2022 Developer-friendly \u2022 Production-ready*\r\n\r\n[English](#) | [\u4e2d\u6587\u7248](README-zh_CN.md) | [Documentation](https://miraitowa-la.github.io/ModbusLink/en/index.html) | [Examples](#examples)\r\n\r\n</div>\r\n\r\n---\r\n\r\n## \ud83d\ude80 Why ModbusLink?\r\n\r\nModbusLink is a next-generation Python Modbus library designed for **industrial automation**, **IoT applications**, and **SCADA systems**. Built with modern Python practices, it provides unparalleled ease of use while maintaining enterprise-grade reliability.\r\n\r\n### \u2728 Key Features\r\n\r\n| Feature | Description | Benefit |\r\n|---------|-------------|----------|\r\n| \ud83c\udfd7\ufe0f **Layered Architecture** | Clean separation of concerns | Easy maintenance & extension |\r\n| \ud83d\udd0c **Universal Transports** | TCP, RTU, ASCII support | Works with any Modbus device |\r\n| \u26a1 **Async Performance** | Native asyncio support | Handle 1000+ concurrent connections |\r\n| \ud83d\udee0\ufe0f **Developer Experience** | Intuitive APIs & full typing | Faster development & fewer bugs |\r\n| \ud83d\udcca **Rich Data Types** | float32, int32, strings & more | Handle complex industrial data |\r\n| \ud83d\udd0d **Advanced Debugging** | Protocol-level monitoring | Rapid troubleshooting |\r\n| \ud83d\udda5\ufe0f **Complete Server** | Full server implementation | Build custom Modbus devices |\r\n| \ud83c\udfaf **Production Ready** | Comprehensive error handling | Deploy with confidence |\r\n\r\n## \ud83d\ude80 Quick Start\r\n\r\n### Installation\r\n\r\n```bash\r\n# Install from PyPI\r\npip install modbuslink\r\n\r\n# Or install with development dependencies\r\npip install modbuslink[dev]\r\n```\r\n\r\n### 30-Second Demo\r\n\r\n```python\r\nfrom modbuslink import ModbusClient, TcpTransport\r\n\r\n# Connect to Modbus TCP device\r\ntransport = TcpTransport(host='192.168.1.100', port=502)\r\nclient = ModbusClient(transport)\r\n\r\nwith client:\r\n # Read temperature from holding registers\r\n temp = client.read_float32(slave_id=1, start_address=100)\r\n print(f\"Temperature: {temp:.1f}\u00b0C\")\r\n \r\n # Control pump via coil\r\n client.write_single_coil(slave_id=1, address=0, value=True)\r\n print(\"Pump started!\")\r\n```\r\n\r\n## \ud83d\udcda Complete Usage Guide\r\n\r\n### TCP Client (Ethernet)\r\n\r\nPerfect for **PLCs**, **HMIs**, and **Ethernet-based devices**:\r\n\r\n```python\r\nfrom modbuslink import ModbusClient, TcpTransport\r\n\r\n# Connect to PLC via Ethernet\r\ntransport = TcpTransport(\r\n host='192.168.1.10',\r\n port=502,\r\n timeout=5.0\r\n)\r\nclient = ModbusClient(transport)\r\n\r\nwith client:\r\n # Read production counter\r\n counter = client.read_int32(slave_id=1, start_address=1000)\r\n print(f\"Production count: {counter}\")\r\n \r\n # Read sensor array\r\n sensors = client.read_holding_registers(slave_id=1, start_address=2000, quantity=10)\r\n print(f\"Sensor values: {sensors}\")\r\n \r\n # Update setpoint\r\n client.write_float32(slave_id=1, start_address=3000, value=75.5)\r\n```\r\n\r\n### RTU Client (Serial RS485/RS232)\r\n\r\nIdeal for **field instruments**, **sensors**, and **legacy devices**:\r\n\r\n```python\r\nfrom modbuslink import ModbusClient, RtuTransport\r\n\r\n# Connect to field device via RS485\r\ntransport = RtuTransport(\r\n port='COM3', # Linux: '/dev/ttyUSB0'\r\n baudrate=9600,\r\n parity='N', # None, Even, Odd\r\n stopbits=1,\r\n timeout=2.0\r\n)\r\nclient = ModbusClient(transport)\r\n\r\nwith client:\r\n # Read flow meter\r\n flow_rate = client.read_float32(slave_id=5, start_address=0)\r\n print(f\"Flow rate: {flow_rate:.2f} L/min\")\r\n \r\n # Read pressure transmitter\r\n pressure = client.read_input_registers(slave_id=6, start_address=0, quantity=1)[0]\r\n pressure_bar = pressure / 100.0 # Convert to bar\r\n print(f\"Pressure: {pressure_bar:.2f} bar\")\r\n```\r\n\r\n### ASCII Client (Serial Text Protocol)\r\n\r\nSpecial applications and **debugging**:\r\n\r\n```python\r\nfrom modbuslink import ModbusClient, AsciiTransport\r\n\r\n# ASCII mode for special devices\r\ntransport = AsciiTransport(\r\n port='COM1',\r\n baudrate=9600,\r\n bytesize=7, # 7-bit ASCII\r\n parity='E', # Even parity\r\n timeout=3.0\r\n)\r\nclient = ModbusClient(transport)\r\n\r\nwith client:\r\n # Read laboratory instrument\r\n temperature = client.read_float32(slave_id=2, start_address=100)\r\n print(f\"Lab temperature: {temperature:.3f}\u00b0C\")\r\n```\r\n\r\n### High-Performance Async Operations\r\n\r\n**Handle multiple devices simultaneously** with async/await:\r\n\r\n```python\r\nimport asyncio\r\nfrom modbuslink import AsyncModbusClient, AsyncTcpTransport\r\n\r\nasync def read_multiple_devices():\r\n \"\"\"Read from multiple PLCs concurrently\"\"\"\r\n \r\n # Create connections to different PLCs\r\n plc1 = AsyncModbusClient(AsyncTcpTransport('192.168.1.10', 502))\r\n plc2 = AsyncModbusClient(AsyncTcpTransport('192.168.1.11', 502))\r\n plc3 = AsyncModbusClient(AsyncTcpTransport('192.168.1.12', 502))\r\n \r\n async with plc1, plc2, plc3:\r\n # Read all PLCs simultaneously\r\n tasks = [\r\n plc1.read_holding_registers(1, 0, 10), # Production line 1\r\n plc2.read_holding_registers(1, 0, 10), # Production line 2\r\n plc3.read_holding_registers(1, 0, 10), # Production line 3\r\n ]\r\n \r\n results = await asyncio.gather(*tasks)\r\n \r\n for i, data in enumerate(results, 1):\r\n print(f\"PLC {i} data: {data}\")\r\n\r\n# Run async example\r\nasyncio.run(read_multiple_devices())\r\n```\r\n\r\n## \ud83d\udda5\ufe0f Modbus Server Implementation\r\n\r\n**Build your own Modbus devices** with ModbusLink's powerful server capabilities:\r\n\r\n### TCP Server (Multi-Client Support)\r\n\r\nCreate **HMI simulators**, **device emulators**, or **data concentrators**:\r\n\r\n```python\r\nfrom modbuslink import AsyncTcpModbusServer, ModbusDataStore\r\nimport asyncio\r\n\r\nasync def industrial_tcp_server():\r\n \"\"\"Simulate a complete industrial control system\"\"\"\r\n \r\n # Create data store for 1000 points each\r\n data_store = ModbusDataStore(\r\n coils_size=1000, # Digital outputs (pumps, valves)\r\n discrete_inputs_size=1000, # Digital inputs (sensors, switches)\r\n holding_registers_size=1000, # Analog outputs (setpoints)\r\n input_registers_size=1000 # Analog inputs (measurements)\r\n )\r\n \r\n # Initialize industrial data\r\n # Pump and valve controls\r\n data_store.write_coils(0, [True, False, True, False])\r\n \r\n # Process setpoints (temperatures, pressures)\r\n data_store.write_holding_registers(0, [750, 1200, 850, 600]) # \u00b0C * 10\r\n \r\n # Sensor readings (simulated)\r\n data_store.write_input_registers(0, [748, 1195, 847, 598]) # Current values\r\n \r\n # Safety interlocks and limit switches\r\n data_store.write_discrete_inputs(0, [True, True, False, True])\r\n \r\n # Create multi-client TCP server\r\n server = AsyncTcpModbusServer(\r\n host=\"0.0.0.0\", # Accept connections from any IP\r\n port=502, # Standard Modbus port\r\n data_store=data_store,\r\n slave_id=1,\r\n max_connections=50 # Support up to 50 HMI clients\r\n )\r\n \r\n print(\"Starting Industrial Control System Simulator...\")\r\n print(\"Connect your HMI to: <your_ip>:502\")\r\n print(\"Slave ID: 1\")\r\n \r\n try:\r\n await server.start()\r\n \r\n # Start background data simulation\r\n simulation_task = asyncio.create_task(simulate_process_data(data_store))\r\n \r\n # Run server forever\r\n await server.serve_forever()\r\n \r\n except KeyboardInterrupt:\r\n print(\"\\nShutting down server...\")\r\n simulation_task.cancel()\r\n finally:\r\n await server.stop()\r\n\r\nasync def simulate_process_data(data_store):\r\n \"\"\"Simulate changing process values\"\"\"\r\n import random\r\n \r\n while True:\r\n # Simulate temperature fluctuations\r\n temps = [random.randint(740, 760) for _ in range(4)]\r\n data_store.write_input_registers(0, temps)\r\n \r\n # Simulate pressure changes\r\n pressures = [random.randint(1180, 1220) for _ in range(4)]\r\n data_store.write_input_registers(10, pressures)\r\n \r\n await asyncio.sleep(1.0) # Update every second\r\n\r\n# Run the server\r\nasyncio.run(industrial_tcp_server())\r\n```\r\n\r\n### RTU Server (Serial Field Device)\r\n\r\nEmulate **field instruments** and **smart sensors**:\r\n\r\n```python\r\nfrom modbuslink import AsyncRtuModbusServer, ModbusDataStore\r\nimport asyncio\r\n\r\nasync def smart_sensor_rtu():\r\n \"\"\"Simulate a smart temperature/pressure sensor\"\"\"\r\n \r\n data_store = ModbusDataStore(\r\n holding_registers_size=100, # Configuration registers\r\n input_registers_size=100 # Measurement data\r\n )\r\n \r\n # Device configuration\r\n data_store.write_holding_registers(0, [\r\n 250, # Temperature alarm high (\u00b0C * 10)\r\n -50, # Temperature alarm low\r\n 1500, # Pressure alarm high (mbar)\r\n 500 # Pressure alarm low\r\n ])\r\n \r\n # Create RTU field device\r\n server = AsyncRtuModbusServer(\r\n port=\"COM3\", # Serial port\r\n baudrate=9600,\r\n parity=\"N\",\r\n data_store=data_store,\r\n slave_id=15, # Field device address\r\n timeout=2.0\r\n )\r\n \r\n print(\"Smart Sensor RTU Device Starting...\")\r\n print(f\"Port: COM3, Baudrate: 9600, Slave ID: 15\")\r\n \r\n try:\r\n await server.start()\r\n \r\n # Start sensor simulation\r\n sensor_task = asyncio.create_task(simulate_sensor_readings(data_store))\r\n \r\n await server.serve_forever()\r\n \r\n except KeyboardInterrupt:\r\n print(\"\\nSensor offline\")\r\n sensor_task.cancel()\r\n finally:\r\n await server.stop()\r\n\r\nasync def simulate_sensor_readings(data_store):\r\n \"\"\"Simulate realistic sensor behavior\"\"\"\r\n import random, math, time\r\n \r\n start_time = time.time()\r\n \r\n while True:\r\n elapsed = time.time() - start_time\r\n \r\n # Simulate daily temperature variation\r\n base_temp = 200 + 50 * math.sin(elapsed / 3600) # Hourly cycle\r\n temp = int(base_temp + random.uniform(-5, 5)) # Add noise\r\n \r\n # Simulate correlated pressure\r\n pressure = int(1000 + temp * 0.5 + random.uniform(-10, 10))\r\n \r\n # Update input registers\r\n data_store.write_input_registers(0, [temp, pressure])\r\n \r\n await asyncio.sleep(5.0) # Update every 5 seconds\r\n\r\n# Run the sensor\r\nasyncio.run(smart_sensor_rtu())\r\n```\r\n\r\n### Multi-Server Deployment\r\n\r\n**Run multiple servers simultaneously** for complex applications:\r\n\r\n```python\r\nfrom modbuslink import (\r\n AsyncTcpModbusServer,\r\n AsyncRtuModbusServer, \r\n AsyncAsciiModbusServer,\r\n ModbusDataStore\r\n)\r\nimport asyncio\r\n\r\nasync def multi_protocol_gateway():\r\n \"\"\"Create a multi-protocol Modbus gateway\"\"\"\r\n \r\n # Shared data store for all protocols\r\n shared_data = ModbusDataStore(\r\n coils_size=1000,\r\n discrete_inputs_size=1000,\r\n holding_registers_size=1000,\r\n input_registers_size=1000\r\n )\r\n \r\n # Initialize gateway data\r\n shared_data.write_holding_registers(0, list(range(100, 200)))\r\n \r\n # Create multiple servers\r\n tcp_server = AsyncTcpModbusServer(\r\n host=\"0.0.0.0\", port=502,\r\n data_store=shared_data, slave_id=1\r\n )\r\n \r\n rtu_server = AsyncRtuModbusServer(\r\n port=\"COM3\", baudrate=9600,\r\n data_store=shared_data, slave_id=1\r\n )\r\n \r\n ascii_server = AsyncAsciiModbusServer(\r\n port=\"COM4\", baudrate=9600,\r\n data_store=shared_data, slave_id=1\r\n )\r\n \r\n # Start all servers\r\n servers = [tcp_server, rtu_server, ascii_server]\r\n \r\n try:\r\n # Start servers concurrently\r\n await asyncio.gather(\r\n *[server.start() for server in servers]\r\n )\r\n \r\n print(\"Multi-Protocol Gateway Online:\")\r\n print(\" \u2022 TCP: 0.0.0.0:502\")\r\n print(\" \u2022 RTU: COM3@9600\")\r\n print(\" \u2022 ASCII: COM4@9600\")\r\n \r\n # Run all servers\r\n await asyncio.gather(\r\n *[server.serve_forever() for server in servers]\r\n )\r\n \r\n except KeyboardInterrupt:\r\n print(\"\\nShutting down gateway...\")\r\n finally:\r\n await asyncio.gather(\r\n *[server.stop() for server in servers]\r\n )\r\n\r\n# Run the gateway\r\nasyncio.run(multi_protocol_gateway())\r\n```\r\n\r\n## \ud83d\udcca Advanced Data Types & Industrial Applications\r\n\r\n### Working with Industrial Data\r\n\r\nModbusLink provides **native support** for common industrial data formats:\r\n\r\n```python\r\nwith client:\r\n # \u2728 32-bit IEEE 754 Floating Point\r\n # Perfect for: Temperature, Pressure, Flow rates, Analog measurements\r\n client.write_float32(slave_id=1, start_address=100, value=25.67) # Temperature \u00b0C\r\n temperature = client.read_float32(slave_id=1, start_address=100)\r\n print(f\"Process temperature: {temperature:.2f}\u00b0C\")\r\n \r\n # \ud83d\udd22 32-bit Signed Integer \r\n # Perfect for: Counters, Production counts, Encoder positions\r\n client.write_int32(slave_id=1, start_address=102, value=-123456)\r\n position = client.read_int32(slave_id=1, start_address=102)\r\n print(f\"Encoder position: {position} pulses\")\r\n \r\n # \ud83d\udcdd String Data\r\n # Perfect for: Device names, Alarm messages, Part numbers\r\n client.write_string(slave_id=1, start_address=110, value=\"PUMP_001\")\r\n device_name = client.read_string(slave_id=1, start_address=110, length=10)\r\n print(f\"Device: {device_name}\")\r\n \r\n # \ud83d\udd04 Byte Order Control (Critical for multi-vendor compatibility)\r\n # Handle different PLC manufacturers\r\n \r\n # Siemens style: Big endian, high word first\r\n client.write_float32(\r\n slave_id=1, start_address=200, value=3.14159,\r\n byte_order=\"big\", word_order=\"high\"\r\n )\r\n \r\n # Schneider style: Little endian, low word first \r\n client.write_float32(\r\n slave_id=1, start_address=202, value=3.14159,\r\n byte_order=\"little\", word_order=\"low\"\r\n )\r\n```\r\n\r\n### Real-World Industrial Example\r\n\r\n```python\r\nfrom modbuslink import ModbusClient, TcpTransport\r\nimport time\r\n\r\ndef monitor_production_line():\r\n \"\"\"Complete production line monitoring system\"\"\"\r\n \r\n transport = TcpTransport(host='192.168.1.50', port=502, timeout=3.0)\r\n client = ModbusClient(transport)\r\n \r\n with client:\r\n print(\"\ud83c\udfed Production Line Monitor Started\")\r\n print(\"=\" * 50)\r\n \r\n while True:\r\n try:\r\n # Read critical process parameters\r\n # Temperature control loop (PID setpoint & process value)\r\n temp_setpoint = client.read_float32(1, 1000) # Setpoint\r\n temp_actual = client.read_float32(1, 1002) # Process value\r\n \r\n # Production counter (32-bit integer)\r\n parts_produced = client.read_int32(1, 2000)\r\n \r\n # Quality metrics (holding registers)\r\n quality_data = client.read_holding_registers(1, 3000, 5)\r\n reject_count = quality_data[0]\r\n efficiency = quality_data[1] / 100.0 # Convert to percentage\r\n \r\n # System status (coils)\r\n status_coils = client.read_coils(1, 0, 8)\r\n line_running = status_coils[0]\r\n emergency_stop = status_coils[1]\r\n \r\n # Display real-time data\r\n print(f\"\\r\ud83c\udf21\ufe0f Temp: {temp_actual:6.1f}\u00b0C (SP: {temp_setpoint:.1f}) \"\r\n f\"\ud83d\udd22 Parts: {parts_produced:6d} \"\r\n f\"\ud83c\udfc6 Efficiency: {efficiency:5.1f}% \"\r\n f\"\ud83d\udea8 Status: {'RUN' if line_running else 'STOP'}\", end=\"\")\r\n \r\n # Automatic quality control\r\n if efficiency < 85.0:\r\n print(\"\\n\u26a0\ufe0f Low efficiency detected - adjusting parameters...\")\r\n # Adjust temperature setpoint\r\n new_setpoint = temp_setpoint + 0.5\r\n client.write_float32(1, 1000, new_setpoint)\r\n \r\n # Safety check\r\n if temp_actual > 85.0:\r\n print(\"\\n\ud83d\udd25 OVERTEMPERATURE ALARM!\")\r\n # Emergency shutdown\r\n client.write_single_coil(1, 0, False) # Stop line\r\n break\r\n \r\n time.sleep(1.0)\r\n \r\n except KeyboardInterrupt:\r\n print(\"\\n\ud83d\udef1 Monitor stopped by user\")\r\n break\r\n except Exception as e:\r\n print(f\"\\n\u274c Communication error: {e}\")\r\n time.sleep(5.0) # Retry after 5 seconds\r\n\r\n# Run the monitoring system\r\nmonitor_production_line()\r\n```\r\n\r\n## \ud83d\udee1\ufe0f Production-Ready Features\r\n\r\n### Comprehensive Error Handling\r\n\r\n**Never lose production data** with robust error management:\r\n\r\n```python\r\nfrom modbuslink import (\r\n ModbusClient, TcpTransport,\r\n ConnectionError, TimeoutError, ModbusException, CRCError\r\n)\r\nimport time\r\n\r\ndef resilient_data_collector():\r\n \"\"\"Production-grade data collection with full error handling\"\"\"\r\n \r\n transport = TcpTransport(host='192.168.1.100', port=502)\r\n client = ModbusClient(transport)\r\n \r\n retry_count = 0\r\n max_retries = 3\r\n \r\n while retry_count < max_retries:\r\n try:\r\n with client:\r\n # Critical data collection\r\n production_data = client.read_holding_registers(1, 1000, 50)\r\n print(f\"\u2705 Data collected successfully: {len(production_data)} points\")\r\n return production_data\r\n \r\n except ConnectionError as e:\r\n print(f\"\ud83d\udd0c Network connection failed: {e}\")\r\n print(\" \u2022 Check network cables\")\r\n print(\" \u2022 Verify device IP address\")\r\n print(\" \u2022 Check firewall settings\")\r\n \r\n except TimeoutError as e:\r\n print(f\"\u23f1\ufe0f Operation timed out: {e}\")\r\n print(\" \u2022 Network may be congested\")\r\n print(\" \u2022 Device may be overloaded\")\r\n \r\n except CRCError as e:\r\n print(f\"\ud83d\udcca Data corruption detected: {e}\")\r\n print(\" \u2022 Check serial cable integrity\")\r\n print(\" \u2022 Verify baud rate settings\")\r\n \r\n except ModbusException as e:\r\n print(f\"\ud83d\udcdd Protocol error: {e}\")\r\n print(\" \u2022 Invalid slave address\")\r\n print(\" \u2022 Register address out of range\")\r\n print(\" \u2022 Function not supported\")\r\n \r\n except Exception as e:\r\n print(f\"\u274c Unknown error: {e}\")\r\n \r\n # Exponential backoff retry\r\n retry_count += 1\r\n wait_time = 2 ** retry_count\r\n print(f\"\ud83d\udd04 Retrying in {wait_time} seconds... ({retry_count}/{max_retries})\")\r\n time.sleep(wait_time)\r\n \r\n print(\"\u274c Failed to collect data after all retries\")\r\n return None\r\n\r\n# Use in production\r\ndata = resilient_data_collector()\r\nif data:\r\n print(\"Processing data...\")\r\nelse:\r\n print(\"Activating backup data source...\")\r\n```\r\n\r\n### Advanced Logging & Debugging\r\n\r\n**Debug communication issues** with protocol-level monitoring:\r\n\r\n```python\r\nfrom modbuslink.utils import ModbusLogger\r\nimport logging\r\n\r\n# Setup comprehensive logging\r\nModbusLogger.setup_logging(\r\n level=logging.DEBUG,\r\n enable_debug=True,\r\n log_file='modbus_debug.log',\r\n console_output=True\r\n)\r\n\r\n# Enable packet-level debugging\r\nModbusLogger.enable_protocol_debug()\r\n\r\n# Now all Modbus communications are logged:\r\n# 2024-08-30 10:15:23 [DEBUG] Sending: 01 03 00 00 00 0A C5 CD\r\n# 2024-08-30 10:15:23 [DEBUG] Received: 01 03 14 00 64 00 C8 01 2C 01 90 01 F4 02 58 02 BC 03 20 03 84 E5 C6\r\n```\r\n\r\n### Performance Monitoring\r\n\r\n```python\r\nimport asyncio\r\nimport time\r\nfrom modbuslink import AsyncModbusClient, AsyncTcpTransport\r\n\r\nasync def performance_benchmark():\r\n \"\"\"Measure ModbusLink performance\"\"\"\r\n \r\n client = AsyncModbusClient(AsyncTcpTransport('192.168.1.100'))\r\n \r\n async with client:\r\n # Benchmark concurrent operations\r\n start_time = time.time()\r\n \r\n # 100 concurrent read operations\r\n tasks = [\r\n client.read_holding_registers(1, i*10, 10) \r\n for i in range(100)\r\n ]\r\n \r\n results = await asyncio.gather(*tasks)\r\n \r\n end_time = time.time()\r\n duration = end_time - start_time\r\n \r\n print(f\"\ud83d\ude80 Performance Results:\")\r\n print(f\" \u2022 Operations: {len(tasks)}\")\r\n print(f\" \u2022 Total time: {duration:.2f} seconds\")\r\n print(f\" \u2022 Operations/sec: {len(tasks)/duration:.1f}\")\r\n print(f\" \u2022 Avg response time: {duration*1000/len(tasks):.1f} ms\")\r\n\r\n# Run benchmark\r\nasyncio.run(performance_benchmark())\r\n```\r\n\r\n## \ud83d\udcc8 Supported Modbus Functions\r\n\r\nComplete **Modbus specification** implementation:\r\n\r\n| Function Code | Name | Description | Use Case |\r\n|---------------|------|-------------|----------|\r\n| **0x01** | Read Coils | Read 1-2000 coil status | Digital outputs (pumps, valves, motors) |\r\n| **0x02** | Read Discrete Inputs | Read 1-2000 input status | Digital sensors (limit switches, buttons) |\r\n| **0x03** | Read Holding Registers | Read 1-125 register values | Analog outputs (setpoints, parameters) |\r\n| **0x04** | Read Input Registers | Read 1-125 input values | Analog inputs (temperature, pressure) |\r\n| **0x05** | Write Single Coil | Write one coil | Control single device (start pump) |\r\n| **0x06** | Write Single Register | Write one register | Set single parameter (temperature setpoint) |\r\n| **0x0F** | Write Multiple Coils | Write 1-1968 coils | Batch control (production sequence) |\r\n| **0x10** | Write Multiple Registers | Write 1-123 registers | Batch parameters (recipe download) |\r\n\r\n### Transport Layer Architecture\r\n\r\nModbusLink's **layered design** supports all major Modbus variants:\r\n\r\n#### Synchronous Transports\r\n- \ud83c\udf10 **TcpTransport**: Ethernet Modbus TCP/IP (IEEE 802.3)\r\n- \ud83d\udcde **RtuTransport**: Serial Modbus RTU (RS232/RS485)\r\n- \ud83d\udcdc **AsciiTransport**: Serial Modbus ASCII (7-bit text)\r\n\r\n#### Asynchronous Transports \r\n- \u26a1 **AsyncTcpTransport**: High-performance TCP (1000+ concurrent connections)\r\n- \u26a1 **AsyncRtuTransport**: Non-blocking serial RTU\r\n- \u26a1 **AsyncAsciiTransport**: Non-blocking serial ASCII\r\n\r\n### Key Performance Metrics\r\n\r\n| Metric | Sync Client | Async Client | Async Server |\r\n|--------|-------------|--------------|-------------|\r\n| **Throughput** | 100 ops/sec | 1000+ ops/sec | 5000+ ops/sec |\r\n| **Connections** | 1 | 1000+ | 1000+ |\r\n| **Memory Usage** | Low | Medium | Medium |\r\n| **CPU Usage** | Low | Very Low | Low |\r\n| **Latency** | 10-50ms | 5-20ms | 1-10ms |\r\n\r\n## \ud83d\udcc1 Project Architecture\r\n\r\n**Clean, maintainable, and extensible** codebase structure:\r\n\r\n```\r\nModbusLink/\r\n\u251c\u2500\u2500 src/modbuslink/\r\n\u2502 \u251c\u2500\u2500 client/ # \ud83d\udcf1 Client Layer\r\n\u2502 \u2502 \u251c\u2500\u2500 sync_client.py # Synchronous Modbus client\r\n\u2502 \u2502 \u2514\u2500\u2500 async_client.py # Asynchronous client with callbacks\r\n\u2502 \u2502\r\n\u2502 \u251c\u2500\u2500 server/ # \ud83d\udda5\ufe0f Server Layer \r\n\u2502 \u2502 \u251c\u2500\u2500 data_store.py # Thread-safe data storage\r\n\u2502 \u2502 \u251c\u2500\u2500 async_base_server.py # Server base class\r\n\u2502 \u2502 \u251c\u2500\u2500 async_tcp_server.py # Multi-client TCP server\r\n\u2502 \u2502 \u251c\u2500\u2500 async_rtu_server.py # Serial RTU server\r\n\u2502 \u2502 \u2514\u2500\u2500 async_ascii_server.py # Serial ASCII server\r\n\u2502 \u2502\r\n\u2502 \u251c\u2500\u2500 transport/ # \ud83d\ude9a Transport Layer\r\n\u2502 \u2502 \u251c\u2500\u2500 base.py # Sync transport interface\r\n\u2502 \u2502 \u251c\u2500\u2500 async_base.py # Async transport interface\r\n\u2502 \u2502 \u251c\u2500\u2500 tcp.py # TCP/IP implementation\r\n\u2502 \u2502 \u251c\u2500\u2500 rtu.py # RTU serial implementation\r\n\u2502 \u2502 \u251c\u2500\u2500 ascii.py # ASCII serial implementation\r\n\u2502 \u2502 \u251c\u2500\u2500 async_tcp.py # Async TCP with connection pooling\r\n\u2502 \u2502 \u251c\u2500\u2500 async_rtu.py # Async RTU with frame detection\r\n\u2502 \u2502 \u2514\u2500\u2500 async_ascii.py # Async ASCII with message parsing\r\n\u2502 \u2502\r\n\u2502 \u251c\u2500\u2500 utils/ # \ud83d\udd27 Utility Layer\r\n\u2502 \u2502 \u251c\u2500\u2500 crc.py # CRC16 validation (RTU)\r\n\u2502 \u2502 \u251c\u2500\u2500 coder.py # Data type conversion\r\n\u2502 \u2502 \u2514\u2500\u2500 logging.py # Advanced logging system\r\n\u2502 \u2502\r\n\u2502 \u2514\u2500\u2500 common/ # \ud83d\udee0\ufe0f Common Components\r\n\u2502 \u2514\u2500\u2500 exceptions.py # Custom exception hierarchy\r\n\u2502\r\n\u251c\u2500\u2500 examples/ # \ud83d\udcda Usage Examples\r\n\u2502 \u251c\u2500\u2500 sync_tcp_example.py # Basic TCP client\r\n\u2502 \u251c\u2500\u2500 async_tcp_example.py # High-performance async client\r\n\u2502 \u251c\u2500\u2500 sync_rtu_example.py # Serial RTU communication \r\n\u2502 \u251c\u2500\u2500 async_rtu_example.py # Async RTU with error recovery\r\n\u2502 \u251c\u2500\u2500 sync_ascii_example.py # ASCII mode debugging\r\n\u2502 \u251c\u2500\u2500 async_ascii_example.py # Async ASCII communication\r\n\u2502 \u251c\u2500\u2500 async_tcp_server_example.py # Multi-client TCP server\r\n\u2502 \u251c\u2500\u2500 async_rtu_server_example.py # RTU field device simulator\r\n\u2502 \u251c\u2500\u2500 async_ascii_server_example.py # ASCII device emulator\r\n\u2502 \u2514\u2500\u2500 multi_server_example.py # Multi-protocol gateway\r\n\u2502\r\n\u2514\u2500\u2500 docs/ # \ud83d\udcdc Documentation\r\n \u251c\u2500\u2500 api/ # API reference\r\n \u251c\u2500\u2500 guides/ # User guides\r\n \u2514\u2500\u2500 examples/ # Advanced examples\r\n```\r\n\r\n## \ud83d\udcda Examples\r\n\r\nExplore **real-world scenarios** in the [examples](examples/) directory:\r\n\r\n### \ud83d\udd04 Synchronous Examples\r\n- **Industrial Control**: Basic sync operations for PLCs and field devices\r\n- **Data Acquisition**: Reliable data collection from sensors\r\n- **Device Configuration**: Parameter setup and calibration\r\n\r\n### \u26a1 Asynchronous Examples \r\n- **SCADA Systems**: High-performance monitoring of multiple devices\r\n- **IoT Gateways**: Concurrent communication with hundreds of sensors\r\n- **Real-time Control**: Sub-millisecond response applications\r\n\r\n### \ud83d\udda5\ufe0f Server Examples\r\n- **Device Simulators**: Test HMI applications without physical hardware\r\n- **Protocol Gateways**: Bridge different Modbus variants\r\n- **Training Systems**: Educational Modbus lab setups\r\n\r\n### \ud83c\udf86 Advanced Features\r\n- **Multi-Protocol**: Run TCP, RTU, and ASCII servers simultaneously \r\n- **Error Recovery**: Automatic reconnection and retry logic\r\n- **Performance Tuning**: Optimize for your specific use case\r\n- **Production Deployment**: Best practices for 24/7 operation\r\n\r\n## \u2699\ufe0f System Requirements\r\n\r\n### Core Requirements\r\n- **Python**: 3.8+ (3.9+ recommended for best performance)\r\n- **Operating System**: Windows, Linux, macOS\r\n- **Memory**: Minimum 64MB RAM\r\n- **Network**: TCP/IP stack for Modbus TCP\r\n- **Serial Ports**: RS232/RS485 for RTU/ASCII\r\n\r\n### Dependencies\r\n```bash\r\n# Core dependencies (automatically installed)\r\npyserial >= 3.5 # Serial port communication\r\npyserial-asyncio >= 0.6 # Async serial support \r\ntyping_extensions >= 4.0.0 # Enhanced type hints\r\n\r\n# Development dependencies (optional)\r\npytest >= 7.0 # Unit testing\r\npytest-mock >= 3.0 # Test mocking\r\nblack >= 22.0 # Code formatting\r\nruff >= 0.1.0 # Code linting\r\nmypy >= 1.0 # Type checking\r\n```\r\n\r\n### Performance Recommendations\r\n- **CPU**: Multi-core recommended for async servers (2+ cores)\r\n- **Network**: Gigabit Ethernet for high-throughput TCP applications\r\n- **Serial**: USB-to-RS485 converters with FTDI chipsets\r\n- **Python**: Use CPython for best performance (avoid PyPy for serial I/O)\r\n\r\n## \ud83d\udcdc License & Contributing\r\n\r\n**MIT License** - Free for commercial use. See [LICENSE.txt](LICENSE.txt) for details.\r\n\r\n### Contributing Guidelines\r\n\r\n**We welcome contributions!** Please:\r\n\r\n1. \ud83c\udf7f **Fork** the repository\r\n2. \ud83c\udf31 **Create** a feature branch\r\n3. \u2728 **Add** tests for new functionality\r\n4. \ud83d\udcdd **Update** documentation\r\n5. \ud83d\ude80 **Submit** a pull request\r\n\r\n**Areas where we need help:**\r\n- Additional Modbus function codes (0x14, 0x15, 0x16, 0x17)\r\n- Performance optimizations\r\n- Additional transport protocols (Modbus Plus, etc.)\r\n- Documentation improvements\r\n- Real-world testing and bug reports\r\n\r\n### Community & Support\r\n\r\n- \ud83d\udcac **GitHub Issues**: Bug reports and feature requests\r\n- \ud83d\udce7 **Email Support**: Technical questions and consulting\r\n- \ud83d\udcda **Documentation**: Comprehensive guides and API reference\r\n- \ud83c\udf86 **Examples**: Production-ready code samples\r\n\r\n---\r\n\r\n<div align=\"center\">\r\n\r\n**Built with \u2764\ufe0f for the Industrial Automation Community**\r\n\r\n*ModbusLink - Connecting Industrial Systems with Modern Python*\r\n\r\n</div>\r\n",
"bugtrack_url": null,
"license": null,
"summary": "\u73b0\u4ee3\u5316\u3001\u529f\u80fd\u5f3a\u5927\u3001\u5f00\u53d1\u8005\u53cb\u597d\u4e14\u9ad8\u5ea6\u53ef\u6269\u5c55\u7684Python Modbus\u5e93 | Modern, powerful, developer-friendly and highly scalable Python Modbus library",
"version": "1.2.0",
"project_urls": {
"Homepage": "https://github.com/Miraitowa-la/ModbusLink",
"Issues": "https://github.com/Miraitowa-la/ModbusLink/issues",
"Repository": "https://github.com/Miraitowa-la/ModbusLink"
},
"split_keywords": [
"modbus",
" industrial",
" automation",
" communication",
" protocol",
" tcp",
" rtu",
" ascii",
" serial",
" plc",
" scada",
" iot"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "dbfe10bf3c743a66d5226e1cb64cdaf7571dff65ee8d2d2a9b1355a2191c67ea",
"md5": "ac790b76f72aa8117bc8aeeaa5086440",
"sha256": "c3e105725e1f5bc4c65a717f77697035b654abfe578912cf39cb98071e141073"
},
"downloads": -1,
"filename": "modbuslink-1.2.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ac790b76f72aa8117bc8aeeaa5086440",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 74978,
"upload_time": "2025-08-30T06:29:19",
"upload_time_iso_8601": "2025-08-30T06:29:19.903689Z",
"url": "https://files.pythonhosted.org/packages/db/fe/10bf3c743a66d5226e1cb64cdaf7571dff65ee8d2d2a9b1355a2191c67ea/modbuslink-1.2.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "9aa392f0c19d776051684d127968042b6c9f1cef65eb62c48926b1392088510e",
"md5": "915ef5aa1865fa84083bc0045a76a6fb",
"sha256": "9c6db9e4fe0ad86458fbfb970bc15caa10c20a509340953bd688e9197c2c635b"
},
"downloads": -1,
"filename": "modbuslink-1.2.0.tar.gz",
"has_sig": false,
"md5_digest": "915ef5aa1865fa84083bc0045a76a6fb",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 71281,
"upload_time": "2025-08-30T06:29:21",
"upload_time_iso_8601": "2025-08-30T06:29:21.654255Z",
"url": "https://files.pythonhosted.org/packages/9a/a3/92f0c19d776051684d127968042b6c9f1cef65eb62c48926b1392088510e/modbuslink-1.2.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-30 06:29:21",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "Miraitowa-la",
"github_project": "ModbusLink",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "modbuslink"
}