94 lines
3.2 KiB
Python
94 lines
3.2 KiB
Python
import sys
|
||
import time
|
||
import random
|
||
from PySide6.QtCore import QCoreApplication, QTimer
|
||
from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo
|
||
from br_com_message import BrComMessage
|
||
|
||
class DeviceSimulator:
|
||
def __init__(self):
|
||
self.app = QCoreApplication(sys.argv)
|
||
self.serial_port = QSerialPort()
|
||
self.running = True
|
||
|
||
# 创建发送定时器
|
||
self.timer = QTimer()
|
||
self.timer.timeout.connect(self.send_data)
|
||
self.timer.start(100) # 每100ms发送一次数据
|
||
|
||
# 测试数据配置
|
||
self.test_addresses = [100, 101, 102, 103, 104, 105]
|
||
self.test_values = [0, 100, 200, 300, 400, 500]
|
||
|
||
def generate_message(self, op_type, addr, value=None):
|
||
"""生成消息"""
|
||
msg = bytearray([0xAA]) # 消息头
|
||
msg.extend([0x00, 0x08]) # 长度字段(8字节)
|
||
msg.append(op_type) # 操作类型
|
||
msg.extend(addr.to_bytes(4, 'big')) # 地址(4字节)
|
||
if value is not None:
|
||
msg.extend(value.to_bytes(4, 'big')) # 值(4字节)
|
||
return msg
|
||
|
||
def send_data(self):
|
||
"""发送数据"""
|
||
if not self.serial_port.isOpen():
|
||
return
|
||
|
||
try:
|
||
# 随机选择操作类型和地址
|
||
op_type = random.choice([BrComMessage.OP_READ, BrComMessage.OP_WRITE])
|
||
addr = random.choice(self.test_addresses)
|
||
value = random.choice(self.test_values) if op_type == BrComMessage.OP_WRITE else None
|
||
|
||
# 生成消息
|
||
msg = self.generate_message(op_type, addr, value)
|
||
|
||
# 模拟数据分片发送
|
||
parts = [msg[i:i+2] for i in range(0, len(msg), 2)]
|
||
for part in parts:
|
||
self.serial_port.write(part)
|
||
# 随机延迟,模拟真实设备
|
||
time.sleep(random.uniform(0.001, 0.01))
|
||
|
||
print(f"Sent: op={op_type}, addr={addr}, value={value}")
|
||
|
||
except Exception as e:
|
||
print(f"Error sending data: {e}")
|
||
|
||
def run(self):
|
||
"""运行模拟器"""
|
||
# 查找可用串口
|
||
available_ports = QSerialPortInfo.availablePorts()
|
||
if not available_ports:
|
||
print("No available serial ports found!")
|
||
return 1
|
||
|
||
# 使用第一个可用串口
|
||
port_info = available_ports[0]
|
||
print(f"Using port: {port_info.portName()}")
|
||
|
||
self.serial_port.setPort(port_info)
|
||
self.serial_port.setBaudRate(115200)
|
||
|
||
if not self.serial_port.open():
|
||
print(f"Failed to open port {port_info.portName()}")
|
||
return 1
|
||
|
||
print("Device simulator started")
|
||
return self.app.exec()
|
||
|
||
def close(self):
|
||
"""关闭模拟器"""
|
||
self.running = False
|
||
if self.serial_port.isOpen():
|
||
self.serial_port.close()
|
||
|
||
if __name__ == "__main__":
|
||
simulator = DeviceSimulator()
|
||
try:
|
||
sys.exit(simulator.run())
|
||
except KeyboardInterrupt:
|
||
print("\nSimulator stopped by user")
|
||
simulator.close()
|
||
sys.exit(0) |