148 lines
4.4 KiB
Python
148 lines
4.4 KiB
Python
import busybox
|
||
from communication.com_cmd import ComCmd
|
||
from math import ceil
|
||
|
||
|
||
class Param:
|
||
BLOCK_SIZE = 60
|
||
def __init__(self, param_name, _id, length):
|
||
self.name = param_name
|
||
self.id = _id
|
||
self.len = length
|
||
self.list_val = [0] * length
|
||
self.list_set_val = [0] * length
|
||
|
||
self._start_addr = int(self.id)
|
||
self._block_num = ceil(self.len / self.BLOCK_SIZE)
|
||
|
||
def values(self):
|
||
return self.list_val
|
||
|
||
def set_all_vals(self, vals):
|
||
self.list_set_val = vals
|
||
|
||
def set_one_val(self, idx, val):
|
||
self.list_set_val[idx] = val
|
||
|
||
def update(self, val):
|
||
self.list_val = val
|
||
|
||
# 由param_manager调用
|
||
def update_one(self, idx, val):
|
||
self.list_val[idx] = val
|
||
|
||
def setval(self):
|
||
pass
|
||
|
||
def process_cmd_response(self, p_cmd: ComCmd, response_data):
|
||
# 对Cmd响应进行处理
|
||
if p_cmd.op == ComCmd.OperationType.READ:
|
||
_start_idx = int((p_cmd.addr - self._start_addr) / 2)
|
||
|
||
self.list_val[_start_idx:_start_idx+self.len] = p_cmd.resolve_read_response(response_data)
|
||
|
||
# 生成read_cmd
|
||
def gen_cmd_read_all(self) -> list[ComCmd]:
|
||
_cmd_list = []
|
||
|
||
for i in range(self._block_num):
|
||
# 如果是最后一个block
|
||
if i == self._block_num - 1:
|
||
print(f'这是最后一个Block, block{i}')
|
||
print((self.len - self.BLOCK_SIZE*i)*2)
|
||
_cmd = ComCmd(
|
||
ComCmd.OperationType.READ,
|
||
self.name,
|
||
self._start_addr + i*self.BLOCK_SIZE*2,
|
||
(self.len - self.BLOCK_SIZE*i)*2
|
||
)
|
||
else:
|
||
_cmd = ComCmd(
|
||
ComCmd.OperationType.READ,
|
||
self.name,
|
||
self._start_addr + i*self.BLOCK_SIZE*2,
|
||
self.BLOCK_SIZE*2
|
||
)
|
||
_cmd_list.append(_cmd)
|
||
|
||
return _cmd_list
|
||
|
||
def gen_cmd_write_one(self, idx):
|
||
"""
|
||
生成指令,向指定idx写入值
|
||
:param idx:
|
||
:return:
|
||
"""
|
||
_cmd = ComCmd(
|
||
ComCmd.OperationType.WRITE,
|
||
self.name,
|
||
self._start_addr + idx*2,
|
||
2,
|
||
write_vals=[self.list_set_val[idx]]
|
||
)
|
||
|
||
return _cmd
|
||
|
||
def addr(self):
|
||
return self._start_addr
|
||
|
||
# 生成write_cmd
|
||
def gen_cmd_write_all(self):
|
||
_cmd_list = []
|
||
|
||
for i in range(self._block_num):
|
||
# 如果是最后一个block
|
||
if i == self._block_num - 1:
|
||
_cmd = ComCmd(
|
||
ComCmd.OperationType.WRITE,
|
||
self.name,
|
||
self._start_addr + i * self.BLOCK_SIZE * 2,
|
||
(self.len - self.BLOCK_SIZE * i) * 2,
|
||
write_vals=self.list_set_val[
|
||
self.BLOCK_SIZE * i:(self.len + 1)
|
||
]
|
||
)
|
||
else:
|
||
_cmd = ComCmd(
|
||
ComCmd.OperationType.WRITE,
|
||
self.name,
|
||
self._start_addr + i * self.BLOCK_SIZE * 2,
|
||
self.BLOCK_SIZE * 2,
|
||
write_vals=busybox.expend_vals(self.list_set_val[self.BLOCK_SIZE*i:self.BLOCK_SIZE*(i+1)])
|
||
)
|
||
_cmd_list.append(_cmd)
|
||
|
||
return _cmd_list
|
||
|
||
|
||
import modbus_tk.defines as cst
|
||
from modbus_tk.modbus_rtu import RtuMaster
|
||
|
||
from sqlitedict import SqliteDict
|
||
|
||
|
||
# 用modbus_tk模块测试Param类
|
||
if __name__ == '__main__':
|
||
test_param = Param('test_param', 1000, 200)
|
||
test_param.set_all_vals([float(i) for i in range(200)])
|
||
cmd_list = test_param.gen_cmd_write_all()
|
||
|
||
for item_cmd in cmd_list:
|
||
print(f'''
|
||
name: {item_cmd.param_name},
|
||
操作码: {item_cmd.op},
|
||
地址: {item_cmd.addr},
|
||
长度: {item_cmd.len},
|
||
write_vals数据:{item_cmd.modbus_write_vals()}
|
||
''')
|
||
|
||
# test_param = Param('test_param', 1000, 200)
|
||
# test_param.set_vals([float(i) for i in range(200)])
|
||
# print(test_param.gen_write_cmd(2).modbus_write_vals())
|
||
res_data = [17204, 0, 17205, 0, 17206, 0, 17207, 0, 17208, 0, 17209, 0, 17210, 0, 17211, 0, 17212, 0, 17213, 0, 17214, 0, 17215, 0, 17216, 0, 17217, 0, 17218, 0, 17219, 0, 17220, 0, 17221, 0, 17222, 0, 17223, 0]
|
||
cmd = ComCmd(ComCmd.OperationType.READ, 'test_param', 1360, 40)
|
||
test_param.process_cmd_response(cmd, res_data)
|
||
print(test_param.list_val)
|
||
|
||
# print(test_param.list_val)
|