用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码)
用Python和pymodbus实现Modbus RTU主从通信全流程实战
当你需要测试Modbus设备却手头没有PLC硬件时,Python的pymodbus库能帮你搭建完整的虚拟测试环境。本文将带你从零开始,用代码构建Modbus RTU主站和从站,实现工业通信协议的核心功能交互。
1. 环境搭建与基础配置
首先确保你的Python环境已安装3.7+版本。通过pip安装必要依赖:
pip install pymodbus==3.0.0 serial==0.0.97pymodbus 3.0的重大改进包括:
- 异步IO支持提升通信效率
- 更完善的RTU帧处理机制
- 简化的同步/异步API切换
创建虚拟串口的两种方案对比:
| 工具 | 适用平台 | 配置复杂度 | 性能表现 |
|---|---|---|---|
| com0com | Windows | 中等 | 优秀 |
| socat | Linux/macOS | 简单 | 良好 |
| 虚拟串口工具 | 跨平台 | 复杂 | 一般 |
提示:开发阶段推荐使用socat,一条命令即可创建虚拟串口对:
socat -d -d pty,raw,echo=0 pty,raw,echo=0
2. 构建Modbus从站模拟器
从站是数据提供方,我们先实现一个支持核心功能码的虚拟设备:
from pymodbus.server import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext def run_slave(): # 初始化数据存储 coil_block = ModbusSequentialDataBlock(0x00, [False]*100) reg_block = ModbusSequentialDataBlock(0x00, [0]*100) store = ModbusSlaveContext( di=coil_block, co=coil_block, hr=reg_block, ir=reg_block ) context = ModbusServerContext(slaves=store, single=True) # 启动RTU从站 StartSerialServer( context=context, port='/dev/ttyS1', # 修改为你的虚拟串口 framer=ModbusRtuFramer, baudrate=19200, timeout=0.005 )关键参数解析:
timeout:影响RTU帧间隔检测,典型值3.5个字符时间baudrate:需与主站严格一致,常见值9600/19200/38400datastore:维护四种Modbus对象的数据状态
3. 实现Modbus主站客户端
主站作为通信发起方,需要处理更复杂的超时重试机制:
from pymodbus.client import ModbusSerialClient from pymodbus.exceptions import ModbusIOException class ModbusMaster: def __init__(self, port): self.client = ModbusSerialClient( method='rtu', port=port, baudrate=19200, timeout=1, retries=3 ) def read_holding_registers(self, addr, count, unit=1): try: response = self.client.read_holding_registers( address=addr, count=count, slave=unit ) if response.isError(): raise ModbusIOException(str(response)) return response.registers except Exception as e: print(f"Read failed: {str(e)}") return None def write_register(self, addr, value, unit=1): return self.client.write_register( address=addr, value=value, slave=unit )通信异常处理要点:
- CRC校验失败会自动重试
- 响应超时需业务层判断
- 从站忙状态(0x06)需延迟重试
4. 核心功能码实战演示
4.1 03功能码:读保持寄存器
典型请求-响应过程:
主站发送: 01 03 00 00 00 02 C4 0B 从站回复: 01 03 04 00 0A 00 0B 85 84对应Python实现:
# 主站读取寄存器0x0000-0x0001 registers = master.read_holding_registers(0, 2) print(f"Register values: {registers}") # 从站预处理方法(在StartSerialServer前添加) def process_03_request(context, client): address = client.registers[0].address values = [0x000A, 0x000B] # 模拟数据 return ModbusRegisterValues(values)4.2 06功能码:写单个寄存器
完整数据流示例:
主站发送: 01 06 00 02 00 FF 89 F8 从站回复: 01 06 00 02 00 FF 89 F8Python交互代码:
# 主站写入操作 result = master.write_register(2, 0x00FF) if not result.isError(): print("Write success") # 从站数据验证 assert store.getValues(3, 2)[0] == 0x00FF5. 高级功能与调试技巧
5.1 自定义异常响应
实现从站主动返回错误码:
from pymodbus.pdu import ExceptionResponse def process_request(self, request): if request.function_code == 0x03: if request.address > 100: return ExceptionResponse( request.function_code, ModbusExceptions.IllegalAddress ) return super().process_request(request)常见异常代码:
| 代码 | 含义 | 触发条件 |
|---|---|---|
| 0x01 | 非法功能码 | 未实现的功能码请求 |
| 0x02 | 非法数据地址 | 地址超出从站范围 |
| 0x03 | 非法数据值 | 数据不符合从站要求 |
| 0x04 | 从站设备故障 | 从站执行过程中发生错误 |
5.2 通信质量监测指标
通过以下参数评估通信可靠性:
# 在主站类中添加统计方法 def get_communication_metrics(self): return { 'success_rate': self.success_count / self.total_requests, 'avg_response_time': self.total_latency / self.success_count, 'crc_errors': self.crc_error_count, 'timeouts': self.timeout_count }优化建议:
- 调整timeout值平衡响应速度与误判率
- 大数据量传输时适当降低波特率
- 关键操作添加二次验证机制
6. 典型问题排查指南
6.1 常见错误现象与解决方案
| 现象 | 可能原因 | 排查步骤 |
|---|---|---|
| 通信完全无响应 | 串口配置错误 | 1. 检查端口号 2. 验证波特率 3. 测试串口连通性 |
| 偶发性CRC校验失败 | 电磁干扰/波特率不匹配 | 1. 降低波特率测试 2. 检查硬件连接 3. 添加磁环 |
| 响应数据格式错误 | 从站数据处理逻辑异常 | 1. 抓取原始数据帧 2. 检查从站数据存储 3. 验证功能码实现 |
| 主站收到无效从站地址响应 | 从站地址冲突/广播问题 | 1. 检查从站ID配置 2. 禁用广播功能 3. 网络终端电阻检查 |
6.2 数据帧分析工具推荐
Windows平台:
- Modbus Poll(主站模拟)
- Modbus Slave(从站模拟)
- AccessPort(串口监控)
Linux/macOS:
# 十六进制查看串口数据 stty -F /dev/ttyS1 19200 raw cat /dev/ttyS1 | hexdump -C跨平台方案:
# 使用pyserial嗅探数据 import serial sniffer = serial.Serial('/dev/ttyS1', 19200) while True: print(sniffer.read().hex())
在实际项目中,建议先通过虚拟串口测试所有功能逻辑,再迁移到物理设备。当遇到通信问题时,采用分治法逐步隔离问题范围——先确保物理层连通性,再验证协议层交互,最后检查业务数据处理逻辑。
