# 这是一个示例 Python 脚本。 import sys from PySide6.QtWidgets import QMainWindow, QApplication, QFileDialog, QAbstractItemView, QMessageBox, QWidget from PySide6.QtGui import QStandardItemModel, QStandardItem from PySide6.QtCore import Slot, QModelIndex, QTimer from rnc_tunning import Ui_MainWindow from br_load_table import load_table import serial.tools.list_ports from br_widget_arr_details import BrWidgetArrDetails from br_widget_var_details import BrWidgetVarDetails from param_data import VarParamData, ParamData, ParamDataArr, ParamDataVar from PySide6.QtSerialPort import QSerialPort from cmd_server import CmdServer from br_com_message import BrComMessage from datetime import datetime from br_data_manager import BrDataManager TIMER_TIMEOUT = 10 params_filepath = 'ParamsDict.yml' default_config = { 'baud_rate': 115200, 'polling_interval': 100 } class PortInfo: def __init__(self, num: str = '', baud_rate=115200, polling_interval=100): self.num = num self.baud_rate = baud_rate self.polling_interval = polling_interval # load_params_from_file('ParamsDict.yml') class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self): super().__init__() self.widget = None self.model = None self.param_list = None self.serial_port = None self.setupUi(self) self.param_data_list = [] self.data_manager = BrDataManager() self.selected_row = 0 self.init_tableview() # self.tableview_load_data() self.setWindowTitle('Brisonus RNC Tunning Tool 2.0.4') self.com = QSerialPort() # com read定时器 self.read_timer: QTimer = QTimer() self.read_timer.setSingleShot(True) self.read_timer.timeout.connect(self.read_timer_stop) # 消息管理服务器 self.cmd_server = CmdServer() self.cmd_server.start() self.cmd_server.resume() self.cmd_server.signal_write.connect(self.com_write) self.portlist = [] self.port_info = PortInfo() self.init_comports() self.widget_page_arr = BrWidgetArrDetails(self.widget) self.widget_page_var = BrWidgetVarDetails(self.widget) self.widget_page_var.signal_read.connect(self.on_var_read) self.widget_page_var.signal_write.connect(self.on_var_write) # 定义widget_page_arr的slot self.widget_page_arr.signal_read.connect(self.on_arr_read) self.widget_page_arr.signal_write.connect(self.on_arr_write) # var arr 两个页面都处于隐藏状态 self.widget_page_arr.hide() self.widget_page_var.hide() # 默认加载可执行路径下 try: self.tableview_load_data('ParamsDict.yml') self.lineEdit_filename.setText('./ParamsDict.yml') except Exception as e: print(e) def on_var_read(self, p): if not self.com.isOpen(): # 弹出对话框提示用户serial port未处于open状态 self.show_msgbox('系统提示', '未打开端口!') return self.cmd_server.cmd_queue.put(p) print('var widget read signal/callback on_read has been called!') def on_var_write(self, p): print('var widget write signal/callback on_read has been called!') if not self.com.isOpen(): # 弹出对话框提示用户serial port未处于open状态 self.show_msgbox('系统提示', '未打开端口!') return self.cmd_server.cmd_queue.put(p) def on_arr_read(self, p_cmd_list): if not self.com.isOpen(): # 弹出对话框提示用户serial port未处于open状态 self.show_msgbox('系统提示', '未打开端口!') return for _cmd_item in p_cmd_list: self.cmd_server.cmd_queue.put(_cmd_item) print('arr widget signal/callback on_read has been called!') def on_arr_write(self, p_cmd_list): print('--------------------1') if not self.com.isOpen(): # 弹出对话框提示用户serial port未处于open状态 self.show_msgbox('系统提示', '未打开端口!') return for _cmd_item in p_cmd_list: self.cmd_server.cmd_queue.put(_cmd_item) print('arr widget signal/callback on_read has been called!') def init_comports(self): self.lineEdit_baudrate.setText(str(default_config['baud_rate'])) self.lineEdit_pollingInterval.setText(str(default_config['polling_interval'])) self.portlist = list(serial.tools.list_ports.comports()) for port_info in self.portlist: self.comboBox_comports.addItem(str(port_info)) def init_tableview(self): self.model = QStandardItemModel(0, 3) self.model.setHorizontalHeaderLabels(['ID', 'Name', 'Value']) self.tableView.horizontalHeader().setStretchLastSection(True) self.tableView.setModel(self.model) self.tableView.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection) self.tableView.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows) # 设置不能编辑 self.tableView.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers) self.tableView.clicked.connect(self.on_tableview_clicked) def tableview_load_data(self, filepath): param_list = load_table(filepath) self.data_manager.param_list = [] # 将param_list转换为param_data_list for param_item in param_list: if param_item['type'] == 'VAR': self.data_manager.param_list.append(ParamDataVar( param_item['id'], param_item['addr'], param_item['name'], param_item['details'], 0 )) if param_item['type'] == 'ARRAY': self.data_manager.param_list.append(ParamDataArr( param_item['id'], param_item['addr'], param_item['name'], param_item['details'], 1, param_item['len'] )) for i in range(0, len(self.data_manager.param_list)): _param = self.data_manager.param_list[i] print(_param) self.model.setItem(i, 0, QStandardItem(str(_param.addr))) self.model.setItem(i, 1, QStandardItem(str(_param.name))) if _param.type == ParamData.VAR_TYPE_VAR: _param.item = QStandardItem(str(_param.val())) self.model.setItem(i, 2, _param.item) if _param.type == ParamData.VAR_TYPE_ARR: _param.item = QStandardItem(str('/')) self.model.setItem(i, 2, _param.item) # 设置列宽 self.tableView.resizeColumnsToContents() def switch_page(self, page_name): if page_name == 'var': self.widget_page_arr.hide() self.widget_page_var.show() if page_name == 'arr': self.widget_page_var.hide() self.widget_page_arr.show() def on_tableview_clicked(self, index: QModelIndex): print('tableview clicked!', index.row()) self.selected_row = index.row() param_index = index.row() self.update_details_page(param_index) def update_details_page(self, param_index): # 将param的详细信息更新到右侧的详情Widget中进行显示 m_param_data = self.data_manager.param_list[param_index] # 如果点击的是普通变量,则加载普通变量的控件 if m_param_data.type == VarParamData.VAR_TYPE_VAR: print(self.widget) self.update_var_details_ui(m_param_data) self.switch_page('var') # 如果点击的是数组型变量,则加载数据型变量的控件 if m_param_data.type == VarParamData.VAR_TYPE_ARR: self.widget_page_arr.update_param_info(m_param_data) self.switch_page('arr') def update_var_details_ui(self, param_data: ParamDataVar): print('update_var_details_ui') self.widget_page_var.update_param_info(param_data) @Slot() def on_pushButton_set_clicked(self): print('[设定] button clicked!') # 获取所选择的端口号 _port_index = self.comboBox_comports.currentIndex() self.port_info.num = self.portlist[_port_index][0] # 获取所输入的波特率 self.port_info.baud_rate = int(self.lineEdit_baudrate.text()) # 设置QSerialPort参数 self.com.setPortName(self.port_info.num) self.com.setBaudRate(self.port_info.baud_rate) self.com.readyRead.connect(self.com_read) self.com.open(QSerialPort.OpenModeFlag.ReadWrite) self.show_msgbox('系统提示', ''' 已打开端口: Port number: %s Baud rate: %d bps Polling interval: 100ms ''' % (self.port_info.num, self.port_info.baud_rate)) def show_msgbox(self, title, content): msg_box = QMessageBox() msg_box.setWindowTitle(title) msg_box.setText(content) msg_box.exec() def com_write(self, write_data: bytes): # 判断serial port是否处于open状态 if not self.com.isOpen(): # 弹出对话框提示用户serial port未处于open状态 self.show_msgbox('系统提示', '未打开端口!') return self.com.write(write_data) # 获取当前系统日期时间 current_datetime = datetime.now() formatted_datetime = current_datetime.strftime("[%Y-%m-%d %H:%M:%S]") write_str = "%s TX:" % formatted_datetime self.textEdit.append(str(write_str) + write_data.hex(' ')) self.statusbar.showMessage('读取中...') # 延时 def com_read(self): # 启动读取定时器 self.read_timer.start(10) def read_timer_stop(self): recv_data = self.com.readAll().data() if len(recv_data) < 9: print('ccc', recv_data) return # 对收到的消息进行解析 res_msg = BrComMessage.resolve_message(list(recv_data)) if res_msg[0]: # 更新参数的数值 _msg = res_msg[1] if _msg.operation == BrComMessage.RES_READ: self.data_manager.update_param_val_by_addr( _msg.addr, _msg.val ) # 更新var page的显示值 self.statusbar.showMessage('地址[%d]读取完成!' % _msg.addr) self.update_details_page(self.selected_row) if _msg.operation == BrComMessage.RES_WRITE: self.statusbar.showMessage('地址[%d]写入完成!' % _msg.addr) current_datetime = datetime.now() formatted_datetime = current_datetime.strftime("[%Y-%m-%d %H:%M:%S]") write_str = "%s RX:" % formatted_datetime self.textEdit.append(str(write_str) + recv_data.hex(' ')) @Slot() def on_pushButton_loadfile_clicked(self): print('[加载文件] button clicked!') dialog = QFileDialog() if dialog.exec(): filepath = dialog.selectedFiles()[0] self.lineEdit_filename.setText(filepath) self.tableview_load_data(filepath) @Slot() def on_pushButton_update_clicked(self): print("[更新] button clicked!") @Slot() def on_pushButton_copy_clicked(self): print("[复制信息] button clicked!") def closeEvent(self, event): reply = QMessageBox.question(self, '系统提示', '系统将退出,是否确认?', QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) if reply == QMessageBox.StandardButton.Yes: self.cmd_server.cancel() self.cmd_server.quit() self.cmd_server.wait() else: event.ignore() # 按装订区域中的绿色按钮以运行脚本。 if __name__ == '__main__': app = QApplication() win = MainWindow() win.show() sys.exit(app.exec())