brisonus_data_collector/frame.py
JingweiCui 04612ce16d [update] 实现了frame_finder功能
- 从数据buffer中查找frames
2025-03-25 09:17:43 +08:00

299 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dataclasses import dataclass
from typing import Optional
import struct
@dataclass
class Frame:
"""帧数据结构"""
command: int # 命令码 (4字节)
length: int # 数据长度 (4字节)
frame_idx: int # 帧序号 (4字节)
channels: list # 17个通道数据 (每个4字节)
checksum: int # 校验和 (1字节)
class FrameFormat:
"""帧格式描述类"""
# 帧头魔数 (4字节)
FRAME_HEADER = bytes([0xAA, 0xAA, 0xAA, 0xAA])
# 各字段长度定义
HEADER_SIZE = 4
COMMAND_SIZE = 4
LENGTH_SIZE = 4
INDEX_SIZE = 4
CHECKSUM_SIZE = 1
CHANNEL_SIZE = 4 # 每个通道的字节数
CHANNEL_COUNT = 17 # 通道数量
# 帧固定部分的长度(不包含可变长度的数据部分)
FIXED_SIZE = HEADER_SIZE + COMMAND_SIZE + LENGTH_SIZE + INDEX_SIZE + CHECKSUM_SIZE
# 数据部分的长度帧IDX + 通道数据)
DATA_SIZE = INDEX_SIZE + (CHANNEL_COUNT - 1) * CHANNEL_SIZE # 帧IDX + (通道数-1)个通道数据
@classmethod
def generateFrameData(cls, frame_idx: int, command: int = None, channel_values: list = None) -> bytes:
"""
生成测试用的帧数据
Args:
frame_idx: 帧序号
command: 命令码如果为None则使用frame_idx作为命令码
channel_values: 通道数据列表如果为None则生成随机数据
Returns:
bytes: 完整的帧数据
"""
# 使用frame_idx作为默认命令码
if command is None:
command = frame_idx
# 生成随机通道数据
if channel_values is None:
import random
channel_values = []
for _ in range(cls.CHANNEL_COUNT): # 使用类常量确保17个通道
# 生成-1000到1000之间的随机浮点数转换为整数存储
value = int(random.uniform(-1000, 1000) * 1000) # 放大1000倍以保持精度
channel_values.append(value)
elif len(channel_values) != cls.CHANNEL_COUNT:
raise ValueError(f"通道数据数量必须为 {cls.CHANNEL_COUNT},当前为 {len(channel_values)}")
# 构建数据部分帧IDX + 通道数据)
data = struct.pack("<I", frame_idx) # 帧IDX
for value in channel_values:
data += struct.pack("<i", value) # 通道数据
# 使用build_frame方法构建完整帧
frame_data = cls.build_frame(command, frame_idx, data)
# 验证生成的帧是否有效
if not cls.parse_frame(frame_data):
print(f"\n帧验证失败,帧数据:")
print(f"帧头: {frame_data[:cls.HEADER_SIZE].hex().upper()}")
print(f"命令码: {struct.unpack('<I', frame_data[cls.HEADER_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE])[0]}")
print(f"长度: {struct.unpack('<I', frame_data[cls.HEADER_SIZE+cls.COMMAND_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE])[0]}")
print(f"帧序号: {struct.unpack('<I', frame_data[cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE+cls.INDEX_SIZE])[0]}")
print(f"校验和: {frame_data[-1]:02X}")
print(f"完整帧数据: {frame_data.hex().upper()}")
raise ValueError("生成的帧验证失败")
return frame_data
@classmethod
def parse_frame(cls, data: bytes) -> Optional[Frame]:
"""
解析帧数据
Args:
data: 完整的帧数据
Returns:
Frame: 解析后的帧对象如果解析失败返回None
"""
try:
# 检查数据长度
if len(data) < cls.FIXED_SIZE + cls.DATA_SIZE:
print(f"数据长度不足: {len(data)} < {cls.FIXED_SIZE + cls.DATA_SIZE}")
return None
# 检查帧头
if data[:cls.HEADER_SIZE] != cls.FRAME_HEADER:
print(f"帧头不匹配: {data[:cls.HEADER_SIZE].hex()} != {cls.FRAME_HEADER.hex()}")
return None
# 解析固定字段
offset = cls.HEADER_SIZE
command = struct.unpack("<I", data[offset:offset+cls.COMMAND_SIZE])[0]
offset += cls.COMMAND_SIZE
length = struct.unpack("<I", data[offset:offset+cls.LENGTH_SIZE])[0]
offset += cls.LENGTH_SIZE
# 提取帧IDX第一个数据项
frame_idx = struct.unpack("<I", data[offset:offset+4])[0]
offset += 4
# 提取17个通道数据
channels = []
try:
for _ in range(17):
if offset + 4 > len(data):
print("错误:数据长度不足以读取所有通道")
return None
channel_value = struct.unpack("<I", data[offset:offset+4])[0]
channels.append(channel_value)
offset += 4
except Exception as e:
print(f"解析通道数据失败: {e}")
return None
# 确保还有校验和字节
if offset >= len(data):
print("错误:数据长度不足以读取校验和")
return None
# 提取校验和
checksum = data[offset]
# 验证校验和
if not cls.verify_checksum(data[:offset], checksum):
print("校验和验证失败")
return None
return Frame(
command=command,
length=length,
frame_idx=frame_idx,
channels=channels,
checksum=checksum
)
except Exception as e:
print(f"解析帧时出错: {e}")
return None
@classmethod
def build_frame(cls, command: int, index: int, data: bytes) -> bytes:
"""
构建帧数据
Args:
command: 命令码
index: 帧索引
data: 数据内容
Returns:
bytes: 完整的帧数据
"""
# 构建帧数据(不包含校验和)
frame_data = (
cls.FRAME_HEADER +
struct.pack("<I", command) +
struct.pack("<I", len(data) + cls.CHECKSUM_SIZE) + # 修正:长度应该是数据长度加上校验和长度
data # 修正直接使用传入的数据不需要再次打包帧IDX
)
# 计算并添加校验和
checksum = cls.calculate_checksum(frame_data)
return frame_data + bytes([checksum])
@staticmethod
def calculate_checksum(data: bytes) -> int:
"""
计算校验和 - 不包含校验码本身
Args:
data: 需要计算校验和的数据(不包含校验码)
Returns:
int: 校验和值
"""
# 简单的字节累加校验
return sum(data) & 0xFF
@classmethod
def verify_checksum(cls, data: bytes, checksum: int) -> bool:
"""
验证校验和
Args:
data: 数据内容(不包含校验码)
checksum: 校验和值
Returns:
bool: 校验结果
"""
return cls.calculate_checksum(data) == checksum
@classmethod
def get_frame_length(cls, data: bytes) -> Optional[int]:
"""
从数据中获取帧总长度
Args:
data: 至少包含帧头和长度字段的数据
Returns:
int: 帧总长度如果数据不足返回None
"""
min_size = cls.HEADER_SIZE + cls.COMMAND_SIZE + cls.LENGTH_SIZE
if len(data) < min_size:
return None
if data[:cls.HEADER_SIZE] != cls.FRAME_HEADER:
return None
length = struct.unpack("<I", data[cls.HEADER_SIZE+cls.COMMAND_SIZE:min_size])[0]
return length + cls.FIXED_SIZE
if __name__ == "__main__":
# 测试数据(十六进制字符串)
test_data_hex = "AAAAAAAA0200000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F3"
print(len(test_data_hex))
# 将十六进制字符串转换为字节数据
test_data = bytes.fromhex(test_data_hex)
print(f"测试数据长度: {len(test_data)} 字节")
print(f"测试数据: {test_data.hex(' ').upper()}")
# 解析过程调试
print("\n解析过程:")
# 1. 检查数据长度
print(f"1. 检查数据长度: {len(test_data)} >= {FrameFormat.FIXED_SIZE}")
# 2. 检查帧头
frame_header = test_data[:FrameFormat.HEADER_SIZE]
print(f"2. 检查帧头: {frame_header.hex().upper()} == {FrameFormat.FRAME_HEADER.hex().upper()}")
# 3. 解析命令码
offset = FrameFormat.HEADER_SIZE
command_bytes = test_data[offset:offset+FrameFormat.COMMAND_SIZE]
command = struct.unpack("<I", command_bytes)[0]
print(f"3. 命令码: 0x{command:08X} (bytes: {command_bytes.hex().upper()})")
# 4. 解析长度
offset += FrameFormat.COMMAND_SIZE
length_bytes = test_data[offset:offset+FrameFormat.LENGTH_SIZE]
length = struct.unpack("<I", length_bytes)[0]
print(f"4. 长度: {length} (bytes: {length_bytes.hex().upper()})")
# 5. 提取帧IDX
offset += FrameFormat.LENGTH_SIZE # 更新offset到帧IDX位置
frame_idx_bytes = test_data[offset:offset+4]
frame_idx = struct.unpack("<I", frame_idx_bytes)[0]
print(f"5. 帧IDX: {frame_idx}")
offset += 4
# 6. 提取17个通道数据
print("6. 通道数据:")
channels = []
try:
for i in range(17):
channel_bytes = test_data[offset:offset+4]
if len(channel_bytes) != 4:
print(f" 警告:通道 {i+1} 数据不完整期望4字节实际{len(channel_bytes)}字节")
break
channel_value = struct.unpack("<I", channel_bytes)[0]
channels.append(channel_value)
print(f" 通道 {i+1}: 0x{channel_value:08X}")
offset += 4
except Exception as e:
print(f" 错误:解析通道数据失败: {e}")
print(f" 当前偏移量: {offset}")
print(f" 剩余数据: {test_data[offset:].hex().upper()}")
# 7. 提取校验和
if offset < len(test_data):
checksum = test_data[offset]
print(f"7. 校验和: 0x{checksum:02X}")
else:
print("7. 错误:无法读取校验和,数据不足")
# 8. 验证校验和
# 计算校验和时只使用校验码之前的数据
calculated_checksum = FrameFormat.calculate_checksum(test_data[:offset])
print(f"8. 校验和验证: 计算值=0x{calculated_checksum:02X}, 实际值=0x{checksum:02X}")
# 解析帧
frame = FrameFormat.parse_frame(test_data)
if frame:
print("\n解析成功!")
print(f"命令码: 0x{frame.command:08X}")
print(f"长度: {frame.length}")
print(f"帧IDX: {frame.frame_idx}")
print("通道数据:")
for i, value in enumerate(frame.channels):
print(f"通道 {i+1}: 0x{value:08X}")
print(f"校验和: 0x{frame.checksum:02X}")
else:
print("\n解析失败!")