brisonus_data_collector/frame_finder.py

210 lines
11 KiB
Python
Raw Permalink Normal View History

from typing import Optional, Tuple
import logging
from frame import FrameFormat
class FrameFinder:
"""帧查找器 - 在数据缓冲区中查找符合帧格式的数据"""
def __init__(self):
"""初始化帧查找器"""
self.frame_length = FrameFormat.FIXED_SIZE + FrameFormat.DATA_SIZE
self.frame_header = FrameFormat.FRAME_HEADER
self.logger = logging.getLogger(__name__)
# 设置日志级别为INFO只有在需要调试时才改为DEBUG
self.logger.setLevel(logging.INFO)
def find_frame_in_buffer(self, buffer: bytes, start_pos: int = 0) -> Tuple[Optional[bytes], int]:
"""
在给定的缓冲区数据中查找下一个有效帧
Args:
buffer: 要搜索的缓冲区数据
start_pos: 开始搜索的位置
Returns:
Tuple[Optional[bytes], int]:
- 找到的帧数据如果没找到则为None
- 下一次搜索的起始位置如果找到帧则是帧后的位置如果没找到则是需要保留的数据起始位置
"""
buffer_size = len(buffer)
current_pos = start_pos
while current_pos < buffer_size:
# 检查当前位置是否有足够的数据
if buffer_size - current_pos < len(self.frame_header):
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"剩余数据不足帧头长度: {buffer_size - current_pos} < {len(self.frame_header)}")
return None, current_pos
# 在当前位置查找帧头
header_pos = self._find_header(buffer, current_pos)
if header_pos is None:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"未找到帧头,将移除所有数据 ({buffer_size - current_pos} 字节)")
if buffer_size - current_pos > 0:
self.logger.debug(f"移除的数据: {buffer[current_pos:].hex()}")
return None, buffer_size
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"找到帧头位置: {header_pos}")
if header_pos > current_pos:
removed_data = buffer[current_pos:header_pos]
self.logger.debug(f"将移除帧头前的无效数据: {removed_data.hex() if len(removed_data) > 0 else ''} ({len(removed_data)} 字节)")
# 检查是否有完整帧的数据
if buffer_size - header_pos < self.frame_length:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"帧头后数据不足: {buffer_size - header_pos} < {self.frame_length}")
self.logger.debug(f"保留从位置 {header_pos} 开始的数据: {buffer[header_pos:].hex()}")
return None, header_pos
# 提取并验证帧
frame_data = buffer[header_pos:header_pos + self.frame_length]
if self._verify_frame(frame_data):
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"找到有效帧: {frame_data.hex()}")
next_pos = header_pos + self.frame_length
remaining_data = buffer[next_pos:] if next_pos < buffer_size else b''
self.logger.debug(f"处理完帧后剩余数据: {remaining_data.hex() if remaining_data else ''} ({len(remaining_data)} 字节)")
return frame_data, header_pos + self.frame_length
# 帧验证失败,从下一个位置继续查找
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"帧验证失败,从下一个位置继续查找")
# 防止在同一位置反复查找
current_pos = header_pos + 1 # 从帧头后的下一个字节开始查找
if current_pos >= buffer_size:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"搜索位置已到达缓冲区末尾,将移除所有数据")
return None, buffer_size
if self.logger.isEnabledFor(logging.DEBUG):
removed_data = buffer[header_pos:current_pos]
self.logger.debug(f"移除无效帧头数据: {removed_data.hex()} ({len(removed_data)} 字节)")
# 如果遍历完整个缓冲区都没有找到有效帧
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"遍历完整个缓冲区未找到有效帧")
return None, buffer_size
def _find_header(self, buffer: bytes, start_pos: int) -> Optional[int]:
"""
在缓冲区中查找帧头
Args:
buffer: 要搜索的缓冲区数据
start_pos: 开始搜索的位置
Returns:
Optional[int]: 帧头位置如果未找到则返回None
"""
header_len = len(self.frame_header)
buffer_size = len(buffer)
# 确保有足够的数据来查找帧头
if buffer_size - start_pos < header_len:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"剩余数据不足帧头长度: {buffer_size - start_pos} < {header_len}")
return None
# 从起始位置开始查找帧头
for i in range(start_pos, buffer_size - header_len + 1):
if buffer[i:i+header_len] == self.frame_header:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"在位置 {i} 找到帧头: {buffer[i:i+header_len].hex()}")
# 显示帧头前后的数据
before = buffer[max(0, i-4):i]
after = buffer[i+header_len:i+header_len+4]
self.logger.debug(f"帧头前4字节: {before.hex() if before else ''}")
self.logger.debug(f"帧头后4字节: {after.hex() if after else ''}")
return i
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"从位置 {start_pos} 开始未找到帧头")
self.logger.debug(f"剩余数据: {buffer[start_pos:].hex()[:100]}...")
self.logger.debug(f"剩余数据长度: {buffer_size - start_pos}")
return None
def _verify_frame(self, frame_data: bytes) -> bool:
"""
验证帧的有效性
Args:
frame_data: 要验证的帧数据
Returns:
bool: 帧是否有效
"""
# 使用FrameFormat验证帧
frame = FrameFormat.parse_frame(frame_data)
if frame is None and self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"帧验证失败: {frame_data.hex()}")
return frame is not None
if __name__ == "__main__":
# 使用示例
import logging
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 创建帧查找器实例
finder = FrameFinder()
# test_data = bytes.fromhex("aaaaaaaaa005000049000000a0050000a0050000859b0b0065410f002487f4ffacccf1ff7187ffff3188f7ff396f02006d78060006f506001da0fcffe8f2ffff3c09f7ffb45bf2ff16cbf2ff023f0d00b77707005037f5ff9c41432bd07c28bc3c692879e9365f15b7957e8f81768c4cd2ae344baaaaaaaaa105000049000000a1050000a1050000cb6604004d6af1ffe37dfaff99f6090038ecfdff79dff6ff93b103001df7f2ff96d8f4fff16c0a00f938fafffbe7faffb851f3ff8e610000859ffeff79f805007325fdff4b7b518836922cd80256d8692a62a80176b29cc069518a6d8eb15e8463b00ebeb196a7717ecadf6f41472d0ad08ea380af293b3d42b0c204b444c0cf7aaaaaaaaaa205000049000000a2050000a20500006d0d04007c20ffff76bdffff2b0af2ffcd8effff5e70f9ffa2e7f2ff26130e0078a7f3ff3e270f002175feff6130f2ffefe8020028ee08002b82010091c20200e3f60d00affe4b337aa67c3803b0a7470a531d4bb04f3bb5ab52410d953423b6ddcdc818840f2bd85ccab31cb47ab6f1eddcbbc90e86f5a67674cdb26b232bb750b2d4f25ebb94006110ea25a3abef4f42c0609d8742d76c8dc0ef13512aa0e5cdc58413aaaaaaaaa305000049000000a3050000a30500000b14feff000a0600674008005945fdffb57cf6ff2f900c00370c0d0096330200dbe00e00dae107001b9600009779faff96710a001d34f8ff48210d0024f00d0010c6f7ff6bc6c1187bf62e386277bfffde48120f7b8f6dad83c3488162304802aaaaaaaaa405000049000000a4050000a405000096f8f4ffc237f7ff50d70100f3d1f8ff01b1f3ff8e7df1ffbddff7ffd988feff6ab3ffffb152f5ff9113f7ffb996f6ffb38900004282faff6ea00c003a87f7ff567ef6ff51a7d919b0e44917abaaaaaaaaa505000049000000a5050000a50500002ec6fbff0f04f3ffc8440800d9edfbfff5620a00745a090040faf1ff3842f6ff1ca1f3ff62df050086a51d813d11d6511709d7a7e79bacfb2eeb9fb825086042471857c030d7bc384c2985472c009400e00af3056eacd84c3195d71b6025c7cb06ee7edd14e61f0fd86356dd534b335dbe27e5b75fd6e4c2fe37356726d67b0cfdf086a9d58aa38203f11e12bebd0b719193aeacb2b1aaaaaaaaa605000049000000a6050000a60500003a3b0a008f04f9ff57230700bdcc0e00f406060084a0faff0a5107004cc0f2ff33f70d000beef4fffd1e090011c4faff69400400897f0200698403003d1ef2ffc78d0a00f6")
with open("test_data/test_frames.bin", "rb") as f:
test_data = f.read()
print("初始测试数据:")
print("总长度:", len(test_data))
print("数据:", test_data.hex())
print("\n开始查找帧:")
# 模拟实际使用场景,从不同位置开始查找
remaining_data = test_data
frame_count = 0
while len(remaining_data) > 0:
frame, next_pos = finder.find_frame_in_buffer(remaining_data)
if frame:
frame_count += 1
print(f"\n找到第 {frame_count} 个帧:")
print(f"帧数据: {frame.hex()}")
print(f"下一个搜索位置: {next_pos}")
# 解析帧内容
parsed_frame = FrameFormat.parse_frame(frame)
if parsed_frame:
print(f"命令码: 0x{parsed_frame.command:08X}")
print(f"长度: {parsed_frame.length}")
print(f"帧IDX: {parsed_frame.frame_idx}")
print("通道数据:")
for i, value in enumerate(parsed_frame.channels):
print(f"通道 {i+1}: 0x{value:08X}")
print(f"校验和: 0x{parsed_frame.checksum:02X}")
# 更新剩余数据
remaining_data = remaining_data[next_pos:]
print(f"\n剩余数据: ({len(remaining_data)} 字节)")
if remaining_data:
# print(f"数据: {remaining_data.hex()}")
pass
else:
# 如果没有找到帧next_pos 表示需要保留的数据起始位置
if next_pos == len(remaining_data):
print("\n没有更多帧了")
break
elif next_pos == 0:
# 如果next_pos为0说明剩余数据不足一个帧头直接结束
print("\n剩余数据不足一个帧头,结束查找")
break
else:
# 移除无效数据
removed_data = remaining_data[:next_pos]
if removed_data:
print(f"\n移除 {len(removed_data)} 字节无效数据: {removed_data.hex()}")
remaining_data = remaining_data[next_pos:]
print(f"剩余数据: ({len(remaining_data)} 字节)")
if remaining_data:
# print(f"数据: {remaining_data.hex()}")
pass
print(f"\n总共找到 {frame_count} 个帧")