当前位置: 首页 > news >正文

用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码)

用Python和pymodbus实现Modbus RTU主从通信全流程实战

当你需要测试Modbus设备却手头没有PLC硬件时,Python的pymodbus库能帮你搭建完整的虚拟测试环境。本文将带你从零开始,用代码构建Modbus RTU主站和从站,实现工业通信协议的核心功能交互。

1. 环境搭建与基础配置

首先确保你的Python环境已安装3.7+版本。通过pip安装必要依赖:

pip install pymodbus==3.0.0 serial==0.0.97

pymodbus 3.0的重大改进包括:

  • 异步IO支持提升通信效率
  • 更完善的RTU帧处理机制
  • 简化的同步/异步API切换

创建虚拟串口的两种方案对比:

工具适用平台配置复杂度性能表现
com0comWindows中等优秀
socatLinux/macOS简单良好
虚拟串口工具跨平台复杂一般

提示:开发阶段推荐使用socat,一条命令即可创建虚拟串口对:
socat -d -d pty,raw,echo=0 pty,raw,echo=0

2. 构建Modbus从站模拟器

从站是数据提供方,我们先实现一个支持核心功能码的虚拟设备:

from pymodbus.server import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext def run_slave(): # 初始化数据存储 coil_block = ModbusSequentialDataBlock(0x00, [False]*100) reg_block = ModbusSequentialDataBlock(0x00, [0]*100) store = ModbusSlaveContext( di=coil_block, co=coil_block, hr=reg_block, ir=reg_block ) context = ModbusServerContext(slaves=store, single=True) # 启动RTU从站 StartSerialServer( context=context, port='/dev/ttyS1', # 修改为你的虚拟串口 framer=ModbusRtuFramer, baudrate=19200, timeout=0.005 )

关键参数解析:

  • timeout:影响RTU帧间隔检测,典型值3.5个字符时间
  • baudrate:需与主站严格一致,常见值9600/19200/38400
  • datastore:维护四种Modbus对象的数据状态

3. 实现Modbus主站客户端

主站作为通信发起方,需要处理更复杂的超时重试机制:

from pymodbus.client import ModbusSerialClient from pymodbus.exceptions import ModbusIOException class ModbusMaster: def __init__(self, port): self.client = ModbusSerialClient( method='rtu', port=port, baudrate=19200, timeout=1, retries=3 ) def read_holding_registers(self, addr, count, unit=1): try: response = self.client.read_holding_registers( address=addr, count=count, slave=unit ) if response.isError(): raise ModbusIOException(str(response)) return response.registers except Exception as e: print(f"Read failed: {str(e)}") return None def write_register(self, addr, value, unit=1): return self.client.write_register( address=addr, value=value, slave=unit )

通信异常处理要点:

  • CRC校验失败会自动重试
  • 响应超时需业务层判断
  • 从站忙状态(0x06)需延迟重试

4. 核心功能码实战演示

4.1 03功能码:读保持寄存器

典型请求-响应过程:

主站发送: 01 03 00 00 00 02 C4 0B 从站回复: 01 03 04 00 0A 00 0B 85 84

对应Python实现:

# 主站读取寄存器0x0000-0x0001 registers = master.read_holding_registers(0, 2) print(f"Register values: {registers}") # 从站预处理方法(在StartSerialServer前添加) def process_03_request(context, client): address = client.registers[0].address values = [0x000A, 0x000B] # 模拟数据 return ModbusRegisterValues(values)

4.2 06功能码:写单个寄存器

完整数据流示例:

主站发送: 01 06 00 02 00 FF 89 F8 从站回复: 01 06 00 02 00 FF 89 F8

Python交互代码:

# 主站写入操作 result = master.write_register(2, 0x00FF) if not result.isError(): print("Write success") # 从站数据验证 assert store.getValues(3, 2)[0] == 0x00FF

5. 高级功能与调试技巧

5.1 自定义异常响应

实现从站主动返回错误码:

from pymodbus.pdu import ExceptionResponse def process_request(self, request): if request.function_code == 0x03: if request.address > 100: return ExceptionResponse( request.function_code, ModbusExceptions.IllegalAddress ) return super().process_request(request)

常见异常代码:

代码含义触发条件
0x01非法功能码未实现的功能码请求
0x02非法数据地址地址超出从站范围
0x03非法数据值数据不符合从站要求
0x04从站设备故障从站执行过程中发生错误

5.2 通信质量监测指标

通过以下参数评估通信可靠性:

# 在主站类中添加统计方法 def get_communication_metrics(self): return { 'success_rate': self.success_count / self.total_requests, 'avg_response_time': self.total_latency / self.success_count, 'crc_errors': self.crc_error_count, 'timeouts': self.timeout_count }

优化建议:

  • 调整timeout值平衡响应速度与误判率
  • 大数据量传输时适当降低波特率
  • 关键操作添加二次验证机制

6. 典型问题排查指南

6.1 常见错误现象与解决方案

现象可能原因排查步骤
通信完全无响应串口配置错误1. 检查端口号
2. 验证波特率
3. 测试串口连通性
偶发性CRC校验失败电磁干扰/波特率不匹配1. 降低波特率测试
2. 检查硬件连接
3. 添加磁环
响应数据格式错误从站数据处理逻辑异常1. 抓取原始数据帧
2. 检查从站数据存储
3. 验证功能码实现
主站收到无效从站地址响应从站地址冲突/广播问题1. 检查从站ID配置
2. 禁用广播功能
3. 网络终端电阻检查

6.2 数据帧分析工具推荐

  • Windows平台

    • Modbus Poll(主站模拟)
    • Modbus Slave(从站模拟)
    • AccessPort(串口监控)
  • Linux/macOS

    # 十六进制查看串口数据 stty -F /dev/ttyS1 19200 raw cat /dev/ttyS1 | hexdump -C
  • 跨平台方案

    # 使用pyserial嗅探数据 import serial sniffer = serial.Serial('/dev/ttyS1', 19200) while True: print(sniffer.read().hex())

在实际项目中,建议先通过虚拟串口测试所有功能逻辑,再迁移到物理设备。当遇到通信问题时,采用分治法逐步隔离问题范围——先确保物理层连通性,再验证协议层交互,最后检查业务数据处理逻辑。

http://www.zskr.cn/news/1478400.html

相关文章:

  • 告别依赖地狱:用AppImage在Ubuntu 22.04上安装最新版Neovim(附FUSE问题解决)
  • 命令行一键下载百度搜图结果,轻量Python脚本支持自定义页数和保存路径
  • 【字节跳动】SEED·C语言宏定义版(.h头文件)
  • ai赋能matlab编程:通过快马调用大模型智能生成遗传算法求解优化问题
  • 从Point A到BWP:手把手拆解5G NR物理资源分配的完整逻辑链
  • 免费Colab跑通LLaMA 2聊天机器人:4-bit量化+Gradio实战指南
  • PointMVSNet ICCV‘19可运行复现包:论文+中文详解+带注释代码+一键训练测试脚本
  • 解决ORB-SLAM3相机快速转动丢失?试试用GCNv2替换特征点提取器(Ubuntu 18.04 + CUDA 10.2实战)
  • 从安装到实战:用快马AI生成支持动态页面与数据入库的openclaw项目模板
  • 手把手教你用C++实现PL/0表达式语法分析器(附完整源码与递归下降子程序详解)
  • 大模型推理的五行养生调优术:从 FP16 大权重到 INT8/INT4 显存剪枝的“炼丹优化之道”
  • 桂林六大黄金回收同城上门报价详解 2026年6月高位变现这样最划算 - 余生黄金回收
  • 计算即组织:从生命系统到人工系统的计算新范式
  • LLM推理本质:残差流几何与高维模式匹配
  • DPDK三层转发性能测试:手把手教你用l3fwd和pktgen搭建双机测试环境(含常见参数解析)
  • 新手必看:用C++ switch和if-else两种方法搞定‘简单计算器’(附除零错误处理)
  • AWS云上NLP流水线实战:从爬虫到聚类的工业级部署指南
  • 5分钟掌握终极虚拟机检测:VMDE完整指南让您快速识别虚拟环境
  • AgentKit深度解析:轻量级LLM代理编排框架实战指南
  • 别只背单词了!从国科大英语Unit1看学术文本的5种行文结构(含真题拆解)
  • TypeScript 从零基础到精通(四):面向对象编程(类与继承)
  • 巴彦淖尔市2026年最新黄金+白银+铂金+K金回收门店及联系方式电话推荐 黄金回收店铺TOP5排行榜 - 盛世金银回收
  • 【字节跳动】本文揭示了AI大模型工业部署中的六大硬性配置规则:1) 严格的张量维度锁定,如情感分支固定768维区间触发拦截;2) 内存分页采用4KB标准页,设置512KB缓存阈值和16.7MB防溢出临
  • TLV75533PDBVR在物联网与便携医疗中的电源方案:25µA Iq的电池友好选择
  • 桂林连锁黄金回收全区县上门报价盘点 2026年6月六家品牌实测对比 - 余生黄金回收
  • 当你的Side Project有了“瓦格纳式”的野心:如何管理创意、债务与偏执
  • 桂林正规黄金回收闲置金变现避坑指南 2026年6月六家靠谱门店实测 - 余生黄金回收
  • 别再手动拼接字符串了!XXL-Job多参数传递的3种优雅方案(附JSON/Map实战代码)
  • 东莞市黄金回收店铺TOP5排行榜 2026年最新黄金+白银+铂金+K金回收门店及联系方式电话推荐 - 大熊猫898989
  • 白城市2026年最新黄金+白银+铂金+K金回收门店及联系方式电话推荐 黄金回收店铺TOP5排行榜 - 盛世金银回收