import serial import time import logging import os import random from typing import Optional class SerialSimulator: """串口数据模拟器""" def __init__(self, port: str, baudrate: int): """ 初始化串口模拟器 Args: port: 串口名称 baudrate: 波特率 """ self.port = port self.baudrate = baudrate self.serial: Optional[serial.Serial] = None self.test_data: Optional[bytes] = None self.bytes_sent = 0 self.start_time = None self.end_time = None # 配置日志 log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) def load_test_data(self) -> bool: """加载测试数据""" try: filename = os.path.join("test_data", "test_frames.bin") with open(filename, 'rb') as f: self.test_data = f.read() self.logger.info(f"已加载测试数据: {len(self.test_data)} 字节") return True except Exception as e: self.logger.error(f"加载测试数据失败: {e}") return False def connect(self) -> bool: """连接串口""" try: self.serial = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=0.1, write_timeout=10 # 添加写入超时 ) self.logger.info(f"成功连接到串口 {self.port}") return True except Exception as e: self.logger.error(f"连接串口失败: {e}") return False def disconnect(self): """断开串口连接""" if self.serial and self.serial.is_open: self.serial.close() self.logger.info("已断开串口连接") def send_data(self) -> bool: """发送数据""" if not self.test_data: if not self.load_test_data(): return False if not self.serial or not self.serial.is_open: if not self.connect(): return False try: data_length = len(self.test_data) current_pos = 0 self.start_time = time.time() # 记录发送开始信息 self.logger.info(f"开始发送数据: {data_length} 字节") self.logger.info(f"串口参数: {self.port}, {self.baudrate} baud") while current_pos < data_length: # 随机大小的数据包(1-1024字节) packet_size = random.randint(1, 1024) # 确保不超过剩余数据长度 packet_size = min(packet_size, data_length - current_pos) # 发送数据包 packet = self.test_data[current_pos:current_pos + packet_size] self.serial.write(packet) current_pos += packet_size self.bytes_sent += packet_size # 随机延迟(0-10ms) time.sleep(random.uniform(0, 0.01)) # 每发送1MB数据打印一次进度 if self.bytes_sent % (1024*1024) < packet_size: progress = (self.bytes_sent / data_length) * 100 elapsed_time = time.time() - self.start_time speed = self.bytes_sent / elapsed_time / 1024 # KB/s self.logger.info(f"发送进度: {progress:.1f}% ({self.bytes_sent}/{data_length} 字节), " f"速度: {speed:.1f} KB/s") self.end_time = time.time() total_time = self.end_time - self.start_time avg_speed = self.bytes_sent / total_time / 1024 # KB/s # 记录发送完成信息 self.logger.info(f"数据发送完成") self.logger.info(f"总时间: {total_time:.2f} 秒") self.logger.info(f"平均速度: {avg_speed:.1f} KB/s") self.logger.info(f"发送字节数: {self.bytes_sent}") return True except Exception as e: self.logger.error(f"发送数据错误: {e}") return False finally: self.disconnect() if __name__ == "__main__": # 使用示例 simulator = SerialSimulator(port="COM1", baudrate=4000000) simulator.send_data()