rnc_tuning/test_device_simulator.py
2025-11-03 13:53:12 +08:00

94 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import time
import random
from PySide6.QtCore import QCoreApplication, QTimer
from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo
from br_com_message import BrComMessage
class DeviceSimulator:
def __init__(self):
self.app = QCoreApplication(sys.argv)
self.serial_port = QSerialPort()
self.running = True
# 创建发送定时器
self.timer = QTimer()
self.timer.timeout.connect(self.send_data)
self.timer.start(100) # 每100ms发送一次数据
# 测试数据配置
self.test_addresses = [100, 101, 102, 103, 104, 105]
self.test_values = [0, 100, 200, 300, 400, 500]
def generate_message(self, op_type, addr, value=None):
"""生成消息"""
msg = bytearray([0xAA]) # 消息头
msg.extend([0x00, 0x08]) # 长度字段8字节
msg.append(op_type) # 操作类型
msg.extend(addr.to_bytes(4, 'big')) # 地址4字节
if value is not None:
msg.extend(value.to_bytes(4, 'big')) # 值4字节
return msg
def send_data(self):
"""发送数据"""
if not self.serial_port.isOpen():
return
try:
# 随机选择操作类型和地址
op_type = random.choice([BrComMessage.OP_READ, BrComMessage.OP_WRITE])
addr = random.choice(self.test_addresses)
value = random.choice(self.test_values) if op_type == BrComMessage.OP_WRITE else None
# 生成消息
msg = self.generate_message(op_type, addr, value)
# 模拟数据分片发送
parts = [msg[i:i+2] for i in range(0, len(msg), 2)]
for part in parts:
self.serial_port.write(part)
# 随机延迟,模拟真实设备
time.sleep(random.uniform(0.001, 0.01))
print(f"Sent: op={op_type}, addr={addr}, value={value}")
except Exception as e:
print(f"Error sending data: {e}")
def run(self):
"""运行模拟器"""
# 查找可用串口
available_ports = QSerialPortInfo.availablePorts()
if not available_ports:
print("No available serial ports found!")
return 1
# 使用第一个可用串口
port_info = available_ports[0]
print(f"Using port: {port_info.portName()}")
self.serial_port.setPort(port_info)
self.serial_port.setBaudRate(115200)
if not self.serial_port.open():
print(f"Failed to open port {port_info.portName()}")
return 1
print("Device simulator started")
return self.app.exec()
def close(self):
"""关闭模拟器"""
self.running = False
if self.serial_port.isOpen():
self.serial_port.close()
if __name__ == "__main__":
simulator = DeviceSimulator()
try:
sys.exit(simulator.run())
except KeyboardInterrupt:
print("\nSimulator stopped by user")
simulator.close()
sys.exit(0)