299 lines
11 KiB
Python
299 lines
11 KiB
Python
from dataclasses import dataclass
|
||
from typing import Optional
|
||
import struct
|
||
|
||
@dataclass
|
||
class Frame:
|
||
"""帧数据结构"""
|
||
command: int # 命令码 (4字节)
|
||
length: int # 数据长度 (4字节)
|
||
frame_idx: int # 帧序号 (4字节)
|
||
channels: list # 17个通道数据 (每个4字节)
|
||
checksum: int # 校验和 (1字节)
|
||
|
||
class FrameFormat:
|
||
"""帧格式描述类"""
|
||
# 帧头魔数 (4字节)
|
||
FRAME_HEADER = bytes([0xAA, 0xAA, 0xAA, 0xAA])
|
||
|
||
# 各字段长度定义
|
||
HEADER_SIZE = 4
|
||
COMMAND_SIZE = 4
|
||
LENGTH_SIZE = 4
|
||
INDEX_SIZE = 4
|
||
CHECKSUM_SIZE = 1
|
||
CHANNEL_SIZE = 4 # 每个通道的字节数
|
||
CHANNEL_COUNT = 17 # 通道数量
|
||
|
||
# 帧固定部分的长度(不包含可变长度的数据部分)
|
||
FIXED_SIZE = HEADER_SIZE + COMMAND_SIZE + LENGTH_SIZE + INDEX_SIZE + CHECKSUM_SIZE
|
||
|
||
# 数据部分的长度(帧IDX + 通道数据)
|
||
DATA_SIZE = INDEX_SIZE + (CHANNEL_COUNT - 1) * CHANNEL_SIZE # 帧IDX + (通道数-1)个通道数据
|
||
|
||
@classmethod
|
||
def generateFrameData(cls, frame_idx: int, command: int = None, channel_values: list = None) -> bytes:
|
||
"""
|
||
生成测试用的帧数据
|
||
Args:
|
||
frame_idx: 帧序号
|
||
command: 命令码,如果为None则使用frame_idx作为命令码
|
||
channel_values: 通道数据列表,如果为None则生成随机数据
|
||
Returns:
|
||
bytes: 完整的帧数据
|
||
"""
|
||
# 使用frame_idx作为默认命令码
|
||
if command is None:
|
||
command = frame_idx
|
||
|
||
# 生成随机通道数据
|
||
if channel_values is None:
|
||
import random
|
||
channel_values = []
|
||
for _ in range(cls.CHANNEL_COUNT): # 使用类常量确保17个通道
|
||
# 生成-1000到1000之间的随机浮点数,转换为整数存储
|
||
value = int(random.uniform(-1000, 1000) * 1000) # 放大1000倍以保持精度
|
||
channel_values.append(value)
|
||
elif len(channel_values) != cls.CHANNEL_COUNT:
|
||
raise ValueError(f"通道数据数量必须为 {cls.CHANNEL_COUNT},当前为 {len(channel_values)}")
|
||
|
||
# 构建数据部分(帧IDX + 通道数据)
|
||
data = struct.pack("<I", frame_idx) # 帧IDX
|
||
for value in channel_values:
|
||
data += struct.pack("<i", value) # 通道数据
|
||
|
||
# 使用build_frame方法构建完整帧
|
||
frame_data = cls.build_frame(command, frame_idx, data)
|
||
|
||
# 验证生成的帧是否有效
|
||
if not cls.parse_frame(frame_data):
|
||
print(f"\n帧验证失败,帧数据:")
|
||
print(f"帧头: {frame_data[:cls.HEADER_SIZE].hex().upper()}")
|
||
print(f"命令码: {struct.unpack('<I', frame_data[cls.HEADER_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE])[0]}")
|
||
print(f"长度: {struct.unpack('<I', frame_data[cls.HEADER_SIZE+cls.COMMAND_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE])[0]}")
|
||
print(f"帧序号: {struct.unpack('<I', frame_data[cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE:cls.HEADER_SIZE+cls.COMMAND_SIZE+cls.LENGTH_SIZE+cls.INDEX_SIZE])[0]}")
|
||
print(f"校验和: {frame_data[-1]:02X}")
|
||
print(f"完整帧数据: {frame_data.hex().upper()}")
|
||
raise ValueError("生成的帧验证失败")
|
||
|
||
return frame_data
|
||
|
||
@classmethod
|
||
def parse_frame(cls, data: bytes) -> Optional[Frame]:
|
||
"""
|
||
解析帧数据
|
||
Args:
|
||
data: 完整的帧数据
|
||
Returns:
|
||
Frame: 解析后的帧对象,如果解析失败返回None
|
||
"""
|
||
try:
|
||
# 检查数据长度
|
||
if len(data) < cls.FIXED_SIZE + cls.DATA_SIZE:
|
||
print(f"数据长度不足: {len(data)} < {cls.FIXED_SIZE + cls.DATA_SIZE}")
|
||
return None
|
||
|
||
# 检查帧头
|
||
if data[:cls.HEADER_SIZE] != cls.FRAME_HEADER:
|
||
print(f"帧头不匹配: {data[:cls.HEADER_SIZE].hex()} != {cls.FRAME_HEADER.hex()}")
|
||
return None
|
||
|
||
# 解析固定字段
|
||
offset = cls.HEADER_SIZE
|
||
command = struct.unpack("<I", data[offset:offset+cls.COMMAND_SIZE])[0]
|
||
offset += cls.COMMAND_SIZE
|
||
|
||
length = struct.unpack("<I", data[offset:offset+cls.LENGTH_SIZE])[0]
|
||
offset += cls.LENGTH_SIZE
|
||
|
||
# 提取帧IDX(第一个数据项)
|
||
frame_idx = struct.unpack("<I", data[offset:offset+4])[0]
|
||
offset += 4
|
||
|
||
# 提取17个通道数据
|
||
channels = []
|
||
try:
|
||
for _ in range(17):
|
||
if offset + 4 > len(data):
|
||
print("错误:数据长度不足以读取所有通道")
|
||
return None
|
||
channel_value = struct.unpack("<I", data[offset:offset+4])[0]
|
||
channels.append(channel_value)
|
||
offset += 4
|
||
except Exception as e:
|
||
print(f"解析通道数据失败: {e}")
|
||
return None
|
||
|
||
# 确保还有校验和字节
|
||
if offset >= len(data):
|
||
print("错误:数据长度不足以读取校验和")
|
||
return None
|
||
|
||
# 提取校验和
|
||
checksum = data[offset]
|
||
|
||
# 验证校验和
|
||
if not cls.verify_checksum(data[:offset], checksum):
|
||
print("校验和验证失败")
|
||
return None
|
||
|
||
return Frame(
|
||
command=command,
|
||
length=length,
|
||
frame_idx=frame_idx,
|
||
channels=channels,
|
||
checksum=checksum
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"解析帧时出错: {e}")
|
||
return None
|
||
|
||
@classmethod
|
||
def build_frame(cls, command: int, index: int, data: bytes) -> bytes:
|
||
"""
|
||
构建帧数据
|
||
Args:
|
||
command: 命令码
|
||
index: 帧索引
|
||
data: 数据内容
|
||
Returns:
|
||
bytes: 完整的帧数据
|
||
"""
|
||
# 构建帧数据(不包含校验和)
|
||
frame_data = (
|
||
cls.FRAME_HEADER +
|
||
struct.pack("<I", command) +
|
||
struct.pack("<I", len(data) + cls.CHECKSUM_SIZE) + # 修正:长度应该是数据长度加上校验和长度
|
||
data # 修正:直接使用传入的数据,不需要再次打包帧IDX
|
||
)
|
||
|
||
# 计算并添加校验和
|
||
checksum = cls.calculate_checksum(frame_data)
|
||
return frame_data + bytes([checksum])
|
||
|
||
@staticmethod
|
||
def calculate_checksum(data: bytes) -> int:
|
||
"""
|
||
计算校验和 - 不包含校验码本身
|
||
Args:
|
||
data: 需要计算校验和的数据(不包含校验码)
|
||
Returns:
|
||
int: 校验和值
|
||
"""
|
||
# 简单的字节累加校验
|
||
return sum(data) & 0xFF
|
||
|
||
@classmethod
|
||
def verify_checksum(cls, data: bytes, checksum: int) -> bool:
|
||
"""
|
||
验证校验和
|
||
Args:
|
||
data: 数据内容(不包含校验码)
|
||
checksum: 校验和值
|
||
Returns:
|
||
bool: 校验结果
|
||
"""
|
||
return cls.calculate_checksum(data) == checksum
|
||
|
||
@classmethod
|
||
def get_frame_length(cls, data: bytes) -> Optional[int]:
|
||
"""
|
||
从数据中获取帧总长度
|
||
Args:
|
||
data: 至少包含帧头和长度字段的数据
|
||
Returns:
|
||
int: 帧总长度,如果数据不足返回None
|
||
"""
|
||
min_size = cls.HEADER_SIZE + cls.COMMAND_SIZE + cls.LENGTH_SIZE
|
||
if len(data) < min_size:
|
||
return None
|
||
|
||
if data[:cls.HEADER_SIZE] != cls.FRAME_HEADER:
|
||
return None
|
||
|
||
length = struct.unpack("<I", data[cls.HEADER_SIZE+cls.COMMAND_SIZE:min_size])[0]
|
||
return length + cls.FIXED_SIZE
|
||
|
||
if __name__ == "__main__":
|
||
# 测试数据(十六进制字符串)
|
||
test_data_hex = "AAAAAAAA0200000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F3"
|
||
print(len(test_data_hex))
|
||
# 将十六进制字符串转换为字节数据
|
||
test_data = bytes.fromhex(test_data_hex)
|
||
print(f"测试数据长度: {len(test_data)} 字节")
|
||
print(f"测试数据: {test_data.hex(' ').upper()}")
|
||
|
||
# 解析过程调试
|
||
print("\n解析过程:")
|
||
|
||
# 1. 检查数据长度
|
||
print(f"1. 检查数据长度: {len(test_data)} >= {FrameFormat.FIXED_SIZE}")
|
||
|
||
# 2. 检查帧头
|
||
frame_header = test_data[:FrameFormat.HEADER_SIZE]
|
||
print(f"2. 检查帧头: {frame_header.hex().upper()} == {FrameFormat.FRAME_HEADER.hex().upper()}")
|
||
|
||
# 3. 解析命令码
|
||
offset = FrameFormat.HEADER_SIZE
|
||
command_bytes = test_data[offset:offset+FrameFormat.COMMAND_SIZE]
|
||
command = struct.unpack("<I", command_bytes)[0]
|
||
print(f"3. 命令码: 0x{command:08X} (bytes: {command_bytes.hex().upper()})")
|
||
|
||
# 4. 解析长度
|
||
offset += FrameFormat.COMMAND_SIZE
|
||
length_bytes = test_data[offset:offset+FrameFormat.LENGTH_SIZE]
|
||
length = struct.unpack("<I", length_bytes)[0]
|
||
print(f"4. 长度: {length} (bytes: {length_bytes.hex().upper()})")
|
||
|
||
# 5. 提取帧IDX
|
||
offset += FrameFormat.LENGTH_SIZE # 更新offset到帧IDX位置
|
||
frame_idx_bytes = test_data[offset:offset+4]
|
||
frame_idx = struct.unpack("<I", frame_idx_bytes)[0]
|
||
print(f"5. 帧IDX: {frame_idx}")
|
||
offset += 4
|
||
|
||
# 6. 提取17个通道数据
|
||
print("6. 通道数据:")
|
||
channels = []
|
||
try:
|
||
for i in range(17):
|
||
channel_bytes = test_data[offset:offset+4]
|
||
if len(channel_bytes) != 4:
|
||
print(f" 警告:通道 {i+1} 数据不完整,期望4字节,实际{len(channel_bytes)}字节")
|
||
break
|
||
channel_value = struct.unpack("<I", channel_bytes)[0]
|
||
channels.append(channel_value)
|
||
print(f" 通道 {i+1}: 0x{channel_value:08X}")
|
||
offset += 4
|
||
except Exception as e:
|
||
print(f" 错误:解析通道数据失败: {e}")
|
||
print(f" 当前偏移量: {offset}")
|
||
print(f" 剩余数据: {test_data[offset:].hex().upper()}")
|
||
|
||
# 7. 提取校验和
|
||
if offset < len(test_data):
|
||
checksum = test_data[offset]
|
||
print(f"7. 校验和: 0x{checksum:02X}")
|
||
else:
|
||
print("7. 错误:无法读取校验和,数据不足")
|
||
|
||
# 8. 验证校验和
|
||
# 计算校验和时只使用校验码之前的数据
|
||
calculated_checksum = FrameFormat.calculate_checksum(test_data[:offset])
|
||
print(f"8. 校验和验证: 计算值=0x{calculated_checksum:02X}, 实际值=0x{checksum:02X}")
|
||
|
||
# 解析帧
|
||
frame = FrameFormat.parse_frame(test_data)
|
||
if frame:
|
||
print("\n解析成功!")
|
||
print(f"命令码: 0x{frame.command:08X}")
|
||
print(f"长度: {frame.length}")
|
||
print(f"帧IDX: {frame.frame_idx}")
|
||
print("通道数据:")
|
||
for i, value in enumerate(frame.channels):
|
||
print(f"通道 {i+1}: 0x{value:08X}")
|
||
print(f"校验和: 0x{frame.checksum:02X}")
|
||
else:
|
||
print("\n解析失败!")
|