rnc_tuning/main.py

315 lines
12 KiB
Python
Raw Normal View History

2025-02-14 11:11:41 +08:00
# 这是一个示例 Python 脚本。
import sys
from PySide6.QtWidgets import QMainWindow, QApplication, QFileDialog, QAbstractItemView, QMessageBox, QWidget
from PySide6.QtGui import QStandardItemModel, QStandardItem
from PySide6.QtCore import Slot, QModelIndex, QTimer
from rnc_tunning import Ui_MainWindow
from br_load_table import load_table
import serial.tools.list_ports
from br_widget_arr_details import BrWidgetArrDetails
from br_widget_var_details import BrWidgetVarDetails
from param_data import VarParamData, ParamData, ParamDataArr, ParamDataVar
from PySide6.QtSerialPort import QSerialPort
from cmd_server import CmdServer
from br_com_message import BrComMessage
from datetime import datetime
from br_data_manager import BrDataManager
TIMER_TIMEOUT = 10
params_filepath = 'ParamsDict.yml'
default_config = {
'baud_rate': 115200,
'polling_interval': 100
}
class PortInfo:
def __init__(self, num: str = '', baud_rate=115200, polling_interval=100):
self.num = num
self.baud_rate = baud_rate
self.polling_interval = polling_interval
# load_params_from_file('ParamsDict.yml')
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self):
super().__init__()
self.widget = None
self.model = None
self.param_list = None
self.serial_port = None
self.setupUi(self)
self.param_data_list = []
self.data_manager = BrDataManager()
self.selected_row = 0
self.init_tableview()
# self.tableview_load_data()
self.setWindowTitle('Brisonus RNC Tunning Tool 2.0.4')
self.com = QSerialPort()
# com read定时器
self.read_timer: QTimer = QTimer()
self.read_timer.setSingleShot(True)
self.read_timer.timeout.connect(self.read_timer_stop)
# 消息管理服务器
self.cmd_server = CmdServer()
self.cmd_server.start()
self.cmd_server.resume()
self.cmd_server.signal_write.connect(self.com_write)
self.portlist = []
self.port_info = PortInfo()
self.init_comports()
self.widget_page_arr = BrWidgetArrDetails(self.widget)
self.widget_page_var = BrWidgetVarDetails(self.widget)
self.widget_page_var.signal_read.connect(self.on_var_read)
self.widget_page_var.signal_write.connect(self.on_var_write)
# 定义widget_page_arr的slot
self.widget_page_arr.signal_read.connect(self.on_arr_read)
# var arr 两个页面都处于隐藏状态
self.widget_page_arr.hide()
self.widget_page_var.hide()
# 默认加载可执行路径下
try:
self.tableview_load_data('ParamsDict.yml')
self.lineEdit_filename.setText('./ParamsDict.yml')
except Exception as e:
print(e)
def on_var_read(self, p):
if not self.com.isOpen():
# 弹出对话框提示用户serial port未处于open状态
self.show_msgbox('系统提示', '未打开端口!')
return
self.cmd_server.cmd_queue.put(p)
print('var widget read signal/callback on_read has been called!')
def on_var_write(self, p):
print('var widget write signal/callback on_read has been called!')
if not self.com.isOpen():
# 弹出对话框提示用户serial port未处于open状态
self.show_msgbox('系统提示', '未打开端口!')
return
self.cmd_server.cmd_queue.put(p)
def on_arr_read(self, p_cmd_list):
if not self.com.isOpen():
# 弹出对话框提示用户serial port未处于open状态
self.show_msgbox('系统提示', '未打开端口!')
return
for _cmd_item in p_cmd_list:
self.cmd_server.cmd_queue.put(_cmd_item)
print('arr widget signal/callback on_read has been called!')
def init_comports(self):
self.lineEdit_baudrate.setText(str(default_config['baud_rate']))
self.lineEdit_pollingInterval.setText(str(default_config['polling_interval']))
self.portlist = list(serial.tools.list_ports.comports())
for port_info in self.portlist:
self.comboBox_comports.addItem(str(port_info))
def init_tableview(self):
self.model = QStandardItemModel(0, 3)
self.model.setHorizontalHeaderLabels(['ID', 'Name', 'Value'])
self.tableView.horizontalHeader().setStretchLastSection(True)
self.tableView.setModel(self.model)
self.tableView.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
self.tableView.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
# 设置不能编辑
self.tableView.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers)
self.tableView.clicked.connect(self.on_tableview_clicked)
def tableview_load_data(self, filepath):
param_list = load_table(filepath)
self.data_manager.param_list = []
# 将param_list转换为param_data_list
for param_item in param_list:
if param_item['type'] == 'VAR':
self.data_manager.param_list.append(ParamDataVar(
param_item['id'],
param_item['addr'],
param_item['name'],
param_item['details'],
0
))
if param_item['type'] == 'ARRAY':
self.data_manager.param_list.append(ParamDataArr(
param_item['id'],
param_item['addr'],
param_item['name'],
param_item['details'],
1,
param_item['len']
))
for i in range(0, len(self.data_manager.param_list)):
_param = self.data_manager.param_list[i]
print(_param)
self.model.setItem(i, 0, QStandardItem(str(_param.addr)))
self.model.setItem(i, 1, QStandardItem(str(_param.name)))
if _param.type == ParamData.VAR_TYPE_VAR:
_param.item = QStandardItem(str(_param.val()))
self.model.setItem(i, 2, _param.item)
if _param.type == ParamData.VAR_TYPE_ARR:
_param.item = QStandardItem(str('/'))
self.model.setItem(i, 2, _param.item)
def switch_page(self, page_name):
if page_name == 'var':
self.widget_page_arr.hide()
self.widget_page_var.show()
if page_name == 'arr':
self.widget_page_var.hide()
self.widget_page_arr.show()
def on_tableview_clicked(self, index: QModelIndex):
print('tableview clicked!', index.row())
self.selected_row = index.row()
param_index = index.row()
self.update_details_page(param_index)
def update_details_page(self, param_index):
# 将param的详细信息更新到右侧的详情Widget中进行显示
m_param_data = self.data_manager.param_list[param_index]
# 如果点击的是普通变量,则加载普通变量的控件
if m_param_data.type == VarParamData.VAR_TYPE_VAR:
print(self.widget)
self.update_var_details_ui(m_param_data)
self.switch_page('var')
# 如果点击的是数组型变量,则加载数据型变量的控件
if m_param_data.type == VarParamData.VAR_TYPE_ARR:
self.widget_page_arr.update_param_info(m_param_data)
self.switch_page('arr')
def update_var_details_ui(self, param_data: ParamDataVar):
print('update_var_details_ui')
self.widget_page_var.update_param_info(param_data)
@Slot()
def on_pushButton_set_clicked(self):
print('[设定] button clicked!')
# 获取所选择的端口号
_port_index = self.comboBox_comports.currentIndex()
self.port_info.num = self.portlist[_port_index][0]
# 获取所输入的波特率
self.port_info.baud_rate = int(self.lineEdit_baudrate.text())
# 设置QSerialPort参数
self.com.setPortName(self.port_info.num)
self.com.setBaudRate(self.port_info.baud_rate)
self.com.readyRead.connect(self.com_read)
self.com.open(QSerialPort.OpenModeFlag.ReadWrite)
self.show_msgbox('系统提示', '''
已打开端口:
Port number: %s
Baud rate: %d bps
Polling interval: 100ms
''' % (self.port_info.num, self.port_info.baud_rate))
def show_msgbox(self, title, content):
msg_box = QMessageBox()
msg_box.setWindowTitle(title)
msg_box.setText(content)
msg_box.exec()
def com_write(self, write_data: bytes):
# 判断serial port是否处于open状态
if not self.com.isOpen():
# 弹出对话框提示用户serial port未处于open状态
self.show_msgbox('系统提示', '未打开端口!')
return
self.com.write(write_data)
# 获取当前系统日期时间
current_datetime = datetime.now()
formatted_datetime = current_datetime.strftime("[%Y-%m-%d %H:%M:%S]")
write_str = "%s TX:" % formatted_datetime
self.textEdit.append(str(write_str) + write_data.hex(' '))
self.statusbar.showMessage('读取中...')
# 延时
def com_read(self):
# 启动读取定时器
self.read_timer.start(10)
def read_timer_stop(self):
recv_data = self.com.readAll().data()
if len(recv_data) < 9:
print('ccc', recv_data)
return
# 对收到的消息进行解析
res_msg = BrComMessage.resolve_message(list(recv_data))
if res_msg[0]:
# 更新参数的数值
_msg = res_msg[1]
if _msg.operation == BrComMessage.RES_READ:
self.data_manager.update_param_val_by_addr(
_msg.addr,
_msg.val
)
# 更新var page的显示值
self.statusbar.showMessage('地址[%d]读取完成!' % _msg.addr)
self.update_details_page(self.selected_row)
if _msg.operation == BrComMessage.RES_WRITE:
self.statusbar.showMessage('地址[%d]写入完成!' % _msg.addr)
current_datetime = datetime.now()
formatted_datetime = current_datetime.strftime("[%Y-%m-%d %H:%M:%S]")
write_str = "%s RX:" % formatted_datetime
self.textEdit.append(str(write_str) + recv_data.hex(' '))
@Slot()
def on_pushButton_loadfile_clicked(self):
print('[加载文件] button clicked!')
dialog = QFileDialog()
if dialog.exec():
filepath = dialog.selectedFiles()[0]
self.lineEdit_filename.setText(filepath)
self.tableview_load_data(filepath)
@Slot()
def on_pushButton_update_clicked(self):
print("[更新] button clicked!")
@Slot()
def on_pushButton_copy_clicked(self):
print("[复制信息] button clicked!")
def closeEvent(self, event):
reply = QMessageBox.question(self, '系统提示', '系统将退出,是否确认?',
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.Yes:
self.cmd_server.cancel()
self.cmd_server.quit()
self.cmd_server.wait()
else:
event.ignore()
# 按装订区域中的绿色按钮以运行脚本。
if __name__ == '__main__':
app = QApplication()
win = MainWindow()
win.show()
sys.exit(app.exec())