From 8823ef546ea9020a1fa69755abdb7dba0ead7053 Mon Sep 17 00:00:00 2001 From: JingweiCui Date: Fri, 21 Mar 2025 10:38:17 +0800 Subject: [PATCH] =?UTF-8?q?[add]=20frame=5Ffinder.py=20=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 用于从缓冲区中查找有效的帧消息 --- frame_finder.py | 170 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 frame_finder.py diff --git a/frame_finder.py b/frame_finder.py new file mode 100644 index 0000000..fd43db6 --- /dev/null +++ b/frame_finder.py @@ -0,0 +1,170 @@ +from typing import Optional, Tuple +import logging +from frame import FrameFormat + +class FrameFinder: + """帧查找器 - 在数据缓冲区中查找符合帧格式的数据""" + + def __init__(self): + """初始化帧查找器""" + self.frame_length = FrameFormat.FIXED_SIZE + FrameFormat.DATA_SIZE + self.frame_header = FrameFormat.FRAME_HEADER + self.logger = logging.getLogger(__name__) + + def find_frame_in_buffer(self, buffer: bytes, start_pos: int = 0) -> Tuple[Optional[bytes], int]: + """ + 在给定的缓冲区数据中查找下一个有效帧 + Args: + buffer: 要搜索的缓冲区数据 + start_pos: 开始搜索的位置 + Returns: + Tuple[Optional[bytes], int]: + - 找到的帧数据(如果没找到则为None) + - 下一次搜索的起始位置(如果找到帧,则是帧后的位置;如果没找到,则是需要保留的数据起始位置) + """ + buffer_size = len(buffer) + + # 检查起始位置是否有效 + if start_pos >= buffer_size: + self.logger.debug(f"起始位置无效: {start_pos} >= {buffer_size},将移除所有数据") + return None, buffer_size + + # 在缓冲区中查找帧头 + header_pos = self._find_header(buffer, start_pos) + if header_pos is None: + self.logger.debug(f"未找到帧头,将移除所有数据 ({buffer_size} 字节)") + if buffer_size > 0: + self.logger.debug(f"移除的数据: {buffer.hex()}") + return None, buffer_size # 返回缓冲区末尾位置,表示当前数据都可以丢弃 + + self.logger.debug(f"找到帧头位置: {header_pos}") + if header_pos > start_pos: + removed_data = buffer[start_pos:header_pos] + self.logger.debug(f"将移除帧头前的无效数据: {removed_data.hex() if len(removed_data) > 0 else '无'} ({len(removed_data)} 字节)") + + # 检查是否有完整帧的数据 + if buffer_size - header_pos < self.frame_length: + self.logger.debug(f"帧头后数据不足: {buffer_size - header_pos} < {self.frame_length}") + self.logger.debug(f"保留从位置 {header_pos} 开始的数据: {buffer[header_pos:].hex()}") + return None, header_pos # 返回帧头位置,表示需要保留从这个位置开始的数据 + + # 提取并验证帧 + frame_data = buffer[header_pos:header_pos + self.frame_length] + if self._verify_frame(frame_data): + self.logger.debug(f"找到有效帧: {frame_data.hex()}") + next_pos = header_pos + self.frame_length + remaining_data = buffer[next_pos:] if next_pos < buffer_size else b'' + self.logger.debug(f"处理完帧后剩余数据: {remaining_data.hex() if remaining_data else '无'} ({len(remaining_data)} 字节)") + return frame_data, next_pos + else: + self.logger.debug(f"帧验证失败,从下一个位置继续查找") + # 防止在同一位置反复查找 + next_search_pos = header_pos + len(self.frame_header) + if next_search_pos >= buffer_size: + self.logger.debug(f"搜索位置已到达缓冲区末尾,将移除所有数据") + return None, buffer_size + + removed_data = buffer[header_pos:next_search_pos] + self.logger.debug(f"移除无效帧头数据: {removed_data.hex()} ({len(removed_data)} 字节)") + return self.find_frame_in_buffer(buffer, next_search_pos) + + def _find_header(self, buffer: bytes, start_pos: int) -> Optional[int]: + """ + 在缓冲区中查找帧头 + Args: + buffer: 要搜索的缓冲区数据 + start_pos: 开始搜索的位置 + Returns: + Optional[int]: 帧头位置,如果未找到则返回None + """ + header_len = len(self.frame_header) + for i in range(start_pos, len(buffer) - header_len + 1): + if buffer[i:i+header_len] == self.frame_header: + return i + return None + + def _verify_frame(self, frame_data: bytes) -> bool: + """ + 验证帧的有效性 + Args: + frame_data: 要验证的帧数据 + Returns: + bool: 帧是否有效 + """ + # 使用FrameFormat验证帧 + frame = FrameFormat.parse_frame(frame_data) + return frame is not None + +if __name__ == "__main__": + # 使用示例 + import logging + + # 配置日志 + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + + # 创建帧查找器实例 + finder = FrameFinder() + + # 测试数据:包含3个完整帧,中间夹杂一些无效数据 + test_data = bytes.fromhex( + # 第一个有效帧 + "1CDFAAAAAAAA0200000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F3" + # 一些无效数据 + "FFFF1234" + # 第二个有效帧 + "1CDFAAAAAAAA0300000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F4" + # 不完整的帧头 + "1CDF12" + # 第三个有效帧 + "1CDFAAAAAAAA0400000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F5" + ) + + print("初始测试数据:") + print("总长度:", len(test_data)) + print("数据:", test_data.hex()) + print("\n开始查找帧:") + + # 模拟实际使用场景,从不同位置开始查找 + remaining_data = test_data + frame_count = 0 + + while len(remaining_data) > 0: + frame, next_pos = finder.find_frame_in_buffer(remaining_data) + if frame: + frame_count += 1 + print(f"\n找到第 {frame_count} 个帧:") + print(f"帧数据: {frame.hex()}") + print(f"下一个搜索位置: {next_pos}") + + # 解析帧内容 + parsed_frame = FrameFormat.parse_frame(frame) + if parsed_frame: + print(f"命令码: 0x{parsed_frame.command:08X}") + print(f"长度: {parsed_frame.length}") + print(f"帧IDX: {parsed_frame.frame_idx}") + print("通道数据:") + for i, value in enumerate(parsed_frame.channels): + print(f"通道 {i+1}: 0x{value:08X}") + print(f"校验和: 0x{parsed_frame.checksum:02X}") + + # 更新剩余数据 + remaining_data = remaining_data[next_pos:] + print(f"\n剩余数据: ({len(remaining_data)} 字节)") + if remaining_data: + print(f"数据: {remaining_data.hex()}") + else: + # 如果没有找到帧,next_pos 表示需要保留的数据起始位置 + if next_pos == len(remaining_data): + print("\n没有更多帧了") + break + else: + print(f"\n在当前位置未找到帧,移除 {next_pos} 字节无效数据") + remaining_data = remaining_data[next_pos:] + print(f"剩余数据: ({len(remaining_data)} 字节)") + if remaining_data: + print(f"数据: {remaining_data.hex()}") + + print(f"\n总共找到 {frame_count} 个帧") \ No newline at end of file