import time from queue import Queue import queue import modbus_tk.exceptions import serial from PySide6.QtCore import QObject, Signal, QThread, Slot from modbus_tk.modbus_rtu import RtuMaster import modbus_tk.defines as cst from threading import Lock from communication.com_cmd import ComCmd from communication.param_manager import ParamManager class Worker(QObject): _instance = None signal_task_done = Signal() signal_task_done_2 = Signal() signal_task_failed = Signal() # 任务完成进度 signal_task_progress = Signal(dict) signal_start_task = Signal() signal_communication_failed = Signal() def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(Worker, cls).__new__(cls) return cls._instance def __init__(self): super().__init__() self.serial: serial = None self.signal_start_task.connect(self.do_work) self.com_api = None self.param_manager = ParamManager() def set_com_interface(self, com_data): try: self.serial = serial.Serial(com_data['name'], 115200) self.com_api:RtuMaster = RtuMaster(self.serial) self.com_api.set_timeout(0.3) return True except Exception as e: print(e) return False def disconnect_interface(self): try: self.com_api.close() self.serial.close() return True except Exception as e: return False @Slot() def do_work(self): if self.com_api is None: self.param_manager.task_queue.empty() self.signal_communication_failed.emit() return # 获取queue长度 task_total = self.param_manager.task_queue.qsize() task_finished_num = 0 if task_total == 0: return while True: try: _cmd: ComCmd = self.param_manager.task_queue.get(timeout=1) if _cmd.op == ComCmd.OperationType.READ: _op = cst.READ_HOLDING_REGISTERS res = self.com_api.execute(1, _op, _cmd.addr,_cmd.len) else: _op = cst.WRITE_MULTIPLE_REGISTERS res = self.com_api.execute(1, _op, _cmd.addr,_cmd.len, _cmd.write_vals) self.param_manager.process_response(_cmd, res) task_finished_num += 1 task_progress = int((task_finished_num / task_total)*100) self.signal_task_progress.emit({ "progress": task_progress, "processed": task_finished_num, "total": task_total }) time.sleep(0.05) except queue.Empty as e: self.signal_task_done.emit() self.signal_task_done_2.emit() break except modbus_tk.exceptions.ModbusInvalidResponseError: self.signal_communication_failed.emit() break class ComThread(QThread): _instance = None _lock = Lock() def __new__(cls, *args, **kwargs): with cls._lock: if cls._instance is None: cls._instance = super(ComThread, cls).__new__(cls) return cls._instance def __init__(self): if not hasattr(self, '_initialized'): super().__init__() self._initialized = True self.worker = Worker() self.started.connect(self.start_event_loop) def start_event_loop(self): self.exec() def run_task(self): self.worker.signal_start_task.emit() def cleanup(self): """退出时清理线程""" if self.isRunning(): self.quit() # 通知线程事件循环退出 self.wait() # 等待线程安全结束