diff --git a/.gitignore b/.gitignore index 9b896e8..97d8186 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ dist build .idea +main.dist diff --git a/BrisonusRNCTunningTool_V2.0.6.spec b/BrisonusRNCTunningTool_V2.0.6.spec new file mode 100644 index 0000000..e3a14b3 --- /dev/null +++ b/BrisonusRNCTunningTool_V2.0.6.spec @@ -0,0 +1,38 @@ +# -*- mode: python ; coding: utf-8 -*- + + +a = Analysis( + ['main.py'], + pathex=[], + binaries=[], + datas=[], + hiddenimports=[], + hookspath=[], + hooksconfig={}, + runtime_hooks=[], + excludes=[], + noarchive=False, + optimize=0, +) +pyz = PYZ(a.pure) + +exe = EXE( + pyz, + a.scripts, + a.binaries, + a.datas, + [], + name='BrisonusRNCTunningTool_V2.0.6', + debug=False, + bootloader_ignore_signals=False, + strip=False, + upx=True, + upx_exclude=[], + runtime_tmpdir=None, + console=False, + disable_windowed_traceback=False, + argv_emulation=False, + target_arch=None, + codesign_identity=None, + entitlements_file=None, +) diff --git a/Brisonus_Params_2.0.5.spec b/Brisonus_Params_2.0.5.spec index ea70e11..58e4783 100644 --- a/Brisonus_Params_2.0.5.spec +++ b/Brisonus_Params_2.0.5.spec @@ -3,7 +3,7 @@ a = Analysis( ['main.py'], - pathex=['.\\venv\\Lib\\site-packages\\'], + pathex=[], binaries=[], datas=[], hiddenimports=[], @@ -29,7 +29,7 @@ exe = EXE( upx=True, upx_exclude=[], runtime_tmpdir=None, - console=True, + console=False, disable_windowed_traceback=False, argv_emulation=False, target_arch=None, diff --git a/br_com_serial.py b/br_com_serial.py new file mode 100644 index 0000000..62faf59 --- /dev/null +++ b/br_com_serial.py @@ -0,0 +1,183 @@ +from PySide6.QtCore import QObject, Signal, Slot, QThread, QMutex, QWaitCondition +from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo +from br_com_message import BrComMessage +import time +import queue + +class SerialReaderThread(QThread): + """串口数据读取线程""" + def __init__(self, serial_port, buffer_queue): + super().__init__() + self.serial_port = serial_port + self.buffer_queue = buffer_queue + self.running = True + self.buffer = bytearray() + self.max_buffer_size = 50 * 1024 * 1024 # 50MB + self.buffer_mutex = QMutex() + + def run(self): + while self.running: + if not self.serial_port.isOpen(): + time.sleep(0.1) + continue + + if self.serial_port.waitForReadyRead(100): # 等待100ms + data = self.serial_port.readAll() + if data: + self.buffer_mutex.lock() + try: + # 检查缓冲区大小 + if len(self.buffer) >= self.max_buffer_size: + print("Warning: Buffer overflow, clearing buffer") + self.buffer.clear() + + # 将数据添加到缓冲区 + self.buffer.extend(data.data()) + # 将完整的数据块放入队列 + self.buffer_queue.put(self.buffer) + # 清空缓冲区,准备接收下一块数据 + self.buffer = bytearray() + finally: + self.buffer_mutex.unlock() + else: + # 如果没有数据,短暂休眠 + time.sleep(0.01) + + def stop(self): + self.running = False + +class MessageProcessorThread(QThread): + """消息处理线程""" + def __init__(self, buffer_queue, message_queue): + super().__init__() + self.buffer_queue = buffer_queue + self.message_queue = message_queue + self.running = True + self.accumulated_data = bytearray() + self.max_accumulated_size = 50 * 1024 * 1024 # 50MB + self.data_mutex = QMutex() + + def run(self): + while self.running: + try: + # 从缓冲区队列获取数据,设置超时 + data = self.buffer_queue.get(timeout=0.1) + if data: + self.data_mutex.lock() + try: + # 检查累积数据大小 + if len(self.accumulated_data) >= self.max_accumulated_size: + print("Warning: Accumulated data overflow, clearing data") + self.accumulated_data.clear() + + # 将新数据添加到累积数据中 + self.accumulated_data.extend(data) + + # 处理累积的数据,查找完整的消息 + while len(self.accumulated_data) > 0: + # 查找消息头(假设消息头是0xAA) + start_index = self.accumulated_data.find(0xAA) + if start_index == -1: + # 没有找到消息头,清空数据 + self.accumulated_data.clear() + break + + # 移除消息头之前的数据 + self.accumulated_data = self.accumulated_data[start_index:] + + # 检查是否有足够的数据来获取消息长度 + if len(self.accumulated_data) < 4: # 假设消息头+长度至少4字节 + break + + # 获取消息长度(假设长度字段在消息头后的2个字节) + message_length = (self.accumulated_data[1] << 8) | self.accumulated_data[2] + + # 检查消息长度是否合理 + if message_length > self.max_accumulated_size: + print(f"Warning: Invalid message length {message_length}, skipping") + self.accumulated_data = self.accumulated_data[1:] # 跳过当前字节 + continue + + # 检查是否有完整的消息 + if len(self.accumulated_data) < message_length: + break + + # 提取完整的消息 + message_data = self.accumulated_data[:message_length] + self.accumulated_data = self.accumulated_data[message_length:] + + # 将消息放入消息队列 + self.message_queue.put(message_data) + finally: + self.data_mutex.unlock() + + except queue.Empty: + # 队列超时,继续循环 + continue + + def stop(self): + self.running = False + +class BrComSerial(QObject): + signal_connected = Signal() + signal_disconnected = Signal() + signal_error = Signal(str) + signal_message = Signal(BrComMessage) + + def __init__(self, parent=None): + super().__init__(parent) + self.serial_port = QSerialPort() + self.serial_port.errorOccurred.connect(self.on_error) + + # 创建数据缓冲队列和消息队列,设置最大大小 + self.buffer_queue = queue.Queue(maxsize=1000) # 限制队列大小 + self.message_queue = queue.Queue(maxsize=1000) # 限制队列大小 + + # 创建并启动数据读取线程 + self.reader_thread = SerialReaderThread(self.serial_port, self.buffer_queue) + self.reader_thread.start() + + # 创建并启动消息处理线程 + self.processor_thread = MessageProcessorThread(self.buffer_queue, self.message_queue) + self.processor_thread.start() + + # 启动消息处理定时器 + self.timer = QTimer() + self.timer.timeout.connect(self.process_messages) + self.timer.start(10) # 每10ms检查一次消息队列 + + def process_messages(self): + """处理消息队列中的消息""" + while not self.message_queue.empty(): + try: + message_data = self.message_queue.get_nowait() + # 处理消息(这里可以添加具体的消息处理逻辑) + msg = BrComMessage.from_bytes(message_data) + if msg: + self.signal_message.emit(msg) + except queue.Empty: + break + except Exception as e: + print(f"Error processing message: {e}") + + def on_error(self, error): + """处理串口错误""" + if error == QSerialPort.NoError: + return + + error_str = f"Serial port error: {error}" + print(error_str) + self.signal_error.emit(error_str) + + if self.serial_port.isOpen(): + self.serial_port.close() + self.signal_disconnected.emit() + + def close(self): + """关闭串口和线程""" + self.reader_thread.stop() + self.processor_thread.stop() + self.reader_thread.wait() + self.processor_thread.wait() + if self.serial_port.isOpen(): + self.serial_port.close() \ No newline at end of file diff --git a/br_widget_arr_details.py b/br_widget_arr_details.py index e41cb60..d5e9668 100644 --- a/br_widget_arr_details.py +++ b/br_widget_arr_details.py @@ -8,10 +8,26 @@ from br_widget_basic_plot import BrWidgetBasicPlot class BrWidgetArrDetails(QWidget, Ui_Widget): signal_read: SignalInstance = Signal(list) signal_write: SignalInstance = Signal(list) + signal_stop: SignalInstance = Signal() + def __init__(self, parent=None): super().__init__(parent) self.setupUi(self) self.param_data: ParamDataArr = None + self.is_reading = False + self.is_writing = False + self.read_count = 0 # 读取计数器 + self.total_read_count = 0 # 总读取数量 + self.write_count = 0 # 写入计数器 + self.total_write_count = 0 # 总写入数量 + self.processed_read_addresses = set() # 已处理的读取地址 + self.processed_write_addresses = set() # 已处理的写入地址 + + # 初始化按钮状态 + self.pushButton_read.setEnabled(True) + self.pushButton_write.setEnabled(True) + self.pushButton_read_stop.setEnabled(False) + self.pushButton_write_stop.setEnabled(False) self.updating = False self.tableWidget_val.setColumnCount(2) @@ -21,7 +37,9 @@ class BrWidgetArrDetails(QWidget, Ui_Widget): self.tableWidget_setval.setHorizontalHeaderLabels(['Addr', 'Value']) self.tableWidget_setval.itemChanged.connect(self.on_table_setval_itemChanged) - + + self.pushButton_read_stop.clicked.connect(self.on_pushButton_read_stop_clicked) + self.pushButton_write_stop.clicked.connect(self.on_pushButton_write_stop_clicked) def on_table_setval_itemChanged(self, item: QTableWidgetItem): if self.updating: @@ -81,23 +99,84 @@ class BrWidgetArrDetails(QWidget, Ui_Widget): for i in range(0, len(self.param_data.setval_list)): self.tableWidget_setval.setItem(0, i, QTableWidgetItem(str(self.param_data.setval_list[i]))) + def reset_read_state(self): + """重置读取状态""" + print('Resetting read state') + self.is_reading = False + self.read_count = 0 + self.total_read_count = 0 + self.processed_read_addresses.clear() + self.pushButton_read.setEnabled(True) + self.pushButton_write.setEnabled(True) + self.pushButton_read_stop.setEnabled(False) + self.pushButton_write_stop.setEnabled(False) + + def reset_write_state(self): + """重置写入状态""" + print('Resetting write state') + self.is_writing = False + self.write_count = 0 + self.total_write_count = 0 + self.processed_write_addresses.clear() + self.pushButton_write.setEnabled(True) + self.pushButton_read.setEnabled(True) + self.pushButton_write_stop.setEnabled(False) + self.pushButton_read_stop.setEnabled(False) + @Slot() def on_pushButton_read_clicked(self): print('[read] button clicked!') + print(f'Starting read operation for array size: {self.param_data.size}') + # 确保之前的状态被清理 + self.reset_read_state() + self.reset_write_state() + + # 设置新的读取状态 + self.is_reading = True + self.read_count = 0 + self.total_read_count = self.param_data.size + + # 设置按钮状态 + self.pushButton_read.setEnabled(False) + self.pushButton_write.setEnabled(False) + self.pushButton_read_stop.setEnabled(True) + self.pushButton_write_stop.setEnabled(False) + # 生成一组读取命令 _cmd_list = [] + print(f'Expected addresses to read: ', end='') for i in range(0, self.param_data.size): + addr = self.param_data.addr + i _cmd_list.append( BrComMessage( BrComMessage.OP_READ, - self.param_data.addr + i + addr ) ) + print(f'{addr} ', end='') + print() # 换行 + print(f'Initialized read operation: count={self.read_count}, total={self.total_read_count}') self.signal_read.emit(_cmd_list) @Slot() def on_pushButton_write_clicked(self): print('[write] button clicked!') + # 确保之前的状态被清理 + self.reset_read_state() + self.reset_write_state() + + # 设置新的写入状态 + self.is_writing = True + self.write_count = 0 + self.total_write_count = self.param_data.size + + # 设置按钮状态 + self.pushButton_write.setEnabled(False) + self.pushButton_read.setEnabled(False) + self.pushButton_write_stop.setEnabled(True) + self.pushButton_read_stop.setEnabled(False) + + print(f'Initialized write operation: count={self.write_count}, total={self.total_write_count}') _cmd_list = [] for i in range(0, self.param_data.size): @@ -110,6 +189,25 @@ class BrWidgetArrDetails(QWidget, Ui_Widget): ) self.signal_write.emit(_cmd_list) + @Slot() + def on_pushButton_read_stop_clicked(self): + print('[read stop] button clicked!') + self.reset_read_state() + self.signal_stop.emit() + + @Slot() + def on_pushButton_write_stop_clicked(self): + print('[write stop] button clicked!') + self.reset_write_state() + self.signal_stop.emit() + + def operation_completed(self): + """操作完成时调用此方法""" + if self.is_reading: + self.reset_read_state() + if self.is_writing: + self.reset_write_state() + @Slot() def on_pushButton_valplot_clicked(self): print('[valpolt] button clicked!') @@ -122,6 +220,138 @@ class BrWidgetArrDetails(QWidget, Ui_Widget): def on_pushButton_setvalplot_clicked(self): print('[setvalplot] button clicked!') + def resend_missing_read_commands(self): + """重新发送未处理地址的读取命令""" + if not self.is_reading: + return + + array_start = self.param_data.addr + array_end = self.param_data.addr + self.param_data.size + expected_addresses = set(range(array_start, array_end)) + missing_addresses = expected_addresses - self.processed_read_addresses + + if missing_addresses: + print(f'Resending read commands for missing addresses: {sorted(missing_addresses)}') + _cmd_list = [] + for addr in sorted(missing_addresses): + _cmd_list.append( + BrComMessage( + BrComMessage.OP_READ, + addr + ) + ) + self.signal_read.emit(_cmd_list) + return True + return False + + def check_read_complete(self, addr): + """检查读取是否完成""" + print(f'Checking completion for addr: {addr}') + if not self.is_reading: + print(f'Not in reading state, ignoring addr: {addr}') + return + + # 检查地址是否在当前数组范围内 + array_start = self.param_data.addr + array_end = self.param_data.addr + self.param_data.size + print(f'Array range: {array_start} to {array_end-1}, checking addr: {addr}') + + if array_start <= addr < array_end and addr not in self.processed_read_addresses: + self.processed_read_addresses.add(addr) + self.read_count += 1 + print(f'Valid address {addr}, incrementing counter. Progress: {self.read_count}/{self.total_read_count}') + + # 找出未处理的地址 + expected_addresses = set(range(array_start, array_end)) + missing_addresses = expected_addresses - self.processed_read_addresses + print(f'Missing addresses: {sorted(missing_addresses)}') + + # 如果有未处理的地址且已经读取了一定数量(比如80%)的地址,尝试重发 + if missing_addresses and self.read_count >= self.total_read_count * 0.8: + print(f'Detected missing addresses at {self.read_count}/{self.total_read_count}, attempting retry') + if self.resend_missing_read_commands(): + print('Resent read commands for missing addresses') + return + + # 如果已经读取了预期数量的地址 + if self.read_count >= self.total_read_count: + if not missing_addresses: + print(f'All data read (count={self.read_count}), completing operation') + self.reset_read_state() + self.signal_stop.emit() + print('Read operation fully completed') + else: + if addr in self.processed_read_addresses: + print(f'Address {addr} already processed') + else: + print(f'Address {addr} outside current array range ({array_start}-{array_end-1})') + + def resend_missing_write_commands(self): + """重新发送未处理地址的写入命令""" + if not self.is_writing: + return + + array_start = self.param_data.addr + array_end = self.param_data.addr + self.param_data.size + expected_addresses = set(range(array_start, array_end)) + missing_addresses = expected_addresses - self.processed_write_addresses + + if missing_addresses: + print(f'Resending write commands for missing addresses: {sorted(missing_addresses)}') + _cmd_list = [] + for addr in sorted(missing_addresses): + _cmd_list.append( + BrComMessage( + BrComMessage.OP_WRITE, + addr, + self.param_data.getval_list[addr - array_start] + ) + ) + self.signal_write.emit(_cmd_list) + return True + return False + + def check_write_complete(self, addr): + """检查写入是否完成""" + print(f'Checking completion for addr: {addr}') + if not self.is_writing: + print(f'Not in writing state, ignoring addr: {addr}') + return + + # 检查地址是否在当前数组范围内 + array_start = self.param_data.addr + array_end = self.param_data.addr + self.param_data.size + print(f'Array range: {array_start} to {array_end-1}, checking addr: {addr}') + + if array_start <= addr < array_end and addr not in self.processed_write_addresses: + self.processed_write_addresses.add(addr) + self.write_count += 1 + print(f'Valid address {addr}, incrementing counter. Progress: {self.write_count}/{self.total_write_count}') + + # 找出未处理的地址 + expected_addresses = set(range(array_start, array_end)) + missing_addresses = expected_addresses - self.processed_write_addresses + print(f'Missing addresses: {sorted(missing_addresses)}') + + # 如果有未处理的地址且已经写入了一定数量(比如80%)的地址,尝试重发 + if missing_addresses and self.write_count >= self.total_write_count * 0.8: + print(f'Detected missing addresses at {self.write_count}/{self.total_write_count}, attempting retry') + if self.resend_missing_write_commands(): + print('Resent write commands for missing addresses') + return + + # 如果已经写入了预期数量的地址 + if self.write_count >= self.total_write_count: + if not missing_addresses: + print(f'All data written (count={self.write_count}), completing operation') + self.reset_write_state() + self.signal_stop.emit() + print('Write operation fully completed') + else: + if addr in self.processed_write_addresses: + print(f'Address {addr} already processed') + else: + print(f'Address {addr} outside current array range ({array_start}-{array_end-1})') if __name__ == "__main__": from PySide6.QtWidgets import QApplication diff --git a/main.exe b/main.exe new file mode 100644 index 0000000..c6b17ea Binary files /dev/null and b/main.exe differ diff --git a/main.py b/main.py index a803e44..7f6d37f 100644 --- a/main.py +++ b/main.py @@ -49,6 +49,20 @@ class MainWindow(QMainWindow, Ui_MainWindow): self.setupUi(self) + # 初始化日志区域折叠状态 + self.log_expanded = True + self.textEdit_original_height = self.textEdit.height() + self.pushButton_toggle_log.clicked.connect(self.toggle_log_area) + + # 添加自动折叠定时器 + self.auto_fold_timer = QTimer() + self.auto_fold_timer.setInterval(5000) # 30秒后自动折叠 + self.auto_fold_timer.timeout.connect(self.auto_fold_log) + self.auto_fold_timer.start() + + # 连接textEdit的textChanged信号以重置定时器 + self.textEdit.textChanged.connect(self.reset_auto_fold_timer) + self.param_data_list = [] self.data_manager = BrDataManager() @@ -56,7 +70,7 @@ class MainWindow(QMainWindow, Ui_MainWindow): self.init_tableview() # self.tableview_load_data() - self.setWindowTitle('Brisonus RNC Tunning Tool 2.0.4') + self.setWindowTitle('BrisonusRNCTunningTool_V2.0.6') self.com = QSerialPort() @@ -84,6 +98,7 @@ class MainWindow(QMainWindow, Ui_MainWindow): # 定义widget_page_arr的slot self.widget_page_arr.signal_read.connect(self.on_arr_read) self.widget_page_arr.signal_write.connect(self.on_arr_write) + self.widget_page_arr.signal_stop.connect(self.on_arr_stop) # var arr 两个页面都处于隐藏状态 self.widget_page_arr.hide() @@ -131,6 +146,15 @@ class MainWindow(QMainWindow, Ui_MainWindow): self.cmd_server.cmd_queue.put(_cmd_item) print('arr widget signal/callback on_read has been called!') + def on_arr_stop(self): + """处理数组操作的停止信号""" + # 清空命令队列 + while not self.cmd_server.cmd_queue.empty(): + self.cmd_server.cmd_queue.get() + self.statusbar.showMessage('操作已停止') + # 通知widget操作已完成 + self.widget_page_arr.operation_completed() + def init_comports(self): self.lineEdit_baudrate.setText(str(default_config['baud_rate'])) self.lineEdit_pollingInterval.setText(str(default_config['polling_interval'])) @@ -285,8 +309,12 @@ Polling interval: 100ms # 更新var page的显示值 self.statusbar.showMessage('地址[%d]读取完成!' % _msg.addr) self.update_details_page(self.selected_row) + # 检查数组读取是否完成 + self.widget_page_arr.check_read_complete(_msg.addr) if _msg.operation == BrComMessage.RES_WRITE: self.statusbar.showMessage('地址[%d]写入完成!' % _msg.addr) + # 检查数组写入是否完成 + self.widget_page_arr.check_write_complete(_msg.addr) current_datetime = datetime.now() formatted_datetime = current_datetime.strftime("[%Y-%m-%d %H:%M:%S]") @@ -322,6 +350,30 @@ Polling interval: 100ms else: event.ignore() + def toggle_log_area(self): + """切换日志区域的展开/折叠状态""" + if self.log_expanded: + # 折叠 + self.textEdit.setMaximumHeight(0) + self.pushButton_toggle_log.setText("︿") + else: + # 展开 + self.textEdit.setMaximumHeight(16777215) # 恢复默认最大高度 + self.pushButton_toggle_log.setText("﹀") + # 重置自动折叠定时器 + self.reset_auto_fold_timer() + self.log_expanded = not self.log_expanded + + def auto_fold_log(self): + """自动折叠日志区域""" + if self.log_expanded: + self.toggle_log_area() + + def reset_auto_fold_timer(self): + """重置自动折叠定时器""" + self.auto_fold_timer.stop() + self.auto_fold_timer.start() + # 按装订区域中的绿色按钮以运行脚本。 if __name__ == '__main__': app = QApplication() diff --git a/main.spec b/main.spec new file mode 100644 index 0000000..c55496f --- /dev/null +++ b/main.spec @@ -0,0 +1,38 @@ +# -*- mode: python ; coding: utf-8 -*- + + +a = Analysis( + ['main.py'], + pathex=[], + binaries=[], + datas=[], + hiddenimports=[], + hookspath=[], + hooksconfig={}, + runtime_hooks=[], + excludes=[], + noarchive=False, + optimize=0, +) +pyz = PYZ(a.pure) + +exe = EXE( + pyz, + a.scripts, + a.binaries, + a.datas, + [], + name='main', + debug=False, + bootloader_ignore_signals=False, + strip=False, + upx=True, + upx_exclude=[], + runtime_tmpdir=None, + console=False, + disable_windowed_traceback=False, + argv_emulation=False, + target_arch=None, + codesign_identity=None, + entitlements_file=None, +) diff --git a/rnc_tunning.py b/rnc_tunning.py index 9d6b69e..582cc12 100644 --- a/rnc_tunning.py +++ b/rnc_tunning.py @@ -142,6 +142,24 @@ class Ui_MainWindow(object): self.verticalLayout_2.addWidget(self.widget) + # 添加日志区域的水平布局 + self.horizontalLayout_log = QHBoxLayout() + self.horizontalLayout_log.setObjectName(u"horizontalLayout_log") + + # 添加日志标题和折叠按钮 + self.label_log = QLabel(self.centralwidget) + self.label_log.setObjectName(u"label_log") + self.horizontalLayout_log.addWidget(self.label_log) + + self.pushButton_toggle_log = QPushButton(self.centralwidget) + self.pushButton_toggle_log.setObjectName(u"pushButton_toggle_log") + self.pushButton_toggle_log.setMaximumWidth(20) + self.horizontalLayout_log.addWidget(self.pushButton_toggle_log) + + self.horizontalLayout_log.addStretch() + + self.verticalLayout_2.addLayout(self.horizontalLayout_log) + self.textEdit = QTextEdit(self.centralwidget) self.textEdit.setObjectName(u"textEdit") @@ -185,5 +203,7 @@ class Ui_MainWindow(object): self.pushButton_loadfile.setText(QCoreApplication.translate("MainWindow", u"\u52a0\u8f7d\u6587\u4ef6", None)) self.pushButton_update.setText(QCoreApplication.translate("MainWindow", u"\u5237\u65b0", None)) self.pushButton_copy.setText(QCoreApplication.translate("MainWindow", u"\u590d\u5236\u4fe1\u606f", None)) + self.label_log.setText(QCoreApplication.translate("MainWindow", u"\u65e5\u5fd7\u4fe1\u606f", None)) + self.pushButton_toggle_log.setText(QCoreApplication.translate("MainWindow", u"﹀", None)) # retranslateUi diff --git a/test_br_com_serial.py b/test_br_com_serial.py new file mode 100644 index 0000000..0746d0e --- /dev/null +++ b/test_br_com_serial.py @@ -0,0 +1,111 @@ +import sys +import time +from PySide6.QtCore import QCoreApplication, QTimer +from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo +from br_com_serial import BrComSerial +from br_com_message import BrComMessage + +class TestBrComSerial: + def __init__(self): + self.app = QCoreApplication(sys.argv) + self.com_serial = BrComSerial() + self.test_data = bytearray() + self.received_messages = [] + self.test_complete = False + + # 连接信号 + self.com_serial.signal_message.connect(self.on_message_received) + self.com_serial.signal_error.connect(self.on_error) + + # 创建测试定时器 + self.timer = QTimer() + self.timer.timeout.connect(self.run_test) + self.timer.start(100) # 每100ms执行一次测试 + + def generate_test_message(self, op_type, addr, value=None): + """生成测试消息""" + msg = bytearray([0xAA]) # 消息头 + msg.extend([0x00, 0x08]) # 长度字段(8字节) + msg.append(op_type) # 操作类型 + msg.extend(addr.to_bytes(4, 'big')) # 地址(4字节) + if value is not None: + msg.extend(value.to_bytes(4, 'big')) # 值(4字节) + return msg + + def simulate_serial_data(self): + """模拟串口数据""" + # 生成一些测试消息 + messages = [ + self.generate_test_message(BrComMessage.OP_READ, 100), + self.generate_test_message(BrComMessage.OP_READ, 101), + self.generate_test_message(BrComMessage.OP_WRITE, 102, 123), + self.generate_test_message(BrComMessage.OP_READ, 103), + ] + + # 模拟数据分片发送 + for msg in messages: + # 将消息分成多个部分发送 + parts = [msg[i:i+2] for i in range(0, len(msg), 2)] + for part in parts: + self.com_serial.serial_port.write(part) + time.sleep(0.01) # 模拟传输延迟 + + def on_message_received(self, msg): + """处理接收到的消息""" + print(f"Received message: op={msg.op}, addr={msg.addr}, value={msg.value}") + self.received_messages.append(msg) + + def on_error(self, error_str): + """处理错误""" + print(f"Error: {error_str}") + + def run_test(self): + """运行测试""" + if not self.test_complete: + # 模拟串口数据 + self.simulate_serial_data() + + # 检查接收到的消息 + if len(self.received_messages) >= 4: # 期望收到4条消息 + print("\nTest Results:") + print(f"Total messages received: {len(self.received_messages)}") + + # 验证消息内容 + for i, msg in enumerate(self.received_messages): + print(f"Message {i+1}: op={msg.op}, addr={msg.addr}, value={msg.value}") + + # 验证消息顺序和内容 + expected_messages = [ + (BrComMessage.OP_READ, 100), + (BrComMessage.OP_READ, 101), + (BrComMessage.OP_WRITE, 102), + (BrComMessage.OP_READ, 103) + ] + + for i, (expected_op, expected_addr) in enumerate(expected_messages): + if i < len(self.received_messages): + msg = self.received_messages[i] + if msg.op == expected_op and msg.addr == expected_addr: + print(f"Message {i+1} matches expected values") + else: + print(f"Message {i+1} does not match expected values") + print(f"Expected: op={expected_op}, addr={expected_addr}") + print(f"Received: op={msg.op}, addr={msg.addr}") + + self.test_complete = True + self.com_serial.close() + self.app.quit() + + def run(self): + """运行测试程序""" + # 创建虚拟串口 + port_info = QSerialPortInfo.availablePorts()[0] # 使用第一个可用串口 + self.com_serial.serial_port.setPort(port_info) + self.com_serial.serial_port.setBaudRate(115200) + self.com_serial.serial_port.open() + + return self.app.exec() + +if __name__ == "__main__": + test = TestBrComSerial() + sys.exit(test.run()) \ No newline at end of file diff --git a/test_device_simulator.py b/test_device_simulator.py new file mode 100644 index 0000000..fe01578 --- /dev/null +++ b/test_device_simulator.py @@ -0,0 +1,94 @@ +import sys +import time +import random +from PySide6.QtCore import QCoreApplication, QTimer +from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo +from br_com_message import BrComMessage + +class DeviceSimulator: + def __init__(self): + self.app = QCoreApplication(sys.argv) + self.serial_port = QSerialPort() + self.running = True + + # 创建发送定时器 + self.timer = QTimer() + self.timer.timeout.connect(self.send_data) + self.timer.start(100) # 每100ms发送一次数据 + + # 测试数据配置 + self.test_addresses = [100, 101, 102, 103, 104, 105] + self.test_values = [0, 100, 200, 300, 400, 500] + + def generate_message(self, op_type, addr, value=None): + """生成消息""" + msg = bytearray([0xAA]) # 消息头 + msg.extend([0x00, 0x08]) # 长度字段(8字节) + msg.append(op_type) # 操作类型 + msg.extend(addr.to_bytes(4, 'big')) # 地址(4字节) + if value is not None: + msg.extend(value.to_bytes(4, 'big')) # 值(4字节) + return msg + + def send_data(self): + """发送数据""" + if not self.serial_port.isOpen(): + return + + try: + # 随机选择操作类型和地址 + op_type = random.choice([BrComMessage.OP_READ, BrComMessage.OP_WRITE]) + addr = random.choice(self.test_addresses) + value = random.choice(self.test_values) if op_type == BrComMessage.OP_WRITE else None + + # 生成消息 + msg = self.generate_message(op_type, addr, value) + + # 模拟数据分片发送 + parts = [msg[i:i+2] for i in range(0, len(msg), 2)] + for part in parts: + self.serial_port.write(part) + # 随机延迟,模拟真实设备 + time.sleep(random.uniform(0.001, 0.01)) + + print(f"Sent: op={op_type}, addr={addr}, value={value}") + + except Exception as e: + print(f"Error sending data: {e}") + + def run(self): + """运行模拟器""" + # 查找可用串口 + available_ports = QSerialPortInfo.availablePorts() + if not available_ports: + print("No available serial ports found!") + return 1 + + # 使用第一个可用串口 + port_info = available_ports[0] + print(f"Using port: {port_info.portName()}") + + self.serial_port.setPort(port_info) + self.serial_port.setBaudRate(115200) + + if not self.serial_port.open(): + print(f"Failed to open port {port_info.portName()}") + return 1 + + print("Device simulator started") + return self.app.exec() + + def close(self): + """关闭模拟器""" + self.running = False + if self.serial_port.isOpen(): + self.serial_port.close() + +if __name__ == "__main__": + simulator = DeviceSimulator() + try: + sys.exit(simulator.run()) + except KeyboardInterrupt: + print("\nSimulator stopped by user") + simulator.close() + sys.exit(0) \ No newline at end of file diff --git a/ui_arr_details.py b/ui_arr_details.py index 2398d27..7cc3747 100644 --- a/ui_arr_details.py +++ b/ui_arr_details.py @@ -77,6 +77,11 @@ class Ui_Widget(object): self.verticalLayout_2.addWidget(self.pushButton_read) + self.pushButton_read_stop = QPushButton(Widget) + self.pushButton_read_stop.setObjectName(u"pushButton_read_stop") + self.pushButton_read_stop.setEnabled(False) + self.verticalLayout_2.addWidget(self.pushButton_read_stop) + self.pushButton_valplot = QPushButton(Widget) self.pushButton_valplot.setObjectName(u"pushButton_valplot") @@ -115,6 +120,11 @@ class Ui_Widget(object): self.verticalLayout_3.addWidget(self.pushButton_write) + self.pushButton_write_stop = QPushButton(Widget) + self.pushButton_write_stop.setObjectName(u"pushButton_write_stop") + self.pushButton_write_stop.setEnabled(False) + self.verticalLayout_3.addWidget(self.pushButton_write_stop) + self.pushButton_setvalplot = QPushButton(Widget) self.pushButton_setvalplot.setObjectName(u"pushButton_setvalplot") @@ -163,9 +173,11 @@ class Ui_Widget(object): self.label_2.setText(QCoreApplication.translate("Widget", u"\u53d8\u91cf\u63cf\u8ff0", None)) self.label_5.setText(QCoreApplication.translate("Widget", u"\u5f53\u524d\u503c", None)) self.pushButton_read.setText(QCoreApplication.translate("Widget", u"Read", None)) + self.pushButton_read_stop.setText(QCoreApplication.translate("Widget", u"Stop", None)) self.pushButton_valplot.setText(QCoreApplication.translate("Widget", u"Plot", None)) self.label_6.setText(QCoreApplication.translate("Widget", u"\u8bbe\u5b9a\u503c", None)) self.pushButton_write.setText(QCoreApplication.translate("Widget", u"Write", None)) + self.pushButton_write_stop.setText(QCoreApplication.translate("Widget", u"Stop", None)) self.pushButton_setvalplot.setText(QCoreApplication.translate("Widget", u"Plot", None)) self.label_details.setText(QCoreApplication.translate("Widget", u"\u7a7a", None)) # retranslateUi diff --git a/ui_basic_plot.py b/ui_basic_plot.py index 9d1fb66..72bdbc2 100644 --- a/ui_basic_plot.py +++ b/ui_basic_plot.py @@ -42,7 +42,7 @@ class Ui_Dialog(object): self.verticalLayout.addWidget(self.listView) - self.verticalLayout.setStretch() + self.verticalLayout.setStretch(2, 1) self.horizontalLayout.addLayout(self.verticalLayout)