import sys import time import random from PySide6.QtCore import QCoreApplication, QTimer from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo from br_com_message import BrComMessage class DeviceSimulator: def __init__(self): self.app = QCoreApplication(sys.argv) self.serial_port = QSerialPort() self.running = True # 创建发送定时器 self.timer = QTimer() self.timer.timeout.connect(self.send_data) self.timer.start(100) # 每100ms发送一次数据 # 测试数据配置 self.test_addresses = [100, 101, 102, 103, 104, 105] self.test_values = [0, 100, 200, 300, 400, 500] def generate_message(self, op_type, addr, value=None): """生成消息""" msg = bytearray([0xAA]) # 消息头 msg.extend([0x00, 0x08]) # 长度字段(8字节) msg.append(op_type) # 操作类型 msg.extend(addr.to_bytes(4, 'big')) # 地址(4字节) if value is not None: msg.extend(value.to_bytes(4, 'big')) # 值(4字节) return msg def send_data(self): """发送数据""" if not self.serial_port.isOpen(): return try: # 随机选择操作类型和地址 op_type = random.choice([BrComMessage.OP_READ, BrComMessage.OP_WRITE]) addr = random.choice(self.test_addresses) value = random.choice(self.test_values) if op_type == BrComMessage.OP_WRITE else None # 生成消息 msg = self.generate_message(op_type, addr, value) # 模拟数据分片发送 parts = [msg[i:i+2] for i in range(0, len(msg), 2)] for part in parts: self.serial_port.write(part) # 随机延迟,模拟真实设备 time.sleep(random.uniform(0.001, 0.01)) print(f"Sent: op={op_type}, addr={addr}, value={value}") except Exception as e: print(f"Error sending data: {e}") def run(self): """运行模拟器""" # 查找可用串口 available_ports = QSerialPortInfo.availablePorts() if not available_ports: print("No available serial ports found!") return 1 # 使用第一个可用串口 port_info = available_ports[0] print(f"Using port: {port_info.portName()}") self.serial_port.setPort(port_info) self.serial_port.setBaudRate(115200) if not self.serial_port.open(): print(f"Failed to open port {port_info.portName()}") return 1 print("Device simulator started") return self.app.exec() def close(self): """关闭模拟器""" self.running = False if self.serial_port.isOpen(): self.serial_port.close() if __name__ == "__main__": simulator = DeviceSimulator() try: sys.exit(simulator.run()) except KeyboardInterrupt: print("\nSimulator stopped by user") simulator.close() sys.exit(0)