import serial import time from collections import deque import threading from queue import Queue, Empty from PySide6.QtCore import Signal, QObject class Frame: START = b'\xAA\xAA\xAA\xAA' CMD_POS = (4, 8) LEN_POS = (8, 12) ID_POS = (12, 16) # CHANNEL_DATA_POS = (16, 16 + 4*16) # CHKSUM_POS = (16 + 4*16, 16 + 4*16 + 1) def __init__(self, ch_num, frame_data): self.frame_data = frame_data self.ch_num = ch_num self.CHANNEL_DATA_POS = (16, 16 + 4 * ch_num) self.CHKSUM_POS = (16 + 4 * ch_num, 16 + 4 * ch_num + 1) self.extract_frame(frame_data) def get_chksum(self, data): chksum = 0 for byte in data: chksum += byte return chksum & 0xFF def extract_frame(self, frame_data): """提取数据帧""" self.index = frame_data[self.ID_POS[0]:self.ID_POS[1]] self.cmd = frame_data[self.CMD_POS[0]:self.CMD_POS[1]] self.checksum = frame_data[self.CHKSUM_POS[0]:self.CHKSUM_POS[1]] self.datas = [] for i in range(0, self.ch_num): channel_data_bytes = frame_data[self.CHANNEL_DATA_POS[0]+i*4:self.CHANNEL_DATA_POS[0] + (i + 1) * 4] self.datas.append(channel_data_bytes) def verify(self): """验证数据帧""" self.chksum = self.get_chksum(self.frame_data[0:self.CHANNEL_DATA_POS[1]]) if self.checksum[0] != self.chksum: return False else: return True class SerialDataLogger(QObject): signal_some_frames_processed = Signal(int) START_CMD = b'\xAA\xAA\xAA\xAA\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\xAD' END_CMD = b'\xAA\xAA\xAA\xAA\x01\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\xAE' CH_NUM = 24 def __init__(self): super().__init__() # 初始化参数 self.CHUNK_SIZE = 256 self.BUFFER_SIZE = 1024*50 # 约81KB的缓存区 self.buffer = deque(maxlen=self.BUFFER_SIZE) self.output_file = "serial_data.dat" self.process_queue = Queue() self.is_running = False self.collect_thread = None self.port = None self.baudrate = None self.ser = None self.data_remainder = b'' self.RECORDER_TAG = 0 self.recorded_frames_count = 0 # 创建处理线程 self.process_thread = threading.Thread(target=self.process_thread_func) self.process_thread.daemon = True self.process_thread.start() def set_channels(self, channel_num): """设置通道数 Args: channels (int): 通道数量 """ self.CH_NUM = channel_num def set_port(self, port, baudrate): self.port = port self.baudrate = baudrate def read_chunk(self): """读取一个数据块""" try: chunk = self.ser.read(self.CHUNK_SIZE) return chunk except Exception as e: print(f"读取数据出错: {e}") return None def process_thread_func(self): """数据处理线程的主函数""" while True: try: if not self.is_running and self.process_queue.empty(): continue # 从队列中获取数据,设置0.1秒超时 data = self.process_queue.get(timeout=0.1) # print('process_thread_func processing') self.process_buffer(data) self.signal_some_frames_processed.emit(self.recorded_frames_count) except Empty: continue except Exception as e: print(f"处理线程发生错误: {e}") def process_buffer(self, data): """处理缓存区数据,识别有效数据帧""" # 查找数据帧的起始标志(假设数据帧以0xAA 0x55开始) frame_start = b'\xAA\xAA\xAA\xAA' valid_frames = [] data = self.data_remainder + data # self.save_data(data) pos = 0 while pos < len(data): # 查找帧起始标志 start_pos = data.find(frame_start, pos) if start_pos == -1: break # 假设每个数据帧的长度是81字节(根据实际协议修改) FRAME_LENGTH = 17 + self.CH_NUM * 4 if start_pos + FRAME_LENGTH <= len(data): frame = data[start_pos:start_pos + FRAME_LENGTH] # 验证数据帧的有效性(根据实际协议添加校验) if Frame(self.CH_NUM, frame).verify(): valid_frames.append(frame) pos = start_pos + FRAME_LENGTH else: # 将剩余的数据保留 self.data_remainder = data[start_pos:] # print(self.data_remainder) break # 将有效数据帧写入文件 self.save_frames(valid_frames) def save_frames(self, frames): """保存有效数据帧到文件""" self.recorded_frames_count += len(frames) try: with open(self.output_file, 'a+') as f: # 写入数据段标签 # 获取时间戳 # 获取当前时间戳(秒,带微秒) timestamp = time.time() # 转换为微秒时间戳 microsecond_timestamp = int(timestamp * 1_000) _tag_string = f':{self.RECORDER_TAG:02X} {microsecond_timestamp:08X}\n' f.write(_tag_string) for frame in frames: _format_string = str(self.format_frame(Frame(self.CH_NUM, frame))) f.write(_format_string) f.write('\n') # 添加换行符分隔不同帧 except Exception as e: print(f"保存数据出错: {e}") def add_tag(self, tag, data): """添加标签""" # 时间戳 TAG标记 pass def format_frame(self, frame: Frame): """格式化数据帧""" """_summary_ 起始符号 INDEX 通道数量 通道1数据 通道2数据 通道3数据 ... """ _format_string = f':{frame.index.hex().upper()} {frame.ch_num:02X}' for data in frame.datas: _format_string += f' {data.hex().upper()}' return _format_string def save_data(self, data): """保存有效数据帧到文件""" try: with open(self.output_file, 'ab') as f: f.write(data) except Exception as e: print(f"保存数据出错: {e}") def send_cmd(self, cmd): """发送命令""" try: self.ser.write(cmd) except Exception as e: print(f"发送命令出错: {e}") def test_connection(self): """测试串口连接 Returns: bool: 连接是否成功 """ try: if self.ser and self.ser.is_open: self.ser.write(self.START_CMD) return True else: self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=1 ) self.ser.write(self.START_CMD) return True except Exception as e: print(f"连接测试失败: {e}") return False def disconnect(self): """断开串口连接 Returns: bool: 断开是否成功 """ try: if self.ser and self.ser.is_open: self.ser.close() return True except Exception as e: print(f"断开连接失败: {e}") return False def start(self): """启动数据采集""" try: if not self.ser or not self.ser.is_open: return False # 设置运行标志 self.is_running = True # 创建并启动采集线程 self.collect_thread = threading.Thread(target=self._run) # self.collect_thread.daemon = True self.collect_thread.start() print("recorder start") self.recorded_frames_count = 0 # 当前时间格式化为本地时间 formatted = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) self.output_file = f"./records/signal_record_{formatted}.dat" return True except Exception as e: print(f"启动失败: {e}") self.is_running = False return False def stop(self): """停止数据采集""" try: # 先设置停止标志 self.is_running = False # 等待采集线程结束 if self.collect_thread and self.collect_thread.is_alive(): self.collect_thread.join(timeout=0.1) # 确保串口仍然打开时才处理剩余数据 if self.ser and self.ser.is_open: # 处理剩余的数据 if len(self.buffer) > 0: self.process_queue.put(bytes(list(self.buffer))) self.buffer.clear() # 等待处理队列清空 while not self.process_queue.empty(): time.sleep(0.1) print('stop !') return True except Exception as e: print(f"停止失败: {e}") return False def _run(self): """内部运行方法,由start()调用""" print("开始接收数据...") try_counter = 0 try: while self.is_running and self.ser and self.ser.is_open: # 添加串口检查 chunk = self.read_chunk() if chunk: # 将数据块添加到缓存区 for byte in chunk: self.buffer.append(byte) # 如果缓存区满,将数据发送到处理队列 if len(self.buffer) >= self.BUFFER_SIZE: print("缓存区已满,加入处理队列...") self.process_queue.put(bytes(list(self.buffer))) self.buffer.clear() try_counter = 0 else: try_counter += 1 if try_counter > 2: if len(self.buffer) > 0: self.process_queue.put(bytes(list(self.buffer))) self.buffer.clear() try_counter = 0 except Exception as e: print(f"数据采集错误: {e}") finally: # 确保关闭串口 if self.ser and self.ser.is_open: try: self.ser.close() except: pass print("数据采集结束") def cleanup(self): """完全清理所有资源""" try: # 停止运行 self.is_running = False # 等待采集线程结束 if hasattr(self, 'collect_thread') and self.collect_thread: try: self.collect_thread.join(timeout=2.0) except: pass # 处理剩余数据 if hasattr(self, 'buffer') and len(self.buffer) > 0: try: self.process_queue.put(bytes(list(self.buffer))) self.buffer.clear() except: pass # 等待处理队列清空 if hasattr(self, 'process_queue'): timeout = time.time() + 2.0 while not self.process_queue.empty() and time.time() < timeout: try: self.process_queue.get_nowait() except: break # 关闭串口 if hasattr(self, 'ser') and self.ser: try: if self.ser.is_open: self.ser.close() except: pass # 等待处理线程结束 if hasattr(self, 'process_thread') and self.process_thread: try: self.process_thread.join(timeout=2.0) except: pass except Exception as e: print(f"清理资源时出错: {e}") if __name__ == "__main__": # 创建实例并运行 logger = SerialDataLogger(port='COM50', baudrate=4000000) logger.start()