94 lines
3.2 KiB
Python
94 lines
3.2 KiB
Python
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
import random
|
|||
|
|
from PySide6.QtCore import QCoreApplication, QTimer
|
|||
|
|
from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo
|
|||
|
|
from br_com_message import BrComMessage
|
|||
|
|
|
|||
|
|
class DeviceSimulator:
|
|||
|
|
def __init__(self):
|
|||
|
|
self.app = QCoreApplication(sys.argv)
|
|||
|
|
self.serial_port = QSerialPort()
|
|||
|
|
self.running = True
|
|||
|
|
|
|||
|
|
# 创建发送定时器
|
|||
|
|
self.timer = QTimer()
|
|||
|
|
self.timer.timeout.connect(self.send_data)
|
|||
|
|
self.timer.start(100) # 每100ms发送一次数据
|
|||
|
|
|
|||
|
|
# 测试数据配置
|
|||
|
|
self.test_addresses = [100, 101, 102, 103, 104, 105]
|
|||
|
|
self.test_values = [0, 100, 200, 300, 400, 500]
|
|||
|
|
|
|||
|
|
def generate_message(self, op_type, addr, value=None):
|
|||
|
|
"""生成消息"""
|
|||
|
|
msg = bytearray([0xAA]) # 消息头
|
|||
|
|
msg.extend([0x00, 0x08]) # 长度字段(8字节)
|
|||
|
|
msg.append(op_type) # 操作类型
|
|||
|
|
msg.extend(addr.to_bytes(4, 'big')) # 地址(4字节)
|
|||
|
|
if value is not None:
|
|||
|
|
msg.extend(value.to_bytes(4, 'big')) # 值(4字节)
|
|||
|
|
return msg
|
|||
|
|
|
|||
|
|
def send_data(self):
|
|||
|
|
"""发送数据"""
|
|||
|
|
if not self.serial_port.isOpen():
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 随机选择操作类型和地址
|
|||
|
|
op_type = random.choice([BrComMessage.OP_READ, BrComMessage.OP_WRITE])
|
|||
|
|
addr = random.choice(self.test_addresses)
|
|||
|
|
value = random.choice(self.test_values) if op_type == BrComMessage.OP_WRITE else None
|
|||
|
|
|
|||
|
|
# 生成消息
|
|||
|
|
msg = self.generate_message(op_type, addr, value)
|
|||
|
|
|
|||
|
|
# 模拟数据分片发送
|
|||
|
|
parts = [msg[i:i+2] for i in range(0, len(msg), 2)]
|
|||
|
|
for part in parts:
|
|||
|
|
self.serial_port.write(part)
|
|||
|
|
# 随机延迟,模拟真实设备
|
|||
|
|
time.sleep(random.uniform(0.001, 0.01))
|
|||
|
|
|
|||
|
|
print(f"Sent: op={op_type}, addr={addr}, value={value}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error sending data: {e}")
|
|||
|
|
|
|||
|
|
def run(self):
|
|||
|
|
"""运行模拟器"""
|
|||
|
|
# 查找可用串口
|
|||
|
|
available_ports = QSerialPortInfo.availablePorts()
|
|||
|
|
if not available_ports:
|
|||
|
|
print("No available serial ports found!")
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
# 使用第一个可用串口
|
|||
|
|
port_info = available_ports[0]
|
|||
|
|
print(f"Using port: {port_info.portName()}")
|
|||
|
|
|
|||
|
|
self.serial_port.setPort(port_info)
|
|||
|
|
self.serial_port.setBaudRate(115200)
|
|||
|
|
|
|||
|
|
if not self.serial_port.open():
|
|||
|
|
print(f"Failed to open port {port_info.portName()}")
|
|||
|
|
return 1
|
|||
|
|
|
|||
|
|
print("Device simulator started")
|
|||
|
|
return self.app.exec()
|
|||
|
|
|
|||
|
|
def close(self):
|
|||
|
|
"""关闭模拟器"""
|
|||
|
|
self.running = False
|
|||
|
|
if self.serial_port.isOpen():
|
|||
|
|
self.serial_port.close()
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
simulator = DeviceSimulator()
|
|||
|
|
try:
|
|||
|
|
sys.exit(simulator.run())
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
print("\nSimulator stopped by user")
|
|||
|
|
simulator.close()
|
|||
|
|
sys.exit(0)
|