brisonus_data_collector/main2.py

561 lines
21 KiB
Python
Raw Permalink Normal View History

2025-11-19 14:57:37 +08:00
import sys
import serial.tools.list_ports
from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QComboBox, QPushButton, QLabel,
QLineEdit, QMessageBox)
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QIntValidator
from serial_recorder import SerialRecorder
import os
# 定义命令常量
CMD_ON = b'\x01\x00\x01\x02'
CMD_OFF = b'\x01\x00\x00\x01'
# CMD_DATA_ON = b'\xAA\xAA\xAA\xAA\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\xAD'
# CMD_DATA_OFF = b'\xAA\xAA\xAA\xAA\x01\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\xAE'
# 智己项目 - 开关 9字节
CMD_DATA_ON = b'\x0A\x01\x77\x3B\x00\x00\x80\x3F\x7C'
CMD_DATA_OFF = b'\x0A\x01\x77\x3B\x00\x00\x00\x00\xBD'
class SerialUI(QMainWindow):
def __init__(self):
super().__init__()
# self.setWindowTitle("Signal Collector V1.1.1 BUILD202503261405")
self.resize(600, 400)
# 设置无边框窗口
self.setWindowFlags(Qt.FramelessWindowHint)
# 设置应用样式
self.setStyleSheet("""
QMainWindow {
background-color: #2B2B2B;
border: none;
}
QWidget {
background-color: #2B2B2B;
color: #FFFFFF;
}
QLabel {
color: #CCCCCC;
font-size: 14px;
}
QComboBox {
background-color: #3C3F41;
border: 1px solid #646464;
border-radius: 3px;
color: #FFFFFF;
padding: 5px;
min-width: 100px;
}
QComboBox:hover {
border: 1px solid #FF8C00;
}
QComboBox::drop-down {
border: none;
padding-right: 8px;
}
QPushButton {
background-color: #3C3F41;
border: 1px solid #646464;
border-radius: 3px;
color: #FFFFFF;
padding: 5px 15px;
min-height: 25px;
}
QPushButton:hover {
background-color: #434749;
border: 1px solid #FF8C00;
}
QPushButton:pressed {
background-color: #333637;
}
QPushButton:checked {
background-color: #FF8C00;
border: 1px solid #FF8C00;
}
QLineEdit {
background-color: #3C3F41;
border: 1px solid #646464;
border-radius: 3px;
color: #FFFFFF;
padding: 5px;
min-width: 100px;
}
QLineEdit:focus {
border: 1px solid #FF8C00;
}
QLineEdit:disabled {
background-color: #2B2B2B;
color: #808080;
}
QStatusBar {
background-color: #3C3F41;
color: #CCCCCC;
}
QMenuBar {
background-color: #2B2B2B;
color: #FFFFFF;
}
QMenuBar::item {
background-color: #2B2B2B;
color: #FFFFFF;
}
QMenuBar::item:selected {
background-color: #3C3F41;
}
QMenu {
background-color: #2B2B2B;
color: #FFFFFF;
}
QMenu::item:selected {
background-color: #3C3F41;
}
#record_btn {
background-color: #FF8C00;
border: none;
font-weight: bold;
}
#record_btn:hover {
background-color: #FFA500;
}
#record_btn:checked {
background-color: #CD3333;
}
#titleBar {
background-color: #2B2B2B;
border-bottom: 1px solid #3C3F41;
min-height: 30px;
}
#titleLabel {
color: #FFFFFF;
font-size: 14px;
padding-left: 10px;
}
#closeButton {
background-color: transparent;
border: none;
color: #FFFFFF;
padding: 5px 10px;
}
#closeButton:hover {
background-color: #CD3333;
}
""")
# 创建中心部件和布局
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
layout.setContentsMargins(0, 0, 0, 0) # 移除边距
# 创建自定义标题栏
title_bar = QWidget()
title_bar.setObjectName("titleBar")
title_bar_layout = QHBoxLayout(title_bar)
title_bar_layout.setContentsMargins(0, 0, 0, 0)
# 标题标签
title_label = QLabel("Signal Collector V1.1.2 BUILD202511141419")
title_label.setObjectName("titleLabel")
# 关闭按钮
close_button = QPushButton("×")
close_button.setObjectName("closeButton")
close_button.setFixedSize(30, 30)
close_button.clicked.connect(self.close)
title_bar_layout.addWidget(title_label)
title_bar_layout.addStretch()
title_bar_layout.addWidget(close_button)
# 添加标题栏到主布局
layout.addWidget(title_bar)
# 创建内容区域
content_widget = QWidget()
content_layout = QVBoxLayout(content_widget)
content_layout.setContentsMargins(20, 20, 20, 20)
# 串口选择区域
port_layout = QHBoxLayout()
port_label = QLabel("串口:")
self.port_combo = QComboBox()
self.refresh_btn = QPushButton("刷新")
self.refresh_btn.clicked.connect(self.refresh_ports)
port_layout.addWidget(port_label)
port_layout.addWidget(self.port_combo)
port_layout.addWidget(self.refresh_btn)
content_layout.addLayout(port_layout)
# 波特率选择区域
baud_layout = QHBoxLayout()
baud_label = QLabel("波特率:")
self.baud_input = QLineEdit()
self.baud_input.setText("4000000")
self.baud_input.setValidator(QIntValidator(1200, 4000000))
self.baud_input.setPlaceholderText("请输入波特率")
self.connect_btn = QPushButton("连接端口")
self.connect_btn.setCheckable(True)
self.connect_btn.clicked.connect(self.toggle_connection)
baud_layout.addWidget(baud_label)
baud_layout.addWidget(self.baud_input)
baud_layout.addWidget(self.connect_btn)
content_layout.addLayout(baud_layout)
# 通道数设置区域
channel_layout = QHBoxLayout()
channel_label = QLabel("通道数:")
self.channel_input = QLineEdit()
self.channel_input.setText("8")
self.channel_input.setValidator(QIntValidator(1, 32))
self.channel_input.setPlaceholderText("请输入通道数(1-32)")
self.apply_channel_btn = QPushButton("应用设置")
self.apply_channel_btn.clicked.connect(self.apply_channel_settings)
channel_layout.addWidget(channel_label)
channel_layout.addWidget(self.channel_input)
channel_layout.addWidget(self.apply_channel_btn)
content_layout.addLayout(channel_layout)
# 数据开关命令区域
data_cmd_layout = QHBoxLayout()
data_cmd_label = QLabel("数据开关:")
self.data_cmd_btn_on = QPushButton("ON")
self.data_cmd_btn_on.clicked.connect(self.on_data_cmd_btn_on)
self.data_cmd_btn_off = QPushButton("OFF")
self.data_cmd_btn_off.clicked.connect(self.on_data_cmd_btn_off)
data_cmd_layout.addWidget(data_cmd_label)
data_cmd_layout.addWidget(self.data_cmd_btn_on)
data_cmd_layout.addWidget(self.data_cmd_btn_off)
content_layout.addLayout(data_cmd_layout)
# 开关按钮区域
cmd_layout = QHBoxLayout()
cmd_label = QLabel("开关命令:")
self.cmd_btn_on = QPushButton("ON")
self.cmd_btn_on.clicked.connect(self.on_cmd_btn_on)
self.cmd_btn_off = QPushButton("OFF")
self.cmd_btn_off.clicked.connect(self.on_cmd_btn_off)
cmd_layout.addWidget(cmd_label)
cmd_layout.addWidget(self.cmd_btn_on)
cmd_layout.addWidget(self.cmd_btn_off)
content_layout.addLayout(cmd_layout)
# 录制控制区域
record_layout = QHBoxLayout()
self.record_btn = QPushButton("开始录制")
self.record_btn.setCheckable(True)
self.record_btn.clicked.connect(self.toggle_recording)
record_layout.addWidget(self.record_btn)
content_layout.addLayout(record_layout)
# 标签输入区域
tag_layout = QHBoxLayout()
self.tag_input = QLineEdit()
self.tag_input.setPlaceholderText("输入标签内容")
self.add_label_btn = QPushButton("增加标签")
self.add_label_btn.clicked.connect(self.add_tag)
tag_layout.addWidget(self.tag_input)
tag_layout.addWidget(self.add_label_btn)
content_layout.addLayout(tag_layout)
# 数据统计显示区域
stats_layout = QHBoxLayout()
self.stats_label = QLabel("总帧数: 0 | 有效帧: 0 | 损坏帧: 0")
self.stats_label.setAlignment(Qt.AlignCenter)
stats_layout.addWidget(self.stats_label)
content_layout.addLayout(stats_layout)
# 添加内容区域到主布局
layout.addWidget(content_widget)
# 添加状态栏
self.statusBar().showMessage("就绪")
# 初始化串口列表
self.refresh_ports()
# 初始化录制状态
self.is_recording = False
self.serial_recorder = None
# 设置对象名称以便样式表识别
self.record_btn.setObjectName("record_btn")
# 增加组件间距
content_layout.setSpacing(10)
# 为所有水平布局添加间距
for layout_item in [port_layout, baud_layout, channel_layout, data_cmd_layout, cmd_layout, record_layout, tag_layout]:
layout_item.setSpacing(10)
# 设置窗口最小尺寸
self.setMinimumSize(450, 250)
# 初始化时禁用录制按钮和命令按钮
self.record_btn.setEnabled(False)
self.cmd_btn_on.setEnabled(False)
self.cmd_btn_off.setEnabled(False)
self.data_cmd_btn_on.setEnabled(False)
self.data_cmd_btn_off.setEnabled(False)
# 创建记录保存目录
self.records_save_path = './records'
if not os.path.exists(self.records_save_path):
os.makedirs(self.records_save_path)
# 创建定时器用于更新统计信息
self.stats_timer = QTimer()
self.stats_timer.timeout.connect(self.update_stats_display)
self.stats_timer.start(1000) # 每秒更新一次
# 添加退出时的确认标志
self.exit_confirmed = False
# 添加鼠标事件处理
self.title_bar = title_bar
self.title_bar.mousePressEvent = self.mousePressEvent
self.title_bar.mouseMoveEvent = self.mouseMoveEvent
self.title_bar.mouseReleaseEvent = self.mouseReleaseEvent
# 窗口拖动相关变量
self.dragging = False
self.drag_position = None
def mousePressEvent(self, event):
"""鼠标按下事件"""
if event.button() == Qt.LeftButton and self.title_bar.rect().contains(event.position().toPoint()):
self.dragging = True
self.drag_position = event.globalPosition().toPoint() - self.frameGeometry().topLeft()
event.accept()
def mouseMoveEvent(self, event):
"""鼠标移动事件"""
if self.dragging:
self.move(event.globalPosition().toPoint() - self.drag_position)
event.accept()
def mouseReleaseEvent(self, event):
"""鼠标释放事件"""
if event.button() == Qt.LeftButton:
self.dragging = False
event.accept()
def on_data_cmd_btn_on(self):
"""发送数据ON命令"""
if self.serial_recorder and self.serial_recorder.serial:
try:
self.serial_recorder.serial.write(CMD_DATA_ON)
self.statusBar().showMessage("已发送数据ON命令")
except serial.serialutil.SerialTimeoutException:
QMessageBox.warning(self, "警告", "发送数据ON命令超时请检查串口连接")
self.statusBar().showMessage("发送数据ON命令失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"发送数据ON命令失败: {str(e)}")
self.statusBar().showMessage("发送数据ON命令失败")
def on_data_cmd_btn_off(self):
"""发送数据OFF命令"""
if self.serial_recorder and self.serial_recorder.serial:
try:
self.serial_recorder.serial.write(CMD_DATA_OFF)
self.statusBar().showMessage("已发送数据OFF命令")
except serial.serialutil.SerialTimeoutException:
QMessageBox.warning(self, "警告", "发送数据OFF命令超时请检查串口连接")
self.statusBar().showMessage("发送数据OFF命令失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"发送数据OFF命令失败: {str(e)}")
self.statusBar().showMessage("发送数据OFF命令失败")
def on_cmd_btn_on(self):
"""发送ON命令"""
if self.serial_recorder and self.serial_recorder.serial:
try:
self.serial_recorder.serial.write(CMD_ON)
self.statusBar().showMessage("已发送ON命令")
except serial.serialutil.SerialTimeoutException:
QMessageBox.warning(self, "警告", "发送ON命令超时请检查串口连接")
self.statusBar().showMessage("发送ON命令失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"发送ON命令失败: {str(e)}")
self.statusBar().showMessage("发送ON命令失败")
def on_cmd_btn_off(self):
"""发送OFF命令"""
if self.serial_recorder and self.serial_recorder.serial:
try:
self.serial_recorder.serial.write(CMD_OFF)
self.statusBar().showMessage("已发送OFF命令")
except serial.serialutil.SerialTimeoutException:
QMessageBox.warning(self, "警告", "发送OFF命令超时请检查串口连接")
self.statusBar().showMessage("发送OFF命令失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"发送OFF命令失败: {str(e)}")
self.statusBar().showMessage("发送OFF命令失败")
def update_stats_display(self):
"""更新统计信息显示"""
if self.serial_recorder:
stats = self.serial_recorder.frame_processor.get_statistics()
self.stats_label.setText(
f"总帧数: {stats['total_frames']} | "
f"有效帧: {stats['valid_frames']} | "
f"损坏帧: {stats['corrupt_frames']}"
)
def refresh_ports(self):
"""刷新可用串口列表"""
self.port_combo.clear()
ports = serial.tools.list_ports.comports()
if ports:
for port in ports:
# 格式化串口信息:端口 - 设备名称 (设备ID)
port_info = f"{port.device} - {port.description}"
if port.serial_number:
port_info += f" ({port.serial_number})"
self.port_combo.addItem(port_info, port.device)
else:
self.port_combo.addItem("未找到可用串口")
def toggle_connection(self):
"""切换串口连接状态"""
if not self.connect_btn.isChecked():
# 断开连接
if self.serial_recorder:
self.serial_recorder.stop()
self.serial_recorder.disconnect()
self.statusBar().showMessage("已断开连接")
self.record_btn.setEnabled(False)
self.cmd_btn_on.setEnabled(False)
self.cmd_btn_off.setEnabled(False)
self.data_cmd_btn_on.setEnabled(False)
self.data_cmd_btn_off.setEnabled(False)
self.connect_btn.setText("连接") # 更新按钮文字为"连接"
else:
# 建立连接
port = self.port_combo.currentData()
if not port:
QMessageBox.warning(self, "警告", "请选择串口")
self.connect_btn.setChecked(False)
return
try:
baudrate = int(self.baud_input.text())
num_channels = int(self.channel_input.text().strip())
if not 1 <= num_channels <= 32:
raise ValueError("通道数必须在1-32之间")
except ValueError as e:
QMessageBox.warning(self, "警告", str(e))
self.connect_btn.setChecked(False)
return
# 创建串口记录器实例
self.serial_recorder = SerialRecorder(
output_dir=self.records_save_path,
num_channels=num_channels
)
# 设置串口参数
self.serial_recorder.set_port(port, baudrate)
# 测试连接
if self.serial_recorder.test_connection():
self.statusBar().showMessage("已连接")
self.record_btn.setEnabled(True)
self.cmd_btn_on.setEnabled(True)
self.cmd_btn_off.setEnabled(True)
self.data_cmd_btn_on.setEnabled(True)
self.data_cmd_btn_off.setEnabled(True)
self.connect_btn.setText("断开") # 更新按钮文字为"断开"
else:
self.connect_btn.setChecked(False)
QMessageBox.warning(self, "错误", "连接失败")
def toggle_recording(self):
"""切换录制状态"""
if not self.record_btn.isChecked():
# 停止录制
if self.serial_recorder:
self.serial_recorder.stop()
self.is_recording = False
self.record_btn.setText("开始录制")
self.statusBar().showMessage("录制已停止")
else:
# 开始录制
if self.serial_recorder:
if self.serial_recorder.start():
self.is_recording = True
self.record_btn.setText("停止录制")
self.statusBar().showMessage("正在录制")
else:
self.record_btn.setChecked(False)
QMessageBox.warning(self, "错误", "启动录制失败")
def add_tag(self):
"""添加标签"""
tag = self.tag_input.text().strip()
if tag:
# TODO: 实现标签添加逻辑
self.tag_input.clear()
self.statusBar().showMessage(f"已添加标签: {tag}")
def apply_channel_settings(self):
"""应用通道数设置"""
try:
num_channels = int(self.channel_input.text().strip())
if not 1 <= num_channels <= 32:
raise ValueError("通道数必须在1-32之间")
if self.serial_recorder:
self.serial_recorder.set_channel_nums(num_channels)
self.statusBar().showMessage(f"已设置通道数为: {num_channels}")
else:
self.statusBar().showMessage("请先连接串口")
except ValueError as e:
QMessageBox.warning(self, "警告", str(e))
self.statusBar().showMessage("设置通道数失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"设置通道数失败: {str(e)}")
self.statusBar().showMessage("设置通道数失败")
def closeEvent(self, event):
"""关闭事件处理"""
if not self.exit_confirmed:
reply = QMessageBox.question(
self, '确认退出',
'确定要退出程序吗?',
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.Yes:
self.exit_confirmed = True
self.cleanup_resources()
event.accept()
else:
event.ignore()
else:
self.cleanup_resources()
event.accept()
def cleanup_resources(self):
"""清理资源"""
if self.serial_recorder:
if self.is_recording:
self.serial_recorder.stop() # 确保停止录制并保存数据
self.serial_recorder.disconnect()
self.stats_timer.stop()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = SerialUI()
window.show()
sys.exit(app.exec())