111 lines
4.4 KiB
Python
111 lines
4.4 KiB
Python
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
from PySide6.QtCore import QCoreApplication, QTimer
|
|||
|
|
from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo
|
|||
|
|
from br_com_serial import BrComSerial
|
|||
|
|
from br_com_message import BrComMessage
|
|||
|
|
|
|||
|
|
class TestBrComSerial:
|
|||
|
|
def __init__(self):
|
|||
|
|
self.app = QCoreApplication(sys.argv)
|
|||
|
|
self.com_serial = BrComSerial()
|
|||
|
|
self.test_data = bytearray()
|
|||
|
|
self.received_messages = []
|
|||
|
|
self.test_complete = False
|
|||
|
|
|
|||
|
|
# 连接信号
|
|||
|
|
self.com_serial.signal_message.connect(self.on_message_received)
|
|||
|
|
self.com_serial.signal_error.connect(self.on_error)
|
|||
|
|
|
|||
|
|
# 创建测试定时器
|
|||
|
|
self.timer = QTimer()
|
|||
|
|
self.timer.timeout.connect(self.run_test)
|
|||
|
|
self.timer.start(100) # 每100ms执行一次测试
|
|||
|
|
|
|||
|
|
def generate_test_message(self, op_type, addr, value=None):
|
|||
|
|
"""生成测试消息"""
|
|||
|
|
msg = bytearray([0xAA]) # 消息头
|
|||
|
|
msg.extend([0x00, 0x08]) # 长度字段(8字节)
|
|||
|
|
msg.append(op_type) # 操作类型
|
|||
|
|
msg.extend(addr.to_bytes(4, 'big')) # 地址(4字节)
|
|||
|
|
if value is not None:
|
|||
|
|
msg.extend(value.to_bytes(4, 'big')) # 值(4字节)
|
|||
|
|
return msg
|
|||
|
|
|
|||
|
|
def simulate_serial_data(self):
|
|||
|
|
"""模拟串口数据"""
|
|||
|
|
# 生成一些测试消息
|
|||
|
|
messages = [
|
|||
|
|
self.generate_test_message(BrComMessage.OP_READ, 100),
|
|||
|
|
self.generate_test_message(BrComMessage.OP_READ, 101),
|
|||
|
|
self.generate_test_message(BrComMessage.OP_WRITE, 102, 123),
|
|||
|
|
self.generate_test_message(BrComMessage.OP_READ, 103),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 模拟数据分片发送
|
|||
|
|
for msg in messages:
|
|||
|
|
# 将消息分成多个部分发送
|
|||
|
|
parts = [msg[i:i+2] for i in range(0, len(msg), 2)]
|
|||
|
|
for part in parts:
|
|||
|
|
self.com_serial.serial_port.write(part)
|
|||
|
|
time.sleep(0.01) # 模拟传输延迟
|
|||
|
|
|
|||
|
|
def on_message_received(self, msg):
|
|||
|
|
"""处理接收到的消息"""
|
|||
|
|
print(f"Received message: op={msg.op}, addr={msg.addr}, value={msg.value}")
|
|||
|
|
self.received_messages.append(msg)
|
|||
|
|
|
|||
|
|
def on_error(self, error_str):
|
|||
|
|
"""处理错误"""
|
|||
|
|
print(f"Error: {error_str}")
|
|||
|
|
|
|||
|
|
def run_test(self):
|
|||
|
|
"""运行测试"""
|
|||
|
|
if not self.test_complete:
|
|||
|
|
# 模拟串口数据
|
|||
|
|
self.simulate_serial_data()
|
|||
|
|
|
|||
|
|
# 检查接收到的消息
|
|||
|
|
if len(self.received_messages) >= 4: # 期望收到4条消息
|
|||
|
|
print("\nTest Results:")
|
|||
|
|
print(f"Total messages received: {len(self.received_messages)}")
|
|||
|
|
|
|||
|
|
# 验证消息内容
|
|||
|
|
for i, msg in enumerate(self.received_messages):
|
|||
|
|
print(f"Message {i+1}: op={msg.op}, addr={msg.addr}, value={msg.value}")
|
|||
|
|
|
|||
|
|
# 验证消息顺序和内容
|
|||
|
|
expected_messages = [
|
|||
|
|
(BrComMessage.OP_READ, 100),
|
|||
|
|
(BrComMessage.OP_READ, 101),
|
|||
|
|
(BrComMessage.OP_WRITE, 102),
|
|||
|
|
(BrComMessage.OP_READ, 103)
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for i, (expected_op, expected_addr) in enumerate(expected_messages):
|
|||
|
|
if i < len(self.received_messages):
|
|||
|
|
msg = self.received_messages[i]
|
|||
|
|
if msg.op == expected_op and msg.addr == expected_addr:
|
|||
|
|
print(f"Message {i+1} matches expected values")
|
|||
|
|
else:
|
|||
|
|
print(f"Message {i+1} does not match expected values")
|
|||
|
|
print(f"Expected: op={expected_op}, addr={expected_addr}")
|
|||
|
|
print(f"Received: op={msg.op}, addr={msg.addr}")
|
|||
|
|
|
|||
|
|
self.test_complete = True
|
|||
|
|
self.com_serial.close()
|
|||
|
|
self.app.quit()
|
|||
|
|
|
|||
|
|
def run(self):
|
|||
|
|
"""运行测试程序"""
|
|||
|
|
# 创建虚拟串口
|
|||
|
|
port_info = QSerialPortInfo.availablePorts()[0] # 使用第一个可用串口
|
|||
|
|
self.com_serial.serial_port.setPort(port_info)
|
|||
|
|
self.com_serial.serial_port.setBaudRate(115200)
|
|||
|
|
self.com_serial.serial_port.open()
|
|||
|
|
|
|||
|
|
return self.app.exec()
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
test = TestBrComSerial()
|
|||
|
|
sys.exit(test.run())
|