brisonus_data_collector/main2.py

561 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import serial.tools.list_ports
from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QComboBox, QPushButton, QLabel,
QLineEdit, QMessageBox)
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QIntValidator
from serial_recorder import SerialRecorder
import os
# 定义命令常量
CMD_ON = b'\x01\x00\x01\x02'
CMD_OFF = b'\x01\x00\x00\x01'
# CMD_DATA_ON = b'\xAA\xAA\xAA\xAA\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\xAD'
# CMD_DATA_OFF = b'\xAA\xAA\xAA\xAA\x01\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\xAE'
# 智己项目 - 开关 9字节
CMD_DATA_ON = b'\x0A\x01\x77\x3B\x00\x00\x80\x3F\x7C'
CMD_DATA_OFF = b'\x0A\x01\x77\x3B\x00\x00\x00\x00\xBD'
class SerialUI(QMainWindow):
def __init__(self):
super().__init__()
# self.setWindowTitle("Signal Collector V1.1.1 BUILD202503261405")
self.resize(600, 400)
# 设置无边框窗口
self.setWindowFlags(Qt.FramelessWindowHint)
# 设置应用样式
self.setStyleSheet("""
QMainWindow {
background-color: #2B2B2B;
border: none;
}
QWidget {
background-color: #2B2B2B;
color: #FFFFFF;
}
QLabel {
color: #CCCCCC;
font-size: 14px;
}
QComboBox {
background-color: #3C3F41;
border: 1px solid #646464;
border-radius: 3px;
color: #FFFFFF;
padding: 5px;
min-width: 100px;
}
QComboBox:hover {
border: 1px solid #FF8C00;
}
QComboBox::drop-down {
border: none;
padding-right: 8px;
}
QPushButton {
background-color: #3C3F41;
border: 1px solid #646464;
border-radius: 3px;
color: #FFFFFF;
padding: 5px 15px;
min-height: 25px;
}
QPushButton:hover {
background-color: #434749;
border: 1px solid #FF8C00;
}
QPushButton:pressed {
background-color: #333637;
}
QPushButton:checked {
background-color: #FF8C00;
border: 1px solid #FF8C00;
}
QLineEdit {
background-color: #3C3F41;
border: 1px solid #646464;
border-radius: 3px;
color: #FFFFFF;
padding: 5px;
min-width: 100px;
}
QLineEdit:focus {
border: 1px solid #FF8C00;
}
QLineEdit:disabled {
background-color: #2B2B2B;
color: #808080;
}
QStatusBar {
background-color: #3C3F41;
color: #CCCCCC;
}
QMenuBar {
background-color: #2B2B2B;
color: #FFFFFF;
}
QMenuBar::item {
background-color: #2B2B2B;
color: #FFFFFF;
}
QMenuBar::item:selected {
background-color: #3C3F41;
}
QMenu {
background-color: #2B2B2B;
color: #FFFFFF;
}
QMenu::item:selected {
background-color: #3C3F41;
}
#record_btn {
background-color: #FF8C00;
border: none;
font-weight: bold;
}
#record_btn:hover {
background-color: #FFA500;
}
#record_btn:checked {
background-color: #CD3333;
}
#titleBar {
background-color: #2B2B2B;
border-bottom: 1px solid #3C3F41;
min-height: 30px;
}
#titleLabel {
color: #FFFFFF;
font-size: 14px;
padding-left: 10px;
}
#closeButton {
background-color: transparent;
border: none;
color: #FFFFFF;
padding: 5px 10px;
}
#closeButton:hover {
background-color: #CD3333;
}
""")
# 创建中心部件和布局
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
layout.setContentsMargins(0, 0, 0, 0) # 移除边距
# 创建自定义标题栏
title_bar = QWidget()
title_bar.setObjectName("titleBar")
title_bar_layout = QHBoxLayout(title_bar)
title_bar_layout.setContentsMargins(0, 0, 0, 0)
# 标题标签
title_label = QLabel("Signal Collector V1.1.2 BUILD202511141419")
title_label.setObjectName("titleLabel")
# 关闭按钮
close_button = QPushButton("×")
close_button.setObjectName("closeButton")
close_button.setFixedSize(30, 30)
close_button.clicked.connect(self.close)
title_bar_layout.addWidget(title_label)
title_bar_layout.addStretch()
title_bar_layout.addWidget(close_button)
# 添加标题栏到主布局
layout.addWidget(title_bar)
# 创建内容区域
content_widget = QWidget()
content_layout = QVBoxLayout(content_widget)
content_layout.setContentsMargins(20, 20, 20, 20)
# 串口选择区域
port_layout = QHBoxLayout()
port_label = QLabel("串口:")
self.port_combo = QComboBox()
self.refresh_btn = QPushButton("刷新")
self.refresh_btn.clicked.connect(self.refresh_ports)
port_layout.addWidget(port_label)
port_layout.addWidget(self.port_combo)
port_layout.addWidget(self.refresh_btn)
content_layout.addLayout(port_layout)
# 波特率选择区域
baud_layout = QHBoxLayout()
baud_label = QLabel("波特率:")
self.baud_input = QLineEdit()
self.baud_input.setText("4000000")
self.baud_input.setValidator(QIntValidator(1200, 4000000))
self.baud_input.setPlaceholderText("请输入波特率")
self.connect_btn = QPushButton("连接端口")
self.connect_btn.setCheckable(True)
self.connect_btn.clicked.connect(self.toggle_connection)
baud_layout.addWidget(baud_label)
baud_layout.addWidget(self.baud_input)
baud_layout.addWidget(self.connect_btn)
content_layout.addLayout(baud_layout)
# 通道数设置区域
channel_layout = QHBoxLayout()
channel_label = QLabel("通道数:")
self.channel_input = QLineEdit()
self.channel_input.setText("8")
self.channel_input.setValidator(QIntValidator(1, 32))
self.channel_input.setPlaceholderText("请输入通道数(1-32)")
self.apply_channel_btn = QPushButton("应用设置")
self.apply_channel_btn.clicked.connect(self.apply_channel_settings)
channel_layout.addWidget(channel_label)
channel_layout.addWidget(self.channel_input)
channel_layout.addWidget(self.apply_channel_btn)
content_layout.addLayout(channel_layout)
# 数据开关命令区域
data_cmd_layout = QHBoxLayout()
data_cmd_label = QLabel("数据开关:")
self.data_cmd_btn_on = QPushButton("ON")
self.data_cmd_btn_on.clicked.connect(self.on_data_cmd_btn_on)
self.data_cmd_btn_off = QPushButton("OFF")
self.data_cmd_btn_off.clicked.connect(self.on_data_cmd_btn_off)
data_cmd_layout.addWidget(data_cmd_label)
data_cmd_layout.addWidget(self.data_cmd_btn_on)
data_cmd_layout.addWidget(self.data_cmd_btn_off)
content_layout.addLayout(data_cmd_layout)
# 开关按钮区域
cmd_layout = QHBoxLayout()
cmd_label = QLabel("开关命令:")
self.cmd_btn_on = QPushButton("ON")
self.cmd_btn_on.clicked.connect(self.on_cmd_btn_on)
self.cmd_btn_off = QPushButton("OFF")
self.cmd_btn_off.clicked.connect(self.on_cmd_btn_off)
cmd_layout.addWidget(cmd_label)
cmd_layout.addWidget(self.cmd_btn_on)
cmd_layout.addWidget(self.cmd_btn_off)
content_layout.addLayout(cmd_layout)
# 录制控制区域
record_layout = QHBoxLayout()
self.record_btn = QPushButton("开始录制")
self.record_btn.setCheckable(True)
self.record_btn.clicked.connect(self.toggle_recording)
record_layout.addWidget(self.record_btn)
content_layout.addLayout(record_layout)
# 标签输入区域
tag_layout = QHBoxLayout()
self.tag_input = QLineEdit()
self.tag_input.setPlaceholderText("输入标签内容")
self.add_label_btn = QPushButton("增加标签")
self.add_label_btn.clicked.connect(self.add_tag)
tag_layout.addWidget(self.tag_input)
tag_layout.addWidget(self.add_label_btn)
content_layout.addLayout(tag_layout)
# 数据统计显示区域
stats_layout = QHBoxLayout()
self.stats_label = QLabel("总帧数: 0 | 有效帧: 0 | 损坏帧: 0")
self.stats_label.setAlignment(Qt.AlignCenter)
stats_layout.addWidget(self.stats_label)
content_layout.addLayout(stats_layout)
# 添加内容区域到主布局
layout.addWidget(content_widget)
# 添加状态栏
self.statusBar().showMessage("就绪")
# 初始化串口列表
self.refresh_ports()
# 初始化录制状态
self.is_recording = False
self.serial_recorder = None
# 设置对象名称以便样式表识别
self.record_btn.setObjectName("record_btn")
# 增加组件间距
content_layout.setSpacing(10)
# 为所有水平布局添加间距
for layout_item in [port_layout, baud_layout, channel_layout, data_cmd_layout, cmd_layout, record_layout, tag_layout]:
layout_item.setSpacing(10)
# 设置窗口最小尺寸
self.setMinimumSize(450, 250)
# 初始化时禁用录制按钮和命令按钮
self.record_btn.setEnabled(False)
self.cmd_btn_on.setEnabled(False)
self.cmd_btn_off.setEnabled(False)
self.data_cmd_btn_on.setEnabled(False)
self.data_cmd_btn_off.setEnabled(False)
# 创建记录保存目录
self.records_save_path = './records'
if not os.path.exists(self.records_save_path):
os.makedirs(self.records_save_path)
# 创建定时器用于更新统计信息
self.stats_timer = QTimer()
self.stats_timer.timeout.connect(self.update_stats_display)
self.stats_timer.start(1000) # 每秒更新一次
# 添加退出时的确认标志
self.exit_confirmed = False
# 添加鼠标事件处理
self.title_bar = title_bar
self.title_bar.mousePressEvent = self.mousePressEvent
self.title_bar.mouseMoveEvent = self.mouseMoveEvent
self.title_bar.mouseReleaseEvent = self.mouseReleaseEvent
# 窗口拖动相关变量
self.dragging = False
self.drag_position = None
def mousePressEvent(self, event):
"""鼠标按下事件"""
if event.button() == Qt.LeftButton and self.title_bar.rect().contains(event.position().toPoint()):
self.dragging = True
self.drag_position = event.globalPosition().toPoint() - self.frameGeometry().topLeft()
event.accept()
def mouseMoveEvent(self, event):
"""鼠标移动事件"""
if self.dragging:
self.move(event.globalPosition().toPoint() - self.drag_position)
event.accept()
def mouseReleaseEvent(self, event):
"""鼠标释放事件"""
if event.button() == Qt.LeftButton:
self.dragging = False
event.accept()
def on_data_cmd_btn_on(self):
"""发送数据ON命令"""
if self.serial_recorder and self.serial_recorder.serial:
try:
self.serial_recorder.serial.write(CMD_DATA_ON)
self.statusBar().showMessage("已发送数据ON命令")
except serial.serialutil.SerialTimeoutException:
QMessageBox.warning(self, "警告", "发送数据ON命令超时请检查串口连接")
self.statusBar().showMessage("发送数据ON命令失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"发送数据ON命令失败: {str(e)}")
self.statusBar().showMessage("发送数据ON命令失败")
def on_data_cmd_btn_off(self):
"""发送数据OFF命令"""
if self.serial_recorder and self.serial_recorder.serial:
try:
self.serial_recorder.serial.write(CMD_DATA_OFF)
self.statusBar().showMessage("已发送数据OFF命令")
except serial.serialutil.SerialTimeoutException:
QMessageBox.warning(self, "警告", "发送数据OFF命令超时请检查串口连接")
self.statusBar().showMessage("发送数据OFF命令失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"发送数据OFF命令失败: {str(e)}")
self.statusBar().showMessage("发送数据OFF命令失败")
def on_cmd_btn_on(self):
"""发送ON命令"""
if self.serial_recorder and self.serial_recorder.serial:
try:
self.serial_recorder.serial.write(CMD_ON)
self.statusBar().showMessage("已发送ON命令")
except serial.serialutil.SerialTimeoutException:
QMessageBox.warning(self, "警告", "发送ON命令超时请检查串口连接")
self.statusBar().showMessage("发送ON命令失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"发送ON命令失败: {str(e)}")
self.statusBar().showMessage("发送ON命令失败")
def on_cmd_btn_off(self):
"""发送OFF命令"""
if self.serial_recorder and self.serial_recorder.serial:
try:
self.serial_recorder.serial.write(CMD_OFF)
self.statusBar().showMessage("已发送OFF命令")
except serial.serialutil.SerialTimeoutException:
QMessageBox.warning(self, "警告", "发送OFF命令超时请检查串口连接")
self.statusBar().showMessage("发送OFF命令失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"发送OFF命令失败: {str(e)}")
self.statusBar().showMessage("发送OFF命令失败")
def update_stats_display(self):
"""更新统计信息显示"""
if self.serial_recorder:
stats = self.serial_recorder.frame_processor.get_statistics()
self.stats_label.setText(
f"总帧数: {stats['total_frames']} | "
f"有效帧: {stats['valid_frames']} | "
f"损坏帧: {stats['corrupt_frames']}"
)
def refresh_ports(self):
"""刷新可用串口列表"""
self.port_combo.clear()
ports = serial.tools.list_ports.comports()
if ports:
for port in ports:
# 格式化串口信息:端口 - 设备名称 (设备ID)
port_info = f"{port.device} - {port.description}"
if port.serial_number:
port_info += f" ({port.serial_number})"
self.port_combo.addItem(port_info, port.device)
else:
self.port_combo.addItem("未找到可用串口")
def toggle_connection(self):
"""切换串口连接状态"""
if not self.connect_btn.isChecked():
# 断开连接
if self.serial_recorder:
self.serial_recorder.stop()
self.serial_recorder.disconnect()
self.statusBar().showMessage("已断开连接")
self.record_btn.setEnabled(False)
self.cmd_btn_on.setEnabled(False)
self.cmd_btn_off.setEnabled(False)
self.data_cmd_btn_on.setEnabled(False)
self.data_cmd_btn_off.setEnabled(False)
self.connect_btn.setText("连接") # 更新按钮文字为"连接"
else:
# 建立连接
port = self.port_combo.currentData()
if not port:
QMessageBox.warning(self, "警告", "请选择串口")
self.connect_btn.setChecked(False)
return
try:
baudrate = int(self.baud_input.text())
num_channels = int(self.channel_input.text().strip())
if not 1 <= num_channels <= 32:
raise ValueError("通道数必须在1-32之间")
except ValueError as e:
QMessageBox.warning(self, "警告", str(e))
self.connect_btn.setChecked(False)
return
# 创建串口记录器实例
self.serial_recorder = SerialRecorder(
output_dir=self.records_save_path,
num_channels=num_channels
)
# 设置串口参数
self.serial_recorder.set_port(port, baudrate)
# 测试连接
if self.serial_recorder.test_connection():
self.statusBar().showMessage("已连接")
self.record_btn.setEnabled(True)
self.cmd_btn_on.setEnabled(True)
self.cmd_btn_off.setEnabled(True)
self.data_cmd_btn_on.setEnabled(True)
self.data_cmd_btn_off.setEnabled(True)
self.connect_btn.setText("断开") # 更新按钮文字为"断开"
else:
self.connect_btn.setChecked(False)
QMessageBox.warning(self, "错误", "连接失败")
def toggle_recording(self):
"""切换录制状态"""
if not self.record_btn.isChecked():
# 停止录制
if self.serial_recorder:
self.serial_recorder.stop()
self.is_recording = False
self.record_btn.setText("开始录制")
self.statusBar().showMessage("录制已停止")
else:
# 开始录制
if self.serial_recorder:
if self.serial_recorder.start():
self.is_recording = True
self.record_btn.setText("停止录制")
self.statusBar().showMessage("正在录制")
else:
self.record_btn.setChecked(False)
QMessageBox.warning(self, "错误", "启动录制失败")
def add_tag(self):
"""添加标签"""
tag = self.tag_input.text().strip()
if tag:
# TODO: 实现标签添加逻辑
self.tag_input.clear()
self.statusBar().showMessage(f"已添加标签: {tag}")
def apply_channel_settings(self):
"""应用通道数设置"""
try:
num_channels = int(self.channel_input.text().strip())
if not 1 <= num_channels <= 32:
raise ValueError("通道数必须在1-32之间")
if self.serial_recorder:
self.serial_recorder.set_channel_nums(num_channels)
self.statusBar().showMessage(f"已设置通道数为: {num_channels}")
else:
self.statusBar().showMessage("请先连接串口")
except ValueError as e:
QMessageBox.warning(self, "警告", str(e))
self.statusBar().showMessage("设置通道数失败")
except Exception as e:
QMessageBox.warning(self, "错误", f"设置通道数失败: {str(e)}")
self.statusBar().showMessage("设置通道数失败")
def closeEvent(self, event):
"""关闭事件处理"""
if not self.exit_confirmed:
reply = QMessageBox.question(
self, '确认退出',
'确定要退出程序吗?',
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.Yes:
self.exit_confirmed = True
self.cleanup_resources()
event.accept()
else:
event.ignore()
else:
self.cleanup_resources()
event.accept()
def cleanup_resources(self):
"""清理资源"""
if self.serial_recorder:
if self.is_recording:
self.serial_recorder.stop() # 确保停止录制并保存数据
self.serial_recorder.disconnect()
self.stats_timer.stop()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = SerialUI()
window.show()
sys.exit(app.exec())