from PySide6.QtCore import QObject, Signal, Slot, QThread, QMutex, QWaitCondition from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo from br_com_message import BrComMessage import time import queue class SerialReaderThread(QThread): """串口数据读取线程""" def __init__(self, serial_port, buffer_queue): super().__init__() self.serial_port = serial_port self.buffer_queue = buffer_queue self.running = True self.buffer = bytearray() self.max_buffer_size = 50 * 1024 * 1024 # 50MB self.buffer_mutex = QMutex() def run(self): while self.running: if not self.serial_port.isOpen(): time.sleep(0.1) continue if self.serial_port.waitForReadyRead(100): # 等待100ms data = self.serial_port.readAll() if data: self.buffer_mutex.lock() try: # 检查缓冲区大小 if len(self.buffer) >= self.max_buffer_size: print("Warning: Buffer overflow, clearing buffer") self.buffer.clear() # 将数据添加到缓冲区 self.buffer.extend(data.data()) # 将完整的数据块放入队列 self.buffer_queue.put(self.buffer) # 清空缓冲区,准备接收下一块数据 self.buffer = bytearray() finally: self.buffer_mutex.unlock() else: # 如果没有数据,短暂休眠 time.sleep(0.01) def stop(self): self.running = False class MessageProcessorThread(QThread): """消息处理线程""" def __init__(self, buffer_queue, message_queue): super().__init__() self.buffer_queue = buffer_queue self.message_queue = message_queue self.running = True self.accumulated_data = bytearray() self.max_accumulated_size = 50 * 1024 * 1024 # 50MB self.data_mutex = QMutex() def run(self): while self.running: try: # 从缓冲区队列获取数据,设置超时 data = self.buffer_queue.get(timeout=0.1) if data: self.data_mutex.lock() try: # 检查累积数据大小 if len(self.accumulated_data) >= self.max_accumulated_size: print("Warning: Accumulated data overflow, clearing data") self.accumulated_data.clear() # 将新数据添加到累积数据中 self.accumulated_data.extend(data) # 处理累积的数据,查找完整的消息 while len(self.accumulated_data) > 0: # 查找消息头(假设消息头是0xAA) start_index = self.accumulated_data.find(0xAA) if start_index == -1: # 没有找到消息头,清空数据 self.accumulated_data.clear() break # 移除消息头之前的数据 self.accumulated_data = self.accumulated_data[start_index:] # 检查是否有足够的数据来获取消息长度 if len(self.accumulated_data) < 4: # 假设消息头+长度至少4字节 break # 获取消息长度(假设长度字段在消息头后的2个字节) message_length = (self.accumulated_data[1] << 8) | self.accumulated_data[2] # 检查消息长度是否合理 if message_length > self.max_accumulated_size: print(f"Warning: Invalid message length {message_length}, skipping") self.accumulated_data = self.accumulated_data[1:] # 跳过当前字节 continue # 检查是否有完整的消息 if len(self.accumulated_data) < message_length: break # 提取完整的消息 message_data = self.accumulated_data[:message_length] self.accumulated_data = self.accumulated_data[message_length:] # 将消息放入消息队列 self.message_queue.put(message_data) finally: self.data_mutex.unlock() except queue.Empty: # 队列超时,继续循环 continue def stop(self): self.running = False class BrComSerial(QObject): signal_connected = Signal() signal_disconnected = Signal() signal_error = Signal(str) signal_message = Signal(BrComMessage) def __init__(self, parent=None): super().__init__(parent) self.serial_port = QSerialPort() self.serial_port.errorOccurred.connect(self.on_error) # 创建数据缓冲队列和消息队列,设置最大大小 self.buffer_queue = queue.Queue(maxsize=1000) # 限制队列大小 self.message_queue = queue.Queue(maxsize=1000) # 限制队列大小 # 创建并启动数据读取线程 self.reader_thread = SerialReaderThread(self.serial_port, self.buffer_queue) self.reader_thread.start() # 创建并启动消息处理线程 self.processor_thread = MessageProcessorThread(self.buffer_queue, self.message_queue) self.processor_thread.start() # 启动消息处理定时器 self.timer = QTimer() self.timer.timeout.connect(self.process_messages) self.timer.start(10) # 每10ms检查一次消息队列 def process_messages(self): """处理消息队列中的消息""" while not self.message_queue.empty(): try: message_data = self.message_queue.get_nowait() # 处理消息(这里可以添加具体的消息处理逻辑) msg = BrComMessage.from_bytes(message_data) if msg: self.signal_message.emit(msg) except queue.Empty: break except Exception as e: print(f"Error processing message: {e}") def on_error(self, error): """处理串口错误""" if error == QSerialPort.NoError: return error_str = f"Serial port error: {error}" print(error_str) self.signal_error.emit(error_str) if self.serial_port.isOpen(): self.serial_port.close() self.signal_disconnected.emit() def close(self): """关闭串口和线程""" self.reader_thread.stop() self.processor_thread.stop() self.reader_thread.wait() self.processor_thread.wait() if self.serial_port.isOpen(): self.serial_port.close()