148 lines
4.4 KiB
Python
148 lines
4.4 KiB
Python
![]() |
import busybox
|
|||
|
from communication.com_cmd import ComCmd
|
|||
|
from math import ceil
|
|||
|
|
|||
|
|
|||
|
class Param:
|
|||
|
BLOCK_SIZE = 60
|
|||
|
def __init__(self, param_name, _id, length):
|
|||
|
self.name = param_name
|
|||
|
self.id = _id
|
|||
|
self.len = length
|
|||
|
self.list_val = [0] * length
|
|||
|
self.list_set_val = [0] * length
|
|||
|
|
|||
|
self._start_addr = int(self.id)
|
|||
|
self._block_num = ceil(self.len / self.BLOCK_SIZE)
|
|||
|
|
|||
|
def values(self):
|
|||
|
return self.list_val
|
|||
|
|
|||
|
def set_all_vals(self, vals):
|
|||
|
self.list_set_val = vals
|
|||
|
|
|||
|
def set_one_val(self, idx, val):
|
|||
|
self.list_set_val[idx] = val
|
|||
|
|
|||
|
def update(self, val):
|
|||
|
self.list_val = val
|
|||
|
|
|||
|
# 由param_manager调用
|
|||
|
def update_one(self, idx, val):
|
|||
|
self.list_val[idx] = val
|
|||
|
|
|||
|
def setval(self):
|
|||
|
pass
|
|||
|
|
|||
|
def process_cmd_response(self, p_cmd: ComCmd, response_data):
|
|||
|
# 对Cmd响应进行处理
|
|||
|
if p_cmd.op == ComCmd.OperationType.READ:
|
|||
|
_start_idx = int((p_cmd.addr - self._start_addr) / 2)
|
|||
|
|
|||
|
self.list_val[_start_idx:_start_idx+self.len] = p_cmd.resolve_read_response(response_data)
|
|||
|
|
|||
|
# 生成read_cmd
|
|||
|
def gen_cmd_read_all(self) -> list[ComCmd]:
|
|||
|
_cmd_list = []
|
|||
|
|
|||
|
for i in range(self._block_num):
|
|||
|
# 如果是最后一个block
|
|||
|
if i == self._block_num - 1:
|
|||
|
print(f'这是最后一个Block, block{i}')
|
|||
|
print((self.len - self.BLOCK_SIZE*i)*2)
|
|||
|
_cmd = ComCmd(
|
|||
|
ComCmd.OperationType.READ,
|
|||
|
self.name,
|
|||
|
self._start_addr + i*self.BLOCK_SIZE*2,
|
|||
|
(self.len - self.BLOCK_SIZE*i)*2
|
|||
|
)
|
|||
|
else:
|
|||
|
_cmd = ComCmd(
|
|||
|
ComCmd.OperationType.READ,
|
|||
|
self.name,
|
|||
|
self._start_addr + i*self.BLOCK_SIZE*2,
|
|||
|
self.BLOCK_SIZE*2
|
|||
|
)
|
|||
|
_cmd_list.append(_cmd)
|
|||
|
|
|||
|
return _cmd_list
|
|||
|
|
|||
|
def gen_cmd_write_one(self, idx):
|
|||
|
"""
|
|||
|
生成指令,向指定idx写入值
|
|||
|
:param idx:
|
|||
|
:return:
|
|||
|
"""
|
|||
|
_cmd = ComCmd(
|
|||
|
ComCmd.OperationType.WRITE,
|
|||
|
self.name,
|
|||
|
self._start_addr + idx*2,
|
|||
|
2,
|
|||
|
write_vals=[self.list_set_val[idx]]
|
|||
|
)
|
|||
|
|
|||
|
return _cmd
|
|||
|
|
|||
|
def addr(self):
|
|||
|
return self._start_addr
|
|||
|
|
|||
|
# 生成write_cmd
|
|||
|
def gen_cmd_write_all(self):
|
|||
|
_cmd_list = []
|
|||
|
|
|||
|
for i in range(self._block_num):
|
|||
|
# 如果是最后一个block
|
|||
|
if i == self._block_num - 1:
|
|||
|
_cmd = ComCmd(
|
|||
|
ComCmd.OperationType.WRITE,
|
|||
|
self.name,
|
|||
|
self._start_addr + i * self.BLOCK_SIZE * 2,
|
|||
|
(self.len - self.BLOCK_SIZE * i) * 2,
|
|||
|
write_vals=self.list_set_val[
|
|||
|
self.BLOCK_SIZE * i:(self.len + 1)
|
|||
|
]
|
|||
|
)
|
|||
|
else:
|
|||
|
_cmd = ComCmd(
|
|||
|
ComCmd.OperationType.WRITE,
|
|||
|
self.name,
|
|||
|
self._start_addr + i * self.BLOCK_SIZE * 2,
|
|||
|
self.BLOCK_SIZE * 2,
|
|||
|
write_vals=busybox.expend_vals(self.list_set_val[self.BLOCK_SIZE*i:self.BLOCK_SIZE*(i+1)])
|
|||
|
)
|
|||
|
_cmd_list.append(_cmd)
|
|||
|
|
|||
|
return _cmd_list
|
|||
|
|
|||
|
|
|||
|
import modbus_tk.defines as cst
|
|||
|
from modbus_tk.modbus_rtu import RtuMaster
|
|||
|
|
|||
|
from sqlitedict import SqliteDict
|
|||
|
|
|||
|
|
|||
|
# 用modbus_tk模块测试Param类
|
|||
|
if __name__ == '__main__':
|
|||
|
test_param = Param('test_param', 1000, 200)
|
|||
|
test_param.set_all_vals([float(i) for i in range(200)])
|
|||
|
cmd_list = test_param.gen_cmd_write_all()
|
|||
|
|
|||
|
for item_cmd in cmd_list:
|
|||
|
print(f'''
|
|||
|
name: {item_cmd.param_name},
|
|||
|
操作码: {item_cmd.op},
|
|||
|
地址: {item_cmd.addr},
|
|||
|
长度: {item_cmd.len},
|
|||
|
write_vals数据:{item_cmd.modbus_write_vals()}
|
|||
|
''')
|
|||
|
|
|||
|
# test_param = Param('test_param', 1000, 200)
|
|||
|
# test_param.set_vals([float(i) for i in range(200)])
|
|||
|
# print(test_param.gen_write_cmd(2).modbus_write_vals())
|
|||
|
res_data = [17204, 0, 17205, 0, 17206, 0, 17207, 0, 17208, 0, 17209, 0, 17210, 0, 17211, 0, 17212, 0, 17213, 0, 17214, 0, 17215, 0, 17216, 0, 17217, 0, 17218, 0, 17219, 0, 17220, 0, 17221, 0, 17222, 0, 17223, 0]
|
|||
|
cmd = ComCmd(ComCmd.OperationType.READ, 'test_param', 1360, 40)
|
|||
|
test_param.process_cmd_response(cmd, res_data)
|
|||
|
print(test_param.list_val)
|
|||
|
|
|||
|
# print(test_param.list_val)
|