[update] server端处理通信粘包问题d

socket_server-副本.py包含了最新内容
This commit is contained in:
cuijingwei@brisonus.com 2025-02-22 12:55:40 +08:00
parent 8493db2dae
commit 2803e6a9c6
2 changed files with 316 additions and 29 deletions

281
socket_server - 副本.py Normal file
View File

@ -0,0 +1,281 @@
import json
import struct
import sys
from dataclasses import dataclass
from PySide6.QtCore import QCoreApplication, QByteArray, Slot
from PySide6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress
from PySide6.QtCore import Signal
from param_manager import ParamManager
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus_rtu as modbus_rtu
from typing import Dict
import serial
@dataclass
class Parameter:
name: str
offset: int
data_type: str
array_index: int = -1
def __str__(self):
return f"{self.name} at offset {self.offset} ({self.data_type})"
class ParameterManager:
def __init__(self):
self.parameters: Dict[str, Parameter] = {}
self._load_parameters()
def _load_parameters(self):
# Load parameters from the structure file
with open('struct_members_2.txt', 'r') as f:
for line in f:
parts = line.strip().split(':')
if len(parts) != 2:
continue
name = parts[0].strip()
offset_type = parts[1].strip().split()
offset = int(offset_type[1])
data_type = offset_type[2].strip('()')
self.parameters[name] = Parameter(name, offset, data_type)
class ModbusController:
def __init__(self, modbus_config):
self.master = modbus_rtu.RtuMaster(
serial.Serial(port=modbus_config["com_port"], baudrate=modbus_config["baud_rate"], bytesize=8, parity='N', stopbits=1)
)
self.master.set_timeout(1.0)
self.slave_addr = 1
self.cst = cst
def _byte_addr_to_register_addr(self, byte_addr: int) -> int:
"""Convert byte address to Modbus register address"""
return byte_addr // 2
def read_parameter(self, param: Parameter):
# Calculate register address from byte offset
reg_addr = self._byte_addr_to_register_addr(param.offset)
print(reg_addr)
print('--------------')
print(param.data_type)
# Read two consecutive 16-bit registers
registers = self.master.execute(
self.slave_addr,
self.cst.READ_HOLDING_REGISTERS,
reg_addr,
2
)
swapped_registers = (registers[0], registers[1])
bytes_value = struct.pack('<HH', *swapped_registers)
# Convert to float32 little endian byte-swap
# For byte-swap, we need to swap the registers and then interpret as little endian
if param.data_type == "float32":
return struct.unpack('<f', bytes_value)[0]
elif param.data_type == "int32_t":
return struct.unpack('<i', bytes_value)[0]
else:
pass
return 0
# throw an error. { param.datatype } dataType not supported.
def write_parameter(self, param: Parameter, value: float):
# Calculate register address from byte offset
reg_addr = self._byte_addr_to_register_addr(param.offset)
if param.data_type == "float32":
# Convert float to two 16-bit registers using little endian byte-swap
bytes_value = struct.pack('<f', float(value))
registers = struct.unpack('<HH', bytes_value)
elif param.data_type == "int32_t":
# Convert float to two 16-bit registers using little endian byte-swap
bytes_value = struct.pack('<i', int(value))
else:
bytes_value = 0
registers = struct.unpack('<HH', bytes_value)
# Swap registers for byte-swap format
swapped_registers = (registers[0], registers[1])
self.master.execute(
self.slave_addr,
self.cst.WRITE_MULTIPLE_REGISTERS,
reg_addr,
output_value=swapped_registers
)
def __del__(self):
try:
self.master.close()
except:
pass
class TcpServer(QCoreApplication):
def __init__(self, server_config, modbus_config):
super().__init__(sys.argv)
# 创建一个 TCP 服务器对象
self.server = QTcpServer(self)
self.host = server_config["host"]
self.port = server_config["port"]
# 连接信号,新的客户端连接时触发
self.server.newConnection.connect(self.on_new_connection)
# self.param_manager = ParamManager()
self.param_manager = ParameterManager()
self.modbus = ModbusController(modbus_config)
# 绑定并开始监听指定的地址和端口
if not self.server.listen(QHostAddress(self.host), self.port):
print(f"Server could not start on {self.host}:{self.port}")
sys.exit(1)
print(f"Server started on {self.host}:{self.port}")
self.buffer_size = 65536 # Increased buffer size
@Slot()
def on_new_connection(self):
# 获取客户端连接的 socket 对象
client_socket = self.server.nextPendingConnection()
# 连接信号
client_socket.readyRead.connect(lambda: self.on_ready_read(client_socket))
client_socket.disconnected.connect(lambda: self.on_disconnected(client_socket))
print(f"New connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()}")
# 发送欢迎消息给客户端
client_socket.flush() # 确保数据已发送
@Slot()
def on_ready_read(self, client_socket: QTcpSocket):
try:
# Read all available data until we have a complete message
data = QByteArray()
while client_socket.bytesAvailable():
data.append(client_socket.readAll())
# Wait a short time for more data if needed
if client_socket.waitForReadyRead(100): # 100ms timeout
continue
break
if not data:
return
print(f"Received from client: {data.data().decode()}")
try:
data_obj = json.loads(data.data().decode())
res, res_data = self.request_process(data_obj)
token = data_obj["token"]
cmd_type = data_obj["cmd"]
if res:
if cmd_type == "get_params":
res_obj = {
"cmd": cmd_type,
"token": token,
"status": 0,
"data": res_data
}
if cmd_type == "set_params":
res_obj = {
"cmd": cmd_type,
"token": token,
"status": 0,
}
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
res_obj = {
"status": -1,
"msg": "JSON decode error."
}
except Exception as e:
print(f"Processing error: {e}")
res_obj = {
"status": -1,
"msg": str(e)
}
# Send response
response = json.dumps(res_obj)
response_data = response.encode()
# Ensure complete send
total_sent = 0
while total_sent < len(response_data):
sent = client_socket.write(response_data[total_sent:])
if sent == 0:
raise RuntimeError("Socket connection broken")
total_sent += sent
client_socket.flush()
except Exception as e:
print(f"Error in on_ready_read: {e}")
@Slot()
def on_disconnected(self, client_socket: QTcpSocket):
print(f"Connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()} closed.")
client_socket.deleteLater() # 清理套接字资源
def request_process(self, data_obj):
try:
#print(data_obj['param_names'])
cmd_type = data_obj["cmd"]
res_data = None
res = False
match cmd_type:
case "get_params":
params = data_obj["data"]["param_names"]
param_data = self.handle_get_params(params)
res_data = param_data
res = True
case "set_params":
params = data_obj["data"]["params"]
if self.handle_set_params(params): # 如果参数设置成功
res = True
return res, res_data
except Exception as e:
print(e)
def handle_get_params(self, params: Dict) -> Dict:
result = {}
for param_name in params:
if param_name in self.param_manager.parameters:
param = self.param_manager.parameters[param_name]
value = self.modbus.read_parameter(param)
result[param_name] = value
return result
def handle_set_params(self, params: Dict) -> bool:
print('handle set')
for param_name, value in params.items():
if param_name in self.param_manager.parameters:
param = self.param_manager.parameters[param_name]
self.modbus.write_parameter(param, value)
return True
if __name__ == "__main__":
# app = TcpServer("192.168.5.4", 12345)
app = TcpServer(server_config={"host": "192.168.5.4", "port": 12345}, modbus_config={"com_port": "COM21", "baud_rate": 9600})
sys.exit(app.exec())

View File

@ -64,6 +64,7 @@ class ModbusController:
# Calculate register address from byte offset
reg_addr = self._byte_addr_to_register_addr(param.offset)
print(reg_addr)
print(param.data_type)
# Read two consecutive 16-bit registers
registers = self.master.execute(
@ -140,41 +141,46 @@ class TcpServer(QCoreApplication):
@Slot()
def on_ready_read(self, client_socket: QTcpSocket):
# 读取客户端发送的数据
data = client_socket.readAll()
print(f"Received from client: {data.data().decode()}")
data_obj = json.loads(data.data().decode())
res, res_data = self.request_process(data_obj)
token = data_obj["token"]
cmd_type = data_obj["cmd"]
data = client_socket.read(65536)
print(data)
res = "something wrong."
print(f"Received from client: {data.data().decode()}")
try:
data_obj = json.loads(data.data().decode())
res, res_data = self.request_process(data_obj)
token = data_obj["token"]
cmd_type = data_obj["cmd"]
if res:
if res:
if cmd_type == "get_params":
res_obj = {
"cmd": cmd_type,
"token": token,
"status": 0,
"data": res_data
}
if cmd_type == "get_params":
res_obj = {
"cmd": cmd_type,
"token": token,
"status": 0,
"data": res_data
}
if cmd_type == "set_params":
res_obj = {
"cmd": cmd_type,
"token": token,
"status": 0,
}
except Exception as e:
print(e)
res = False
res_data = e
res_obj = {
"status": -1,
"msg": "JSON decode error."
}
response = json.dumps(res_obj)
if cmd_type == "set_params":
res_obj = {
"cmd": cmd_type,
"token": token,
"status": 0,
}
response = json.dumps(res_obj)
print(response)
# # 发送响应给客户端
# response = "Server has received your message."
client_socket.write(response.encode())
client_socket.flush()
# print(f"Sent to client: {response}")
@Slot()
def on_disconnected(self, client_socket: QTcpSocket):