app_socket_server/socket_server.py
cuijingwei@brisonus.com 8493db2dae 首次提交
2025-02-21 16:00:12 +08:00

231 lines
7.6 KiB
Python

import json
import struct
import sys
from dataclasses import dataclass
from PySide6.QtCore import QCoreApplication, QByteArray, Slot
from PySide6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress
from PySide6.QtCore import Signal
from param_manager import ParamManager
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus_rtu as modbus_rtu
from typing import Dict
import serial
@dataclass
class Parameter:
name: str
offset: int
data_type: str
array_index: int = -1
def __str__(self):
return f"{self.name} at offset {self.offset} ({self.data_type})"
class ParameterManager:
def __init__(self):
self.parameters: Dict[str, Parameter] = {}
self._load_parameters()
def _load_parameters(self):
# Load parameters from the structure file
with open('struct_members_2.txt', 'r') as f:
for line in f:
parts = line.strip().split(':')
if len(parts) != 2:
continue
name = parts[0].strip()
offset_type = parts[1].strip().split()
offset = int(offset_type[1])
data_type = offset_type[2].strip('()')
self.parameters[name] = Parameter(name, offset, data_type)
class ModbusController:
def __init__(self, modbus_config):
self.master = modbus_rtu.RtuMaster(
serial.Serial(port=modbus_config["com_port"], baudrate=modbus_config["baud_rate"], bytesize=8, parity='N', stopbits=1)
)
self.master.set_timeout(1.0)
self.slave_addr = 1
self.cst = cst
def _byte_addr_to_register_addr(self, byte_addr: int) -> int:
"""Convert byte address to Modbus register address"""
return byte_addr // 2
def read_parameter(self, param: Parameter) -> float:
# Calculate register address from byte offset
reg_addr = self._byte_addr_to_register_addr(param.offset)
print(reg_addr)
# Read two consecutive 16-bit registers
registers = self.master.execute(
self.slave_addr,
self.cst.READ_HOLDING_REGISTERS,
reg_addr,
2
)
# Convert to float32 little endian byte-swap
# For byte-swap, we need to swap the registers and then interpret as little endian
swapped_registers = (registers[0], registers[1])
bytes_value = struct.pack('<HH', *swapped_registers)
return struct.unpack('<f', bytes_value)[0]
def write_parameter(self, param: Parameter, value: float):
# Calculate register address from byte offset
reg_addr = self._byte_addr_to_register_addr(param.offset)
# Convert float to two 16-bit registers using little endian byte-swap
bytes_value = struct.pack('<f', float(value))
registers = struct.unpack('<HH', bytes_value)
# Swap registers for byte-swap format
swapped_registers = (registers[0], registers[1])
self.master.execute(
self.slave_addr,
self.cst.WRITE_MULTIPLE_REGISTERS,
reg_addr,
output_value=swapped_registers
)
def __del__(self):
try:
self.master.close()
except:
pass
class TcpServer(QCoreApplication):
def __init__(self, server_config, modbus_config):
super().__init__(sys.argv)
# 创建一个 TCP 服务器对象
self.server = QTcpServer(self)
self.host = server_config["host"]
self.port = server_config["port"]
# 连接信号,新的客户端连接时触发
self.server.newConnection.connect(self.on_new_connection)
# self.param_manager = ParamManager()
self.param_manager = ParameterManager()
self.modbus = ModbusController(modbus_config)
# 绑定并开始监听指定的地址和端口
if not self.server.listen(QHostAddress(self.host), self.port):
print(f"Server could not start on {self.host}:{self.port}")
sys.exit(1)
print(f"Server started on {self.host}:{self.port}")
@Slot()
def on_new_connection(self):
# 获取客户端连接的 socket 对象
client_socket = self.server.nextPendingConnection()
# 连接信号
client_socket.readyRead.connect(lambda: self.on_ready_read(client_socket))
client_socket.disconnected.connect(lambda: self.on_disconnected(client_socket))
print(f"New connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()}")
# 发送欢迎消息给客户端
client_socket.flush() # 确保数据已发送
@Slot()
def on_ready_read(self, client_socket: QTcpSocket):
# 读取客户端发送的数据
data = client_socket.readAll()
print(f"Received from client: {data.data().decode()}")
data_obj = json.loads(data.data().decode())
res, res_data = self.request_process(data_obj)
token = data_obj["token"]
cmd_type = data_obj["cmd"]
res = "something wrong."
if res:
if cmd_type == "get_params":
res_obj = {
"cmd": cmd_type,
"token": token,
"status": 0,
"data": res_data
}
if cmd_type == "set_params":
res_obj = {
"cmd": cmd_type,
"token": token,
"status": 0,
}
response = json.dumps(res_obj)
print(response)
# # 发送响应给客户端
# response = "Server has received your message."
client_socket.write(response.encode())
client_socket.flush()
# print(f"Sent to client: {response}")
@Slot()
def on_disconnected(self, client_socket: QTcpSocket):
print(f"Connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()} closed.")
client_socket.deleteLater() # 清理套接字资源
def request_process(self, data_obj):
try:
#print(data_obj['param_names'])
cmd_type = data_obj["cmd"]
res_data = None
res = False
match cmd_type:
case "get_params":
params = data_obj["data"]["param_names"]
param_data = self.handle_get_params(params)
res_data = param_data
res = True
case "set_params":
params = data_obj["data"]["params"]
if self.handle_set_params(params): # 如果参数设置成功
res = True
return res, res_data
except Exception as e:
print(e)
def handle_get_params(self, params: Dict) -> Dict:
result = {}
for param_name in params:
if param_name in self.param_manager.parameters:
param = self.param_manager.parameters[param_name]
value = self.modbus.read_parameter(param)
result[param_name] = value
return result
def handle_set_params(self, params: Dict) -> bool:
print('handle set')
for param_name, value in params.items():
if param_name in self.param_manager.parameters:
param = self.param_manager.parameters[param_name]
self.modbus.write_parameter(param, value)
return True
if __name__ == "__main__":
# app = TcpServer("192.168.5.4", 12345)
app = TcpServer(server_config={"host": "192.168.5.4", "port": 12345}, modbus_config={"com_port": "COM21", "baud_rate": 9600})
sys.exit(app.exec())