2025-03-25 09:17:43 +08:00
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
import struct
|
2025-11-19 14:57:37 +08:00
|
|
|
|
import time
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
|
class Frame:
|
|
|
|
|
|
"""帧数据结构"""
|
|
|
|
|
|
command: int # 命令码 (4字节)
|
|
|
|
|
|
length: int # 数据长度 (4字节)
|
|
|
|
|
|
frame_idx: int # 帧序号 (4字节)
|
|
|
|
|
|
channels: list # 17个通道数据 (每个4字节)
|
|
|
|
|
|
checksum: int # 校验和 (1字节)
|
2025-11-19 14:57:37 +08:00
|
|
|
|
timestamp: float # 时间戳 (秒)
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
|
|
|
|
|
class FrameFormat:
|
|
|
|
|
|
"""帧格式描述类"""
|
|
|
|
|
|
# 帧头魔数 (4字节)
|
|
|
|
|
|
FRAME_HEADER = bytes([0xAA, 0xAA, 0xAA, 0xAA])
|
|
|
|
|
|
|
|
|
|
|
|
# 各字段长度定义
|
|
|
|
|
|
HEADER_SIZE = 4
|
|
|
|
|
|
COMMAND_SIZE = 4
|
|
|
|
|
|
LENGTH_SIZE = 4
|
|
|
|
|
|
INDEX_SIZE = 4
|
|
|
|
|
|
CHECKSUM_SIZE = 1
|
|
|
|
|
|
CHANNEL_SIZE = 4 # 每个通道的字节数
|
|
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
def __init__(self, num_channels: int = 17):
|
|
|
|
|
|
"""
|
|
|
|
|
|
初始化帧格式
|
|
|
|
|
|
Args:
|
|
|
|
|
|
num_channels: 通道数量,默认为8
|
|
|
|
|
|
"""
|
|
|
|
|
|
self.CHANNEL_COUNT = num_channels
|
|
|
|
|
|
# 帧固定部分的长度(不包含可变长度的数据部分)
|
|
|
|
|
|
self.FIXED_SIZE = self.HEADER_SIZE + self.COMMAND_SIZE + self.LENGTH_SIZE + self.INDEX_SIZE + self.CHECKSUM_SIZE
|
|
|
|
|
|
# 数据部分的长度(帧IDX + 通道数据)
|
|
|
|
|
|
self.DATA_SIZE = self.INDEX_SIZE + (self.CHANNEL_COUNT - 1) * self.CHANNEL_SIZE # 帧IDX + (通道数-1)个通道数据
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
def generateFrameData(self, frame_idx: int, command: int = None, channel_values: list = None) -> bytes:
|
2025-03-25 09:17:43 +08:00
|
|
|
|
"""
|
|
|
|
|
|
生成测试用的帧数据
|
|
|
|
|
|
Args:
|
|
|
|
|
|
frame_idx: 帧序号
|
|
|
|
|
|
command: 命令码,如果为None则使用frame_idx作为命令码
|
|
|
|
|
|
channel_values: 通道数据列表,如果为None则生成随机数据
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
bytes: 完整的帧数据
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 使用frame_idx作为默认命令码
|
|
|
|
|
|
if command is None:
|
|
|
|
|
|
command = frame_idx
|
|
|
|
|
|
|
|
|
|
|
|
# 生成随机通道数据
|
|
|
|
|
|
if channel_values is None:
|
|
|
|
|
|
import random
|
|
|
|
|
|
channel_values = []
|
2025-11-19 14:57:37 +08:00
|
|
|
|
for _ in range(self.CHANNEL_COUNT): # 使用实例变量确保正确的通道数
|
2025-03-25 09:17:43 +08:00
|
|
|
|
# 生成-1000到1000之间的随机浮点数,转换为整数存储
|
|
|
|
|
|
value = int(random.uniform(-1000, 1000) * 1000) # 放大1000倍以保持精度
|
|
|
|
|
|
channel_values.append(value)
|
2025-11-19 14:57:37 +08:00
|
|
|
|
elif len(channel_values) != self.CHANNEL_COUNT:
|
|
|
|
|
|
raise ValueError(f"通道数据数量必须为 {self.CHANNEL_COUNT},当前为 {len(channel_values)}")
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
|
|
|
|
|
# 构建数据部分(帧IDX + 通道数据)
|
2025-11-19 14:57:37 +08:00
|
|
|
|
data = struct.pack("<I", frame_idx)
|
2025-03-25 09:17:43 +08:00
|
|
|
|
for value in channel_values:
|
|
|
|
|
|
data += struct.pack("<i", value) # 通道数据
|
|
|
|
|
|
|
|
|
|
|
|
# 使用build_frame方法构建完整帧
|
2025-11-19 14:57:37 +08:00
|
|
|
|
frame_data = self.build_frame(command, frame_idx, data)
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
|
|
|
|
|
# 验证生成的帧是否有效
|
2025-11-19 14:57:37 +08:00
|
|
|
|
if not self.parse_frame(frame_data):
|
2025-03-25 09:17:43 +08:00
|
|
|
|
print(f"\n帧验证失败,帧数据:")
|
2025-11-19 14:57:37 +08:00
|
|
|
|
print(f"帧头: {frame_data[:self.HEADER_SIZE].hex().upper()}")
|
|
|
|
|
|
print(f"命令码: {struct.unpack('<I', frame_data[self.HEADER_SIZE:self.HEADER_SIZE+self.COMMAND_SIZE])[0]}")
|
|
|
|
|
|
print(f"长度: {struct.unpack('<I', frame_data[self.HEADER_SIZE+self.COMMAND_SIZE:self.HEADER_SIZE+self.COMMAND_SIZE+self.LENGTH_SIZE])[0]}")
|
|
|
|
|
|
print(f"帧序号: {struct.unpack('<I', frame_data[self.HEADER_SIZE+self.COMMAND_SIZE+self.LENGTH_SIZE:self.HEADER_SIZE+self.COMMAND_SIZE+self.LENGTH_SIZE+self.INDEX_SIZE])[0]}")
|
2025-03-25 09:17:43 +08:00
|
|
|
|
print(f"校验和: {frame_data[-1]:02X}")
|
|
|
|
|
|
print(f"完整帧数据: {frame_data.hex().upper()}")
|
|
|
|
|
|
raise ValueError("生成的帧验证失败")
|
|
|
|
|
|
|
|
|
|
|
|
return frame_data
|
|
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
@staticmethod
|
|
|
|
|
|
def parse_frame(data: bytes, num_channels: int = 17) -> Optional[Frame]:
|
2025-03-25 09:17:43 +08:00
|
|
|
|
"""
|
|
|
|
|
|
解析帧数据
|
|
|
|
|
|
Args:
|
|
|
|
|
|
data: 完整的帧数据
|
2025-11-19 14:57:37 +08:00
|
|
|
|
num_channels: 通道数量,默认为17
|
2025-03-25 09:17:43 +08:00
|
|
|
|
Returns:
|
|
|
|
|
|
Frame: 解析后的帧对象,如果解析失败返回None
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
2025-11-19 14:57:37 +08:00
|
|
|
|
# 计算帧长度
|
|
|
|
|
|
HEADER_SIZE = 4
|
|
|
|
|
|
COMMAND_SIZE = 4
|
|
|
|
|
|
LENGTH_SIZE = 4
|
|
|
|
|
|
INDEX_SIZE = 4
|
|
|
|
|
|
CHECKSUM_SIZE = 1
|
|
|
|
|
|
CHANNEL_SIZE = 4
|
|
|
|
|
|
|
|
|
|
|
|
FIXED_SIZE = HEADER_SIZE + COMMAND_SIZE + LENGTH_SIZE + INDEX_SIZE + CHECKSUM_SIZE
|
|
|
|
|
|
DATA_SIZE = INDEX_SIZE + (num_channels - 1) * CHANNEL_SIZE
|
|
|
|
|
|
|
2025-03-25 09:17:43 +08:00
|
|
|
|
# 检查数据长度
|
2025-11-19 14:57:37 +08:00
|
|
|
|
if len(data) < FIXED_SIZE + DATA_SIZE:
|
|
|
|
|
|
print(f"数据长度不足: {len(data)} < {FIXED_SIZE + DATA_SIZE}")
|
2025-03-25 09:17:43 +08:00
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 检查帧头
|
2025-11-19 14:57:37 +08:00
|
|
|
|
FRAME_HEADER = bytes([0xAA, 0xAA, 0xAA, 0xAA])
|
|
|
|
|
|
if data[:HEADER_SIZE] != FRAME_HEADER:
|
|
|
|
|
|
print(f"帧头不匹配: {data[:HEADER_SIZE].hex()} != {FRAME_HEADER.hex()}")
|
2025-03-25 09:17:43 +08:00
|
|
|
|
return None
|
|
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
# 解析命令码
|
|
|
|
|
|
command = struct.unpack('<I', data[HEADER_SIZE:HEADER_SIZE+COMMAND_SIZE])[0]
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
# 解析长度
|
|
|
|
|
|
length = struct.unpack('<I', data[HEADER_SIZE+COMMAND_SIZE:HEADER_SIZE+COMMAND_SIZE+LENGTH_SIZE])[0]
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
# 解析帧序号
|
|
|
|
|
|
frame_idx = struct.unpack('<I', data[HEADER_SIZE+COMMAND_SIZE+LENGTH_SIZE:HEADER_SIZE+COMMAND_SIZE+LENGTH_SIZE+INDEX_SIZE])[0]
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
# 解析通道数据
|
2025-03-25 09:17:43 +08:00
|
|
|
|
channels = []
|
2025-11-19 14:57:37 +08:00
|
|
|
|
data_start = HEADER_SIZE + COMMAND_SIZE + LENGTH_SIZE + INDEX_SIZE
|
|
|
|
|
|
for i in range(num_channels):
|
|
|
|
|
|
channel_data = data[data_start + i*CHANNEL_SIZE:data_start + (i+1)*CHANNEL_SIZE]
|
|
|
|
|
|
value = struct.unpack('<i', channel_data)[0]
|
|
|
|
|
|
channels.append(value)
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
# 解析校验和
|
|
|
|
|
|
checksum = data[-1]
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
|
|
|
|
|
# 验证校验和
|
2025-11-19 14:57:37 +08:00
|
|
|
|
calculated_checksum = sum(data[:-CHECKSUM_SIZE]) & 0xFF
|
|
|
|
|
|
if calculated_checksum != checksum:
|
|
|
|
|
|
print(f"校验和不匹配: {calculated_checksum:02X} != {checksum:02X}")
|
2025-03-25 09:17:43 +08:00
|
|
|
|
return None
|
|
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
# 创建帧对象,添加当前时间作为时间戳
|
2025-03-25 09:17:43 +08:00
|
|
|
|
return Frame(
|
|
|
|
|
|
command=command,
|
|
|
|
|
|
length=length,
|
|
|
|
|
|
frame_idx=frame_idx,
|
|
|
|
|
|
channels=channels,
|
2025-11-19 14:57:37 +08:00
|
|
|
|
checksum=checksum,
|
|
|
|
|
|
timestamp=time.time()
|
2025-03-25 09:17:43 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-11-19 14:57:37 +08:00
|
|
|
|
print(f"解析帧数据时出错: {e}")
|
2025-03-25 09:17:43 +08:00
|
|
|
|
return None
|
|
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
def build_frame(self, command: int, index: int, data: bytes) -> bytes:
|
2025-03-25 09:17:43 +08:00
|
|
|
|
"""
|
|
|
|
|
|
构建帧数据
|
|
|
|
|
|
Args:
|
|
|
|
|
|
command: 命令码
|
|
|
|
|
|
index: 帧索引
|
|
|
|
|
|
data: 数据内容
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
bytes: 完整的帧数据
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 构建帧数据(不包含校验和)
|
|
|
|
|
|
frame_data = (
|
2025-11-19 14:57:37 +08:00
|
|
|
|
self.FRAME_HEADER +
|
2025-03-25 09:17:43 +08:00
|
|
|
|
struct.pack("<I", command) +
|
2025-11-19 14:57:37 +08:00
|
|
|
|
struct.pack("<I", len(data) + self.CHECKSUM_SIZE) + # 修正:长度应该是数据长度加上校验和长度
|
2025-03-25 09:17:43 +08:00
|
|
|
|
data # 修正:直接使用传入的数据,不需要再次打包帧IDX
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# 计算并添加校验和
|
2025-11-19 14:57:37 +08:00
|
|
|
|
checksum = self.calculate_checksum(frame_data)
|
2025-03-25 09:17:43 +08:00
|
|
|
|
return frame_data + bytes([checksum])
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
|
def calculate_checksum(data: bytes) -> int:
|
|
|
|
|
|
"""
|
|
|
|
|
|
计算校验和 - 不包含校验码本身
|
|
|
|
|
|
Args:
|
|
|
|
|
|
data: 需要计算校验和的数据(不包含校验码)
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
int: 校验和值
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 简单的字节累加校验
|
|
|
|
|
|
return sum(data) & 0xFF
|
|
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
def verify_checksum(self, data: bytes, checksum: int) -> bool:
|
2025-03-25 09:17:43 +08:00
|
|
|
|
"""
|
|
|
|
|
|
验证校验和
|
|
|
|
|
|
Args:
|
|
|
|
|
|
data: 数据内容(不包含校验码)
|
|
|
|
|
|
checksum: 校验和值
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
bool: 校验结果
|
|
|
|
|
|
"""
|
2025-11-19 14:57:37 +08:00
|
|
|
|
return self.calculate_checksum(data) == checksum
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
def get_frame_length(self, data: bytes) -> Optional[int]:
|
2025-03-25 09:17:43 +08:00
|
|
|
|
"""
|
|
|
|
|
|
从数据中获取帧总长度
|
|
|
|
|
|
Args:
|
|
|
|
|
|
data: 至少包含帧头和长度字段的数据
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
int: 帧总长度,如果数据不足返回None
|
|
|
|
|
|
"""
|
2025-11-19 14:57:37 +08:00
|
|
|
|
min_size = self.HEADER_SIZE + self.COMMAND_SIZE + self.LENGTH_SIZE
|
2025-03-25 09:17:43 +08:00
|
|
|
|
if len(data) < min_size:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
if data[:self.HEADER_SIZE] != self.FRAME_HEADER:
|
2025-03-25 09:17:43 +08:00
|
|
|
|
return None
|
|
|
|
|
|
|
2025-11-19 14:57:37 +08:00
|
|
|
|
length = struct.unpack("<I", data[self.HEADER_SIZE+self.COMMAND_SIZE:min_size])[0]
|
|
|
|
|
|
return length + self.FIXED_SIZE
|
2025-03-25 09:17:43 +08:00
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
# 测试数据(十六进制字符串)
|
|
|
|
|
|
test_data_hex = "AAAAAAAA0200000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F3"
|
|
|
|
|
|
print(len(test_data_hex))
|
|
|
|
|
|
# 将十六进制字符串转换为字节数据
|
|
|
|
|
|
test_data = bytes.fromhex(test_data_hex)
|
|
|
|
|
|
print(f"测试数据长度: {len(test_data)} 字节")
|
|
|
|
|
|
print(f"测试数据: {test_data.hex(' ').upper()}")
|
|
|
|
|
|
|
|
|
|
|
|
# 解析过程调试
|
|
|
|
|
|
print("\n解析过程:")
|
|
|
|
|
|
|
|
|
|
|
|
# 1. 检查数据长度
|
|
|
|
|
|
print(f"1. 检查数据长度: {len(test_data)} >= {FrameFormat.FIXED_SIZE}")
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 检查帧头
|
|
|
|
|
|
frame_header = test_data[:FrameFormat.HEADER_SIZE]
|
|
|
|
|
|
print(f"2. 检查帧头: {frame_header.hex().upper()} == {FrameFormat.FRAME_HEADER.hex().upper()}")
|
|
|
|
|
|
|
|
|
|
|
|
# 3. 解析命令码
|
|
|
|
|
|
offset = FrameFormat.HEADER_SIZE
|
|
|
|
|
|
command_bytes = test_data[offset:offset+FrameFormat.COMMAND_SIZE]
|
|
|
|
|
|
command = struct.unpack("<I", command_bytes)[0]
|
|
|
|
|
|
print(f"3. 命令码: 0x{command:08X} (bytes: {command_bytes.hex().upper()})")
|
|
|
|
|
|
|
|
|
|
|
|
# 4. 解析长度
|
|
|
|
|
|
offset += FrameFormat.COMMAND_SIZE
|
|
|
|
|
|
length_bytes = test_data[offset:offset+FrameFormat.LENGTH_SIZE]
|
|
|
|
|
|
length = struct.unpack("<I", length_bytes)[0]
|
|
|
|
|
|
print(f"4. 长度: {length} (bytes: {length_bytes.hex().upper()})")
|
|
|
|
|
|
|
|
|
|
|
|
# 5. 提取帧IDX
|
|
|
|
|
|
offset += FrameFormat.LENGTH_SIZE # 更新offset到帧IDX位置
|
|
|
|
|
|
frame_idx_bytes = test_data[offset:offset+4]
|
|
|
|
|
|
frame_idx = struct.unpack("<I", frame_idx_bytes)[0]
|
|
|
|
|
|
print(f"5. 帧IDX: {frame_idx}")
|
|
|
|
|
|
offset += 4
|
|
|
|
|
|
|
|
|
|
|
|
# 6. 提取17个通道数据
|
|
|
|
|
|
print("6. 通道数据:")
|
|
|
|
|
|
channels = []
|
|
|
|
|
|
try:
|
|
|
|
|
|
for i in range(17):
|
|
|
|
|
|
channel_bytes = test_data[offset:offset+4]
|
|
|
|
|
|
if len(channel_bytes) != 4:
|
|
|
|
|
|
print(f" 警告:通道 {i+1} 数据不完整,期望4字节,实际{len(channel_bytes)}字节")
|
|
|
|
|
|
break
|
|
|
|
|
|
channel_value = struct.unpack("<I", channel_bytes)[0]
|
|
|
|
|
|
channels.append(channel_value)
|
|
|
|
|
|
print(f" 通道 {i+1}: 0x{channel_value:08X}")
|
|
|
|
|
|
offset += 4
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" 错误:解析通道数据失败: {e}")
|
|
|
|
|
|
print(f" 当前偏移量: {offset}")
|
|
|
|
|
|
print(f" 剩余数据: {test_data[offset:].hex().upper()}")
|
|
|
|
|
|
|
|
|
|
|
|
# 7. 提取校验和
|
|
|
|
|
|
if offset < len(test_data):
|
|
|
|
|
|
checksum = test_data[offset]
|
|
|
|
|
|
print(f"7. 校验和: 0x{checksum:02X}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("7. 错误:无法读取校验和,数据不足")
|
|
|
|
|
|
|
|
|
|
|
|
# 8. 验证校验和
|
|
|
|
|
|
# 计算校验和时只使用校验码之前的数据
|
|
|
|
|
|
calculated_checksum = FrameFormat.calculate_checksum(test_data[:offset])
|
|
|
|
|
|
print(f"8. 校验和验证: 计算值=0x{calculated_checksum:02X}, 实际值=0x{checksum:02X}")
|
|
|
|
|
|
|
|
|
|
|
|
# 解析帧
|
|
|
|
|
|
frame = FrameFormat.parse_frame(test_data)
|
|
|
|
|
|
if frame:
|
|
|
|
|
|
print("\n解析成功!")
|
|
|
|
|
|
print(f"命令码: 0x{frame.command:08X}")
|
|
|
|
|
|
print(f"长度: {frame.length}")
|
|
|
|
|
|
print(f"帧IDX: {frame.frame_idx}")
|
|
|
|
|
|
print("通道数据:")
|
|
|
|
|
|
for i, value in enumerate(frame.channels):
|
|
|
|
|
|
print(f"通道 {i+1}: 0x{value:08X}")
|
|
|
|
|
|
print(f"校验和: 0x{frame.checksum:02X}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("\n解析失败!")
|