import serial import time import threading from queue import Queue from collections import deque from typing import Optional import logging import os from datetime import datetime from buffer_process import BufferProcessor class SerialRecorder: """串口数据记录器 - 记录原始数据并进行帧处理""" def __init__(self, port: str, baudrate: int, buffer_size: int = 4*1024*1024): """ 初始化串口记录器 Args: port: 串口名称 baudrate: 波特率 buffer_size: 缓冲区大小(字节) """ self.port = port self.baudrate = baudrate self.buffer_size = buffer_size # 初始化主缓冲区 self.buffer = deque(maxlen=buffer_size) # 主接收缓冲区 self.buffer_lock = threading.Lock() # 主缓冲区锁 # 串口和线程相关 self.serial: Optional[serial.Serial] = None self.is_running = False self.read_thread: Optional[threading.Thread] = None self.save_thread: Optional[threading.Thread] = None self.bytes_received = 0 self.start_time = None self.last_data_time = None self.error_count = 0 self.max_errors = 5 # Ping-pong buffer 设置 self.save_buffer_size = 4 * 1024 * 1024 # 4MB self.save_buffer_a = deque(maxlen=self.save_buffer_size) self.save_buffer_b = deque(maxlen=self.save_buffer_size) self.current_save_buffer = self.save_buffer_a self.save_buffer_full = False self.save_buffer_event = threading.Event() self.save_buffer_lock = threading.Lock() # 保存缓冲区锁 # 帧处理器设置 self.frame_queue = Queue() self.buffer_processor = BufferProcessor( buffer=self.buffer, buffer_lock=self.buffer_lock, frame_queue=self.frame_queue, process_interval=0.001 # 1ms处理间隔 ) # 配置日志 log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) log_file = os.path.join(log_dir, f"recorder_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) self.logger.info(f"初始化完成,主缓冲区大小: {self.buffer_size} 字节") def connect(self) -> bool: """连接串口""" try: self.serial = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=0.001, # 减小超时时间 write_timeout=1, # 添加写入超时 bytesize=serial.EIGHTBITS, # 8位数据位 parity=serial.PARITY_NONE, # 无校验 stopbits=serial.STOPBITS_ONE, # 1位停止位 rtscts=True # 启用硬件流控 ) # 设置接收缓冲区大小 self.serial.set_buffer_size(rx_size=1024*1024) # 1MB接收缓冲区 self.logger.info(f"成功连接到串口 {self.port}") return True except Exception as e: self.logger.error(f"连接串口失败: {e}") return False def disconnect(self): """断开串口连接""" if self.serial and self.serial.is_open: self.serial.close() self.logger.info("已断开串口连接") def start(self) -> bool: """启动数据记录""" if not self.serial or not self.serial.is_open: if not self.connect(): return False self.start_time = time.time() self.last_data_time = time.time() self.bytes_received = 0 self.buffer.clear() self.save_buffer_a.clear() self.save_buffer_b.clear() self.current_save_buffer = self.save_buffer_a self.save_buffer_full = False self.save_buffer_event.clear() self.is_running = True self.read_thread = threading.Thread(target=self._read_loop) self.save_thread = threading.Thread(target=self._save_loop) self.read_thread.daemon = True self.save_thread.daemon = True self.read_thread.start() self.save_thread.start() # 启动帧处理器 self.buffer_processor.start() self.logger.info("开始记录数据") return True def stop(self): """停止数据记录""" self.is_running = False self.save_buffer_event.set() # 触发保存线程处理剩余数据 # 停止帧处理器 self.buffer_processor.stop() if self.save_thread: self.save_thread.join(timeout=1.0) if self.read_thread: self.read_thread.join(timeout=1.0) self.disconnect() # 记录统计信息 if self.start_time is not None: total_time = time.time() - self.start_time speed = self.bytes_received / total_time / 1024 if total_time > 0 else 0 self.logger.info(f"数据记录完成") self.logger.info(f"总时间: {total_time:.2f} 秒") self.logger.info(f"平均速度: {speed:.1f} KB/s") self.logger.info(f"接收字节数: {self.bytes_received}") self.logger.info(f"缓冲区使用: {len(self.buffer)}/{self.buffer_size}") self.logger.info(f"找到帧数: {self.frame_queue.qsize()}") else: self.logger.info("数据记录未正常启动") def _save_loop(self): """数据保存循环""" while self.is_running or self.save_buffer_event.is_set(): try: # 等待缓冲区满或程序结束 if not self.save_buffer_full and self.is_running: time.sleep(0.1) continue with self.save_buffer_lock: # 确定要保存的缓冲区 buffer_to_save = self.save_buffer_a if self.current_save_buffer == self.save_buffer_b else self.save_buffer_b if len(buffer_to_save) > 0: # 创建保存目录 records_dir = "records" if not os.path.exists(records_dir): os.makedirs(records_dir) # 生成文件名 filename = f"received_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.bin" filepath = os.path.join(records_dir, filename) # 保存数据 with open(filepath, 'wb') as f: data = bytes(buffer_to_save) f.write(data) self.logger.info(f"数据已保存到文件: {filepath}") self.logger.info(f"保存数据大小: {len(data)} 字节") # 清空已保存的缓冲区 buffer_to_save.clear() self.logger.info("已清空保存缓冲区") # 重置状态 self.save_buffer_full = False self.save_buffer_event.clear() except Exception as e: self.logger.error(f"保存数据错误: {e}") time.sleep(0.1) def _read_loop(self): """数据读取循环""" CHUNK_SIZE = 4096 last_progress_time = time.time() PROGRESS_INTERVAL = 1.0 self.error_count = 0 while self.is_running: try: if self.serial.in_waiting: # 读取数据 chunk = self.serial.read(min(CHUNK_SIZE, self.serial.in_waiting)) chunk_size = len(chunk) if chunk_size > 0: self.bytes_received += chunk_size self.last_data_time = time.time() self.error_count = 0 # 更新主缓冲区 with self.buffer_lock: # 如果主缓冲区接近满,清空它 if len(self.buffer) > self.buffer_size * 0.9: self.logger.warning("主缓冲区接近满,执行清空操作") self.buffer.clear() self.buffer.extend(chunk) # 更新保存缓冲区 with self.save_buffer_lock: # 检查当前缓冲区是否已满 if len(self.current_save_buffer) >= self.save_buffer_size: # 等待保存线程处理完上一个缓冲区 while self.save_buffer_full and self.is_running: time.sleep(0.01) if not self.is_running: break # 切换到另一个缓冲区 self.current_save_buffer = self.save_buffer_b if self.current_save_buffer == self.save_buffer_a else self.save_buffer_a self.save_buffer_full = True self.save_buffer_event.set() self.logger.info(f"切换到新的保存缓冲区,当前缓冲区大小: {len(self.current_save_buffer)}") self.current_save_buffer.extend(chunk) # 降低进度报告频率 current_time = time.time() if current_time - last_progress_time >= PROGRESS_INTERVAL: elapsed_time = current_time - self.start_time speed = self.bytes_received / elapsed_time / 1024 if elapsed_time > 0 else 0 self.logger.info(f"接收进度: {self.bytes_received} 字节, " f"速度: {speed:.1f} KB/s, " f"缓冲区使用: {len(self.buffer)}/{self.buffer_size}, " f"保存缓冲区使用: {len(self.current_save_buffer)}/{self.save_buffer_size}, " f"找到帧数: {self.frame_queue.qsize()}") last_progress_time = current_time else: time.sleep(0.001) except Exception as e: self.error_count += 1 self.logger.error(f"读取数据错误: {e}, 错误次数: {self.error_count}") # 如果错误次数过多,尝试重新连接 if self.error_count >= self.max_errors: self.logger.error("错误次数过多,尝试重新连接串口") self.disconnect() if not self.connect(): self.logger.error("重新连接失败,停止记录") self.is_running = False break self.error_count = 0 time.sleep(0.1) def get_statistics(self) -> dict: """获取统计信息""" current_time = time.time() running_time = current_time - self.start_time if self.start_time else 0 stats = { "bytes_received": self.bytes_received, "buffer_usage": len(self.buffer), "buffer_size": self.buffer_size, "last_data_time": self.last_data_time, "time_since_last_data": time.time() - self.last_data_time if self.last_data_time else None, "frames_found": self.frame_queue.qsize(), "save_buffer_usage": len(self.current_save_buffer), "save_buffer_size": self.save_buffer_size, "receive_rate": f"{self.bytes_received / running_time:.1f}" if running_time > 0 else "N/A" } # 添加帧处理器的统计信息 if hasattr(self, 'buffer_processor'): processor_stats = self.buffer_processor.get_stats() stats.update(processor_stats) return stats def get_data(self, size: int = None) -> bytes: """从缓冲区获取数据 Args: size: 要获取的数据大小,如果为None则获取所有数据 Returns: bytes: 获取的数据 """ with self.buffer_lock: # 使用锁保护缓冲区操作 if size is None: data = bytes(self.buffer) self.buffer.clear() return data else: data = bytes(list(self.buffer)[:size]) for _ in range(min(size, len(self.buffer))): self.buffer.popleft() return data if __name__ == "__main__": # 使用示例 recorder = SerialRecorder(port="COM51", baudrate=4000000) try: if recorder.start(): print("开始记录数据...") last_stats_time = time.time() STATS_INTERVAL = 1.0 # 统计信息显示间隔 while True: current_time = time.time() # 每秒显示一次统计信息 if current_time - last_stats_time >= STATS_INTERVAL: stats = recorder.get_statistics() print(f"\n当前状态:") print(f"已接收字节: {stats['bytes_received']:,}") print(f"主缓冲区: {stats['buffer_usage']:,}/{recorder.buffer_size:,} 字节") print(f"接收速率: {stats['receive_rate']} 字节/秒") if stats['time_since_last_data'] is not None: print(f"距离上次数据: {stats['time_since_last_data']:.1f}秒") else: print("等待数据...") last_stats_time = current_time time.sleep(0.1) # 降低主循环频率 except KeyboardInterrupt: print("\n停止记录...") finally: recorder.stop()