brisonus_data_collector/frame_finder.py
JingweiCui 8823ef546e [add] frame_finder.py 模块
- 用于从缓冲区中查找有效的帧消息
2025-03-21 10:38:17 +08:00

170 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Optional, Tuple
import logging
from frame import FrameFormat
class FrameFinder:
"""帧查找器 - 在数据缓冲区中查找符合帧格式的数据"""
def __init__(self):
"""初始化帧查找器"""
self.frame_length = FrameFormat.FIXED_SIZE + FrameFormat.DATA_SIZE
self.frame_header = FrameFormat.FRAME_HEADER
self.logger = logging.getLogger(__name__)
def find_frame_in_buffer(self, buffer: bytes, start_pos: int = 0) -> Tuple[Optional[bytes], int]:
"""
在给定的缓冲区数据中查找下一个有效帧
Args:
buffer: 要搜索的缓冲区数据
start_pos: 开始搜索的位置
Returns:
Tuple[Optional[bytes], int]:
- 找到的帧数据如果没找到则为None
- 下一次搜索的起始位置(如果找到帧,则是帧后的位置;如果没找到,则是需要保留的数据起始位置)
"""
buffer_size = len(buffer)
# 检查起始位置是否有效
if start_pos >= buffer_size:
self.logger.debug(f"起始位置无效: {start_pos} >= {buffer_size},将移除所有数据")
return None, buffer_size
# 在缓冲区中查找帧头
header_pos = self._find_header(buffer, start_pos)
if header_pos is None:
self.logger.debug(f"未找到帧头,将移除所有数据 ({buffer_size} 字节)")
if buffer_size > 0:
self.logger.debug(f"移除的数据: {buffer.hex()}")
return None, buffer_size # 返回缓冲区末尾位置,表示当前数据都可以丢弃
self.logger.debug(f"找到帧头位置: {header_pos}")
if header_pos > start_pos:
removed_data = buffer[start_pos:header_pos]
self.logger.debug(f"将移除帧头前的无效数据: {removed_data.hex() if len(removed_data) > 0 else ''} ({len(removed_data)} 字节)")
# 检查是否有完整帧的数据
if buffer_size - header_pos < self.frame_length:
self.logger.debug(f"帧头后数据不足: {buffer_size - header_pos} < {self.frame_length}")
self.logger.debug(f"保留从位置 {header_pos} 开始的数据: {buffer[header_pos:].hex()}")
return None, header_pos # 返回帧头位置,表示需要保留从这个位置开始的数据
# 提取并验证帧
frame_data = buffer[header_pos:header_pos + self.frame_length]
if self._verify_frame(frame_data):
self.logger.debug(f"找到有效帧: {frame_data.hex()}")
next_pos = header_pos + self.frame_length
remaining_data = buffer[next_pos:] if next_pos < buffer_size else b''
self.logger.debug(f"处理完帧后剩余数据: {remaining_data.hex() if remaining_data else ''} ({len(remaining_data)} 字节)")
return frame_data, next_pos
else:
self.logger.debug(f"帧验证失败,从下一个位置继续查找")
# 防止在同一位置反复查找
next_search_pos = header_pos + len(self.frame_header)
if next_search_pos >= buffer_size:
self.logger.debug(f"搜索位置已到达缓冲区末尾,将移除所有数据")
return None, buffer_size
removed_data = buffer[header_pos:next_search_pos]
self.logger.debug(f"移除无效帧头数据: {removed_data.hex()} ({len(removed_data)} 字节)")
return self.find_frame_in_buffer(buffer, next_search_pos)
def _find_header(self, buffer: bytes, start_pos: int) -> Optional[int]:
"""
在缓冲区中查找帧头
Args:
buffer: 要搜索的缓冲区数据
start_pos: 开始搜索的位置
Returns:
Optional[int]: 帧头位置如果未找到则返回None
"""
header_len = len(self.frame_header)
for i in range(start_pos, len(buffer) - header_len + 1):
if buffer[i:i+header_len] == self.frame_header:
return i
return None
def _verify_frame(self, frame_data: bytes) -> bool:
"""
验证帧的有效性
Args:
frame_data: 要验证的帧数据
Returns:
bool: 帧是否有效
"""
# 使用FrameFormat验证帧
frame = FrameFormat.parse_frame(frame_data)
return frame is not None
if __name__ == "__main__":
# 使用示例
import logging
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 创建帧查找器实例
finder = FrameFinder()
# 测试数据包含3个完整帧中间夹杂一些无效数据
test_data = bytes.fromhex(
# 第一个有效帧
"1CDFAAAAAAAA0200000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F3"
# 一些无效数据
"FFFF1234"
# 第二个有效帧
"1CDFAAAAAAAA0300000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F4"
# 不完整的帧头
"1CDF12"
# 第三个有效帧
"1CDFAAAAAAAA0400000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F5"
)
print("初始测试数据:")
print("总长度:", len(test_data))
print("数据:", test_data.hex())
print("\n开始查找帧:")
# 模拟实际使用场景,从不同位置开始查找
remaining_data = test_data
frame_count = 0
while len(remaining_data) > 0:
frame, next_pos = finder.find_frame_in_buffer(remaining_data)
if frame:
frame_count += 1
print(f"\n找到第 {frame_count} 个帧:")
print(f"帧数据: {frame.hex()}")
print(f"下一个搜索位置: {next_pos}")
# 解析帧内容
parsed_frame = FrameFormat.parse_frame(frame)
if parsed_frame:
print(f"命令码: 0x{parsed_frame.command:08X}")
print(f"长度: {parsed_frame.length}")
print(f"帧IDX: {parsed_frame.frame_idx}")
print("通道数据:")
for i, value in enumerate(parsed_frame.channels):
print(f"通道 {i+1}: 0x{value:08X}")
print(f"校验和: 0x{parsed_frame.checksum:02X}")
# 更新剩余数据
remaining_data = remaining_data[next_pos:]
print(f"\n剩余数据: ({len(remaining_data)} 字节)")
if remaining_data:
print(f"数据: {remaining_data.hex()}")
else:
# 如果没有找到帧next_pos 表示需要保留的数据起始位置
if next_pos == len(remaining_data):
print("\n没有更多帧了")
break
else:
print(f"\n在当前位置未找到帧,移除 {next_pos} 字节无效数据")
remaining_data = remaining_data[next_pos:]
print(f"剩余数据: ({len(remaining_data)} 字节)")
if remaining_data:
print(f"数据: {remaining_data.hex()}")
print(f"\n总共找到 {frame_count} 个帧")