brisonus_app_eq/ERNCTuningTool/communication/param_manager.py

160 lines
4.8 KiB
Python
Raw Permalink Normal View History

2025-02-18 22:05:52 +08:00
from queue import Queue
from sqlitedict import SqliteDict
import busybox
from communication.com_cmd import ComCmd
from communication.param import Param
from env import PARAMS_DB_PATH, TB_NAME, PARAMS_DEFINES
import csv
class ParamManager:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self.data = 10
self.task_queue = Queue()
#
self.dict = SqliteDict(PARAMS_DB_PATH, tablename=TB_NAME, autocommit=True)
self.init_params_db()
def init_params_db(self):
with open(PARAMS_DEFINES, mode='r', encoding='utf-8') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for idx, row_data in enumerate(csv_reader):
if idx == 0:
continue
# 判断db中是否存在当前名称的参数
if self.dict.get(row_data[0]) is None:
self.dict[row_data[0]] = Param(
str(row_data[0]),
int(row_data[1]),
int(row_data[2]))
self.dict.commit()
for itm_param in self.dict.items():
_param: Param = itm_param[1]
_param.list_val = _param.list_set_val
self.dict[itm_param[0]] = _param
def get(self, key)->Param:
"""
通过key值获取参数变量
:param key:
:return:
"""
return self.dict[key]
def set_param_all_vals(self, key, vals: list):
"""
设置名为key的变量的所有值
:param key:
:param vals:
:return:
"""
# 生成设置参数的命令
tmp_param: Param = self.dict[key]
tmp_param.set_all_vals(vals)
self.dict[key] = tmp_param
self.dict.commit()
_cmds = tmp_param.gen_cmd_write_all()
for cmd in _cmds:
self.task_queue.put(cmd)
def gen_read_all_params(self):
for itm_param in self.dict.items():
self.get_param_vals(itm_param[0])
def gen_write_all_params(self):
for itm_param in self.dict.items():
self.sync_param_to_device(itm_param[0])
def dump_all_params(self):
# 按csv文件的顺序输出参数
param_data_list = []
with open(PARAMS_DEFINES, mode='r', encoding='utf-8') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for idx, row_data in enumerate(csv_reader):
# 跳过第一行
if idx == 0:
continue
_param = self.dict[row_data[0]]
param_data_list.append({
"name": _param.name,
"addr": _param.addr(),
"len": _param.len,
"val": _param.list_val
})
return param_data_list
def import_params_data(self, param_data_list):
for itm in param_data_list:
_param: Param = self.dict[itm['name']]
_param.list_val = itm['val']
_param.list_set_val = itm['val']
# 修复BUG导入数据无效没有把_param写回数据库
self.dict[itm['name']] = _param
self.dict.commit()
def sync_param_to_device(self, key):
tmp_param: Param = self.dict[key]
_cmds = tmp_param.gen_cmd_write_all()
for cmd in _cmds:
self.task_queue.put(cmd)
def set_param_one_val(self, p_name, idx, val):
"""
设置名为key的变量的index为idx的值为val
:param p_name:
:param idx:
:param val:
:return:
"""
tmp_param: Param = self.dict[p_name]
tmp_param.set_one_val(idx, val)
self.dict[p_name] = tmp_param
# 更新数据库
self.dict.commit()
_cmd = tmp_param.gen_cmd_write_one(idx)
self.task_queue.put(_cmd)
# 写入后立即获取
self.get_param_vals(p_name)
def get_param_vals(self, p_name):
_param_real_name = busybox.extract_param_names_plain(p_name)[0][0]
tmp_param: Param = self.dict[_param_real_name]
_cmds = tmp_param.gen_cmd_read_all()
for cmd in _cmds:
self.task_queue.put(cmd)
def process_response(self, p_cmd:ComCmd, res_data):
param = self.dict[p_cmd.param_name]
param.process_cmd_response(p_cmd, res_data)
self.dict[p_cmd.param_name] = param
self.dict.commit()
if __name__ == '__main__':
param_manager1 = ParamManager()
param_manager2 = ParamManager()
param_manager2.data = 10
print(param_manager1.data)
test_param = param_manager1.get('orderOne')
print(test_param.values())
print(test_param.list_set_val)
param_manager1.set_param_all_vals('orderOne', [0])