当前位置: 首页 > news >正文

串口关键字抓取

带环形缓冲区

import serial
import time
from datetime import datetime  # 用于获取精确到秒的系统时间class RingBuffer:"""自定义15字符长度的环形缓冲区"""def __init__(self, capacity=8):self.capacity = capacity  # 缓冲区容量固定为15self.buffer = []  # 存储字符的列表self.last_valid_time = None  # 记录上一次有效接收(U-Boot)的时间戳def add_char(self, char):"""添加字符到缓冲区,超出容量则移除最旧字符"""if len(self.buffer) >= self.capacity:self.buffer.pop(0)  # 移除头部最旧字符self.buffer.append(char)  # 新增字符到尾部def get_buffer_str(self):"""返回缓冲区拼接后的字符串"""return ''.join(self.buffer)def reset(self):"""清空缓冲区"""self.buffer = []def serial_monitor_demo(port="COM3", bps=38400, timeout=5,  # 串口单次读取超时(与wait_time保持一致)wait_time=100000,  # 每次接收的最大等待时间target_str="U-Boot",  # 指定接收的目标ASCII字符串
):ser = Nonei = 0  # 成功接收目标字符串计数j = 0  # 超时计数k = 0  # 接收非目标字符串计数total_tests = 0  # 总测试数(i+j+k)# 初始化15字符长度的环形缓冲区ring_buffer = RingBuffer(capacity=15)try:# 1. 初始化并打开串口(仅打开一次,持续保持连接)ser = serial.Serial(port=port, baudrate=bps, timeout=wait_time,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,xonxoff=False,  # 关闭软件流控rtscts=False    # 关闭硬件流控)if ser.isOpen():open_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"RMC_AC压测_测试复位引脚台阶_程序\n[{open_time}] 串口 {port} 打开成功(波特率:{bps}),持续等待接收目标ASCII字符串:{target_str}(单次超时时间设定为:{wait_time}s)\n")# 2. 持续等待接收数据(串口保持打开,循环接收)while True:received_data = b""  # 存储单次接收的字节数据start_time = time.time()  # 记录本次接收开始时间has_received_data = False  # 标记是否接收到任何数据# 单次接收等待逻辑(超时时间=wait_time)while (time.time() - start_time) < wait_time:if ser.in_waiting > 0:received_data += ser.read(ser.in_waiting)has_received_data = True  # 标记已接收到数据# ASCII解码(忽略非ASCII无效字符)received_str = received_data.decode("ascii", errors="ignore")# 将接收字符逐个加入环形缓冲区for char in received_str:ring_buffer.add_char(char)# 检查缓冲区中是否包含目标字符串buffer_str = ring_buffer.get_buffer_str()if target_str in buffer_str:i += 1total_tests = i + j + kcurrent_time = datetime.now()success_time = current_time.strftime("%Y-%m-%d %H:%M:%S")# 计算与上一次有效接收的时间间隔time_interval = 0if ring_buffer.last_valid_time is not None:time_interval = (current_time - ring_buffer.last_valid_time).total_seconds()# 更新上一次有效接收时间ring_buffer.last_valid_time = current_time# 打印有效接收信息(含时间间隔)print(f"[{success_time}] RMC成功拉起,拉起数:{i} 总测试数:{total_tests} | 与上一次有效接收间隔:{time_interval:.1f}秒")ring_buffer.reset()  # 清空环形缓冲区ser.flushInput()  # 清空串口缓冲区#time.sleep(0.3)  # 隔离尾段字符break  # 退出本次等待,进入下一次接收循环# 3. 判断是否接收到非目标字符串if has_received_data and target_str not in received_data.decode("ascii", errors="ignore"):k += 1total_tests = i + j + knon_target_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{non_target_time}] ---非目标字符串---,非目标数:{k} 超时数:{j} 总测试数:{total_tests}")# 4. 判断本次接收是否超时(仅无任何数据时计数超时)elapsed_time = time.time() - start_timeif elapsed_time >= wait_time - 0.01 and not has_received_data:j += 1total_tests = i + j + ktimeout_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{timeout_time}] ---超时警告---:本次等待{wait_time}s未接收到任何数据,超时数:{j} 非目标字符串数:{k} 总测试数:{total_tests}")#time.sleep(0.1)  # 降低CPU占用except KeyboardInterrupt:# 手动终止程序(Ctrl+C)stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"\n[{stop_time}] ---程序手动终止---")print(f"最终统计:成功拉起数:{i},非目标字符串数:{k},超时数:{j},总测试数:{total_tests}")# 新增:计算各场景占比(便于分析)if total_tests > 0:print(f"成功率:{i/total_tests:.2%},非目标占比:{k/total_tests:.2%},超时占比:{j/total_tests:.2%}")except Exception as e:# 串口操作异常exception_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"\n[{exception_time}] ---串口操作异常---:{str(e)}")print(f"异常时统计:成功拉起数:{i},非目标字符串数:{k},超时数:{j} 总测试数:{total_tests}")finally:# 程序终止时必关串口(释放资源)if ser and ser.isOpen():ser.close()final_close_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{final_close_time}] 串口 {port} 已关闭")# 执行函数
if __name__ == "__main__":serial_monitor_demo()

 

延时等待:

import serial
import time
from datetime import datetime  # 用于获取精确到秒的系统时间def serial_monitor_demo(port="COM3", bps=38400, timeout=5,  # 串口单次读取超时(与wait_time保持一致)wait_time=0.3,  # 每次接收的最大等待时间(1秒)target_str="U-Boot",  # 指定接收的目标ASCII字符串(可修改)#write_data=""  # 初始写入数据(不需要可注释)
):ser = Nonei = 0  # 成功接收目标字符串计数j = 0  # 超时计数k = 0  # 接收非目标字符串计数(新增)total_tests = 0  # 总测试数(i+j+k)try:# 1. 初始化并打开串口(仅打开一次,持续保持连接)ser = serial.Serial(port=port, baudrate=bps, timeout=wait_time,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,xonxoff=False,  # 关闭软件流控rtscts=False    # 关闭硬件流控)if ser.isOpen():open_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"RMC_AC压测_测试复位引脚台阶_程序\n[{open_time}] 串口 {port} 打开成功(波特率:{bps}),持续等待接收目标ASCII字符串:{target_str}(单次超时时间设定为:{wait_time}s)\n")# 可选:仅发送一次初始数据(不需要可注释)# write_bytes = write_data.encode("gbk")  # 中文用gbk,ASCII用encode("ascii")# ser.write(write_bytes)# send_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# print(f"[{send_time}] 已发送初始数据:{write_data}\n")# 2. 持续等待接收数据(串口保持打开,循环接收)while True:received_data = b""  # 存储单次接收的字节数据start_time = time.time()  # 记录本次接收开始时间has_received_data = False  # 标记是否接收到任何数据(新增)# 单次接收等待逻辑(超时时间=wait_time)while (time.time() - start_time) < wait_time:if ser.in_waiting > 0:received_data += ser.read(ser.in_waiting)has_received_data = True  # 标记已接收到数据# ASCII解码(忽略非ASCII无效字符)received_str = received_data.decode("ascii", errors="ignore")# 检查目标字符串if target_str in received_str:i += 1total_tests = i + j + ksuccess_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{success_time}] RMC成功拉起,拉起数:{i} 非目标字符串数:{k} 超时数:{j} 总测试数:{total_tests}")ser.flushInput()  # 清空缓冲区,准备下一次接收time.sleep(0.3)  # 隔离尾段字符break  # 退出本次等待,进入下一次接收循环# 3. 新增:判断是否接收到非目标字符串if has_received_data and target_str not in received_data.decode("ascii", errors="ignore"):k += 1total_tests = i + j + knon_target_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")# 打印非目标字符串详情(含实际接收内容,便于调试)received_str = received_data.decode("ascii", errors="ignore")#print(f"[{non_target_time}] ---非目标字符串---:接收内容:{received_str.strip()},非目标数:{k} 超时数:{j} 总测试数:{total_tests}")print(f"[{non_target_time}] ---非目标字符串---,非目标数:{k} 超时数:{j} 总测试数:{total_tests}")# 4. 判断本次接收是否超时(仅无任何数据时计数超时)elapsed_time = time.time() - start_timeif elapsed_time >= wait_time - 0.01 and not has_received_data:  # 新增:仅无数据时算超时j += 1total_tests = i + j + ktimeout_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{timeout_time}] ---超时警告---:本次等待{wait_time}s未接收到任何数据,超时数:{j} 非目标字符串数:{k} 总测试数:{total_tests}")time.sleep(0.1)  # 降低CPU占用,不影响接收响应except KeyboardInterrupt:# 手动终止程序(Ctrl+C)stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"\n[{stop_time}] ---程序手动终止---")print(f"最终统计:成功拉起数:{i},非目标字符串数:{k},超时数:{j},总测试数:{total_tests}")# 新增:计算各场景占比(便于分析)if total_tests > 0:print(f"成功率:{i/total_tests:.2%},非目标占比:{k/total_tests:.2%},超时占比:{j/total_tests:.2%}")except Exception as e:# 串口操作异常exception_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"\n[{exception_time}] ---串口操作异常---:{str(e)}")print(f"异常时统计:成功拉起数:{i},非目标字符串数:{k},超时数:{j},总测试数:{total_tests}")finally:# 程序终止时必关串口(释放资源)if ser and ser.isOpen():ser.close()final_close_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"[{final_close_time}] 串口 {port} 已关闭")# 执行函数
if __name__ == "__main__":serial_monitor_demo()

 

http://www.zskr.cn/news/71398.html

相关文章:

  • 奇奇怪怪的特性
  • 2025年12月振动时效机TOP3实力厂商新盘点:技术适配与服务特色双视角
  • 2025年广东回收基恩士测量仪品牌权威榜单:广东回收基恩士光电开关/广东回收基恩士控制器/广东回收传感器渠道精选
  • 深入解析:2025-11-07 ZYZ28-NOIP模拟赛-Round3 hetao1733837的record
  • 2025年12月人行通道闸机厂家最新实力TOP榜:速通门、摆闸、转闸、单向门选择指南
  • 叶轮加工行业十大头部企业市场占有率排名
  • 2025年武汉装修辅材平台排行榜,伟星大口径管/知名管/系列
  • 星模拟器研发领先厂家推荐,助力航天与科研创新
  • 松鼠Ai人工智能教育怎么样?
  • 【IEEE出版 | 上海理工大学主办】第六届机械自动化与智能制造国际学术会议(MAIM 2025)
  • 激光切割设备2025年顶尖厂商综合实力权威推荐榜单
  • 2025年汽车海外营销代运营公司推荐:B2B外贸企业Facebook、LinkedIn、TikTok、INS、Google一站式出海营销服务商精选(12月新版)
  • 2025年中国压力开关五大厂家推荐:看哪家技术水平高
  • 2025年五大全防护门窗品牌排行榜,亿合全防护门窗详细介绍及
  • C语言随堂笔记-2
  • 通过ZigBee技术来实现智能家居控制器的设计方案
  • 2025年成品户外泳池实力厂家权威盘点:成品泳池/室内成品泳池/冲浪成品泳池源头企业精选
  • 2025年中国伤口清创机设备行业市场分析报告及头部生产企业汇总
  • 南京留学中介排名TOP10发布!表现突出的不踩雷
  • 2025年纸咖啡杯机厂家及设备选购全指南
  • 2025 年 LAPP 电缆源头厂家最新推荐榜,聚焦企业产品品质与服务能力深度解析lapp供应商,lapp公司,lapp加工厂推荐
  • 2025年四川省发电机组生产实力排行榜
  • 二分环浏览器更新图标制作
  • 为什么国内许多著名开源项目经常虎头蛇尾?
  • 2025年成都殡葬公司权威推荐榜单:殡葬‌/公墓‌/殡仪一条龙‌源头公司精选
  • 2025年12月急救箱权威推荐榜单:院前/拉杆/医疗/应急/内科/外科/综合/便捷式/气管插管急救箱,全方位安全守护首选!
  • Spring配置接口WebMvcConfigurer
  • AI+安全:深信服以技术革新重塑网络安全新范式
  • 基于5G下行信号的模糊函数分析matlab仿真,对比速度模糊函数和距离模糊函数
  • 2025年防静电地坪公司权威推荐榜单:PVC防静电地板/聚氨酯防静电自流平地坪/NFJCC不发火防静电防爆地坪厂家精选