231 lines
8.5 KiB
Python
231 lines
8.5 KiB
Python
|
|
from PySide6.QtWidgets import (QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
|
||
|
|
QPushButton, QLabel, QComboBox, QLineEdit, QMessageBox)
|
||
|
|
from PySide6.QtCore import Qt, QTimer
|
||
|
|
from PySide6.QtGui import QIntValidator
|
||
|
|
from plot_widget import PlotWidget
|
||
|
|
from frame_processor import FrameProcessor
|
||
|
|
from serial_recorder import SerialRecorder
|
||
|
|
from frame import FrameFormat
|
||
|
|
import serial.tools.list_ports
|
||
|
|
import time
|
||
|
|
|
||
|
|
|
||
|
|
class MainWindow(QMainWindow):
|
||
|
|
"""主窗口"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
super().__init__()
|
||
|
|
self.setup_ui()
|
||
|
|
self.setup_recorder()
|
||
|
|
|
||
|
|
def setup_ui(self):
|
||
|
|
"""设置UI界面"""
|
||
|
|
self.setWindowTitle("数据采集与显示系统")
|
||
|
|
self.setMinimumSize(1200, 800)
|
||
|
|
|
||
|
|
# 创建中央窗口
|
||
|
|
central_widget = QWidget()
|
||
|
|
self.setCentralWidget(central_widget)
|
||
|
|
|
||
|
|
# 创建主布局
|
||
|
|
layout = QVBoxLayout(central_widget)
|
||
|
|
|
||
|
|
# 创建串口控制区域
|
||
|
|
serial_control = QHBoxLayout()
|
||
|
|
|
||
|
|
# 串口选择
|
||
|
|
self.port_combo = QComboBox()
|
||
|
|
self.refresh_btn = QPushButton("刷新")
|
||
|
|
self.refresh_btn.clicked.connect(self.refresh_ports)
|
||
|
|
serial_control.addWidget(QLabel("串口:"))
|
||
|
|
serial_control.addWidget(self.port_combo)
|
||
|
|
serial_control.addWidget(self.refresh_btn)
|
||
|
|
|
||
|
|
# 波特率设置
|
||
|
|
self.baud_input = QLineEdit()
|
||
|
|
self.baud_input.setText("4000000")
|
||
|
|
self.baud_input.setValidator(QIntValidator(1200, 4000000))
|
||
|
|
serial_control.addWidget(QLabel("波特率:"))
|
||
|
|
serial_control.addWidget(self.baud_input)
|
||
|
|
|
||
|
|
# 连接按钮
|
||
|
|
self.connect_btn = QPushButton("连接")
|
||
|
|
self.connect_btn.setCheckable(True)
|
||
|
|
self.connect_btn.clicked.connect(self.toggle_connection)
|
||
|
|
serial_control.addWidget(self.connect_btn)
|
||
|
|
|
||
|
|
serial_control.addStretch()
|
||
|
|
layout.addLayout(serial_control)
|
||
|
|
|
||
|
|
# 创建控制按钮区域
|
||
|
|
control_layout = QHBoxLayout()
|
||
|
|
|
||
|
|
# 开始/停止按钮
|
||
|
|
self.start_btn = QPushButton("开始")
|
||
|
|
self.start_btn.setCheckable(True)
|
||
|
|
self.start_btn.clicked.connect(self.toggle_acquisition)
|
||
|
|
self.start_btn.setEnabled(False) # 初始禁用
|
||
|
|
control_layout.addWidget(self.start_btn)
|
||
|
|
|
||
|
|
# 清除按钮
|
||
|
|
self.clear_btn = QPushButton("清除")
|
||
|
|
self.clear_btn.clicked.connect(self.clear_data)
|
||
|
|
control_layout.addWidget(self.clear_btn)
|
||
|
|
|
||
|
|
control_layout.addStretch()
|
||
|
|
layout.addLayout(control_layout)
|
||
|
|
|
||
|
|
# 创建绘图控件
|
||
|
|
self.plot_widget = PlotWidget()
|
||
|
|
layout.addWidget(self.plot_widget)
|
||
|
|
|
||
|
|
# 创建状态栏
|
||
|
|
self.statusBar().showMessage("就绪")
|
||
|
|
|
||
|
|
# 初始化串口列表
|
||
|
|
self.refresh_ports()
|
||
|
|
|
||
|
|
# 创建数据处理定时器
|
||
|
|
self.process_timer = QTimer()
|
||
|
|
self.process_timer.timeout.connect(self.process_data)
|
||
|
|
self.process_timer.start(10) # 10ms 处理一次
|
||
|
|
|
||
|
|
def setup_recorder(self):
|
||
|
|
"""设置串口记录器"""
|
||
|
|
self.recorder = SerialRecorder()
|
||
|
|
self.is_connected = False
|
||
|
|
self.is_running = False
|
||
|
|
|
||
|
|
def refresh_ports(self):
|
||
|
|
"""刷新可用串口列表"""
|
||
|
|
self.port_combo.clear()
|
||
|
|
ports = serial.tools.list_ports.comports()
|
||
|
|
|
||
|
|
if ports:
|
||
|
|
for port in ports:
|
||
|
|
# 格式化串口信息:端口 - 设备名称 (设备ID)
|
||
|
|
port_info = f"{port.device} - {port.description}"
|
||
|
|
if port.serial_number:
|
||
|
|
port_info += f" ({port.serial_number})"
|
||
|
|
self.port_combo.addItem(port_info, port.device)
|
||
|
|
else:
|
||
|
|
self.statusBar().showMessage("未找到可用串口")
|
||
|
|
|
||
|
|
def toggle_connection(self):
|
||
|
|
"""切换串口连接状态"""
|
||
|
|
if self.connect_btn.isChecked():
|
||
|
|
port = self.port_combo.currentData()
|
||
|
|
try:
|
||
|
|
baudrate = int(self.baud_input.text())
|
||
|
|
self.recorder.set_port(port, baudrate)
|
||
|
|
|
||
|
|
if self.recorder.test_connection():
|
||
|
|
self.connect_btn.setText("断开")
|
||
|
|
self.statusBar().showMessage("串口已连接")
|
||
|
|
# 禁用串口设置
|
||
|
|
self.port_combo.setEnabled(False)
|
||
|
|
self.baud_input.setEnabled(False)
|
||
|
|
self.refresh_btn.setEnabled(False)
|
||
|
|
# 启用开始按钮
|
||
|
|
self.start_btn.setEnabled(True)
|
||
|
|
self.is_connected = True
|
||
|
|
else:
|
||
|
|
raise Exception("串口连接失败")
|
||
|
|
except ValueError:
|
||
|
|
QMessageBox.critical(self, "错误", "请输入有效的波特率")
|
||
|
|
self.connect_btn.setChecked(False)
|
||
|
|
except Exception as e:
|
||
|
|
QMessageBox.critical(self, "错误", f"无法连接串口: {str(e)}")
|
||
|
|
self.connect_btn.setChecked(False)
|
||
|
|
else:
|
||
|
|
if self.recorder:
|
||
|
|
if self.recorder.disconnect():
|
||
|
|
self.connect_btn.setText("连接")
|
||
|
|
self.statusBar().showMessage("串口已断开")
|
||
|
|
# 启用串口设置
|
||
|
|
self.port_combo.setEnabled(True)
|
||
|
|
self.baud_input.setEnabled(True)
|
||
|
|
self.refresh_btn.setEnabled(True)
|
||
|
|
# 禁用开始按钮
|
||
|
|
self.start_btn.setEnabled(False)
|
||
|
|
self.is_connected = False
|
||
|
|
else:
|
||
|
|
QMessageBox.warning(self, "警告", "断开连接时发生错误")
|
||
|
|
|
||
|
|
def toggle_acquisition(self):
|
||
|
|
"""切换数据采集状态"""
|
||
|
|
if not self.is_connected:
|
||
|
|
QMessageBox.warning(self, "警告", "请先连接串口")
|
||
|
|
self.start_btn.setChecked(False)
|
||
|
|
return
|
||
|
|
|
||
|
|
self.is_running = self.start_btn.isChecked()
|
||
|
|
self.start_btn.setText("停止" if self.is_running else "开始")
|
||
|
|
|
||
|
|
if self.is_running:
|
||
|
|
# 开始采集
|
||
|
|
if self.recorder.start():
|
||
|
|
self.statusBar().showMessage("数据采集已开始")
|
||
|
|
self.connect_btn.setEnabled(False) # 禁用连接按钮
|
||
|
|
else:
|
||
|
|
QMessageBox.critical(self, "错误", "启动数据采集失败")
|
||
|
|
self.start_btn.setChecked(False)
|
||
|
|
self.is_running = False
|
||
|
|
else:
|
||
|
|
# 停止采集
|
||
|
|
if self.recorder.stop():
|
||
|
|
self.statusBar().showMessage("数据采集已停止")
|
||
|
|
self.connect_btn.setEnabled(True) # 启用连接按钮
|
||
|
|
else:
|
||
|
|
QMessageBox.warning(self, "警告", "停止采集时发生错误")
|
||
|
|
|
||
|
|
def process_data(self):
|
||
|
|
"""处理接收到的数据"""
|
||
|
|
if not self.is_running:
|
||
|
|
return
|
||
|
|
|
||
|
|
# 处理所有待处理的帧
|
||
|
|
while True:
|
||
|
|
# 从帧处理器获取帧(非阻塞)
|
||
|
|
frame_data = self.recorder.frame_processor.get_frame(timeout=0)
|
||
|
|
if frame_data is None:
|
||
|
|
break
|
||
|
|
|
||
|
|
# 更新绘图
|
||
|
|
self.plot_widget.add_data(frame_data)
|
||
|
|
|
||
|
|
# 更新状态栏信息
|
||
|
|
stats = self.recorder.frame_processor.get_statistics()
|
||
|
|
status_text = (
|
||
|
|
f"总帧数: {stats['total_frames']} | "
|
||
|
|
f"有效帧: {stats['valid_frames']} | "
|
||
|
|
f"损坏帧: {stats['corrupt_frames']} | "
|
||
|
|
f"无效字节: {stats['invalid_bytes']}"
|
||
|
|
)
|
||
|
|
self.statusBar().showMessage(status_text)
|
||
|
|
|
||
|
|
def clear_data(self):
|
||
|
|
"""清除所有数据"""
|
||
|
|
self.plot_widget.clear_data()
|
||
|
|
self.statusBar().showMessage("数据已清除")
|
||
|
|
|
||
|
|
def closeEvent(self, event):
|
||
|
|
"""关闭事件处理"""
|
||
|
|
if self.is_running:
|
||
|
|
reply = QMessageBox.question(
|
||
|
|
self,
|
||
|
|
'确认退出',
|
||
|
|
'数据采集正在进行中,确定要退出吗?',
|
||
|
|
QMessageBox.Yes | QMessageBox.No,
|
||
|
|
QMessageBox.No
|
||
|
|
)
|
||
|
|
|
||
|
|
if reply == QMessageBox.Yes:
|
||
|
|
self.recorder.stop()
|
||
|
|
self.process_timer.stop()
|
||
|
|
event.accept()
|
||
|
|
else:
|
||
|
|
event.ignore()
|
||
|
|
else:
|
||
|
|
self.process_timer.stop()
|
||
|
|
event.accept()
|