import os import struct import random from frame import FrameFormat def generate_invalid_data(size: int) -> bytes: """生成无效数据,避免生成可能被误认为帧头的序列 Args: size: 数据大小 Returns: bytes: 无效数据 """ invalid_data = bytearray() while len(invalid_data) < size: # 生成随机字节,但避免生成可能形成帧头的序列 byte = random.randint(0, 255) if len(invalid_data) >= 3: # 如果前三个字节都是0xAA,确保当前字节不是0xAA if all(b == 0xAA for b in invalid_data[-3:]) and byte == 0xAA: byte = (byte + 1) & 0xFF invalid_data.append(byte) return bytes(invalid_data) def generate_corrupt_frame(frame_idx: int) -> bytes: """生成损坏的帧数据 Args: frame_idx: 帧序号 Returns: bytes: 损坏的帧数据 """ # 生成一个完整的帧 frame_data = FrameFormat.generateFrameData(frame_idx) # 修改命令码(这是一个更可靠的破坏方式) corrupt_frame = bytearray(frame_data) command_pos = FrameFormat.HEADER_SIZE # 修改命令码的所有字节 for i in range(FrameFormat.COMMAND_SIZE): corrupt_frame[command_pos + i] = (corrupt_frame[command_pos + i] + 1) & 0xFF # 重新计算校验和(不包括校验和字节本身) checksum = sum(corrupt_frame[:-1]) & 0xFF corrupt_frame[-1] = checksum return bytes(corrupt_frame) def main(): # 创建测试数据目录 test_data_dir = "test_data" if not os.path.exists(test_data_dir): os.makedirs(test_data_dir) # 配置参数 total_size = 1024 * 1024 # 256KB数据 frame_interval = 100 # 帧之间的平均字节数 # 计算单个帧大小 frame_size = FrameFormat.FIXED_SIZE + FrameFormat.DATA_SIZE print(f"单个帧大小: {frame_size} 字节") # 计算预期帧数 expected_frames = total_size // (frame_size + frame_interval) print(f"预期帧数: {expected_frames}") # 生成测试数据 test_data = bytearray() frame_idx = 0 valid_frames = 0 corrupt_frames = 0 invalid_frames = 0 while len(test_data) < total_size: # 检查添加新帧是否会超出目标大小 if len(test_data) + frame_size + frame_interval > total_size: break # 添加一些无效数据 invalid_size = random.randint(0, frame_interval) # 确保添加无效数据后不会超出目标大小 if len(test_data) + invalid_size + frame_size > total_size: break invalid_data = generate_invalid_data(invalid_size) test_data.extend(invalid_data) # 有90%的概率添加有效帧,10%的概率添加损坏帧 if random.random() < 0.9: # 添加有效帧 frame_data = FrameFormat.generateFrameData(frame_idx) # 验证帧是否有效 if FrameFormat.parse_frame(frame_data): test_data.extend(frame_data) valid_frames += 1 # 调试信息:打印前10个帧的详细信息 if valid_frames <= 10: print(f"\n有效帧 #{valid_frames}:") print(f" 帧大小: {len(frame_data)} 字节") print(f" 帧头: {frame_data[:FrameFormat.HEADER_SIZE].hex().upper()}") print(f" 命令码: {struct.unpack('