356 lines
15 KiB
Python
356 lines
15 KiB
Python
import serial
|
||
import time
|
||
import threading
|
||
from queue import Queue
|
||
from collections import deque
|
||
from typing import Optional
|
||
import logging
|
||
import os
|
||
from datetime import datetime
|
||
from buffer_process import BufferProcessor
|
||
|
||
class SerialRecorder:
|
||
"""串口数据记录器 - 记录原始数据并进行帧处理"""
|
||
def __init__(self, port: str, baudrate: int, buffer_size: int = 4*1024*1024):
|
||
"""
|
||
初始化串口记录器
|
||
Args:
|
||
port: 串口名称
|
||
baudrate: 波特率
|
||
buffer_size: 缓冲区大小(字节)
|
||
"""
|
||
self.port = port
|
||
self.baudrate = baudrate
|
||
self.buffer_size = buffer_size
|
||
|
||
# 初始化主缓冲区
|
||
self.buffer = deque(maxlen=buffer_size) # 主接收缓冲区
|
||
self.buffer_lock = threading.Lock() # 主缓冲区锁
|
||
|
||
# 串口和线程相关
|
||
self.serial: Optional[serial.Serial] = None
|
||
self.is_running = False
|
||
self.read_thread: Optional[threading.Thread] = None
|
||
self.save_thread: Optional[threading.Thread] = None
|
||
self.bytes_received = 0
|
||
self.start_time = None
|
||
self.last_data_time = None
|
||
self.error_count = 0
|
||
self.max_errors = 5
|
||
|
||
# Ping-pong buffer 设置
|
||
self.save_buffer_size = 4 * 1024 * 1024 # 4MB
|
||
self.save_buffer_a = deque(maxlen=self.save_buffer_size)
|
||
self.save_buffer_b = deque(maxlen=self.save_buffer_size)
|
||
self.current_save_buffer = self.save_buffer_a
|
||
self.save_buffer_full = False
|
||
self.save_buffer_event = threading.Event()
|
||
self.save_buffer_lock = threading.Lock() # 保存缓冲区锁
|
||
|
||
# 帧处理器设置
|
||
self.frame_queue = Queue()
|
||
self.buffer_processor = BufferProcessor(
|
||
buffer=self.buffer,
|
||
buffer_lock=self.buffer_lock,
|
||
frame_queue=self.frame_queue,
|
||
process_interval=0.001 # 1ms处理间隔
|
||
)
|
||
|
||
# 配置日志
|
||
log_dir = "logs"
|
||
if not os.path.exists(log_dir):
|
||
os.makedirs(log_dir)
|
||
|
||
log_file = os.path.join(log_dir, f"recorder_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler(log_file),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
self.logger = logging.getLogger(__name__)
|
||
self.logger.info(f"初始化完成,主缓冲区大小: {self.buffer_size} 字节")
|
||
|
||
def connect(self) -> bool:
|
||
"""连接串口"""
|
||
try:
|
||
self.serial = serial.Serial(
|
||
port=self.port,
|
||
baudrate=self.baudrate,
|
||
timeout=0.001, # 减小超时时间
|
||
write_timeout=1, # 添加写入超时
|
||
bytesize=serial.EIGHTBITS, # 8位数据位
|
||
parity=serial.PARITY_NONE, # 无校验
|
||
stopbits=serial.STOPBITS_ONE, # 1位停止位
|
||
rtscts=True # 启用硬件流控
|
||
)
|
||
# 设置接收缓冲区大小
|
||
self.serial.set_buffer_size(rx_size=1024*1024) # 1MB接收缓冲区
|
||
self.logger.info(f"成功连接到串口 {self.port}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"连接串口失败: {e}")
|
||
return False
|
||
|
||
def disconnect(self):
|
||
"""断开串口连接"""
|
||
if self.serial and self.serial.is_open:
|
||
self.serial.close()
|
||
self.logger.info("已断开串口连接")
|
||
|
||
def start(self) -> bool:
|
||
"""启动数据记录"""
|
||
if not self.serial or not self.serial.is_open:
|
||
if not self.connect():
|
||
return False
|
||
|
||
self.start_time = time.time()
|
||
self.last_data_time = time.time()
|
||
self.bytes_received = 0
|
||
self.buffer.clear()
|
||
self.save_buffer_a.clear()
|
||
self.save_buffer_b.clear()
|
||
self.current_save_buffer = self.save_buffer_a
|
||
self.save_buffer_full = False
|
||
self.save_buffer_event.clear()
|
||
|
||
self.is_running = True
|
||
self.read_thread = threading.Thread(target=self._read_loop)
|
||
self.save_thread = threading.Thread(target=self._save_loop)
|
||
self.read_thread.daemon = True
|
||
self.save_thread.daemon = True
|
||
self.read_thread.start()
|
||
self.save_thread.start()
|
||
|
||
# 启动帧处理器
|
||
self.buffer_processor.start()
|
||
|
||
self.logger.info("开始记录数据")
|
||
return True
|
||
|
||
def stop(self):
|
||
"""停止数据记录"""
|
||
self.is_running = False
|
||
self.save_buffer_event.set() # 触发保存线程处理剩余数据
|
||
|
||
# 停止帧处理器
|
||
self.buffer_processor.stop()
|
||
|
||
if self.save_thread:
|
||
self.save_thread.join(timeout=1.0)
|
||
if self.read_thread:
|
||
self.read_thread.join(timeout=1.0)
|
||
|
||
self.disconnect()
|
||
|
||
# 记录统计信息
|
||
if self.start_time is not None:
|
||
total_time = time.time() - self.start_time
|
||
speed = self.bytes_received / total_time / 1024 if total_time > 0 else 0
|
||
|
||
self.logger.info(f"数据记录完成")
|
||
self.logger.info(f"总时间: {total_time:.2f} 秒")
|
||
self.logger.info(f"平均速度: {speed:.1f} KB/s")
|
||
self.logger.info(f"接收字节数: {self.bytes_received}")
|
||
self.logger.info(f"缓冲区使用: {len(self.buffer)}/{self.buffer_size}")
|
||
self.logger.info(f"找到帧数: {self.frame_queue.qsize()}")
|
||
else:
|
||
self.logger.info("数据记录未正常启动")
|
||
|
||
def _save_loop(self):
|
||
"""数据保存循环"""
|
||
while self.is_running or self.save_buffer_event.is_set():
|
||
try:
|
||
# 等待缓冲区满或程序结束
|
||
if not self.save_buffer_full and self.is_running:
|
||
time.sleep(0.1)
|
||
continue
|
||
|
||
with self.save_buffer_lock:
|
||
# 确定要保存的缓冲区
|
||
buffer_to_save = self.save_buffer_a if self.current_save_buffer == self.save_buffer_b else self.save_buffer_b
|
||
|
||
if len(buffer_to_save) > 0:
|
||
# 创建保存目录
|
||
records_dir = "records"
|
||
if not os.path.exists(records_dir):
|
||
os.makedirs(records_dir)
|
||
|
||
# 生成文件名
|
||
filename = f"received_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.bin"
|
||
filepath = os.path.join(records_dir, filename)
|
||
|
||
# 保存数据
|
||
with open(filepath, 'wb') as f:
|
||
data = bytes(buffer_to_save)
|
||
f.write(data)
|
||
|
||
self.logger.info(f"数据已保存到文件: {filepath}")
|
||
self.logger.info(f"保存数据大小: {len(data)} 字节")
|
||
|
||
# 清空已保存的缓冲区
|
||
buffer_to_save.clear()
|
||
self.logger.info("已清空保存缓冲区")
|
||
|
||
# 重置状态
|
||
self.save_buffer_full = False
|
||
self.save_buffer_event.clear()
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"保存数据错误: {e}")
|
||
time.sleep(0.1)
|
||
|
||
def _read_loop(self):
|
||
"""数据读取循环"""
|
||
CHUNK_SIZE = 4096
|
||
last_progress_time = time.time()
|
||
PROGRESS_INTERVAL = 1.0
|
||
self.error_count = 0
|
||
|
||
while self.is_running:
|
||
try:
|
||
if self.serial.in_waiting:
|
||
# 读取数据
|
||
chunk = self.serial.read(min(CHUNK_SIZE, self.serial.in_waiting))
|
||
chunk_size = len(chunk)
|
||
|
||
if chunk_size > 0:
|
||
self.bytes_received += chunk_size
|
||
self.last_data_time = time.time()
|
||
self.error_count = 0
|
||
|
||
# 更新主缓冲区
|
||
with self.buffer_lock:
|
||
# 如果主缓冲区接近满,清空它
|
||
if len(self.buffer) > self.buffer_size * 0.9:
|
||
self.logger.warning("主缓冲区接近满,执行清空操作")
|
||
self.buffer.clear()
|
||
self.buffer.extend(chunk)
|
||
|
||
# 更新保存缓冲区
|
||
with self.save_buffer_lock:
|
||
# 检查当前缓冲区是否已满
|
||
if len(self.current_save_buffer) >= self.save_buffer_size:
|
||
# 等待保存线程处理完上一个缓冲区
|
||
while self.save_buffer_full and self.is_running:
|
||
time.sleep(0.01)
|
||
|
||
if not self.is_running:
|
||
break
|
||
|
||
# 切换到另一个缓冲区
|
||
self.current_save_buffer = self.save_buffer_b if self.current_save_buffer == self.save_buffer_a else self.save_buffer_a
|
||
self.save_buffer_full = True
|
||
self.save_buffer_event.set()
|
||
self.logger.info(f"切换到新的保存缓冲区,当前缓冲区大小: {len(self.current_save_buffer)}")
|
||
|
||
self.current_save_buffer.extend(chunk)
|
||
|
||
# 降低进度报告频率
|
||
current_time = time.time()
|
||
if current_time - last_progress_time >= PROGRESS_INTERVAL:
|
||
elapsed_time = current_time - self.start_time
|
||
speed = self.bytes_received / elapsed_time / 1024 if elapsed_time > 0 else 0
|
||
self.logger.info(f"接收进度: {self.bytes_received} 字节, "
|
||
f"速度: {speed:.1f} KB/s, "
|
||
f"缓冲区使用: {len(self.buffer)}/{self.buffer_size}, "
|
||
f"保存缓冲区使用: {len(self.current_save_buffer)}/{self.save_buffer_size}, "
|
||
f"找到帧数: {self.frame_queue.qsize()}")
|
||
last_progress_time = current_time
|
||
else:
|
||
time.sleep(0.001)
|
||
|
||
except Exception as e:
|
||
self.error_count += 1
|
||
self.logger.error(f"读取数据错误: {e}, 错误次数: {self.error_count}")
|
||
|
||
# 如果错误次数过多,尝试重新连接
|
||
if self.error_count >= self.max_errors:
|
||
self.logger.error("错误次数过多,尝试重新连接串口")
|
||
self.disconnect()
|
||
if not self.connect():
|
||
self.logger.error("重新连接失败,停止记录")
|
||
self.is_running = False
|
||
break
|
||
self.error_count = 0
|
||
|
||
time.sleep(0.1)
|
||
|
||
def get_statistics(self) -> dict:
|
||
"""获取统计信息"""
|
||
current_time = time.time()
|
||
running_time = current_time - self.start_time if self.start_time else 0
|
||
|
||
stats = {
|
||
"bytes_received": self.bytes_received,
|
||
"buffer_usage": len(self.buffer),
|
||
"buffer_size": self.buffer_size,
|
||
"last_data_time": self.last_data_time,
|
||
"time_since_last_data": time.time() - self.last_data_time if self.last_data_time else None,
|
||
"frames_found": self.frame_queue.qsize(),
|
||
"save_buffer_usage": len(self.current_save_buffer),
|
||
"save_buffer_size": self.save_buffer_size,
|
||
"receive_rate": f"{self.bytes_received / running_time:.1f}" if running_time > 0 else "N/A"
|
||
}
|
||
|
||
# 添加帧处理器的统计信息
|
||
if hasattr(self, 'buffer_processor'):
|
||
processor_stats = self.buffer_processor.get_stats()
|
||
stats.update(processor_stats)
|
||
|
||
return stats
|
||
|
||
def get_data(self, size: int = None) -> bytes:
|
||
"""从缓冲区获取数据
|
||
Args:
|
||
size: 要获取的数据大小,如果为None则获取所有数据
|
||
Returns:
|
||
bytes: 获取的数据
|
||
"""
|
||
with self.buffer_lock: # 使用锁保护缓冲区操作
|
||
if size is None:
|
||
data = bytes(self.buffer)
|
||
self.buffer.clear()
|
||
return data
|
||
else:
|
||
data = bytes(list(self.buffer)[:size])
|
||
for _ in range(min(size, len(self.buffer))):
|
||
self.buffer.popleft()
|
||
return data
|
||
|
||
if __name__ == "__main__":
|
||
# 使用示例
|
||
recorder = SerialRecorder(port="COM51", baudrate=4000000)
|
||
|
||
try:
|
||
if recorder.start():
|
||
print("开始记录数据...")
|
||
|
||
last_stats_time = time.time()
|
||
STATS_INTERVAL = 1.0 # 统计信息显示间隔
|
||
|
||
while True:
|
||
current_time = time.time()
|
||
|
||
# 每秒显示一次统计信息
|
||
if current_time - last_stats_time >= STATS_INTERVAL:
|
||
stats = recorder.get_statistics()
|
||
print(f"\n当前状态:")
|
||
print(f"已接收字节: {stats['bytes_received']:,}")
|
||
print(f"主缓冲区: {stats['buffer_usage']:,}/{recorder.buffer_size:,} 字节")
|
||
print(f"接收速率: {stats['receive_rate']} 字节/秒")
|
||
if stats['time_since_last_data'] is not None:
|
||
print(f"距离上次数据: {stats['time_since_last_data']:.1f}秒")
|
||
else:
|
||
print("等待数据...")
|
||
|
||
last_stats_time = current_time
|
||
|
||
time.sleep(0.1) # 降低主循环频率
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n停止记录...")
|
||
finally:
|
||
recorder.stop() |