brisonus_data_collector/frame.py

299 lines
11 KiB
Python
Raw Normal View History

from dataclasses import dataclass
from typing import Optional
import struct
@dataclass
class Frame:
"""帧数据结构"""
command: int # 命令码 (4字节)
length: int # 数据长度 (4字节)
frame_idx: int # 帧序号 (4字节)
channels: list # 17个通道数据 (每个4字节)
checksum: int # 校验和 (1字节)
class FrameFormat:
"""帧格式描述类"""
# 帧头魔数 (4字节)
FRAME_HEADER = bytes([0xAA, 0xAA, 0xAA, 0xAA])
# 各字段长度定义
HEADER_SIZE = 4
COMMAND_SIZE = 4
LENGTH_SIZE = 4
INDEX_SIZE = 4
CHECKSUM_SIZE = 1
CHANNEL_SIZE = 4 # 每个通道的字节数
CHANNEL_COUNT = 17 # 通道数量
# 帧固定部分的长度(不包含可变长度的数据部分)
FIXED_SIZE = HEADER_SIZE + COMMAND_SIZE + LENGTH_SIZE + INDEX_SIZE + CHECKSUM_SIZE
# 数据部分的长度帧IDX + 通道数据)
DATA_SIZE = INDEX_SIZE + (CHANNEL_COUNT - 1) * CHANNEL_SIZE # 帧IDX + (通道数-1)个通道数据
@classmethod
def generateFrameData(cls, frame_idx: int, command: int = None, channel_values: list = None) -> bytes:
"""
生成测试用的帧数据
Args:
frame_idx: 帧序号
command: 命令码如果为None则使用frame_idx作为命令码
channel_values: 通道数据列表如果为None则生成随机数据
Returns:
bytes: 完整的帧数据
"""
# 使用frame_idx作为默认命令码
if command is None:
command = frame_idx
# 生成随机通道数据
if channel_values is None:
import random
channel_values = []
for _ in range(cls.CHANNEL_COUNT): # 使用类常量确保17个通道
# 生成-1000到1000之间的随机浮点数转换为整数存储
value = int(random.uniform(-1000, 1000) * 1000) # 放大1000倍以保持精度
channel_values.append(value)
elif len(channel_values) != cls.CHANNEL_COUNT:
raise ValueError(f"通道数据数量必须为 {cls.CHANNEL_COUNT},当前为 {len(channel_values)}")
# 构建数据部分帧IDX + 通道数据)
data = struct.pack("<I", frame_idx) # 帧IDX
for value in channel_values:
data += struct.pack("<i", value) # 通道数据
# 使用build_frame方法构建完整帧
frame_data = cls.build_frame(command, frame_idx, data)
# 验证生成的帧是否有效
if not cls.parse_frame(frame_data):
print(f"\n帧验证失败,帧数据:")
print(f"帧头: {frame_data[:cls.HEADER_SIZE].hex().upper()}")
print(f"命令码: {struct.unpack('<I', frame_data[cls.HEADER_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE])[0]}")
print(f"长度: {struct.unpack('<I', frame_data[cls.HEADER_SIZE+cls.COMMAND_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE])[0]}")
print(f"帧序号: {struct.unpack('<I', frame_data[cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE+cls.INDEX_SIZE])[0]}")
print(f"校验和: {frame_data[-1]:02X}")
print(f"完整帧数据: {frame_data.hex().upper()}")
raise ValueError("生成的帧验证失败")
return frame_data
@classmethod
def parse_frame(cls, data: bytes) -> Optional[Frame]:
"""
解析帧数据
Args:
data: 完整的帧数据
Returns:
Frame: 解析后的帧对象如果解析失败返回None
"""
try:
# 检查数据长度
if len(data) < cls.FIXED_SIZE + cls.DATA_SIZE:
print(f"数据长度不足: {len(data)} < {cls.FIXED_SIZE + cls.DATA_SIZE}")
return None
# 检查帧头
if data[:cls.HEADER_SIZE] != cls.FRAME_HEADER:
print(f"帧头不匹配: {data[:cls.HEADER_SIZE].hex()} != {cls.FRAME_HEADER.hex()}")
return None
# 解析固定字段
offset = cls.HEADER_SIZE
command = struct.unpack("<I", data[offset:offset+cls.COMMAND_SIZE])[0]
offset += cls.COMMAND_SIZE
length = struct.unpack("<I", data[offset:offset+cls.LENGTH_SIZE])[0]
offset += cls.LENGTH_SIZE
# 提取帧IDX第一个数据项
frame_idx = struct.unpack("<I", data[offset:offset+4])[0]
offset += 4
# 提取17个通道数据
channels = []
try:
for _ in range(17):
if offset + 4 > len(data):
print("错误:数据长度不足以读取所有通道")
return None
channel_value = struct.unpack("<I", data[offset:offset+4])[0]
channels.append(channel_value)
offset += 4
except Exception as e:
print(f"解析通道数据失败: {e}")
return None
# 确保还有校验和字节
if offset >= len(data):
print("错误:数据长度不足以读取校验和")
return None
# 提取校验和
checksum = data[offset]
# 验证校验和
if not cls.verify_checksum(data[:offset], checksum):
print("校验和验证失败")
return None
return Frame(
command=command,
length=length,
frame_idx=frame_idx,
channels=channels,
checksum=checksum
)
except Exception as e:
print(f"解析帧时出错: {e}")
return None
@classmethod
def build_frame(cls, command: int, index: int, data: bytes) -> bytes:
"""
构建帧数据
Args:
command: 命令码
index: 帧索引
data: 数据内容
Returns:
bytes: 完整的帧数据
"""
# 构建帧数据(不包含校验和)
frame_data = (
cls.FRAME_HEADER +
struct.pack("<I", command) +
struct.pack("<I", len(data) + cls.CHECKSUM_SIZE) + # 修正:长度应该是数据长度加上校验和长度
data # 修正直接使用传入的数据不需要再次打包帧IDX
)
# 计算并添加校验和
checksum = cls.calculate_checksum(frame_data)
return frame_data + bytes([checksum])
@staticmethod
def calculate_checksum(data: bytes) -> int:
"""
计算校验和 - 不包含校验码本身
Args:
data: 需要计算校验和的数据不包含校验码
Returns:
int: 校验和值
"""
# 简单的字节累加校验
return sum(data) & 0xFF
@classmethod
def verify_checksum(cls, data: bytes, checksum: int) -> bool:
"""
验证校验和
Args:
data: 数据内容不包含校验码
checksum: 校验和值
Returns:
bool: 校验结果
"""
return cls.calculate_checksum(data) == checksum
@classmethod
def get_frame_length(cls, data: bytes) -> Optional[int]:
"""
从数据中获取帧总长度
Args:
data: 至少包含帧头和长度字段的数据
Returns:
int: 帧总长度如果数据不足返回None
"""
min_size = cls.HEADER_SIZE + cls.COMMAND_SIZE + cls.LENGTH_SIZE
if len(data) < min_size:
return None
if data[:cls.HEADER_SIZE] != cls.FRAME_HEADER:
return None
length = struct.unpack("<I", data[cls.HEADER_SIZE+cls.COMMAND_SIZE:min_size])[0]
return length + cls.FIXED_SIZE
if __name__ == "__main__":
# 测试数据(十六进制字符串)
test_data_hex = "AAAAAAAA0200000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F3"
print(len(test_data_hex))
# 将十六进制字符串转换为字节数据
test_data = bytes.fromhex(test_data_hex)
print(f"测试数据长度: {len(test_data)} 字节")
print(f"测试数据: {test_data.hex(' ').upper()}")
# 解析过程调试
print("\n解析过程:")
# 1. 检查数据长度
print(f"1. 检查数据长度: {len(test_data)} >= {FrameFormat.FIXED_SIZE}")
# 2. 检查帧头
frame_header = test_data[:FrameFormat.HEADER_SIZE]
print(f"2. 检查帧头: {frame_header.hex().upper()} == {FrameFormat.FRAME_HEADER.hex().upper()}")
# 3. 解析命令码
offset = FrameFormat.HEADER_SIZE
command_bytes = test_data[offset:offset+FrameFormat.COMMAND_SIZE]
command = struct.unpack("<I", command_bytes)[0]
print(f"3. 命令码: 0x{command:08X} (bytes: {command_bytes.hex().upper()})")
# 4. 解析长度
offset += FrameFormat.COMMAND_SIZE
length_bytes = test_data[offset:offset+FrameFormat.LENGTH_SIZE]
length = struct.unpack("<I", length_bytes)[0]
print(f"4. 长度: {length} (bytes: {length_bytes.hex().upper()})")
# 5. 提取帧IDX
offset += FrameFormat.LENGTH_SIZE # 更新offset到帧IDX位置
frame_idx_bytes = test_data[offset:offset+4]
frame_idx = struct.unpack("<I", frame_idx_bytes)[0]
print(f"5. 帧IDX: {frame_idx}")
offset += 4
# 6. 提取17个通道数据
print("6. 通道数据:")
channels = []
try:
for i in range(17):
channel_bytes = test_data[offset:offset+4]
if len(channel_bytes) != 4:
print(f" 警告:通道 {i+1} 数据不完整期望4字节实际{len(channel_bytes)}字节")
break
channel_value = struct.unpack("<I", channel_bytes)[0]
channels.append(channel_value)
print(f" 通道 {i+1}: 0x{channel_value:08X}")
offset += 4
except Exception as e:
print(f" 错误:解析通道数据失败: {e}")
print(f" 当前偏移量: {offset}")
print(f" 剩余数据: {test_data[offset:].hex().upper()}")
# 7. 提取校验和
if offset < len(test_data):
checksum = test_data[offset]
print(f"7. 校验和: 0x{checksum:02X}")
else:
print("7. 错误:无法读取校验和,数据不足")
# 8. 验证校验和
# 计算校验和时只使用校验码之前的数据
calculated_checksum = FrameFormat.calculate_checksum(test_data[:offset])
print(f"8. 校验和验证: 计算值=0x{calculated_checksum:02X}, 实际值=0x{checksum:02X}")
# 解析帧
frame = FrameFormat.parse_frame(test_data)
if frame:
print("\n解析成功!")
print(f"命令码: 0x{frame.command:08X}")
print(f"长度: {frame.length}")
print(f"帧IDX: {frame.frame_idx}")
print("通道数据:")
for i, value in enumerate(frame.channels):
print(f"通道 {i+1}: 0x{value:08X}")
print(f"校验和: 0x{frame.checksum:02X}")
else:
print("\n解析失败!")