import json import struct import sys import time from dataclasses import dataclass from PySide6.QtCore import QCoreApplication, QByteArray, Slot from PySide6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress from PySide6.QtCore import Signal from param_manager import ParamManager import modbus_tk import modbus_tk.defines as cst import modbus_tk.modbus_rtu as modbus_rtu from typing import Dict import serial @dataclass class Parameter: name: str offset: int data_type: str array_index: int = -1 def __str__(self): return f"{self.name} at offset {self.offset} ({self.data_type})" class ParameterManager: def __init__(self): self.parameters: Dict[str, Parameter] = {} self._load_parameters() def _load_parameters(self): # Load parameters from the structure file with open('struct_members_0222.txt', 'r') as f: for line in f: parts = line.strip().split(':') if len(parts) != 2: continue name = parts[0].strip() offset_type = parts[1].strip().split() offset = int(offset_type[1]) data_type = offset_type[2].strip('()') self.parameters[name] = Parameter(name, offset, data_type) class ModbusController: def __init__(self, modbus_config): self.master = modbus_rtu.RtuMaster( serial.Serial(port=modbus_config["com_port"], baudrate=modbus_config["baud_rate"], bytesize=8, parity='N', stopbits=1) ) self.master.set_timeout(0.1) self.slave_addr = 1 self.cst = cst def _byte_addr_to_register_addr(self, byte_addr: int) -> int: """Convert byte address to Modbus register address""" return byte_addr // 2 def read_parameter(self, param: Parameter): # Calculate register address from byte offset reg_addr = self._byte_addr_to_register_addr(param.offset) print(reg_addr) print('--------------') print(param.data_type) # Read two consecutive 16-bit registers registers = self.master.execute( self.slave_addr, self.cst.READ_HOLDING_REGISTERS, reg_addr, 2 ) swapped_registers = (registers[0], registers[1]) bytes_value = struct.pack(' Dict: result = {} for param_name in params: if param_name in self.param_manager.parameters: param = self.param_manager.parameters[param_name] value = self.modbus.read_parameter(param) result[param_name] = value time.sleep(0.08) return result def handle_set_params(self, params: Dict) -> bool: print('handle set') for param_name, value in params.items(): if param_name in self.param_manager.parameters: param = self.param_manager.parameters[param_name] self.modbus.write_parameter(param, value) time.sleep(0.08) return True if __name__ == "__main__": # app = TcpServer("192.168.5.4", 12345) app = TcpServer(server_config={"host": "192.168.5.7", "port": 12345}, modbus_config={"com_port": "COM21", "baud_rate": 9600}) sys.exit(app.exec())