brisonus_data_collector/serial_recorder.py
JingweiCui 04612ce16d [update] 实现了frame_finder功能
- 从数据buffer中查找frames
2025-03-25 09:17:43 +08:00

356 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import serial
import time
import threading
from queue import Queue
from collections import deque
from typing import Optional
import logging
import os
from datetime import datetime
from buffer_process import BufferProcessor
class SerialRecorder:
"""串口数据记录器 - 记录原始数据并进行帧处理"""
def __init__(self, port: str, baudrate: int, buffer_size: int = 4*1024*1024):
"""
初始化串口记录器
Args:
port: 串口名称
baudrate: 波特率
buffer_size: 缓冲区大小(字节)
"""
self.port = port
self.baudrate = baudrate
self.buffer_size = buffer_size
# 初始化主缓冲区
self.buffer = deque(maxlen=buffer_size) # 主接收缓冲区
self.buffer_lock = threading.Lock() # 主缓冲区锁
# 串口和线程相关
self.serial: Optional[serial.Serial] = None
self.is_running = False
self.read_thread: Optional[threading.Thread] = None
self.save_thread: Optional[threading.Thread] = None
self.bytes_received = 0
self.start_time = None
self.last_data_time = None
self.error_count = 0
self.max_errors = 5
# Ping-pong buffer 设置
self.save_buffer_size = 4 * 1024 * 1024 # 4MB
self.save_buffer_a = deque(maxlen=self.save_buffer_size)
self.save_buffer_b = deque(maxlen=self.save_buffer_size)
self.current_save_buffer = self.save_buffer_a
self.save_buffer_full = False
self.save_buffer_event = threading.Event()
self.save_buffer_lock = threading.Lock() # 保存缓冲区锁
# 帧处理器设置
self.frame_queue = Queue()
self.buffer_processor = BufferProcessor(
buffer=self.buffer,
buffer_lock=self.buffer_lock,
frame_queue=self.frame_queue,
process_interval=0.001 # 1ms处理间隔
)
# 配置日志
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, f"recorder_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
self.logger.info(f"初始化完成,主缓冲区大小: {self.buffer_size} 字节")
def connect(self) -> bool:
"""连接串口"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.001, # 减小超时时间
write_timeout=1, # 添加写入超时
bytesize=serial.EIGHTBITS, # 8位数据位
parity=serial.PARITY_NONE, # 无校验
stopbits=serial.STOPBITS_ONE, # 1位停止位
rtscts=True # 启用硬件流控
)
# 设置接收缓冲区大小
self.serial.set_buffer_size(rx_size=1024*1024) # 1MB接收缓冲区
self.logger.info(f"成功连接到串口 {self.port}")
return True
except Exception as e:
self.logger.error(f"连接串口失败: {e}")
return False
def disconnect(self):
"""断开串口连接"""
if self.serial and self.serial.is_open:
self.serial.close()
self.logger.info("已断开串口连接")
def start(self) -> bool:
"""启动数据记录"""
if not self.serial or not self.serial.is_open:
if not self.connect():
return False
self.start_time = time.time()
self.last_data_time = time.time()
self.bytes_received = 0
self.buffer.clear()
self.save_buffer_a.clear()
self.save_buffer_b.clear()
self.current_save_buffer = self.save_buffer_a
self.save_buffer_full = False
self.save_buffer_event.clear()
self.is_running = True
self.read_thread = threading.Thread(target=self._read_loop)
self.save_thread = threading.Thread(target=self._save_loop)
self.read_thread.daemon = True
self.save_thread.daemon = True
self.read_thread.start()
self.save_thread.start()
# 启动帧处理器
self.buffer_processor.start()
self.logger.info("开始记录数据")
return True
def stop(self):
"""停止数据记录"""
self.is_running = False
self.save_buffer_event.set() # 触发保存线程处理剩余数据
# 停止帧处理器
self.buffer_processor.stop()
if self.save_thread:
self.save_thread.join(timeout=1.0)
if self.read_thread:
self.read_thread.join(timeout=1.0)
self.disconnect()
# 记录统计信息
if self.start_time is not None:
total_time = time.time() - self.start_time
speed = self.bytes_received / total_time / 1024 if total_time > 0 else 0
self.logger.info(f"数据记录完成")
self.logger.info(f"总时间: {total_time:.2f}")
self.logger.info(f"平均速度: {speed:.1f} KB/s")
self.logger.info(f"接收字节数: {self.bytes_received}")
self.logger.info(f"缓冲区使用: {len(self.buffer)}/{self.buffer_size}")
self.logger.info(f"找到帧数: {self.frame_queue.qsize()}")
else:
self.logger.info("数据记录未正常启动")
def _save_loop(self):
"""数据保存循环"""
while self.is_running or self.save_buffer_event.is_set():
try:
# 等待缓冲区满或程序结束
if not self.save_buffer_full and self.is_running:
time.sleep(0.1)
continue
with self.save_buffer_lock:
# 确定要保存的缓冲区
buffer_to_save = self.save_buffer_a if self.current_save_buffer == self.save_buffer_b else self.save_buffer_b
if len(buffer_to_save) > 0:
# 创建保存目录
records_dir = "records"
if not os.path.exists(records_dir):
os.makedirs(records_dir)
# 生成文件名
filename = f"received_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.bin"
filepath = os.path.join(records_dir, filename)
# 保存数据
with open(filepath, 'wb') as f:
data = bytes(buffer_to_save)
f.write(data)
self.logger.info(f"数据已保存到文件: {filepath}")
self.logger.info(f"保存数据大小: {len(data)} 字节")
# 清空已保存的缓冲区
buffer_to_save.clear()
self.logger.info("已清空保存缓冲区")
# 重置状态
self.save_buffer_full = False
self.save_buffer_event.clear()
except Exception as e:
self.logger.error(f"保存数据错误: {e}")
time.sleep(0.1)
def _read_loop(self):
"""数据读取循环"""
CHUNK_SIZE = 4096
last_progress_time = time.time()
PROGRESS_INTERVAL = 1.0
self.error_count = 0
while self.is_running:
try:
if self.serial.in_waiting:
# 读取数据
chunk = self.serial.read(min(CHUNK_SIZE, self.serial.in_waiting))
chunk_size = len(chunk)
if chunk_size > 0:
self.bytes_received += chunk_size
self.last_data_time = time.time()
self.error_count = 0
# 更新主缓冲区
with self.buffer_lock:
# 如果主缓冲区接近满,清空它
if len(self.buffer) > self.buffer_size * 0.9:
self.logger.warning("主缓冲区接近满,执行清空操作")
self.buffer.clear()
self.buffer.extend(chunk)
# 更新保存缓冲区
with self.save_buffer_lock:
# 检查当前缓冲区是否已满
if len(self.current_save_buffer) >= self.save_buffer_size:
# 等待保存线程处理完上一个缓冲区
while self.save_buffer_full and self.is_running:
time.sleep(0.01)
if not self.is_running:
break
# 切换到另一个缓冲区
self.current_save_buffer = self.save_buffer_b if self.current_save_buffer == self.save_buffer_a else self.save_buffer_a
self.save_buffer_full = True
self.save_buffer_event.set()
self.logger.info(f"切换到新的保存缓冲区,当前缓冲区大小: {len(self.current_save_buffer)}")
self.current_save_buffer.extend(chunk)
# 降低进度报告频率
current_time = time.time()
if current_time - last_progress_time >= PROGRESS_INTERVAL:
elapsed_time = current_time - self.start_time
speed = self.bytes_received / elapsed_time / 1024 if elapsed_time > 0 else 0
self.logger.info(f"接收进度: {self.bytes_received} 字节, "
f"速度: {speed:.1f} KB/s, "
f"缓冲区使用: {len(self.buffer)}/{self.buffer_size}, "
f"保存缓冲区使用: {len(self.current_save_buffer)}/{self.save_buffer_size}, "
f"找到帧数: {self.frame_queue.qsize()}")
last_progress_time = current_time
else:
time.sleep(0.001)
except Exception as e:
self.error_count += 1
self.logger.error(f"读取数据错误: {e}, 错误次数: {self.error_count}")
# 如果错误次数过多,尝试重新连接
if self.error_count >= self.max_errors:
self.logger.error("错误次数过多,尝试重新连接串口")
self.disconnect()
if not self.connect():
self.logger.error("重新连接失败,停止记录")
self.is_running = False
break
self.error_count = 0
time.sleep(0.1)
def get_statistics(self) -> dict:
"""获取统计信息"""
current_time = time.time()
running_time = current_time - self.start_time if self.start_time else 0
stats = {
"bytes_received": self.bytes_received,
"buffer_usage": len(self.buffer),
"buffer_size": self.buffer_size,
"last_data_time": self.last_data_time,
"time_since_last_data": time.time() - self.last_data_time if self.last_data_time else None,
"frames_found": self.frame_queue.qsize(),
"save_buffer_usage": len(self.current_save_buffer),
"save_buffer_size": self.save_buffer_size,
"receive_rate": f"{self.bytes_received / running_time:.1f}" if running_time > 0 else "N/A"
}
# 添加帧处理器的统计信息
if hasattr(self, 'buffer_processor'):
processor_stats = self.buffer_processor.get_stats()
stats.update(processor_stats)
return stats
def get_data(self, size: int = None) -> bytes:
"""从缓冲区获取数据
Args:
size: 要获取的数据大小如果为None则获取所有数据
Returns:
bytes: 获取的数据
"""
with self.buffer_lock: # 使用锁保护缓冲区操作
if size is None:
data = bytes(self.buffer)
self.buffer.clear()
return data
else:
data = bytes(list(self.buffer)[:size])
for _ in range(min(size, len(self.buffer))):
self.buffer.popleft()
return data
if __name__ == "__main__":
# 使用示例
recorder = SerialRecorder(port="COM51", baudrate=4000000)
try:
if recorder.start():
print("开始记录数据...")
last_stats_time = time.time()
STATS_INTERVAL = 1.0 # 统计信息显示间隔
while True:
current_time = time.time()
# 每秒显示一次统计信息
if current_time - last_stats_time >= STATS_INTERVAL:
stats = recorder.get_statistics()
print(f"\n当前状态:")
print(f"已接收字节: {stats['bytes_received']:,}")
print(f"主缓冲区: {stats['buffer_usage']:,}/{recorder.buffer_size:,} 字节")
print(f"接收速率: {stats['receive_rate']} 字节/秒")
if stats['time_since_last_data'] is not None:
print(f"距离上次数据: {stats['time_since_last_data']:.1f}")
else:
print("等待数据...")
last_stats_time = current_time
time.sleep(0.1) # 降低主循环频率
except KeyboardInterrupt:
print("\n停止记录...")
finally:
recorder.stop()