brisonus_data_collector/data_collector copy.py
JingweiCui f7a4a23ab8 首次提交
将PC上的文件存档到远程仓库。
2025-01-07 09:07:01 +08:00

216 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import serial
import time
from collections import deque
import threading
from queue import Queue, Empty
class Frame:
START = b'\xAA\xAA\xAA\xAA'
CMD_POS = (4, 8)
LEN_POS = (8, 12)
ID_POS = (12, 16)
# CHANNEL_DATA_POS = (16, 16 + 4*16)
# CHKSUM_POS = (16 + 4*16, 16 + 4*16 + 1)
def __init__(self, ch_num, frame_data):
self.frame_data = frame_data
self.ch_num = ch_num
self.CHANNEL_DATA_POS = (16, 16 + 4 * ch_num)
self.CHKSUM_POS = (16 + 4 * ch_num, 16 + 4 * ch_num + 1)
self.extract_frame(frame_data)
def get_chksum(self, data):
chksum = 0
for byte in data:
chksum += byte
return chksum & 0xFF
def extract_frame(self, frame_data):
"""提取数据帧"""
self.index = frame_data[self.ID_POS[0]:self.ID_POS[1]]
self.cmd = frame_data[self.CMD_POS[0]:self.CMD_POS[1]]
self.checksum = frame_data[self.CHKSUM_POS[0]:self.CHKSUM_POS[1]]
self.datas = []
for i in range(0, self.ch_num):
channel_data_bytes = frame_data[self.CHANNEL_DATA_POS[0]+i*4:self.CHANNEL_DATA_POS[0] + (i + 1) * 4]
self.datas.append(channel_data_bytes)
def verify(self):
"""验证数据帧"""
self.chksum = self.get_chksum(self.frame_data[0:self.CHANNEL_DATA_POS[1]])
if self.checksum[0] != self.chksum:
return False
else:
return True
class SerialDataLogger:
def __init__(self, port='COM50', baudrate=4000000):
# 初始化参数
self.CHUNK_SIZE = 256
self.BUFFER_SIZE = 1024*50 # 约81KB的缓存区
self.buffer = deque(maxlen=self.BUFFER_SIZE)
self.output_file = "serial_data.dat"
self.process_queue = Queue()
self.is_running = True
self.tag = 0xAA
self.data_remainder = b''
# 初始化串口
self.ser = serial.Serial(
port=port,
baudrate=baudrate,
timeout=1
)
# 创建处理线程
self.process_thread = threading.Thread(target=self.process_thread_func)
self.process_thread.daemon = True
self.process_thread.start()
def read_chunk(self):
"""读取一个数据块"""
try:
chunk = self.ser.read(self.CHUNK_SIZE)
return chunk
except Exception as e:
print(f"读取数据出错: {e}")
return None
def process_thread_func(self):
"""数据处理线程的主函数"""
while self.is_running:
try:
# 从队列中获取数据设置1秒超时
data = self.process_queue.get(timeout=1)
self.process_buffer(data)
except Empty:
continue
# except Exception as e:
# print(f"处理线程发生错误: {e}")
def process_buffer(self, data):
"""处理缓存区数据,识别有效数据帧"""
# 查找数据帧的起始标志假设数据帧以0xAA 0x55开始
frame_start = b'\xAA\xAA\xAA\xAA'
valid_frames = []
data = self.data_remainder + data
print(data)
print(len(data))
# self.save_data(data)
pos = 0
while pos < len(data):
# 查找帧起始标志
start_pos = data.find(frame_start, pos)
if start_pos == -1:
break
# 假设每个数据帧的长度是81字节根据实际协议修改
FRAME_LENGTH = 81
if start_pos + FRAME_LENGTH <= len(data):
frame = data[start_pos:start_pos + FRAME_LENGTH]
# 验证数据帧的有效性(根据实际协议添加校验)
if Frame(16, frame).verify():
valid_frames.append(frame)
pos = start_pos + FRAME_LENGTH
else:
# 将剩余的数据保留
print(start_pos)
self.data_remainder = data[start_pos:]
print(self.data_remainder)
break
# 将有效数据帧写入文件
self.save_frames(valid_frames)
def save_frames(self, frames):
"""保存有效数据帧到文件"""
try:
with open(self.output_file, 'a+') as f:
# 写入数据段标签
# 获取时间戳
# 获取当前时间戳(秒,带微秒)
timestamp = time.time()
# 转换为微秒时间戳
microsecond_timestamp = int(timestamp * 1_000)
_tag_string = f':{self.tag:02X} {microsecond_timestamp:08X}\n'
f.write(_tag_string)
for frame in frames:
_format_string = str(self.format_frame(Frame(16, frame)))
f.write(_format_string)
f.write('\n') # 添加换行符分隔不同帧
except Exception as e:
print(f"保存数据出错: {e}")
def add_tag(self, tag, data):
"""添加标签"""
# 时间戳 TAG标记
pass
def format_frame(self, frame: Frame):
"""格式化数据帧"""
"""_summary_
起始符号 INDEX 通道数量 通道1数据 通道2数据 通道3数据 ...
"""
_format_string = f':{frame.index.hex().upper()} {frame.ch_num:02X}'
for data in frame.datas:
_format_string += f' {data.hex().upper()}'
return _format_string
def save_data(self, data):
"""保存有效数据帧到文件"""
try:
with open(self.output_file, 'ab') as f:
f.write(data)
except Exception as e:
print(f"保存数据出错: {e}")
def run(self):
"""主运行循环"""
print("开始接收数据...")
try_counter = 0
try:
while True:
chunk = self.read_chunk()
if chunk:
# 将数据块添加到缓存区
for byte in chunk:
self.buffer.append(byte)
# 如果缓存区满,将数据发送到处理队列
if len(self.buffer) >= self.BUFFER_SIZE:
print("缓存区已满,加入处理队列...")
# 将当前缓存区的数据转换为字节并加入处理队列
self.process_queue.put(bytes(list(self.buffer)))
self.buffer.clear()
try_counter = 0
else:
try_counter += 1
if try_counter > 2:
if len(self.buffer) > 0:
self.process_queue.put(bytes(list(self.buffer)))
self.buffer.clear()
try_counter = 0
except KeyboardInterrupt:
print("\n程序已停止")
finally:
self.is_running = False # 停止处理线程
self.process_thread.join() # 等待处理线程结束
self.ser.close()
if __name__ == "__main__":
# 创建实例并运行
logger = SerialDataLogger(port='COM50', baudrate=4000000)
logger.run()