brisonus_data_collector/frame_processor.py

252 lines
9.4 KiB
Python
Raw Permalink Normal View History

2025-11-19 14:57:37 +08:00
import time
import logging
from typing import Tuple, Dict, List, Optional
from frame_finder import FrameFinder
from frame import FrameFormat
from queue import Queue
import os
from datetime import datetime
import csv
import numpy as np
class FrameProcessor:
"""帧处理器 - 处理帧数据并统计信息"""
# 缓冲区大小设置为64KB可以根据实际情况调整
BUFFER_SIZE = 64 * 1024 # 64KB
def __init__(self, num_channels: int = 8):
"""
初始化帧处理器
Args:
num_channels: 通道数量默认为8
"""
self.finder = FrameFinder()
self.frame_count = 0
self.valid_frames = 0
self.corrupt_frames = 0
self.invalid_bytes = 0
self.start_time = None
self.total_bytes_processed = 0
self.buffer = bytearray() # 用于存储未处理完的数据
self.frame_queue = Queue() # 用于存储发现的帧
self.dump_start_time = None # 用于记录导出开始时间
self.num_channels = num_channels # 保存通道数量
def process_buffer(self, data: bytes) -> Tuple[int, int, int, int]:
"""
处理数据缓冲区
Args:
data: 要处理的数据
Returns:
Tuple[int, int, int, int]: (总帧数, 有效帧数, 损坏帧数, 无效字节数)
"""
if self.start_time is None:
self.start_time = time.time()
self.total_bytes_processed += len(data)
# 将新数据添加到缓冲区
self.buffer.extend(data)
# 如果缓冲区过大,可以在这里添加清理逻辑
# if len(self.buffer) > self.BUFFER_SIZE * 2:
# self.buffer = self.buffer[-self.BUFFER_SIZE:]
while len(self.buffer) > 0:
frame_data, next_pos = self.finder.find_frame_in_buffer(self.buffer)
if frame_data:
self.frame_count += 1
# 使用当前通道数创建 FrameFormat 实例
frame_format = FrameFormat(self.num_channels)
if frame_format.parse_frame(frame_data):
self.valid_frames += 1
# 将有效帧加入队列
self.frame_queue.put(frame_data)
else:
self.corrupt_frames += 1
# 更新缓冲区
self.buffer = self.buffer[next_pos:]
else:
# 如果没有找到帧next_pos 表示需要保留的数据起始位置
if next_pos == len(self.buffer):
break
elif next_pos == 0:
break
else:
# 移除无效数据
self.invalid_bytes += next_pos
self.buffer = self.buffer[next_pos:]
return self.frame_count, self.valid_frames, self.corrupt_frames, self.invalid_bytes
def get_frame(self, timeout: float = None) -> bytes:
"""
从队列中获取一个帧
Args:
timeout: 超时时间None表示永久等待
Returns:
bytes: 帧数据如果超时则返回None
"""
try:
return self.frame_queue.get(timeout=timeout)
except:
return None
def get_statistics(self) -> Dict:
"""
获取处理统计信息
Returns:
Dict: 统计信息
"""
if self.start_time is None:
runtime = 0
else:
runtime = time.time() - self.start_time
stats = {
"total_bytes": self.total_bytes_processed,
"total_frames": self.frame_count,
"valid_frames": self.valid_frames,
"corrupt_frames": self.corrupt_frames,
"invalid_bytes": self.invalid_bytes,
"runtime": runtime,
"bytes_per_second": self.total_bytes_processed/runtime if runtime > 0 else 0,
"frames_per_second": self.frame_count/runtime if runtime > 0 else 0,
"queue_size": self.frame_queue.qsize() # 添加队列大小信息
}
return stats
def _parse_frame_data(self, frame_data: bytes) -> Optional[Dict]:
"""解析帧数据"""
try:
# 使用当前通道数创建 FrameFormat 实例
frame_format = FrameFormat(self.num_channels)
frame = frame_format.parse_frame(frame_data)
if not frame:
return None
# 将通道数据转换为16进制字符串
channels = []
for ch in frame.channels:
ch_bytes = ch.to_bytes(4, byteorder='big', signed=True)
ch_hex = ch_bytes.hex()
channels.append(ch_hex)
return {
'index': frame.frame_idx, # 添加帧索引
'timestamp': frame.timestamp,
'channels': channels
}
except Exception as e:
self.logger.error(f"解析帧数据失败: {e}")
return None
def dump_frames(self, output_dir: str = 'dumps', format: str = 'dat',
max_frames: int = None, include_raw: bool = False) -> Dict:
"""
将有效帧导出到文件
Args:
output_dir: 输出目录
format: 输出格式 ('dat' 'npy')
max_frames: 最大导出帧数None表示导出所有帧
include_raw: 是否包含原始数据
Returns:
Dict: 导出统计信息
"""
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 生成输出文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == 'dat':
filename = f'frames_{timestamp}.dat'
else:
filename = f'frames_{timestamp}.npy'
filepath = os.path.join(output_dir, filename)
# 初始化统计信息
stats = {
'total_frames': 0,
'exported_frames': 0,
'start_time': None,
'end_time': None,
'file_size': 0,
'format': format
}
try:
self.dump_start_time = time.time()
stats['start_time'] = self.dump_start_time
if format == 'dat':
# DAT格式导出 - 每行一帧数据
with open(filepath, 'w') as f:
frame_idx = 0 # 初始化帧索引
while not self.frame_queue.empty():
if max_frames and stats['exported_frames'] >= max_frames:
break
frame_data = self.get_frame()
if not frame_data:
break
parsed_data = self._parse_frame_data(frame_data)
if not parsed_data:
continue
# 格式化数据行:索引 时间戳 通道1 通道2 ... 通道8
line = f"{parsed_data['index']} {parsed_data['timestamp']:.6f}"
for ch in parsed_data['channels']:
line += f" {ch}" # 使用16进制字符串
if include_raw:
line += f" {parsed_data['raw_data']}"
f.write(line + '\n')
stats['exported_frames'] += 1
frame_idx += 1 # 更新帧索引
else:
# NPY格式导出
frames_data = []
frame_idx = 0 # 初始化帧索引
while not self.frame_queue.empty():
if max_frames and stats['exported_frames'] >= max_frames:
break
frame_data = self.get_frame()
if not frame_data:
break
parsed_data = self._parse_frame_data(frame_data)
if not parsed_data:
continue
# 准备帧数据
frame_array = np.array([
parsed_data['index'],
parsed_data['timestamp'],
*[int.from_bytes(bytes.fromhex(ch), byteorder='big', signed=True) for ch in parsed_data['channels']] # 将16进制字符串转回有符号整数
])
frames_data.append(frame_array)
stats['exported_frames'] += 1
frame_idx += 1 # 更新帧索引
if frames_data:
frames_array = np.array(frames_data)
np.save(filepath, frames_array)
# 更新统计信息
stats['end_time'] = time.time()
stats['total_frames'] = self.valid_frames
stats['file_size'] = os.path.getsize(filepath)
return stats
except Exception as e:
logging.error(f"导出数据时出错: {e}")
raise