import sys import serial.tools.list_ports from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QComboBox, QPushButton, QLabel, QLineEdit, QMessageBox) from PySide6.QtCore import Qt from PySide6.QtGui import QIntValidator from data_collector import SerialDataLogger import os CMD_ON = b'\x01\x00\x01\x02' CMD_OFF = b'\x01\x00\x00\x01' class SerialUI(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Signal Collector V1.0.6 BUILD202503041320") self.resize(400, 300) # 设置应用样式 self.setStyleSheet(""" QMainWindow { background-color: #2B2B2B; } QWidget { background-color: #2B2B2B; color: #FFFFFF; } QLabel { color: #CCCCCC; font-size: 14px; } QComboBox { background-color: #3C3F41; border: 1px solid #646464; border-radius: 3px; color: #FFFFFF; padding: 5px; min-width: 100px; } QComboBox:hover { border: 1px solid #FF8C00; } QComboBox::drop-down { border: none; padding-right: 8px; } QPushButton { background-color: #3C3F41; border: 1px solid #646464; border-radius: 3px; color: #FFFFFF; padding: 5px 15px; min-height: 25px; } QPushButton:hover { background-color: #434749; border: 1px solid #FF8C00; } QPushButton:pressed { background-color: #333637; } QPushButton:checked { background-color: #FF8C00; border: 1px solid #FF8C00; } QLineEdit { background-color: #3C3F41; border: 1px solid #646464; border-radius: 3px; color: #FFFFFF; padding: 5px; min-width: 100px; } QLineEdit:focus { border: 1px solid #FF8C00; } QLineEdit:disabled { background-color: #2B2B2B; color: #808080; } QStatusBar { background-color: #3C3F41; color: #CCCCCC; } #record_btn { background-color: #FF8C00; border: none; font-weight: bold; } #record_btn:hover { background-color: #FFA500; } #record_btn:checked { background-color: #CD3333; } """) # 创建中心部件和布局 central_widget = QWidget() self.setCentralWidget(central_widget) layout = QVBoxLayout(central_widget) # 串口选择区域 port_layout = QHBoxLayout() port_label = QLabel("串口:") self.port_combo = QComboBox() self.refresh_btn = QPushButton("刷新") self.refresh_btn.clicked.connect(self.refresh_ports) port_layout.addWidget(port_label) port_layout.addWidget(self.port_combo) port_layout.addWidget(self.refresh_btn) layout.addLayout(port_layout) # 波特率选择区域 baud_layout = QHBoxLayout() baud_label = QLabel("波特率:") self.baud_input = QLineEdit() self.baud_input.setText("4000000") self.baud_input.setValidator(QIntValidator(1200, 4000000)) self.baud_input.setPlaceholderText("请输入波特率") self.connect_btn = QPushButton("连接端口") self.connect_btn.setCheckable(True) self.connect_btn.clicked.connect(self.toggle_connection) baud_layout.addWidget(baud_label) baud_layout.addWidget(self.baud_input) baud_layout.addWidget(self.connect_btn) layout.addLayout(baud_layout) # 开关按钮区域 cmd_layout = QHBoxLayout() cmd_label = QLabel("开关命令:") self.cmd_btn_on = QPushButton("ON") self.cmd_btn_on.clicked.connect(self.on_cmd_btn_on) self.cmd_btn_off = QPushButton("OFF") self.cmd_btn_off.clicked.connect(self.on_cmd_btn_off) cmd_layout.addWidget(baud_label) cmd_layout.addWidget(self.cmd_btn_on) cmd_layout.addWidget(self.cmd_btn_off) layout.addLayout(cmd_layout) # 录制控制区域 record_layout = QHBoxLayout() self.record_btn = QPushButton("开始录制") self.record_btn.setCheckable(True) self.record_btn.clicked.connect(self.toggle_recording) record_layout.addWidget(self.record_btn) layout.addLayout(record_layout) channel_layout = QHBoxLayout() self.channel_input = QLineEdit() self.channel_input.setText("0") # 设置默认值为0 self.channel_input.setPlaceholderText("输入标签内容") self.set_channel_btn = QPushButton("设置通道") self.set_channel_btn.clicked.connect(self.confirm_channel_setting) channel_layout.addWidget(self.channel_input) channel_layout.addWidget(self.set_channel_btn) layout.addLayout(channel_layout) # 标签输入区域 tag_layout = QHBoxLayout() self.tag_input = QLineEdit() self.tag_input.setText("0") # 设置默认值为0 self.tag_input.setPlaceholderText("输入标签内容") self.add_label_btn = QPushButton("增加标签") self.add_label_btn.clicked.connect(self.add_tag) tag_layout.addWidget(self.tag_input) tag_layout.addWidget(self.add_label_btn) layout.addLayout(tag_layout) # 数据计数显示区域 count_layout = QHBoxLayout() self.count_label = QLabel("当前记录数量:") self.count_label.setAlignment(Qt.AlignCenter) count_layout.addWidget(self.count_label) self.count_value_label = QLabel("0") self.count_value_label.setAlignment(Qt.AlignCenter) count_layout.addWidget(self.count_value_label) layout.addLayout(count_layout) # 添加状态栏 self.statusBar().showMessage("就绪") # 初始化串口列表 self.refresh_ports() # 初始化录制状态 self.is_recording = False self.serial_logger = None # 设置对象名称以便样式表识别 self.record_btn.setObjectName("record_btn") # 增加组件间距 layout.setSpacing(10) layout.setContentsMargins(20, 20, 20, 20) # 为所有水平布局添加间距 for layout_item in [port_layout, baud_layout, record_layout, tag_layout]: layout_item.setSpacing(10) # 设置窗口最小尺寸 self.setMinimumSize(450, 250) # 初始化时禁用录制按钮 self.record_btn.setEnabled(False) records_save_path = './records' if not os.path.exists(records_save_path): # 如果不存在,则创建文件夹 os.makedirs(records_save_path) self.serial_logger = SerialDataLogger() self.channel_input.setText(str(self.serial_logger.CH_NUM)) self.serial_logger.signal_some_frames_processed.connect(self.update_count_display) # 添加退出时的确认标志 self.exit_confirmed = False def on_cmd_btn_on(self): self.serial_logger.send_cmd(CMD_ON) def on_cmd_btn_off(self): self.serial_logger.send_cmd(CMD_OFF) def update_count_display(self): """更新数据计数显示""" if self.serial_logger: count = self.serial_logger.recorded_frames_count self.count_value_label.setText(str(count)) def get_recorder_tag(self): return self.label_input.text() def refresh_ports(self): """刷新可用串口列表""" self.port_combo.clear() ports = serial.tools.list_ports.comports() if ports: for port in ports: # 格式化串口信息:端口 - 设备名称 (设备ID) port_info = f"{port.device} - {port.description}" if port.serial_number: port_info += f" ({port.serial_number})" # 将完整信息显示在下拉框中,但在数据中保存实际的端口名 self.port_combo.addItem(port_info, port.device) else: self.statusBar().showMessage("未找到可用串口") def confirm_channel_setting(self): """确认通道数设置""" try: channels = int(self.channel_input.text()) if 1 <= channels <= 64: if self.serial_logger: self.serial_logger.set_channels(channels) self.statusBar().showMessage(f"已设置通道数: {channels}") # 禁用通道设置 self.channel_input.setEnabled(False) self.set_channel_btn.setEnabled(False) # 启用录制按钮 self.record_btn.setEnabled(True) else: QMessageBox.warning(self, "警告", "请输入1-64之间的通道数") except ValueError: QMessageBox.warning(self, "警告", "请输入有效的通道数") def toggle_connection(self): """切换端口连接状态""" print("toggle_connection") if self.exit_confirmed and self.connect_btn.isChecked(): self.connect_btn.setChecked(False) return if self.connect_btn.isChecked(): port = self.port_combo.currentData() try: baudrate = int(self.baud_input.text()) self.serial_logger.set_port(port, baudrate) if self.serial_logger.test_connection(): self.connect_btn.setText("断开连接") self.statusBar().showMessage("端口已连接") # 禁用串口和波特率选择 self.port_combo.setEnabled(False) self.baud_input.setEnabled(False) self.refresh_btn.setEnabled(False) # 启用录制按钮 self.record_btn.setEnabled(True) self.is_connected = True else: raise Exception("端口连接失败") except ValueError: QMessageBox.critical(self, "错误", "请输入有效的波特率") self.connect_btn.setChecked(False) except Exception as e: QMessageBox.critical(self, "错误", f"无法连接端口: {str(e)}") self.connect_btn.setChecked(False) else: if self.serial_logger: if self.serial_logger.disconnect(): self.connect_btn.setText("连接端口") self.statusBar().showMessage("端口已断开") # 启用串口和波特率选择 self.port_combo.setEnabled(True) self.baud_input.setEnabled(True) self.refresh_btn.setEnabled(True) # 禁用录制按钮 self.record_btn.setEnabled(False) self.is_connected = False else: QMessageBox.warning(self, "警告", "断开连接时发生错误") def toggle_recording(self): """切换录制状态""" if not self.is_connected: QMessageBox.warning(self, "警告", "请先连接端口") self.record_btn.setChecked(False) return # 如果正在退出,不允许开始新的录制 if self.exit_confirmed: self.record_btn.setChecked(False) return if self.record_btn.isChecked(): self.serial_logger.test_connection() if self.serial_logger.start(): count = self.serial_logger.recorded_frames_count self.count_value_label.setText(str(count)) self.record_btn.setText("停止录制") self.statusBar().showMessage("正在录制...") # 禁用连接按钮 self.connect_btn.setEnabled(False) else: QMessageBox.critical(self, "错误", "启动数据记录失败") self.record_btn.setChecked(False) else: if self.serial_logger.stop(): self.record_btn.setText("开始录制") self.statusBar().showMessage("录制已停止") self.record_btn.setEnabled(True) # self.record_btn.setChecked(True) # 启用连接按钮 self.connect_btn.setEnabled(True) print(self.serial_logger) else: QMessageBox.warning(self, "警告", "停止录制时发生错误") def add_tag(self): """添加标签""" label_text = self.label_input.text().strip() if not label_text: return if self.serial_logger and self.is_recording: # 这里可以添加向文件写入标签的逻辑 # 例如:self.serial_logger.add_label(label_text) self.serial_logger.RECORDER_TAG = int(self.tag_input.text()) self.statusBar().showMessage(f"已添加标签: {label_text}") self.label_input.clear() else: QMessageBox.warning(self, "警告", "请先开始录制") def closeEvent(self, event): """窗口关闭事件处理""" if not self.exit_confirmed: reply = QMessageBox.question( self, '确认退出', '确定要退出程序吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.Yes: # 停止更新计时器 if hasattr(self, 'update_timer'): self.update_timer.stop() # 如果正在录制,先停止录制 if self.record_btn.isChecked(): self.record_btn.setChecked(False) self.toggle_recording() # 如果已连接,断开连接 if self.connect_btn.isChecked(): self.connect_btn.setChecked(False) self.toggle_connection() self.exit_confirmed = True event.accept() else: event.ignore() else: event.accept() def cleanup_resources(self): """清理所有资源""" try: # 清理串口记录器 if self.serial_logger: try: # 直接设置停止标志 self.serial_logger.is_running = False # 关闭串口连接 if hasattr(self.serial_logger, 'ser') and self.serial_logger.ser: self.serial_logger.ser.close() # 快速清理处理队列 if hasattr(self.serial_logger, 'process_queue'): while not self.serial_logger.process_queue.empty(): try: self.serial_logger.process_queue.get_nowait() except: break self.serial_logger = None except Exception as e: print(f"清理串口记录器时出错: {e}") except Exception as e: print(f"清理资源时出错: {e}") if __name__ == '__main__': app = QApplication(sys.argv) window = SerialUI() window.show() sys.exit(app.exec())