brisonus_data_collector/buffer_process.py
JingweiCui 04612ce16d [update] 实现了frame_finder功能
- 从数据buffer中查找frames
2025-03-25 09:17:43 +08:00

141 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import struct
import logging
from frame import FrameFormat
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class BufferProcessor:
def __init__(self):
self.buffer = bytearray()
self.total_bytes_processed = 0
self.total_frames_found = 0
self.total_invalid_bytes = 0
self.total_corrupt_frames = 0
self.total_valid_frames = 0
self.frame_sizes = []
self.frame_intervals = []
self.last_frame_pos = 0
def process_data(self, data: bytes) -> None:
"""
处理接收到的数据
Args:
data: 接收到的数据
"""
self.buffer.extend(data)
self.total_bytes_processed += len(data)
# 处理缓冲区中的数据
while len(self.buffer) >= FrameFormat.FIXED_SIZE:
# 查找帧头
header_pos = self._find_header()
if header_pos is None:
# 没有找到帧头,清空缓冲区
self.total_invalid_bytes += len(self.buffer)
self.buffer.clear()
break
# 移除帧头前的无效数据
if header_pos > 0:
self.total_invalid_bytes += header_pos
self.buffer = self.buffer[header_pos:]
# 获取帧长度
frame_length = FrameFormat.get_frame_length(self.buffer)
if frame_length is None:
# 数据不足,等待更多数据
break
# 检查是否有完整的帧
if len(self.buffer) < frame_length:
break
# 提取帧数据
frame_data = self.buffer[:frame_length]
self.buffer = self.buffer[frame_length:]
# 验证帧
if FrameFormat.parse_frame(frame_data):
self.total_valid_frames += 1
self.frame_sizes.append(frame_length)
if self.last_frame_pos > 0:
self.frame_intervals.append(header_pos)
self.last_frame_pos = self.total_bytes_processed - len(self.buffer)
else:
self.total_corrupt_frames += 1
self.total_frames_found += 1
def _find_header(self) -> int:
"""
在缓冲区中查找帧头
Returns:
int: 帧头位置如果未找到返回None
"""
if len(self.buffer) < FrameFormat.HEADER_SIZE:
return None
# 查找帧头
for i in range(len(self.buffer) - FrameFormat.HEADER_SIZE + 1):
if self.buffer[i:i+FrameFormat.HEADER_SIZE] == FrameFormat.FRAME_HEADER:
return i
return None
def get_statistics(self) -> dict:
"""
获取处理统计信息
Returns:
dict: 统计信息
"""
stats = {
"total_bytes_processed": self.total_bytes_processed,
"total_frames_found": self.total_frames_found,
"total_invalid_bytes": self.total_invalid_bytes,
"total_corrupt_frames": self.total_corrupt_frames,
"total_valid_frames": self.total_valid_frames,
"buffer_size": len(self.buffer),
"average_frame_size": sum(self.frame_sizes) / len(self.frame_sizes) if self.frame_sizes else 0,
"average_frame_interval": sum(self.frame_intervals) / len(self.frame_intervals) if self.frame_intervals else 0
}
return stats
def process_file(file_path: str) -> None:
"""
处理文件中的所有数据
Args:
file_path: 文件路径
"""
processor = BufferProcessor()
with open(file_path, 'rb') as f:
while True:
data = f.read(1024) # 每次读取1KB
if not data:
break
processor.process_data(data)
# 输出统计信息
stats = processor.get_statistics()
print("\n处理完成!")
print(f"总处理字节数: {stats['total_bytes_processed']:,}")
print(f"总帧数: {stats['total_frames_found']}")
print(f"有效帧数: {stats['total_valid_frames']}")
print(f"损坏帧数: {stats['total_corrupt_frames']}")
print(f"无效字节数: {stats['total_invalid_bytes']}")
print(f"剩余缓冲区大小: {stats['buffer_size']}")
print(f"平均帧大小: {stats['average_frame_size']:.1f} 字节")
print(f"平均帧间隔: {stats['average_frame_interval']:.1f} 字节")
if __name__ == "__main__":
# 处理测试数据文件
test_file = "test_data/test_frames.bin"
if not os.path.exists(test_file):
print(f"错误:测试数据文件 {test_file} 不存在")
exit(1)
process_file(test_file)