brisonus_data_collector/plot_widget.py

313 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PySide6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QSpinBox, QComboBox, QGridLayout, QFrame
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QPalette, QColor
import pyqtgraph as pg
import numpy as np
from typing import List, Dict
import time
from frame import FrameFormat # 只导入FrameFormat类因为我们只需要使用parse_frame方法
class PlotWidget(QWidget):
"""工业风格的数据绘图控件"""
def __init__(self, parent=None):
super().__init__(parent)
self.setup_ui()
self.setup_plot()
self.setup_data()
def setup_ui(self):
"""设置UI界面"""
# 创建主布局
layout = QVBoxLayout(self)
# 创建工具栏
toolbar = QHBoxLayout()
# 添加时间尺度控制
self.time_scale_spin = QSpinBox()
self.time_scale_spin.setRange(1, 1000) # 1ms到1000ms
self.time_scale_spin.setValue(100) # 默认100ms
self.time_scale_spin.setSingleStep(10)
toolbar.addWidget(QLabel("时间尺度(ms):"))
toolbar.addWidget(self.time_scale_spin)
# 添加数据帧率控制
self.frame_rate_spin = QSpinBox()
self.frame_rate_spin.setRange(1, 10000) # 1Hz到10kHz
self.frame_rate_spin.setValue(1000) # 默认1000Hz
self.frame_rate_spin.setSingleStep(100)
toolbar.addWidget(QLabel("数据帧率(Hz):"))
toolbar.addWidget(self.frame_rate_spin)
# 自动缩放按钮
self.auto_scale_btn = QPushButton("自动缩放")
self.auto_scale_btn.clicked.connect(self.auto_scale)
toolbar.addWidget(self.auto_scale_btn)
# 暂停按钮
self.pause_btn = QPushButton("暂停")
self.pause_btn.setCheckable(True)
self.pause_btn.clicked.connect(self.toggle_pause)
toolbar.addWidget(self.pause_btn)
toolbar.addStretch()
layout.addLayout(toolbar)
# 创建网格布局用于放置多个绘图控件
plot_layout = QGridLayout()
plot_layout.setSpacing(10) # 设置绘图控件之间的间距
# 创建8个绘图控件
self.plot_widgets = []
self.curves = []
self.stats_labels = [] # 添加统计标签列表
colors = ['#ff0000', '#00ff00', '#0000ff', '#ffff00',
'#ff00ff', '#00ffff', '#ffffff', '#ff8000']
for i in range(8):
# 创建绘图控件容器
container = QFrame()
container.setFrameStyle(QFrame.Panel | QFrame.Sunken)
container_layout = QVBoxLayout(container)
# 创建标题
title_layout = QHBoxLayout()
title_label = QLabel(f"通道 {i+1}")
stats_label = QLabel("点数: 0 | 点数/秒: 0.0") # 添加统计标签
title_layout.addWidget(title_label)
title_layout.addStretch()
title_layout.addWidget(stats_label)
container_layout.addLayout(title_layout)
# 创建绘图控件
plot_widget = pg.PlotWidget()
plot_widget.setBackground('#2b2b2b')
plot_widget.showGrid(x=True, y=True)
plot_widget.setLabel('left', '数值')
plot_widget.setLabel('bottom', '时间 (秒)')
plot_widget.getAxis('left').setPen(pg.mkPen(color='#ffffff'))
plot_widget.getAxis('bottom').setPen(pg.mkPen(color='#ffffff'))
plot_widget.showGrid(x=True, y=True)
plot_widget.getViewBox().setBackgroundColor('#2b2b2b')
# 创建曲线
curve = plot_widget.plot(pen=colors[i], name=f"通道 {i+1}")
container_layout.addWidget(plot_widget)
# 将控件添加到网格布局中
plot_layout.addWidget(container, i // 2, i % 2)
self.plot_widgets.append(plot_widget)
self.curves.append(curve)
self.stats_labels.append(stats_label) # 保存统计标签引用
layout.addLayout(plot_layout)
# 创建状态栏
status_bar = QHBoxLayout()
self.status_label = QLabel("就绪")
self.fps_label = QLabel("FPS: 0")
status_bar.addWidget(self.status_label)
status_bar.addStretch()
status_bar.addWidget(self.fps_label)
layout.addLayout(status_bar)
def setup_plot(self):
"""设置绘图参数"""
# 设置pyqtgraph的性能选项
pg.setConfigOptions(antialias=False) # 关闭抗锯齿以提高性能
pg.setConfigOptions(useOpenGL=True) # 使用OpenGL加速
def setup_data(self):
"""设置数据存储"""
self.max_points = 1000000 # 增加缓冲区大小以存储更多数据
self.data_buffer = np.zeros((8, self.max_points))
self.time_buffer = np.zeros(self.max_points)
self.current_index = 0
self.is_paused = False
self.last_update_time = time.time()
self.frame_count = 0
self.start_time = None
self.last_scroll_time = time.time()
self.last_frame_time = 0 # 上次收到数据的时间
self.frame_interval = 1.0 / 1000.0 # 默认1000Hz
self.last_view_time = 0 # 上次视图更新时间
self.is_running = False # 添加运行状态标志
# 添加每个通道的点数统计
self.channel_points = [0] * 8
self.channel_points_per_second = [0.0] * 8
self.last_channel_points = [0] * 8
self.last_stats_update = time.time()
# 创建更新定时器
self.update_timer = QTimer()
self.update_timer.timeout.connect(self.update_plot)
self.update_timer.start(16) # 约60FPS
def add_data(self, frame_data: bytes):
"""添加新数据
Args:
frame_data: 原始帧数据bytes
"""
if self.is_paused:
return
# 解析帧数据
frame = FrameFormat.parse_frame(frame_data)
if frame is None:
return
if self.start_time is None:
self.start_time = time.time()
current_time = time.time() - self.start_time
self.last_frame_time = current_time # 更新最后收到数据的时间
# 更新数据缓冲区只取前8个通道
for i in range(min(8, len(frame.channels))):
# 将通道数据转换为浮点数
value = float(frame.channels[i]) / 1000.0 # 因为数据放大了1000倍
self.data_buffer[i, self.current_index] = value
self.channel_points[i] += 1
# 更新时间和索引
self.time_buffer[self.current_index] = current_time
self.current_index = (self.current_index + 1) % self.max_points
# 如果缓冲区已满,增加缓冲区大小
if self.current_index == 0:
new_size = self.max_points * 2
new_data_buffer = np.zeros((8, new_size))
new_time_buffer = np.zeros(new_size)
# 复制旧数据
new_data_buffer[:, :self.max_points] = self.data_buffer
new_time_buffer[:self.max_points] = self.time_buffer
# 更新缓冲区
self.data_buffer = new_data_buffer
self.time_buffer = new_time_buffer
self.max_points = new_size
self.current_index = self.max_points // 2 # 从中间开始存储新数据
# 更新点数/秒统计(降低更新频率)
current_stats_time = time.time()
if current_stats_time - self.last_stats_update >= 1.0:
for i in range(8):
points_diff = self.channel_points[i] - self.last_channel_points[i]
self.channel_points_per_second[i] = points_diff / (current_stats_time - self.last_stats_update)
self.last_channel_points[i] = self.channel_points[i]
# 更新统计标签
self.stats_labels[i].setText(
f"点数: {self.channel_points[i]} | 点数/秒: {self.channel_points_per_second[i]:.1f}"
)
def update_plot(self):
"""更新绘图"""
if self.is_paused:
return
current_time = time.time()
if self.start_time is not None:
current_time = current_time - self.start_time
time_scale = self.time_scale_spin.value() / 1000.0 # 转换为秒
self.frame_interval = 1.0 / self.frame_rate_spin.value() # 更新帧间隔
# 计算时间增量
time_delta = current_time - self.last_view_time
self.last_view_time = current_time
# 更新所有通道的曲线数据
for i in range(8):
if not self.is_running:
continue
# 获取当前通道的所有数据点
if self.current_index <= self.max_points:
time_data = self.time_buffer[:self.current_index]
data = self.data_buffer[i, :self.current_index]
else:
# 当缓冲区满时,只保留最新的数据
start_idx = self.current_index - self.max_points
time_data = self.time_buffer[start_idx:self.current_index]
data = self.data_buffer[i, start_idx:self.current_index]
# 如果没有数据,创建空数据
if len(time_data) == 0:
time_data = np.array([current_time])
data = np.array([0.0])
# 更新曲线数据
self.curves[i].setData(
time_data,
data,
antialias=False,
skipFiniteCheck=True
)
# 计算显示范围
current_time_value = time_data[-1]
view_start = max(0, current_time_value - time_scale)
# 平滑滚动
current_view = self.plot_widgets[i].getViewBox().state['viewRange'][0]
target_start = view_start
target_end = current_time_value
# 使用线性插值实现平滑滚动
scroll_speed = 0.1 # 滚动速度因子
new_start = current_view[0] + (target_start - current_view[0]) * scroll_speed
new_end = current_view[1] + (target_end - current_view[1]) * scroll_speed
self.plot_widgets[i].setXRange(new_start, new_end, padding=0)
# 更新FPS
self.frame_count += 1
if current_time - self.last_update_time >= 1.0:
fps = self.frame_count / (current_time - self.last_update_time)
self.fps_label.setText(f"FPS: {fps:.1f}")
self.frame_count = 0
self.last_update_time = current_time
def auto_scale(self):
"""自动缩放坐标轴"""
for plot_widget in self.plot_widgets:
plot_widget.autoRange()
def toggle_pause(self):
"""切换暂停状态"""
self.is_paused = self.pause_btn.isChecked()
self.status_label.setText("已暂停" if self.is_paused else "就绪")
def start(self):
"""开始数据采集"""
self.is_running = True
if self.start_time is None:
self.start_time = time.time()
self.status_label.setText("数据采集中")
def stop(self):
"""停止数据采集"""
self.is_running = False
self.status_label.setText("已停止")
def clear_data(self):
"""清除所有数据"""
self.data_buffer.fill(0)
self.time_buffer.fill(0)
self.current_index = 0
self.start_time = None
self.channel_points = [0] * 8 # 重置点数统计
self.channel_points_per_second = [0.0] * 8
self.last_channel_points = [0] * 8
for curve in self.curves:
curve.setData([], [])
def closeEvent(self, event):
"""关闭事件处理"""
self.update_timer.stop()
super().closeEvent(event)