rnc_tuning/test_br_com_serial.py

111 lines
4.4 KiB
Python
Raw Permalink Normal View History

2025-11-03 13:53:12 +08:00
import sys
import time
from PySide6.QtCore import QCoreApplication, QTimer
from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo
from br_com_serial import BrComSerial
from br_com_message import BrComMessage
class TestBrComSerial:
def __init__(self):
self.app = QCoreApplication(sys.argv)
self.com_serial = BrComSerial()
self.test_data = bytearray()
self.received_messages = []
self.test_complete = False
# 连接信号
self.com_serial.signal_message.connect(self.on_message_received)
self.com_serial.signal_error.connect(self.on_error)
# 创建测试定时器
self.timer = QTimer()
self.timer.timeout.connect(self.run_test)
self.timer.start(100) # 每100ms执行一次测试
def generate_test_message(self, op_type, addr, value=None):
"""生成测试消息"""
msg = bytearray([0xAA]) # 消息头
msg.extend([0x00, 0x08]) # 长度字段8字节
msg.append(op_type) # 操作类型
msg.extend(addr.to_bytes(4, 'big')) # 地址4字节
if value is not None:
msg.extend(value.to_bytes(4, 'big')) # 值4字节
return msg
def simulate_serial_data(self):
"""模拟串口数据"""
# 生成一些测试消息
messages = [
self.generate_test_message(BrComMessage.OP_READ, 100),
self.generate_test_message(BrComMessage.OP_READ, 101),
self.generate_test_message(BrComMessage.OP_WRITE, 102, 123),
self.generate_test_message(BrComMessage.OP_READ, 103),
]
# 模拟数据分片发送
for msg in messages:
# 将消息分成多个部分发送
parts = [msg[i:i+2] for i in range(0, len(msg), 2)]
for part in parts:
self.com_serial.serial_port.write(part)
time.sleep(0.01) # 模拟传输延迟
def on_message_received(self, msg):
"""处理接收到的消息"""
print(f"Received message: op={msg.op}, addr={msg.addr}, value={msg.value}")
self.received_messages.append(msg)
def on_error(self, error_str):
"""处理错误"""
print(f"Error: {error_str}")
def run_test(self):
"""运行测试"""
if not self.test_complete:
# 模拟串口数据
self.simulate_serial_data()
# 检查接收到的消息
if len(self.received_messages) >= 4: # 期望收到4条消息
print("\nTest Results:")
print(f"Total messages received: {len(self.received_messages)}")
# 验证消息内容
for i, msg in enumerate(self.received_messages):
print(f"Message {i+1}: op={msg.op}, addr={msg.addr}, value={msg.value}")
# 验证消息顺序和内容
expected_messages = [
(BrComMessage.OP_READ, 100),
(BrComMessage.OP_READ, 101),
(BrComMessage.OP_WRITE, 102),
(BrComMessage.OP_READ, 103)
]
for i, (expected_op, expected_addr) in enumerate(expected_messages):
if i < len(self.received_messages):
msg = self.received_messages[i]
if msg.op == expected_op and msg.addr == expected_addr:
print(f"Message {i+1} matches expected values")
else:
print(f"Message {i+1} does not match expected values")
print(f"Expected: op={expected_op}, addr={expected_addr}")
print(f"Received: op={msg.op}, addr={msg.addr}")
self.test_complete = True
self.com_serial.close()
self.app.quit()
def run(self):
"""运行测试程序"""
# 创建虚拟串口
port_info = QSerialPortInfo.availablePorts()[0] # 使用第一个可用串口
self.com_serial.serial_port.setPort(port_info)
self.com_serial.serial_port.setBaudRate(115200)
self.com_serial.serial_port.open()
return self.app.exec()
if __name__ == "__main__":
test = TestBrComSerial()
sys.exit(test.run())