brisonus_app_eq/ERNCTuningTool/communication/param_manager.py
2025-02-18 22:05:52 +08:00

161 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from queue import Queue
from sqlitedict import SqliteDict
import busybox
from communication.com_cmd import ComCmd
from communication.param import Param
from env import PARAMS_DB_PATH, TB_NAME, PARAMS_DEFINES
import csv
class ParamManager:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self.data = 10
self.task_queue = Queue()
#
self.dict = SqliteDict(PARAMS_DB_PATH, tablename=TB_NAME, autocommit=True)
self.init_params_db()
def init_params_db(self):
with open(PARAMS_DEFINES, mode='r', encoding='utf-8') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for idx, row_data in enumerate(csv_reader):
if idx == 0:
continue
# 判断db中是否存在当前名称的参数
if self.dict.get(row_data[0]) is None:
self.dict[row_data[0]] = Param(
str(row_data[0]),
int(row_data[1]),
int(row_data[2]))
self.dict.commit()
#
for itm_param in self.dict.items():
_param: Param = itm_param[1]
_param.list_val = _param.list_set_val
self.dict[itm_param[0]] = _param
def get(self, key)->Param:
"""
通过key值获取参数变量
:param key:
:return:
"""
return self.dict[key]
def set_param_all_vals(self, key, vals: list):
"""
设置名为key的变量的所有值
:param key:
:param vals:
:return:
"""
# 生成设置参数的命令
tmp_param: Param = self.dict[key]
tmp_param.set_all_vals(vals)
self.dict[key] = tmp_param
self.dict.commit()
_cmds = tmp_param.gen_cmd_write_all()
for cmd in _cmds:
self.task_queue.put(cmd)
def gen_read_all_params(self):
for itm_param in self.dict.items():
self.get_param_vals(itm_param[0])
def gen_write_all_params(self):
for itm_param in self.dict.items():
self.sync_param_to_device(itm_param[0])
def dump_all_params(self):
# 按csv文件的顺序输出参数
param_data_list = []
with open(PARAMS_DEFINES, mode='r', encoding='utf-8') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for idx, row_data in enumerate(csv_reader):
# 跳过第一行
if idx == 0:
continue
_param = self.dict[row_data[0]]
param_data_list.append({
"name": _param.name,
"addr": _param.addr(),
"len": _param.len,
"val": _param.list_val
})
return param_data_list
def import_params_data(self, param_data_list):
for itm in param_data_list:
_param: Param = self.dict[itm['name']]
_param.list_val = itm['val']
_param.list_set_val = itm['val']
# 修复BUG导入数据无效没有把_param写回数据库
self.dict[itm['name']] = _param
self.dict.commit()
def sync_param_to_device(self, key):
tmp_param: Param = self.dict[key]
_cmds = tmp_param.gen_cmd_write_all()
for cmd in _cmds:
self.task_queue.put(cmd)
def set_param_one_val(self, p_name, idx, val):
"""
设置名为key的变量的index为idx的值为val
:param p_name:
:param idx:
:param val:
:return:
"""
tmp_param: Param = self.dict[p_name]
tmp_param.set_one_val(idx, val)
self.dict[p_name] = tmp_param
# 更新数据库
self.dict.commit()
_cmd = tmp_param.gen_cmd_write_one(idx)
self.task_queue.put(_cmd)
# 写入后立即获取
self.get_param_vals(p_name)
def get_param_vals(self, p_name):
_param_real_name = busybox.extract_param_names_plain(p_name)[0][0]
tmp_param: Param = self.dict[_param_real_name]
_cmds = tmp_param.gen_cmd_read_all()
for cmd in _cmds:
self.task_queue.put(cmd)
def process_response(self, p_cmd:ComCmd, res_data):
param = self.dict[p_cmd.param_name]
param.process_cmd_response(p_cmd, res_data)
self.dict[p_cmd.param_name] = param
self.dict.commit()
if __name__ == '__main__':
param_manager1 = ParamManager()
param_manager2 = ParamManager()
param_manager2.data = 10
print(param_manager1.data)
test_param = param_manager1.get('orderOne')
print(test_param.values())
print(test_param.list_set_val)
param_manager1.set_param_all_vals('orderOne', [0])