import busybox from communication.com_cmd import ComCmd from math import ceil class Param: BLOCK_SIZE = 60 def __init__(self, param_name, _id, length): self.name = param_name self.id = _id self.len = length self.list_val = [0] * length self.list_set_val = [0] * length self._start_addr = int(self.id) self._block_num = ceil(self.len / self.BLOCK_SIZE) def values(self): return self.list_val def set_all_vals(self, vals): self.list_set_val = vals def set_one_val(self, idx, val): self.list_set_val[idx] = val def update(self, val): self.list_val = val # 由param_manager调用 def update_one(self, idx, val): self.list_val[idx] = val def setval(self): pass def process_cmd_response(self, p_cmd: ComCmd, response_data): # 对Cmd响应进行处理 if p_cmd.op == ComCmd.OperationType.READ: _start_idx = int((p_cmd.addr - self._start_addr) / 2) self.list_val[_start_idx:_start_idx+self.len] = p_cmd.resolve_read_response(response_data) # 生成read_cmd def gen_cmd_read_all(self) -> list[ComCmd]: _cmd_list = [] for i in range(self._block_num): # 如果是最后一个block if i == self._block_num - 1: print(f'这是最后一个Block, block{i}') print((self.len - self.BLOCK_SIZE*i)*2) _cmd = ComCmd( ComCmd.OperationType.READ, self.name, self._start_addr + i*self.BLOCK_SIZE*2, (self.len - self.BLOCK_SIZE*i)*2 ) else: _cmd = ComCmd( ComCmd.OperationType.READ, self.name, self._start_addr + i*self.BLOCK_SIZE*2, self.BLOCK_SIZE*2 ) _cmd_list.append(_cmd) return _cmd_list def gen_cmd_write_one(self, idx): """ 生成指令,向指定idx写入值 :param idx: :return: """ _cmd = ComCmd( ComCmd.OperationType.WRITE, self.name, self._start_addr + idx*2, 2, write_vals=[self.list_set_val[idx]] ) return _cmd def addr(self): return self._start_addr # 生成write_cmd def gen_cmd_write_all(self): _cmd_list = [] for i in range(self._block_num): # 如果是最后一个block if i == self._block_num - 1: _cmd = ComCmd( ComCmd.OperationType.WRITE, self.name, self._start_addr + i * self.BLOCK_SIZE * 2, (self.len - self.BLOCK_SIZE * i) * 2, write_vals=self.list_set_val[ self.BLOCK_SIZE * i:(self.len + 1) ] ) else: _cmd = ComCmd( ComCmd.OperationType.WRITE, self.name, self._start_addr + i * self.BLOCK_SIZE * 2, self.BLOCK_SIZE * 2, write_vals=busybox.expend_vals(self.list_set_val[self.BLOCK_SIZE*i:self.BLOCK_SIZE*(i+1)]) ) _cmd_list.append(_cmd) return _cmd_list import modbus_tk.defines as cst from modbus_tk.modbus_rtu import RtuMaster from sqlitedict import SqliteDict # 用modbus_tk模块测试Param类 if __name__ == '__main__': test_param = Param('test_param', 1000, 200) test_param.set_all_vals([float(i) for i in range(200)]) cmd_list = test_param.gen_cmd_write_all() for item_cmd in cmd_list: print(f''' name: {item_cmd.param_name}, 操作码: {item_cmd.op}, 地址: {item_cmd.addr}, 长度: {item_cmd.len}, write_vals数据:{item_cmd.modbus_write_vals()} ''') # test_param = Param('test_param', 1000, 200) # test_param.set_vals([float(i) for i in range(200)]) # print(test_param.gen_write_cmd(2).modbus_write_vals()) res_data = [17204, 0, 17205, 0, 17206, 0, 17207, 0, 17208, 0, 17209, 0, 17210, 0, 17211, 0, 17212, 0, 17213, 0, 17214, 0, 17215, 0, 17216, 0, 17217, 0, 17218, 0, 17219, 0, 17220, 0, 17221, 0, 17222, 0, 17223, 0] cmd = ComCmd(ComCmd.OperationType.READ, 'test_param', 1360, 40) test_param.process_cmd_response(cmd, res_data) print(test_param.list_val) # print(test_param.list_val)