import threading import time import logging import os from collections import deque from queue import Queue from frame_finder import FrameFinder from frame import FrameFormat from buffer_process import BufferProcessor def count_frames_in_data(data: bytes) -> int: """直接从原始数据中计数帧数 Args: data: 原始数据 Returns: int: 帧数 """ count = 0 offset = 0 while offset < len(data): # 查找帧头 header_pos = data.find(FrameFormat.FRAME_HEADER, offset) if header_pos == -1: break # 检查是否有足够的数据来解析帧 if header_pos + FrameFormat.FIXED_SIZE > len(data): break # 尝试解析帧 frame = FrameFormat.parse_frame(data[header_pos:]) if frame: count += 1 offset = header_pos + FrameFormat.FIXED_SIZE + frame.length else: offset = header_pos + 1 return count def count_frames_with_finder(data: bytes) -> int: """使用FrameFinder计数帧数 Args: data: 原始数据 Returns: int: 帧数 """ finder = FrameFinder() count = 0 offset = 0 while offset < len(data): # 查找下一帧 frame_data = finder.find_frame_in_buffer(data[offset:]) if frame_data: count += 1 offset += len(frame_data) else: offset += 1 return count def print_frame_details(frame: FrameFormat) -> None: """打印帧详细信息 Args: frame: 帧对象 """ print(f" 帧序号: {frame.frame_idx}") print(f" 命令码: 0x{frame.command:08X}") print(f" 数据长度: {frame.length}") print(f" 校验和: 0x{frame.checksum:02X}") print(" 通道数据:") for i, value in enumerate(frame.channels): print(f" 通道 {i+1}: {value}") def main(): # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # 从文件加载测试数据 test_data_dir = "./test_data" test_file = "test_frames.bin" test_file_path = os.path.join(test_data_dir, test_file) print(f"尝试读取测试文件: {test_file_path}") if not os.path.exists(test_file_path): print(f"错误:找不到测试文件: {test_file_path}") print("请先运行 generate_test_data.py 生成测试数据") return try: with open(test_file_path, 'rb') as f: test_data = f.read() except Exception as e: print(f"错误:读取测试文件失败 - {e}") return print(f"读取测试数据: {len(test_data):,} 字节") # 使用两种方法计数帧数 print("\n使用直接计数方法...") direct_count = count_frames_in_data(test_data) print(f"直接计数结果: {direct_count} 帧") print("\n使用FrameFinder计数...") finder_count = count_frames_with_finder(test_data) print(f"FrameFinder计数结果: {finder_count} 帧") # 计算预期帧数 frame_size = FrameFormat.FIXED_SIZE + FrameFormat.DATA_SIZE expected_frames = len(test_data) // (frame_size + 100) # 假设平均帧间隔为100字节 print(f"\n预期帧数: {expected_frames}") print(f"实际帧数: {direct_count}") print(f"差异: {direct_count - expected_frames}") # 创建测试缓冲区 test_buffer = deque() test_buffer.extend(test_data) # 创建帧队列 frame_queue = Queue() # 创建处理器 processor = BufferProcessor( buffer=test_buffer, buffer_lock=threading.Lock(), frame_queue=frame_queue, process_interval=0.0001, # 0.1ms max_errors=10 ) # 收集警告和注意事项 warnings = [] notes = [] try: # 启动处理器 processor.start() # 开始计时 start_time = time.time() # 等待处理完成 while processor.is_running and len(test_buffer) > 0: time.sleep(0.1) # 计算运行时间 runtime = time.time() - start_time # 收集统计信息 stats = processor.get_stats() # 添加警告和注意事项 if stats['bytes_processed'] != len(test_data): warnings.append(f"警告:处理的数据大小 ({stats['bytes_processed']:,} 字节) 与输入数据大小 ({len(test_data):,} 字节) 不匹配") if stats['frames_found'] != direct_count: warnings.append(f"警告:处理器找到的有效帧数 ({stats['frames_found']}) 与直接计数结果 ({direct_count}) 不匹配") if stats['frames_found'] == 0: warnings.append("警告:未找到任何有效帧") if stats.get('error_count', 0) > 0: notes.append(f"注意:处理过程中出现 {stats['error_count']} 个错误") # 添加一个小延迟确保所有输出都完成 time.sleep(0.1) # 打印测试统计 print("\n\n=================== 测试统计 ===================") print(f"测试文件: {os.path.basename(test_file_path)}") print(f"数据大小: {len(test_data):,} 字节") print(f"运行时间: {runtime:.3f} 秒") print("\n处理性能:") print(f"- 平均处理速度: {stats['bytes_per_second']:,.1f} 字节/秒") print(f"- 帧处理速度: {stats['frames_per_second']:,.1f} 帧/秒") print("\n帧统计:") print(f"- 总帧数: {stats['frames_found']}") print(f"- 处理字节数: {stats['bytes_processed']:,} 字节") print(f"- 缓冲区剩余: {stats['buffer_size']:,} 字节") if warnings or notes: print("\n警告和注意事项:") for warning in warnings: print(f"- {warning}") for note in notes: print(f"- {note}") print("===============================================") finally: # 停止处理器 processor.stop() if __name__ == "__main__": main()