111 lines
4.4 KiB
Python
111 lines
4.4 KiB
Python
import sys
|
||
import time
|
||
from PySide6.QtCore import QCoreApplication, QTimer
|
||
from PySide6.QtSerialPort import QSerialPort, QSerialPortInfo
|
||
from br_com_serial import BrComSerial
|
||
from br_com_message import BrComMessage
|
||
|
||
class TestBrComSerial:
|
||
def __init__(self):
|
||
self.app = QCoreApplication(sys.argv)
|
||
self.com_serial = BrComSerial()
|
||
self.test_data = bytearray()
|
||
self.received_messages = []
|
||
self.test_complete = False
|
||
|
||
# 连接信号
|
||
self.com_serial.signal_message.connect(self.on_message_received)
|
||
self.com_serial.signal_error.connect(self.on_error)
|
||
|
||
# 创建测试定时器
|
||
self.timer = QTimer()
|
||
self.timer.timeout.connect(self.run_test)
|
||
self.timer.start(100) # 每100ms执行一次测试
|
||
|
||
def generate_test_message(self, op_type, addr, value=None):
|
||
"""生成测试消息"""
|
||
msg = bytearray([0xAA]) # 消息头
|
||
msg.extend([0x00, 0x08]) # 长度字段(8字节)
|
||
msg.append(op_type) # 操作类型
|
||
msg.extend(addr.to_bytes(4, 'big')) # 地址(4字节)
|
||
if value is not None:
|
||
msg.extend(value.to_bytes(4, 'big')) # 值(4字节)
|
||
return msg
|
||
|
||
def simulate_serial_data(self):
|
||
"""模拟串口数据"""
|
||
# 生成一些测试消息
|
||
messages = [
|
||
self.generate_test_message(BrComMessage.OP_READ, 100),
|
||
self.generate_test_message(BrComMessage.OP_READ, 101),
|
||
self.generate_test_message(BrComMessage.OP_WRITE, 102, 123),
|
||
self.generate_test_message(BrComMessage.OP_READ, 103),
|
||
]
|
||
|
||
# 模拟数据分片发送
|
||
for msg in messages:
|
||
# 将消息分成多个部分发送
|
||
parts = [msg[i:i+2] for i in range(0, len(msg), 2)]
|
||
for part in parts:
|
||
self.com_serial.serial_port.write(part)
|
||
time.sleep(0.01) # 模拟传输延迟
|
||
|
||
def on_message_received(self, msg):
|
||
"""处理接收到的消息"""
|
||
print(f"Received message: op={msg.op}, addr={msg.addr}, value={msg.value}")
|
||
self.received_messages.append(msg)
|
||
|
||
def on_error(self, error_str):
|
||
"""处理错误"""
|
||
print(f"Error: {error_str}")
|
||
|
||
def run_test(self):
|
||
"""运行测试"""
|
||
if not self.test_complete:
|
||
# 模拟串口数据
|
||
self.simulate_serial_data()
|
||
|
||
# 检查接收到的消息
|
||
if len(self.received_messages) >= 4: # 期望收到4条消息
|
||
print("\nTest Results:")
|
||
print(f"Total messages received: {len(self.received_messages)}")
|
||
|
||
# 验证消息内容
|
||
for i, msg in enumerate(self.received_messages):
|
||
print(f"Message {i+1}: op={msg.op}, addr={msg.addr}, value={msg.value}")
|
||
|
||
# 验证消息顺序和内容
|
||
expected_messages = [
|
||
(BrComMessage.OP_READ, 100),
|
||
(BrComMessage.OP_READ, 101),
|
||
(BrComMessage.OP_WRITE, 102),
|
||
(BrComMessage.OP_READ, 103)
|
||
]
|
||
|
||
for i, (expected_op, expected_addr) in enumerate(expected_messages):
|
||
if i < len(self.received_messages):
|
||
msg = self.received_messages[i]
|
||
if msg.op == expected_op and msg.addr == expected_addr:
|
||
print(f"Message {i+1} matches expected values")
|
||
else:
|
||
print(f"Message {i+1} does not match expected values")
|
||
print(f"Expected: op={expected_op}, addr={expected_addr}")
|
||
print(f"Received: op={msg.op}, addr={msg.addr}")
|
||
|
||
self.test_complete = True
|
||
self.com_serial.close()
|
||
self.app.quit()
|
||
|
||
def run(self):
|
||
"""运行测试程序"""
|
||
# 创建虚拟串口
|
||
port_info = QSerialPortInfo.availablePorts()[0] # 使用第一个可用串口
|
||
self.com_serial.serial_port.setPort(port_info)
|
||
self.com_serial.serial_port.setBaudRate(115200)
|
||
self.com_serial.serial_port.open()
|
||
|
||
return self.app.exec()
|
||
|
||
if __name__ == "__main__":
|
||
test = TestBrComSerial()
|
||
sys.exit(test.run()) |