440 lines
16 KiB
Python
440 lines
16 KiB
Python
import sys
|
||
import serial.tools.list_ports
|
||
from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
|
||
QHBoxLayout, QComboBox, QPushButton, QLabel,
|
||
QLineEdit, QMessageBox)
|
||
from PySide6.QtCore import Qt
|
||
from PySide6.QtGui import QIntValidator
|
||
from data_collector import SerialDataLogger
|
||
import os
|
||
|
||
|
||
CMD_ON = b'\x01\x00\x01\x02'
|
||
CMD_OFF = b'\x01\x00\x00\x01'
|
||
|
||
|
||
|
||
class SerialUI(QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("Signal Collector V1.0.6 BUILD202503041320")
|
||
self.resize(400, 300)
|
||
|
||
# 设置应用样式
|
||
self.setStyleSheet("""
|
||
QMainWindow {
|
||
background-color: #2B2B2B;
|
||
}
|
||
QWidget {
|
||
background-color: #2B2B2B;
|
||
color: #FFFFFF;
|
||
}
|
||
QLabel {
|
||
color: #CCCCCC;
|
||
font-size: 14px;
|
||
}
|
||
QComboBox {
|
||
background-color: #3C3F41;
|
||
border: 1px solid #646464;
|
||
border-radius: 3px;
|
||
color: #FFFFFF;
|
||
padding: 5px;
|
||
min-width: 100px;
|
||
}
|
||
QComboBox:hover {
|
||
border: 1px solid #FF8C00;
|
||
}
|
||
QComboBox::drop-down {
|
||
border: none;
|
||
padding-right: 8px;
|
||
}
|
||
QPushButton {
|
||
background-color: #3C3F41;
|
||
border: 1px solid #646464;
|
||
border-radius: 3px;
|
||
color: #FFFFFF;
|
||
padding: 5px 15px;
|
||
min-height: 25px;
|
||
}
|
||
QPushButton:hover {
|
||
background-color: #434749;
|
||
border: 1px solid #FF8C00;
|
||
}
|
||
QPushButton:pressed {
|
||
background-color: #333637;
|
||
}
|
||
QPushButton:checked {
|
||
background-color: #FF8C00;
|
||
border: 1px solid #FF8C00;
|
||
}
|
||
QLineEdit {
|
||
background-color: #3C3F41;
|
||
border: 1px solid #646464;
|
||
border-radius: 3px;
|
||
color: #FFFFFF;
|
||
padding: 5px;
|
||
min-width: 100px;
|
||
}
|
||
QLineEdit:focus {
|
||
border: 1px solid #FF8C00;
|
||
}
|
||
QLineEdit:disabled {
|
||
background-color: #2B2B2B;
|
||
color: #808080;
|
||
}
|
||
QStatusBar {
|
||
background-color: #3C3F41;
|
||
color: #CCCCCC;
|
||
}
|
||
#record_btn {
|
||
background-color: #FF8C00;
|
||
border: none;
|
||
font-weight: bold;
|
||
}
|
||
#record_btn:hover {
|
||
background-color: #FFA500;
|
||
}
|
||
#record_btn:checked {
|
||
background-color: #CD3333;
|
||
}
|
||
""")
|
||
|
||
# 创建中心部件和布局
|
||
central_widget = QWidget()
|
||
self.setCentralWidget(central_widget)
|
||
layout = QVBoxLayout(central_widget)
|
||
|
||
# 串口选择区域
|
||
port_layout = QHBoxLayout()
|
||
port_label = QLabel("串口:")
|
||
self.port_combo = QComboBox()
|
||
self.refresh_btn = QPushButton("刷新")
|
||
self.refresh_btn.clicked.connect(self.refresh_ports)
|
||
port_layout.addWidget(port_label)
|
||
port_layout.addWidget(self.port_combo)
|
||
port_layout.addWidget(self.refresh_btn)
|
||
layout.addLayout(port_layout)
|
||
|
||
# 波特率选择区域
|
||
baud_layout = QHBoxLayout()
|
||
baud_label = QLabel("波特率:")
|
||
self.baud_input = QLineEdit()
|
||
self.baud_input.setText("4000000")
|
||
self.baud_input.setValidator(QIntValidator(1200, 4000000))
|
||
self.baud_input.setPlaceholderText("请输入波特率")
|
||
self.connect_btn = QPushButton("连接端口")
|
||
self.connect_btn.setCheckable(True)
|
||
self.connect_btn.clicked.connect(self.toggle_connection)
|
||
baud_layout.addWidget(baud_label)
|
||
baud_layout.addWidget(self.baud_input)
|
||
baud_layout.addWidget(self.connect_btn)
|
||
layout.addLayout(baud_layout)
|
||
|
||
# 开关按钮区域
|
||
cmd_layout = QHBoxLayout()
|
||
cmd_label = QLabel("开关命令:")
|
||
self.cmd_btn_on = QPushButton("ON")
|
||
self.cmd_btn_on.clicked.connect(self.on_cmd_btn_on)
|
||
self.cmd_btn_off = QPushButton("OFF")
|
||
self.cmd_btn_off.clicked.connect(self.on_cmd_btn_off)
|
||
|
||
cmd_layout.addWidget(baud_label)
|
||
cmd_layout.addWidget(self.cmd_btn_on)
|
||
cmd_layout.addWidget(self.cmd_btn_off)
|
||
layout.addLayout(cmd_layout)
|
||
|
||
# 录制控制区域
|
||
record_layout = QHBoxLayout()
|
||
self.record_btn = QPushButton("开始录制")
|
||
self.record_btn.setCheckable(True)
|
||
self.record_btn.clicked.connect(self.toggle_recording)
|
||
record_layout.addWidget(self.record_btn)
|
||
layout.addLayout(record_layout)
|
||
|
||
channel_layout = QHBoxLayout()
|
||
self.channel_input = QLineEdit()
|
||
self.channel_input.setText("0") # 设置默认值为0
|
||
self.channel_input.setPlaceholderText("输入标签内容")
|
||
self.set_channel_btn = QPushButton("设置通道")
|
||
self.set_channel_btn.clicked.connect(self.confirm_channel_setting)
|
||
channel_layout.addWidget(self.channel_input)
|
||
channel_layout.addWidget(self.set_channel_btn)
|
||
layout.addLayout(channel_layout)
|
||
|
||
# 标签输入区域
|
||
tag_layout = QHBoxLayout()
|
||
self.tag_input = QLineEdit()
|
||
self.tag_input.setText("0") # 设置默认值为0
|
||
self.tag_input.setPlaceholderText("输入标签内容")
|
||
self.add_label_btn = QPushButton("增加标签")
|
||
self.add_label_btn.clicked.connect(self.add_tag)
|
||
tag_layout.addWidget(self.tag_input)
|
||
tag_layout.addWidget(self.add_label_btn)
|
||
layout.addLayout(tag_layout)
|
||
|
||
# 数据计数显示区域
|
||
count_layout = QHBoxLayout()
|
||
self.count_label = QLabel("当前记录数量:")
|
||
self.count_label.setAlignment(Qt.AlignCenter)
|
||
count_layout.addWidget(self.count_label)
|
||
|
||
self.count_value_label = QLabel("0")
|
||
self.count_value_label.setAlignment(Qt.AlignCenter)
|
||
count_layout.addWidget(self.count_value_label)
|
||
layout.addLayout(count_layout)
|
||
|
||
# 添加状态栏
|
||
self.statusBar().showMessage("就绪")
|
||
|
||
# 初始化串口列表
|
||
self.refresh_ports()
|
||
|
||
# 初始化录制状态
|
||
self.is_recording = False
|
||
self.serial_logger = None
|
||
|
||
# 设置对象名称以便样式表识别
|
||
self.record_btn.setObjectName("record_btn")
|
||
|
||
# 增加组件间距
|
||
layout.setSpacing(10)
|
||
layout.setContentsMargins(20, 20, 20, 20)
|
||
|
||
# 为所有水平布局添加间距
|
||
for layout_item in [port_layout, baud_layout, record_layout, tag_layout]:
|
||
layout_item.setSpacing(10)
|
||
|
||
# 设置窗口最小尺寸
|
||
self.setMinimumSize(450, 250)
|
||
|
||
# 初始化时禁用录制按钮
|
||
self.record_btn.setEnabled(False)
|
||
|
||
records_save_path = './records'
|
||
if not os.path.exists(records_save_path):
|
||
# 如果不存在,则创建文件夹
|
||
os.makedirs(records_save_path)
|
||
|
||
self.serial_logger = SerialDataLogger()
|
||
self.channel_input.setText(str(self.serial_logger.CH_NUM))
|
||
self.serial_logger.signal_some_frames_processed.connect(self.update_count_display)
|
||
|
||
# 添加退出时的确认标志
|
||
self.exit_confirmed = False
|
||
|
||
def on_cmd_btn_on(self):
|
||
self.serial_logger.send_cmd(CMD_ON)
|
||
|
||
|
||
def on_cmd_btn_off(self):
|
||
self.serial_logger.send_cmd(CMD_OFF)
|
||
|
||
def update_count_display(self):
|
||
"""更新数据计数显示"""
|
||
if self.serial_logger:
|
||
count = self.serial_logger.recorded_frames_count
|
||
self.count_value_label.setText(str(count))
|
||
|
||
def get_recorder_tag(self):
|
||
return self.label_input.text()
|
||
|
||
def refresh_ports(self):
|
||
"""刷新可用串口列表"""
|
||
self.port_combo.clear()
|
||
ports = serial.tools.list_ports.comports()
|
||
|
||
if ports:
|
||
for port in ports:
|
||
# 格式化串口信息:端口 - 设备名称 (设备ID)
|
||
port_info = f"{port.device} - {port.description}"
|
||
if port.serial_number:
|
||
port_info += f" ({port.serial_number})"
|
||
|
||
# 将完整信息显示在下拉框中,但在数据中保存实际的端口名
|
||
self.port_combo.addItem(port_info, port.device)
|
||
else:
|
||
self.statusBar().showMessage("未找到可用串口")
|
||
|
||
def confirm_channel_setting(self):
|
||
"""确认通道数设置"""
|
||
try:
|
||
channels = int(self.channel_input.text())
|
||
if 1 <= channels <= 64:
|
||
if self.serial_logger:
|
||
self.serial_logger.set_channels(channels)
|
||
self.statusBar().showMessage(f"已设置通道数: {channels}")
|
||
# 禁用通道设置
|
||
self.channel_input.setEnabled(False)
|
||
self.set_channel_btn.setEnabled(False)
|
||
# 启用录制按钮
|
||
self.record_btn.setEnabled(True)
|
||
else:
|
||
QMessageBox.warning(self, "警告", "请输入1-64之间的通道数")
|
||
except ValueError:
|
||
QMessageBox.warning(self, "警告", "请输入有效的通道数")
|
||
|
||
def toggle_connection(self):
|
||
"""切换端口连接状态"""
|
||
print("toggle_connection")
|
||
if self.exit_confirmed and self.connect_btn.isChecked():
|
||
self.connect_btn.setChecked(False)
|
||
return
|
||
|
||
if self.connect_btn.isChecked():
|
||
port = self.port_combo.currentData()
|
||
try:
|
||
baudrate = int(self.baud_input.text())
|
||
self.serial_logger.set_port(port, baudrate)
|
||
if self.serial_logger.test_connection():
|
||
self.connect_btn.setText("断开连接")
|
||
self.statusBar().showMessage("端口已连接")
|
||
# 禁用串口和波特率选择
|
||
self.port_combo.setEnabled(False)
|
||
self.baud_input.setEnabled(False)
|
||
self.refresh_btn.setEnabled(False)
|
||
# 启用录制按钮
|
||
self.record_btn.setEnabled(True)
|
||
self.is_connected = True
|
||
else:
|
||
raise Exception("端口连接失败")
|
||
except ValueError:
|
||
QMessageBox.critical(self, "错误", "请输入有效的波特率")
|
||
self.connect_btn.setChecked(False)
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "错误", f"无法连接端口: {str(e)}")
|
||
self.connect_btn.setChecked(False)
|
||
else:
|
||
if self.serial_logger:
|
||
if self.serial_logger.disconnect():
|
||
self.connect_btn.setText("连接端口")
|
||
self.statusBar().showMessage("端口已断开")
|
||
# 启用串口和波特率选择
|
||
self.port_combo.setEnabled(True)
|
||
self.baud_input.setEnabled(True)
|
||
self.refresh_btn.setEnabled(True)
|
||
# 禁用录制按钮
|
||
self.record_btn.setEnabled(False)
|
||
self.is_connected = False
|
||
|
||
else:
|
||
QMessageBox.warning(self, "警告", "断开连接时发生错误")
|
||
|
||
def toggle_recording(self):
|
||
"""切换录制状态"""
|
||
if not self.is_connected:
|
||
QMessageBox.warning(self, "警告", "请先连接端口")
|
||
self.record_btn.setChecked(False)
|
||
return
|
||
|
||
# 如果正在退出,不允许开始新的录制
|
||
if self.exit_confirmed:
|
||
self.record_btn.setChecked(False)
|
||
return
|
||
|
||
if self.record_btn.isChecked():
|
||
self.serial_logger.test_connection()
|
||
if self.serial_logger.start():
|
||
count = self.serial_logger.recorded_frames_count
|
||
self.count_value_label.setText(str(count))
|
||
self.record_btn.setText("停止录制")
|
||
self.statusBar().showMessage("正在录制...")
|
||
# 禁用连接按钮
|
||
self.connect_btn.setEnabled(False)
|
||
else:
|
||
QMessageBox.critical(self, "错误", "启动数据记录失败")
|
||
self.record_btn.setChecked(False)
|
||
else:
|
||
if self.serial_logger.stop():
|
||
self.record_btn.setText("开始录制")
|
||
self.statusBar().showMessage("录制已停止")
|
||
self.record_btn.setEnabled(True)
|
||
# self.record_btn.setChecked(True)
|
||
# 启用连接按钮
|
||
self.connect_btn.setEnabled(True)
|
||
print(self.serial_logger)
|
||
else:
|
||
QMessageBox.warning(self, "警告", "停止录制时发生错误")
|
||
|
||
def add_tag(self):
|
||
"""添加标签"""
|
||
label_text = self.label_input.text().strip()
|
||
if not label_text:
|
||
return
|
||
|
||
if self.serial_logger and self.is_recording:
|
||
# 这里可以添加向文件写入标签的逻辑
|
||
# 例如:self.serial_logger.add_label(label_text)
|
||
self.serial_logger.RECORDER_TAG = int(self.tag_input.text())
|
||
self.statusBar().showMessage(f"已添加标签: {label_text}")
|
||
self.label_input.clear()
|
||
else:
|
||
QMessageBox.warning(self, "警告", "请先开始录制")
|
||
|
||
def closeEvent(self, event):
|
||
"""窗口关闭事件处理"""
|
||
if not self.exit_confirmed:
|
||
reply = QMessageBox.question(
|
||
self,
|
||
'确认退出',
|
||
'确定要退出程序吗?',
|
||
QMessageBox.Yes | QMessageBox.No,
|
||
QMessageBox.No
|
||
)
|
||
|
||
if reply == QMessageBox.Yes:
|
||
# 停止更新计时器
|
||
if hasattr(self, 'update_timer'):
|
||
self.update_timer.stop()
|
||
|
||
# 如果正在录制,先停止录制
|
||
if self.record_btn.isChecked():
|
||
self.record_btn.setChecked(False)
|
||
self.toggle_recording()
|
||
|
||
# 如果已连接,断开连接
|
||
if self.connect_btn.isChecked():
|
||
self.connect_btn.setChecked(False)
|
||
self.toggle_connection()
|
||
|
||
self.exit_confirmed = True
|
||
event.accept()
|
||
else:
|
||
event.ignore()
|
||
else:
|
||
event.accept()
|
||
|
||
def cleanup_resources(self):
|
||
"""清理所有资源"""
|
||
try:
|
||
# 清理串口记录器
|
||
if self.serial_logger:
|
||
try:
|
||
# 直接设置停止标志
|
||
self.serial_logger.is_running = False
|
||
|
||
# 关闭串口连接
|
||
if hasattr(self.serial_logger, 'ser') and self.serial_logger.ser:
|
||
self.serial_logger.ser.close()
|
||
|
||
# 快速清理处理队列
|
||
if hasattr(self.serial_logger, 'process_queue'):
|
||
while not self.serial_logger.process_queue.empty():
|
||
try:
|
||
self.serial_logger.process_queue.get_nowait()
|
||
except:
|
||
break
|
||
|
||
self.serial_logger = None
|
||
|
||
except Exception as e:
|
||
print(f"清理串口记录器时出错: {e}")
|
||
|
||
except Exception as e:
|
||
print(f"清理资源时出错: {e}")
|
||
|
||
if __name__ == '__main__':
|
||
app = QApplication(sys.argv)
|
||
window = SerialUI()
|
||
window.show()
|
||
sys.exit(app.exec())
|