286 lines
9.6 KiB
Python
286 lines
9.6 KiB
Python
import json
|
|
import struct
|
|
import sys
|
|
from dataclasses import dataclass
|
|
|
|
from PySide6.QtCore import QCoreApplication, QByteArray, Slot
|
|
from PySide6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress
|
|
from PySide6.QtCore import Signal
|
|
|
|
from param_manager import ParamManager
|
|
import modbus_tk
|
|
import modbus_tk.defines as cst
|
|
import modbus_tk.modbus_rtu as modbus_rtu
|
|
from typing import Dict
|
|
import serial
|
|
|
|
|
|
@dataclass
|
|
class Parameter:
|
|
name: str
|
|
offset: int
|
|
data_type: str
|
|
array_index: int = -1
|
|
|
|
def __str__(self):
|
|
return f"{self.name} at offset {self.offset} ({self.data_type})"
|
|
|
|
|
|
class ParameterManager:
|
|
def __init__(self):
|
|
self.parameters: Dict[str, Parameter] = {}
|
|
self._load_parameters()
|
|
|
|
def _load_parameters(self):
|
|
# Load parameters from the structure file
|
|
with open('struct_members_2.txt', 'r') as f:
|
|
for line in f:
|
|
parts = line.strip().split(':')
|
|
if len(parts) != 2:
|
|
continue
|
|
|
|
name = parts[0].strip()
|
|
offset_type = parts[1].strip().split()
|
|
offset = int(offset_type[1])
|
|
data_type = offset_type[2].strip('()')
|
|
|
|
self.parameters[name] = Parameter(name, offset, data_type)
|
|
|
|
|
|
class ModbusController:
|
|
def __init__(self, modbus_config):
|
|
self.master = modbus_rtu.RtuMaster(
|
|
serial.Serial(port=modbus_config["com_port"], baudrate=modbus_config["baud_rate"], bytesize=8, parity='N', stopbits=1)
|
|
)
|
|
self.master.set_timeout(1.0)
|
|
self.slave_addr = 1
|
|
self.cst = cst
|
|
|
|
def _byte_addr_to_register_addr(self, byte_addr: int) -> int:
|
|
"""Convert byte address to Modbus register address"""
|
|
return byte_addr // 2
|
|
|
|
def read_parameter(self, param: Parameter):
|
|
# Calculate register address from byte offset
|
|
reg_addr = self._byte_addr_to_register_addr(param.offset)
|
|
print(reg_addr)
|
|
print('--------------')
|
|
print(param.data_type)
|
|
|
|
# Read two consecutive 16-bit registers
|
|
registers = self.master.execute(
|
|
self.slave_addr,
|
|
self.cst.READ_HOLDING_REGISTERS,
|
|
reg_addr,
|
|
2
|
|
)
|
|
swapped_registers = (registers[0], registers[1])
|
|
bytes_value = struct.pack('<HH', *swapped_registers)
|
|
# Convert to float32 little endian byte-swap
|
|
# For byte-swap, we need to swap the registers and then interpret as little endian
|
|
if param.data_type == "float":
|
|
return struct.unpack('<f', bytes_value)[0]
|
|
|
|
elif param.data_type == "int32_t":
|
|
return struct.unpack('<i', bytes_value)[0]
|
|
|
|
else:
|
|
pass
|
|
return 0
|
|
# throw an error. { param.datatype } dataType not supported.
|
|
|
|
def write_parameter(self, param: Parameter, value):
|
|
# Calculate register address from byte offset
|
|
reg_addr = self._byte_addr_to_register_addr(param.offset)
|
|
|
|
if param.data_type == "float":
|
|
print('convert')
|
|
# Convert float to two 16-bit registers using little endian byte-swap
|
|
bytes_value = struct.pack('<f', float(value))
|
|
|
|
elif param.data_type == "int32_t":
|
|
# Convert float to two 16-bit registers using little endian byte-swap
|
|
|
|
bytes_value = struct.pack('<i', int(value))
|
|
|
|
else:
|
|
bytes_value = 0
|
|
registers = struct.unpack('<HH', bytes_value)
|
|
# Swap registers for byte-swap format
|
|
swapped_registers = (registers[0], registers[1])
|
|
|
|
print(swapped_registers)
|
|
|
|
self.master.execute(
|
|
self.slave_addr,
|
|
self.cst.WRITE_MULTIPLE_REGISTERS,
|
|
reg_addr,
|
|
output_value=swapped_registers
|
|
)
|
|
|
|
def __del__(self):
|
|
try:
|
|
self.master.close()
|
|
except:
|
|
pass
|
|
|
|
class TcpServer(QCoreApplication):
|
|
def __init__(self, server_config, modbus_config):
|
|
super().__init__(sys.argv)
|
|
|
|
# 创建一个 TCP 服务器对象
|
|
self.server = QTcpServer(self)
|
|
self.host = server_config["host"]
|
|
self.port = server_config["port"]
|
|
|
|
# 连接信号,新的客户端连接时触发
|
|
self.server.newConnection.connect(self.on_new_connection)
|
|
# self.param_manager = ParamManager()
|
|
self.param_manager = ParameterManager()
|
|
self.modbus = ModbusController(modbus_config)
|
|
# 绑定并开始监听指定的地址和端口
|
|
if not self.server.listen(QHostAddress(self.host), self.port):
|
|
print(f"Server could not start on {self.host}:{self.port}")
|
|
sys.exit(1)
|
|
|
|
print(f"Server started on {self.host}:{self.port}")
|
|
self.buffer_size = 65536 # Increased buffer size
|
|
|
|
@Slot()
|
|
def on_new_connection(self):
|
|
# 获取客户端连接的 socket 对象
|
|
client_socket = self.server.nextPendingConnection()
|
|
|
|
# 连接信号
|
|
client_socket.readyRead.connect(lambda: self.on_ready_read(client_socket))
|
|
client_socket.disconnected.connect(lambda: self.on_disconnected(client_socket))
|
|
|
|
print(f"New connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()}")
|
|
|
|
# 发送欢迎消息给客户端
|
|
client_socket.flush() # 确保数据已发送
|
|
|
|
@Slot()
|
|
def on_ready_read(self, client_socket: QTcpSocket):
|
|
try:
|
|
# Read all available data until we have a complete message
|
|
data = QByteArray()
|
|
while client_socket.bytesAvailable():
|
|
data.append(client_socket.readAll())
|
|
|
|
# Wait a short time for more data if needed
|
|
if client_socket.waitForReadyRead(100): # 100ms timeout
|
|
continue
|
|
break
|
|
|
|
if not data:
|
|
return
|
|
|
|
print(f"Received from client: {data.data().decode()}")
|
|
|
|
try:
|
|
data_obj = json.loads(data.data().decode())
|
|
res, res_data = self.request_process(data_obj)
|
|
token = data_obj["token"]
|
|
cmd_type = data_obj["cmd"]
|
|
|
|
if res:
|
|
if cmd_type == "get_params":
|
|
res_obj = {
|
|
"cmd": cmd_type,
|
|
"token": token,
|
|
"status": 0,
|
|
"data": res_data
|
|
}
|
|
|
|
if cmd_type == "set_params":
|
|
res_obj = {
|
|
"cmd": cmd_type,
|
|
"token": token,
|
|
"status": 0,
|
|
}
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f"JSON decode error: {e}")
|
|
res_obj = {
|
|
"status": -1,
|
|
"msg": "JSON decode error."
|
|
}
|
|
except Exception as e:
|
|
print(f"Processing error: {e}")
|
|
res_obj = {
|
|
"status": -1,
|
|
"msg": str(e)
|
|
}
|
|
|
|
# Send response
|
|
response = json.dumps(res_obj)
|
|
response_data = response.encode()
|
|
print(response)
|
|
|
|
# Ensure complete send
|
|
total_sent = 0
|
|
while total_sent < len(response_data):
|
|
sent = client_socket.write(response_data[total_sent:])
|
|
if sent == 0:
|
|
raise RuntimeError("Socket connection broken")
|
|
total_sent += sent
|
|
|
|
client_socket.flush()
|
|
|
|
except Exception as e:
|
|
print(f"Error in on_ready_read: {e}")
|
|
|
|
@Slot()
|
|
def on_disconnected(self, client_socket: QTcpSocket):
|
|
print(f"Connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()} closed.")
|
|
client_socket.deleteLater() # 清理套接字资源
|
|
|
|
|
|
def request_process(self, data_obj):
|
|
try:
|
|
#print(data_obj['param_names'])
|
|
cmd_type = data_obj["cmd"]
|
|
res_data = None
|
|
res = False
|
|
match cmd_type:
|
|
case "get_params":
|
|
params = data_obj["data"]["params"]
|
|
param_data = self.handle_get_params(params)
|
|
res_data = param_data
|
|
res = True
|
|
case "set_params":
|
|
params = data_obj["data"]["params"]
|
|
if self.handle_set_params(params): # 如果参数设置成功
|
|
res = True
|
|
|
|
return res, res_data
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
def handle_get_params(self, params: Dict) -> Dict:
|
|
result = {}
|
|
for param_name in params:
|
|
if param_name in self.param_manager.parameters:
|
|
param = self.param_manager.parameters[param_name]
|
|
value = self.modbus.read_parameter(param)
|
|
result[param_name] = value
|
|
|
|
return result
|
|
|
|
def handle_set_params(self, params: Dict) -> bool:
|
|
print('handle set')
|
|
for param_name, value in params.items():
|
|
if param_name in self.param_manager.parameters:
|
|
param = self.param_manager.parameters[param_name]
|
|
self.modbus.write_parameter(param, value)
|
|
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# app = TcpServer("192.168.5.4", 12345)
|
|
app = TcpServer(server_config={"host": "192.168.5.4", "port": 12345}, modbus_config={"com_port": "COM21", "baud_rate": 9600})
|
|
sys.exit(app.exec())
|