brisonus_app_eq/ERNCTuningTool/communication/param.py

148 lines
4.4 KiB
Python
Raw Normal View History

2025-02-18 22:05:52 +08:00
import busybox
from communication.com_cmd import ComCmd
from math import ceil
class Param:
BLOCK_SIZE = 60
def __init__(self, param_name, _id, length):
self.name = param_name
self.id = _id
self.len = length
self.list_val = [0] * length
self.list_set_val = [0] * length
self._start_addr = int(self.id)
self._block_num = ceil(self.len / self.BLOCK_SIZE)
def values(self):
return self.list_val
def set_all_vals(self, vals):
self.list_set_val = vals
def set_one_val(self, idx, val):
self.list_set_val[idx] = val
def update(self, val):
self.list_val = val
# 由param_manager调用
def update_one(self, idx, val):
self.list_val[idx] = val
def setval(self):
pass
def process_cmd_response(self, p_cmd: ComCmd, response_data):
# 对Cmd响应进行处理
if p_cmd.op == ComCmd.OperationType.READ:
_start_idx = int((p_cmd.addr - self._start_addr) / 2)
self.list_val[_start_idx:_start_idx+self.len] = p_cmd.resolve_read_response(response_data)
# 生成read_cmd
def gen_cmd_read_all(self) -> list[ComCmd]:
_cmd_list = []
for i in range(self._block_num):
# 如果是最后一个block
if i == self._block_num - 1:
print(f'这是最后一个Block, block{i}')
print((self.len - self.BLOCK_SIZE*i)*2)
_cmd = ComCmd(
ComCmd.OperationType.READ,
self.name,
self._start_addr + i*self.BLOCK_SIZE*2,
(self.len - self.BLOCK_SIZE*i)*2
)
else:
_cmd = ComCmd(
ComCmd.OperationType.READ,
self.name,
self._start_addr + i*self.BLOCK_SIZE*2,
self.BLOCK_SIZE*2
)
_cmd_list.append(_cmd)
return _cmd_list
def gen_cmd_write_one(self, idx):
"""
生成指令向指定idx写入值
:param idx:
:return:
"""
_cmd = ComCmd(
ComCmd.OperationType.WRITE,
self.name,
self._start_addr + idx*2,
2,
write_vals=[self.list_set_val[idx]]
)
return _cmd
def addr(self):
return self._start_addr
# 生成write_cmd
def gen_cmd_write_all(self):
_cmd_list = []
for i in range(self._block_num):
# 如果是最后一个block
if i == self._block_num - 1:
_cmd = ComCmd(
ComCmd.OperationType.WRITE,
self.name,
self._start_addr + i * self.BLOCK_SIZE * 2,
(self.len - self.BLOCK_SIZE * i) * 2,
write_vals=self.list_set_val[
self.BLOCK_SIZE * i:(self.len + 1)
]
)
else:
_cmd = ComCmd(
ComCmd.OperationType.WRITE,
self.name,
self._start_addr + i * self.BLOCK_SIZE * 2,
self.BLOCK_SIZE * 2,
write_vals=busybox.expend_vals(self.list_set_val[self.BLOCK_SIZE*i:self.BLOCK_SIZE*(i+1)])
)
_cmd_list.append(_cmd)
return _cmd_list
import modbus_tk.defines as cst
from modbus_tk.modbus_rtu import RtuMaster
from sqlitedict import SqliteDict
# 用modbus_tk模块测试Param类
if __name__ == '__main__':
test_param = Param('test_param', 1000, 200)
test_param.set_all_vals([float(i) for i in range(200)])
cmd_list = test_param.gen_cmd_write_all()
for item_cmd in cmd_list:
print(f'''
name: {item_cmd.param_name},
操作码: {item_cmd.op},
地址: {item_cmd.addr},
长度: {item_cmd.len},
write_vals数据{item_cmd.modbus_write_vals()}
''')
# test_param = Param('test_param', 1000, 200)
# test_param.set_vals([float(i) for i in range(200)])
# print(test_param.gen_write_cmd(2).modbus_write_vals())
res_data = [17204, 0, 17205, 0, 17206, 0, 17207, 0, 17208, 0, 17209, 0, 17210, 0, 17211, 0, 17212, 0, 17213, 0, 17214, 0, 17215, 0, 17216, 0, 17217, 0, 17218, 0, 17219, 0, 17220, 0, 17221, 0, 17222, 0, 17223, 0]
cmd = ComCmd(ComCmd.OperationType.READ, 'test_param', 1360, 40)
test_param.process_cmd_response(cmd, res_data)
print(test_param.list_val)
# print(test_param.list_val)