brisonus_data_collector/frame.py

304 lines
12 KiB
Python
Raw Normal View History

from dataclasses import dataclass
from typing import Optional
import struct
2025-11-19 14:57:37 +08:00
import time
@dataclass
class Frame:
"""帧数据结构"""
command: int # 命令码 (4字节)
length: int # 数据长度 (4字节)
frame_idx: int # 帧序号 (4字节)
channels: list # 17个通道数据 (每个4字节)
checksum: int # 校验和 (1字节)
2025-11-19 14:57:37 +08:00
timestamp: float # 时间戳 (秒)
class FrameFormat:
"""帧格式描述类"""
# 帧头魔数 (4字节)
FRAME_HEADER = bytes([0xAA, 0xAA, 0xAA, 0xAA])
# 各字段长度定义
HEADER_SIZE = 4
COMMAND_SIZE = 4
LENGTH_SIZE = 4
INDEX_SIZE = 4
CHECKSUM_SIZE = 1
CHANNEL_SIZE = 4 # 每个通道的字节数
2025-11-19 14:57:37 +08:00
def __init__(self, num_channels: int = 17):
"""
初始化帧格式
Args:
num_channels: 通道数量默认为8
"""
self.CHANNEL_COUNT = num_channels
# 帧固定部分的长度(不包含可变长度的数据部分)
self.FIXED_SIZE = self.HEADER_SIZE + self.COMMAND_SIZE + self.LENGTH_SIZE + self.INDEX_SIZE + self.CHECKSUM_SIZE
# 数据部分的长度帧IDX + 通道数据)
self.DATA_SIZE = self.INDEX_SIZE + (self.CHANNEL_COUNT - 1) * self.CHANNEL_SIZE # 帧IDX + (通道数-1)个通道数据
2025-11-19 14:57:37 +08:00
def generateFrameData(self, frame_idx: int, command: int = None, channel_values: list = None) -> bytes:
"""
生成测试用的帧数据
Args:
frame_idx: 帧序号
command: 命令码如果为None则使用frame_idx作为命令码
channel_values: 通道数据列表如果为None则生成随机数据
Returns:
bytes: 完整的帧数据
"""
# 使用frame_idx作为默认命令码
if command is None:
command = frame_idx
# 生成随机通道数据
if channel_values is None:
import random
channel_values = []
2025-11-19 14:57:37 +08:00
for _ in range(self.CHANNEL_COUNT): # 使用实例变量确保正确的通道数
# 生成-1000到1000之间的随机浮点数转换为整数存储
value = int(random.uniform(-1000, 1000) * 1000) # 放大1000倍以保持精度
channel_values.append(value)
2025-11-19 14:57:37 +08:00
elif len(channel_values) != self.CHANNEL_COUNT:
raise ValueError(f"通道数据数量必须为 {self.CHANNEL_COUNT},当前为 {len(channel_values)}")
# 构建数据部分帧IDX + 通道数据)
2025-11-19 14:57:37 +08:00
data = struct.pack("<I", frame_idx)
for value in channel_values:
data += struct.pack("<i", value) # 通道数据
# 使用build_frame方法构建完整帧
2025-11-19 14:57:37 +08:00
frame_data = self.build_frame(command, frame_idx, data)
# 验证生成的帧是否有效
2025-11-19 14:57:37 +08:00
if not self.parse_frame(frame_data):
print(f"\n帧验证失败,帧数据:")
2025-11-19 14:57:37 +08:00
print(f"帧头: {frame_data[:self.HEADER_SIZE].hex().upper()}")
print(f"命令码: {struct.unpack('<I', frame_data[self.HEADER_SIZE:self.HEADER_SIZE+self.COMMAND_SIZE])[0]}")
print(f"长度: {struct.unpack('<I', frame_data[self.HEADER_SIZE+self.COMMAND_SIZE:self.HEADER_SIZE+self.COMMAND_SIZE+self.LENGTH_SIZE])[0]}")
print(f"帧序号: {struct.unpack('<I', frame_data[self.HEADER_SIZE+self.COMMAND_SIZE+self.LENGTH_SIZE:self.HEADER_SIZE+self.COMMAND_SIZE+self.LENGTH_SIZE+self.INDEX_SIZE])[0]}")
print(f"校验和: {frame_data[-1]:02X}")
print(f"完整帧数据: {frame_data.hex().upper()}")
raise ValueError("生成的帧验证失败")
return frame_data
2025-11-19 14:57:37 +08:00
@staticmethod
def parse_frame(data: bytes, num_channels: int = 17) -> Optional[Frame]:
"""
解析帧数据
Args:
data: 完整的帧数据
2025-11-19 14:57:37 +08:00
num_channels: 通道数量默认为17
Returns:
Frame: 解析后的帧对象如果解析失败返回None
"""
try:
2025-11-19 14:57:37 +08:00
# 计算帧长度
HEADER_SIZE = 4
COMMAND_SIZE = 4
LENGTH_SIZE = 4
INDEX_SIZE = 4
CHECKSUM_SIZE = 1
CHANNEL_SIZE = 4
FIXED_SIZE = HEADER_SIZE + COMMAND_SIZE + LENGTH_SIZE + INDEX_SIZE + CHECKSUM_SIZE
DATA_SIZE = INDEX_SIZE + (num_channels - 1) * CHANNEL_SIZE
# 检查数据长度
2025-11-19 14:57:37 +08:00
if len(data) < FIXED_SIZE + DATA_SIZE:
print(f"数据长度不足: {len(data)} < {FIXED_SIZE + DATA_SIZE}")
return None
# 检查帧头
2025-11-19 14:57:37 +08:00
FRAME_HEADER = bytes([0xAA, 0xAA, 0xAA, 0xAA])
if data[:HEADER_SIZE] != FRAME_HEADER:
print(f"帧头不匹配: {data[:HEADER_SIZE].hex()} != {FRAME_HEADER.hex()}")
return None
2025-11-19 14:57:37 +08:00
# 解析命令码
command = struct.unpack('<I', data[HEADER_SIZE:HEADER_SIZE+COMMAND_SIZE])[0]
2025-11-19 14:57:37 +08:00
# 解析长度
length = struct.unpack('<I', data[HEADER_SIZE+COMMAND_SIZE:HEADER_SIZE+COMMAND_SIZE+LENGTH_SIZE])[0]
2025-11-19 14:57:37 +08:00
# 解析帧序号
frame_idx = struct.unpack('<I', data[HEADER_SIZE+COMMAND_SIZE+LENGTH_SIZE:HEADER_SIZE+COMMAND_SIZE+LENGTH_SIZE+INDEX_SIZE])[0]
2025-11-19 14:57:37 +08:00
# 解析通道数据
channels = []
2025-11-19 14:57:37 +08:00
data_start = HEADER_SIZE + COMMAND_SIZE + LENGTH_SIZE + INDEX_SIZE
for i in range(num_channels):
channel_data = data[data_start + i*CHANNEL_SIZE:data_start + (i+1)*CHANNEL_SIZE]
value = struct.unpack('<i', channel_data)[0]
channels.append(value)
2025-11-19 14:57:37 +08:00
# 解析校验和
checksum = data[-1]
# 验证校验和
2025-11-19 14:57:37 +08:00
calculated_checksum = sum(data[:-CHECKSUM_SIZE]) & 0xFF
if calculated_checksum != checksum:
print(f"校验和不匹配: {calculated_checksum:02X} != {checksum:02X}")
return None
2025-11-19 14:57:37 +08:00
# 创建帧对象,添加当前时间作为时间戳
return Frame(
command=command,
length=length,
frame_idx=frame_idx,
channels=channels,
2025-11-19 14:57:37 +08:00
checksum=checksum,
timestamp=time.time()
)
except Exception as e:
2025-11-19 14:57:37 +08:00
print(f"解析帧数据时出错: {e}")
return None
2025-11-19 14:57:37 +08:00
def build_frame(self, command: int, index: int, data: bytes) -> bytes:
"""
构建帧数据
Args:
command: 命令码
index: 帧索引
data: 数据内容
Returns:
bytes: 完整的帧数据
"""
# 构建帧数据(不包含校验和)
frame_data = (
2025-11-19 14:57:37 +08:00
self.FRAME_HEADER +
struct.pack("<I", command) +
2025-11-19 14:57:37 +08:00
struct.pack("<I", len(data) + self.CHECKSUM_SIZE) + # 修正:长度应该是数据长度加上校验和长度
data # 修正直接使用传入的数据不需要再次打包帧IDX
)
# 计算并添加校验和
2025-11-19 14:57:37 +08:00
checksum = self.calculate_checksum(frame_data)
return frame_data + bytes([checksum])
@staticmethod
def calculate_checksum(data: bytes) -> int:
"""
计算校验和 - 不包含校验码本身
Args:
data: 需要计算校验和的数据不包含校验码
Returns:
int: 校验和值
"""
# 简单的字节累加校验
return sum(data) & 0xFF
2025-11-19 14:57:37 +08:00
def verify_checksum(self, data: bytes, checksum: int) -> bool:
"""
验证校验和
Args:
data: 数据内容不包含校验码
checksum: 校验和值
Returns:
bool: 校验结果
"""
2025-11-19 14:57:37 +08:00
return self.calculate_checksum(data) == checksum
2025-11-19 14:57:37 +08:00
def get_frame_length(self, data: bytes) -> Optional[int]:
"""
从数据中获取帧总长度
Args:
data: 至少包含帧头和长度字段的数据
Returns:
int: 帧总长度如果数据不足返回None
"""
2025-11-19 14:57:37 +08:00
min_size = self.HEADER_SIZE + self.COMMAND_SIZE + self.LENGTH_SIZE
if len(data) < min_size:
return None
2025-11-19 14:57:37 +08:00
if data[:self.HEADER_SIZE] != self.FRAME_HEADER:
return None
2025-11-19 14:57:37 +08:00
length = struct.unpack("<I", data[self.HEADER_SIZE+self.COMMAND_SIZE:min_size])[0]
return length + self.FIXED_SIZE
if __name__ == "__main__":
# 测试数据(十六进制字符串)
test_data_hex = "AAAAAAAA0200000049000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000F3"
print(len(test_data_hex))
# 将十六进制字符串转换为字节数据
test_data = bytes.fromhex(test_data_hex)
print(f"测试数据长度: {len(test_data)} 字节")
print(f"测试数据: {test_data.hex(' ').upper()}")
# 解析过程调试
print("\n解析过程:")
# 1. 检查数据长度
print(f"1. 检查数据长度: {len(test_data)} >= {FrameFormat.FIXED_SIZE}")
# 2. 检查帧头
frame_header = test_data[:FrameFormat.HEADER_SIZE]
print(f"2. 检查帧头: {frame_header.hex().upper()} == {FrameFormat.FRAME_HEADER.hex().upper()}")
# 3. 解析命令码
offset = FrameFormat.HEADER_SIZE
command_bytes = test_data[offset:offset+FrameFormat.COMMAND_SIZE]
command = struct.unpack("<I", command_bytes)[0]
print(f"3. 命令码: 0x{command:08X} (bytes: {command_bytes.hex().upper()})")
# 4. 解析长度
offset += FrameFormat.COMMAND_SIZE
length_bytes = test_data[offset:offset+FrameFormat.LENGTH_SIZE]
length = struct.unpack("<I", length_bytes)[0]
print(f"4. 长度: {length} (bytes: {length_bytes.hex().upper()})")
# 5. 提取帧IDX
offset += FrameFormat.LENGTH_SIZE # 更新offset到帧IDX位置
frame_idx_bytes = test_data[offset:offset+4]
frame_idx = struct.unpack("<I", frame_idx_bytes)[0]
print(f"5. 帧IDX: {frame_idx}")
offset += 4
# 6. 提取17个通道数据
print("6. 通道数据:")
channels = []
try:
for i in range(17):
channel_bytes = test_data[offset:offset+4]
if len(channel_bytes) != 4:
print(f" 警告:通道 {i+1} 数据不完整期望4字节实际{len(channel_bytes)}字节")
break
channel_value = struct.unpack("<I", channel_bytes)[0]
channels.append(channel_value)
print(f" 通道 {i+1}: 0x{channel_value:08X}")
offset += 4
except Exception as e:
print(f" 错误:解析通道数据失败: {e}")
print(f" 当前偏移量: {offset}")
print(f" 剩余数据: {test_data[offset:].hex().upper()}")
# 7. 提取校验和
if offset < len(test_data):
checksum = test_data[offset]
print(f"7. 校验和: 0x{checksum:02X}")
else:
print("7. 错误:无法读取校验和,数据不足")
# 8. 验证校验和
# 计算校验和时只使用校验码之前的数据
calculated_checksum = FrameFormat.calculate_checksum(test_data[:offset])
print(f"8. 校验和验证: 计算值=0x{calculated_checksum:02X}, 实际值=0x{checksum:02X}")
# 解析帧
frame = FrameFormat.parse_frame(test_data)
if frame:
print("\n解析成功!")
print(f"命令码: 0x{frame.command:08X}")
print(f"长度: {frame.length}")
print(f"帧IDX: {frame.frame_idx}")
print("通道数据:")
for i, value in enumerate(frame.channels):
print(f"通道 {i+1}: 0x{value:08X}")
print(f"校验和: 0x{frame.checksum:02X}")
else:
print("\n解析失败!")