brisonus_app_eq/ERNCTuningTool/communication/param.py
2025-02-18 22:05:52 +08:00

148 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import busybox
from communication.com_cmd import ComCmd
from math import ceil
class Param:
BLOCK_SIZE = 60
def __init__(self, param_name, _id, length):
self.name = param_name
self.id = _id
self.len = length
self.list_val = [0] * length
self.list_set_val = [0] * length
self._start_addr = int(self.id)
self._block_num = ceil(self.len / self.BLOCK_SIZE)
def values(self):
return self.list_val
def set_all_vals(self, vals):
self.list_set_val = vals
def set_one_val(self, idx, val):
self.list_set_val[idx] = val
def update(self, val):
self.list_val = val
# 由param_manager调用
def update_one(self, idx, val):
self.list_val[idx] = val
def setval(self):
pass
def process_cmd_response(self, p_cmd: ComCmd, response_data):
# 对Cmd响应进行处理
if p_cmd.op == ComCmd.OperationType.READ:
_start_idx = int((p_cmd.addr - self._start_addr) / 2)
self.list_val[_start_idx:_start_idx+self.len] = p_cmd.resolve_read_response(response_data)
# 生成read_cmd
def gen_cmd_read_all(self) -> list[ComCmd]:
_cmd_list = []
for i in range(self._block_num):
# 如果是最后一个block
if i == self._block_num - 1:
print(f'这是最后一个Block, block{i}')
print((self.len - self.BLOCK_SIZE*i)*2)
_cmd = ComCmd(
ComCmd.OperationType.READ,
self.name,
self._start_addr + i*self.BLOCK_SIZE*2,
(self.len - self.BLOCK_SIZE*i)*2
)
else:
_cmd = ComCmd(
ComCmd.OperationType.READ,
self.name,
self._start_addr + i*self.BLOCK_SIZE*2,
self.BLOCK_SIZE*2
)
_cmd_list.append(_cmd)
return _cmd_list
def gen_cmd_write_one(self, idx):
"""
生成指令向指定idx写入值
:param idx:
:return:
"""
_cmd = ComCmd(
ComCmd.OperationType.WRITE,
self.name,
self._start_addr + idx*2,
2,
write_vals=[self.list_set_val[idx]]
)
return _cmd
def addr(self):
return self._start_addr
# 生成write_cmd
def gen_cmd_write_all(self):
_cmd_list = []
for i in range(self._block_num):
# 如果是最后一个block
if i == self._block_num - 1:
_cmd = ComCmd(
ComCmd.OperationType.WRITE,
self.name,
self._start_addr + i * self.BLOCK_SIZE * 2,
(self.len - self.BLOCK_SIZE * i) * 2,
write_vals=self.list_set_val[
self.BLOCK_SIZE * i:(self.len + 1)
]
)
else:
_cmd = ComCmd(
ComCmd.OperationType.WRITE,
self.name,
self._start_addr + i * self.BLOCK_SIZE * 2,
self.BLOCK_SIZE * 2,
write_vals=busybox.expend_vals(self.list_set_val[self.BLOCK_SIZE*i:self.BLOCK_SIZE*(i+1)])
)
_cmd_list.append(_cmd)
return _cmd_list
import modbus_tk.defines as cst
from modbus_tk.modbus_rtu import RtuMaster
from sqlitedict import SqliteDict
# 用modbus_tk模块测试Param类
if __name__ == '__main__':
test_param = Param('test_param', 1000, 200)
test_param.set_all_vals([float(i) for i in range(200)])
cmd_list = test_param.gen_cmd_write_all()
for item_cmd in cmd_list:
print(f'''
name: {item_cmd.param_name},
操作码: {item_cmd.op},
地址: {item_cmd.addr},
长度: {item_cmd.len},
write_vals数据{item_cmd.modbus_write_vals()}
''')
# test_param = Param('test_param', 1000, 200)
# test_param.set_vals([float(i) for i in range(200)])
# print(test_param.gen_write_cmd(2).modbus_write_vals())
res_data = [17204, 0, 17205, 0, 17206, 0, 17207, 0, 17208, 0, 17209, 0, 17210, 0, 17211, 0, 17212, 0, 17213, 0, 17214, 0, 17215, 0, 17216, 0, 17217, 0, 17218, 0, 17219, 0, 17220, 0, 17221, 0, 17222, 0, 17223, 0]
cmd = ComCmd(ComCmd.OperationType.READ, 'test_param', 1360, 40)
test_param.process_cmd_response(cmd, res_data)
print(test_param.list_val)
# print(test_param.list_val)