brisonus_data_collector/generate_test_data.py
JingweiCui 04612ce16d [update] 实现了frame_finder功能
- 从数据buffer中查找frames
2025-03-25 09:17:43 +08:00

157 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import struct
import random
from frame import FrameFormat
def generate_invalid_data(size: int) -> bytes:
"""生成无效数据,避免生成可能被误认为帧头的序列
Args:
size: 数据大小
Returns:
bytes: 无效数据
"""
invalid_data = bytearray()
while len(invalid_data) < size:
# 生成随机字节,但避免生成可能形成帧头的序列
byte = random.randint(0, 255)
if len(invalid_data) >= 3:
# 如果前三个字节都是0xAA确保当前字节不是0xAA
if all(b == 0xAA for b in invalid_data[-3:]) and byte == 0xAA:
byte = (byte + 1) & 0xFF
invalid_data.append(byte)
return bytes(invalid_data)
def generate_corrupt_frame(frame_idx: int) -> bytes:
"""生成损坏的帧数据
Args:
frame_idx: 帧序号
Returns:
bytes: 损坏的帧数据
"""
# 生成一个完整的帧
frame_data = FrameFormat.generateFrameData(frame_idx)
# 修改命令码(这是一个更可靠的破坏方式)
corrupt_frame = bytearray(frame_data)
command_pos = FrameFormat.HEADER_SIZE
# 修改命令码的所有字节
for i in range(FrameFormat.COMMAND_SIZE):
corrupt_frame[command_pos + i] = (corrupt_frame[command_pos + i] + 1) & 0xFF
# 重新计算校验和(不包括校验和字节本身)
checksum = sum(corrupt_frame[:-1]) & 0xFF
corrupt_frame[-1] = checksum
return bytes(corrupt_frame)
def main():
# 创建测试数据目录
test_data_dir = "test_data"
if not os.path.exists(test_data_dir):
os.makedirs(test_data_dir)
# 配置参数
total_size = 1024 * 1024 # 256KB数据
frame_interval = 100 # 帧之间的平均字节数
# 计算单个帧大小
frame_size = FrameFormat.FIXED_SIZE + FrameFormat.DATA_SIZE
print(f"单个帧大小: {frame_size} 字节")
# 计算预期帧数
expected_frames = total_size // (frame_size + frame_interval)
print(f"预期帧数: {expected_frames}")
# 生成测试数据
test_data = bytearray()
frame_idx = 0
valid_frames = 0
corrupt_frames = 0
invalid_frames = 0
while len(test_data) < total_size:
# 检查添加新帧是否会超出目标大小
if len(test_data) + frame_size + frame_interval > total_size:
break
# 添加一些无效数据
invalid_size = random.randint(0, frame_interval)
# 确保添加无效数据后不会超出目标大小
if len(test_data) + invalid_size + frame_size > total_size:
break
invalid_data = generate_invalid_data(invalid_size)
test_data.extend(invalid_data)
# 有90%的概率添加有效帧10%的概率添加损坏帧
if random.random() < 0.9:
# 添加有效帧
frame_data = FrameFormat.generateFrameData(frame_idx)
# 验证帧是否有效
if FrameFormat.parse_frame(frame_data):
test_data.extend(frame_data)
valid_frames += 1
# 调试信息打印前10个帧的详细信息
if valid_frames <= 10:
print(f"\n有效帧 #{valid_frames}:")
print(f" 帧大小: {len(frame_data)} 字节")
print(f" 帧头: {frame_data[:FrameFormat.HEADER_SIZE].hex().upper()}")
print(f" 命令码: {struct.unpack('<I', frame_data[FrameFormat.HEADER_SIZE:FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE])[0]}")
print(f" 数据长度: {struct.unpack('<I', frame_data[FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE:FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE+FrameFormat.LENGTH_SIZE])[0]}")
print(f" 帧序号: {struct.unpack('<I', frame_data[FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE+FrameFormat.LENGTH_SIZE:FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE+FrameFormat.LENGTH_SIZE+FrameFormat.INDEX_SIZE])[0]}")
print(f" 校验和: {frame_data[-1]:02X}")
print(f" 完整帧数据: {frame_data.hex().upper()}")
# 解析并显示通道数据
frame = FrameFormat.parse_frame(frame_data)
if frame:
print(" 通道数据:")
for i, value in enumerate(frame.channels):
print(f" 通道 {i+1}: {value}")
else:
invalid_frames += 1
print(f"\n警告:生成的帧验证失败,已跳过")
else:
# 添加损坏帧
corrupt_frame = generate_corrupt_frame(frame_idx)
# 验证损坏帧是否真的无效
if not FrameFormat.parse_frame(corrupt_frame):
test_data.extend(corrupt_frame)
corrupt_frames += 1
# 调试信息打印前10个损坏帧的详细信息
if corrupt_frames <= 10:
print(f"\n损坏帧 #{corrupt_frames}:")
print(f" 帧大小: {len(corrupt_frame)} 字节")
print(f" 帧头: {corrupt_frame[:FrameFormat.HEADER_SIZE].hex().upper()}")
print(f" 命令码: {struct.unpack('<I', corrupt_frame[FrameFormat.HEADER_SIZE:FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE])[0]}")
print(f" 数据长度: {struct.unpack('<I', corrupt_frame[FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE:FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE+FrameFormat.LENGTH_SIZE])[0]}")
print(f" 帧序号: {struct.unpack('<I', corrupt_frame[FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE+FrameFormat.LENGTH_SIZE:FrameFormat.HEADER_SIZE+FrameFormat.COMMAND_SIZE+FrameFormat.LENGTH_SIZE+FrameFormat.INDEX_SIZE])[0]}")
print(f" 校验和: {corrupt_frame[-1]:02X}")
print(f" 完整帧数据: {corrupt_frame.hex().upper()}")
else:
invalid_frames += 1
print(f"\n警告:生成的损坏帧验证通过,已跳过")
frame_idx += 1
# 保存测试数据
filename = os.path.join(test_data_dir, "test_frames.bin")
with open(filename, 'wb') as f:
f.write(test_data)
print(f"\n测试数据已生成: {filename}")
print(f"总大小: {len(test_data):,} 字节")
print(f"预期帧数: {expected_frames}")
print(f"实际帧数: {valid_frames + corrupt_frames}")
print(f"有效帧数: {valid_frames}")
print(f"损坏帧数: {corrupt_frames}")
print(f"无效帧数: {invalid_frames}")
print(f"平均每帧大小(包含无效数据): {len(test_data) / (valid_frames + corrupt_frames):.1f} 字节")
# 分析数据中的帧头数量
header_count = test_data.count(FrameFormat.FRAME_HEADER)
print(f"帧头数量: {header_count}")
if __name__ == "__main__":
main()