160 lines
4.8 KiB
Python
160 lines
4.8 KiB
Python
from queue import Queue
|
||
|
||
from sqlitedict import SqliteDict
|
||
|
||
import busybox
|
||
from communication.com_cmd import ComCmd
|
||
from communication.param import Param
|
||
from env import PARAMS_DB_PATH, TB_NAME, PARAMS_DEFINES
|
||
import csv
|
||
|
||
|
||
class ParamManager:
|
||
_instance = None
|
||
def __new__(cls, *args, **kwargs):
|
||
if not cls._instance:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
self.data = 10
|
||
self.task_queue = Queue()
|
||
#
|
||
self.dict = SqliteDict(PARAMS_DB_PATH, tablename=TB_NAME, autocommit=True)
|
||
self.init_params_db()
|
||
|
||
def init_params_db(self):
|
||
with open(PARAMS_DEFINES, mode='r', encoding='utf-8') as csv_file:
|
||
csv_reader = csv.reader(csv_file, delimiter=',')
|
||
for idx, row_data in enumerate(csv_reader):
|
||
if idx == 0:
|
||
continue
|
||
# 判断db中是否存在当前名称的参数
|
||
if self.dict.get(row_data[0]) is None:
|
||
self.dict[row_data[0]] = Param(
|
||
str(row_data[0]),
|
||
int(row_data[1]),
|
||
int(row_data[2]))
|
||
self.dict.commit()
|
||
|
||
for itm_param in self.dict.items():
|
||
_param: Param = itm_param[1]
|
||
_param.list_val = _param.list_set_val
|
||
|
||
self.dict[itm_param[0]] = _param
|
||
|
||
|
||
def get(self, key)->Param:
|
||
"""
|
||
通过key值获取参数变量
|
||
:param key:
|
||
:return:
|
||
"""
|
||
return self.dict[key]
|
||
|
||
def set_param_all_vals(self, key, vals: list):
|
||
"""
|
||
设置名为key的变量的所有值
|
||
:param key:
|
||
:param vals:
|
||
:return:
|
||
"""
|
||
# 生成设置参数的命令
|
||
tmp_param: Param = self.dict[key]
|
||
tmp_param.set_all_vals(vals)
|
||
self.dict[key] = tmp_param
|
||
self.dict.commit()
|
||
|
||
_cmds = tmp_param.gen_cmd_write_all()
|
||
for cmd in _cmds:
|
||
self.task_queue.put(cmd)
|
||
|
||
def gen_read_all_params(self):
|
||
for itm_param in self.dict.items():
|
||
self.get_param_vals(itm_param[0])
|
||
|
||
def gen_write_all_params(self):
|
||
for itm_param in self.dict.items():
|
||
self.sync_param_to_device(itm_param[0])
|
||
|
||
|
||
def dump_all_params(self):
|
||
# 按csv文件的顺序输出参数
|
||
param_data_list = []
|
||
with open(PARAMS_DEFINES, mode='r', encoding='utf-8') as csv_file:
|
||
csv_reader = csv.reader(csv_file, delimiter=',')
|
||
for idx, row_data in enumerate(csv_reader):
|
||
# 跳过第一行
|
||
if idx == 0:
|
||
continue
|
||
_param = self.dict[row_data[0]]
|
||
param_data_list.append({
|
||
"name": _param.name,
|
||
"addr": _param.addr(),
|
||
"len": _param.len,
|
||
"val": _param.list_val
|
||
})
|
||
return param_data_list
|
||
|
||
def import_params_data(self, param_data_list):
|
||
for itm in param_data_list:
|
||
_param: Param = self.dict[itm['name']]
|
||
_param.list_val = itm['val']
|
||
_param.list_set_val = itm['val']
|
||
# 修复BUG,导入数据无效,没有把_param写回数据库
|
||
self.dict[itm['name']] = _param
|
||
self.dict.commit()
|
||
|
||
def sync_param_to_device(self, key):
|
||
tmp_param: Param = self.dict[key]
|
||
_cmds = tmp_param.gen_cmd_write_all()
|
||
for cmd in _cmds:
|
||
self.task_queue.put(cmd)
|
||
|
||
def set_param_one_val(self, p_name, idx, val):
|
||
"""
|
||
设置名为key的变量的index为idx的值为val
|
||
:param p_name:
|
||
:param idx:
|
||
:param val:
|
||
:return:
|
||
"""
|
||
tmp_param: Param = self.dict[p_name]
|
||
tmp_param.set_one_val(idx, val)
|
||
self.dict[p_name] = tmp_param
|
||
# 更新数据库
|
||
self.dict.commit()
|
||
_cmd = tmp_param.gen_cmd_write_one(idx)
|
||
self.task_queue.put(_cmd)
|
||
# 写入后立即获取
|
||
self.get_param_vals(p_name)
|
||
|
||
|
||
def get_param_vals(self, p_name):
|
||
_param_real_name = busybox.extract_param_names_plain(p_name)[0][0]
|
||
tmp_param: Param = self.dict[_param_real_name]
|
||
_cmds = tmp_param.gen_cmd_read_all()
|
||
for cmd in _cmds:
|
||
self.task_queue.put(cmd)
|
||
|
||
def process_response(self, p_cmd:ComCmd, res_data):
|
||
param = self.dict[p_cmd.param_name]
|
||
param.process_cmd_response(p_cmd, res_data)
|
||
self.dict[p_cmd.param_name] = param
|
||
self.dict.commit()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
param_manager1 = ParamManager()
|
||
param_manager2 = ParamManager()
|
||
param_manager2.data = 10
|
||
print(param_manager1.data)
|
||
test_param = param_manager1.get('orderOne')
|
||
print(test_param.values())
|
||
print(test_param.list_set_val)
|
||
|
||
param_manager1.set_param_all_vals('orderOne', [0])
|
||
|
||
|
||
|