brisonus_app_eq/test/params_service.py

264 lines
8.4 KiB
Python

import random
import string
import json
import time
import queue
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional, Dict, Callable
from PySide6.QtCore import QObject, Signal, Slot, QTimer
from PySide6.QtNetwork import QTcpSocket
from PySide6.QtCore import QByteArray
class CMD(Enum):
GET_PARAMS = 1
SET_PARAMS = 2
@dataclass
class Request:
token: str
cmd: CMD
widget: QObject
data: Any
callback: Optional[Callable] = None
created_at: datetime = None
timeout: int = 30 # 超时时间(秒)
def __post_init__(self):
self.created_at = datetime.now()
@property
def is_expired(self) -> bool:
return datetime.now() > self.created_at + timedelta(seconds=self.timeout)
@dataclass()
class Response:
token: str
cmd: CMD
widget: QObject
data: Any
class ParamsService(QObject):
signal_request_complete = Signal(object) # 请求完成信号
signal_connection_status = Signal(bool) # 连接状态信号
signal_error = Signal(str) # 错误信号
def __init__(self, host: str, port: int, parent=None):
super().__init__(parent)
self.host = host
self.port = port
# 初始化状态
self._is_running = True
self._connected = False
# 请求队列和响应映射
self.request_queue = queue.Queue()
self.pending_requests: Dict[str, Request] = {}
# 初始化socket
self.socket = QTcpSocket(self)
self._setup_socket_connections()
# 初始化定时器
self._request_timer = QTimer(self)
self._request_timer.timeout.connect(self._process_next_request)
self._request_timer.setInterval(100) # 100ms间隔
# 重连定时器
self._reconnect_timer = QTimer(self)
self._reconnect_timer.timeout.connect(self._try_reconnect)
self._reconnect_timer.setInterval(5000) # 5秒重连间隔
# 超时检查定时器
self._timeout_timer = QTimer(self)
self._timeout_timer.timeout.connect(self._check_timeouts)
self._timeout_timer.setInterval(1000) # 1秒检查一次
# 启动定时器
self._request_timer.start()
self._timeout_timer.start()
# 首次连接
self.connect_to_server()
def _setup_socket_connections(self):
"""设置socket信号连接"""
self.socket.connected.connect(self._on_connected)
self.socket.disconnected.connect(self._on_disconnected)
self.socket.readyRead.connect(self._on_ready_read)
self.socket.errorOccurred.connect(self._on_socket_error)
def connect_to_server(self):
"""连接到服务器"""
if not self._connected:
self.socket.connectToHost(self.host, self.port)
@Slot()
def _on_connected(self):
"""连接成功处理"""
print(f"Connected to {self.host}:{self.port}")
self._connected = True
self._reconnect_timer.stop()
self.signal_connection_status.emit(True)
@Slot()
def _on_disconnected(self):
"""断开连接处理"""
print("Disconnected from server")
self._connected = False
self.signal_connection_status.emit(False)
self._reconnect_timer.start() # 启动重连定时器
@Slot()
def _on_socket_error(self):
"""Socket错误处理"""
error = self.socket.errorString()
print(f"Socket error: {error}")
self.signal_error.emit(f"Socket error: {error}")
@Slot()
def _on_ready_read(self):
"""数据接收处理"""
try:
data = self.socket.readAll()
response = json.loads(bytes(data).decode())
self._handle_response(response)
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
self.signal_error.emit(f"Invalid JSON format: {str(e)}")
except Exception as e:
print(f"Error processing response: {e}")
self.signal_error.emit(f"Response processing error: {str(e)}")
@Slot()
def _process_next_request(self):
"""处理队列中的下一个请求"""
if not self._connected or self.request_queue.empty():
return
try:
request = self.request_queue.get_nowait()
self._send_request(request)
self.request_queue.task_done()
except queue.Empty:
pass
except Exception as e:
print(f"Error processing request: {e}")
self.signal_error.emit(f"Request processing error: {str(e)}")
def _send_request(self, request: Request):
"""发送请求到服务器"""
try:
match request.cmd:
case CMD.GET_PARAMS:
self.pending_requests[request.token] = request
request_data = {
"cmd": "get_params",
"token": request.token,
"data": request.data
}
json_data = json.dumps(request_data)
self.socket.write(json_data.encode('utf-8'))
self.socket.flush()
case CMD.SET_PARAMS:
request_data = {
"cmd": "set_params",
"token": request.token,
"data": request.data
}
json_data = json.dumps(request_data)
self.socket.write(json_data.encode('utf-8'))
self.socket.flush()
except Exception as e:
print(f"Error sending request: {e}")
self.signal_error.emit(f"Request sending error: {str(e)}")
self.pending_requests.pop(request.token, None)
def _handle_response(self, response: dict):
"""处理服务器响应"""
try:
token = response.get("token")
if token in self.pending_requests:
request = self.pending_requests.pop(token)
if request.callback:
res_data = response["data"]
res = Response(token, CMD.GET_PARAMS, request.widget, res_data)
request.callback(res)
self.signal_request_complete.emit(response)
except Exception as e:
print(f"Error handling response: {e}")
self.signal_error.emit(f"Response handling error: {str(e)}")
@Slot()
def _check_timeouts(self):
"""检查请求超时"""
current_time = datetime.now()
expired_tokens = [
token for token, request in self.pending_requests.items()
if request.is_expired
]
for token in expired_tokens:
request = self.pending_requests.pop(token)
self.signal_error.emit(f"Request timeout: {token}")
if request.callback:
request.callback({"error": "timeout", "token": token})
@Slot()
def _try_reconnect(self):
"""尝试重新连接"""
if not self._connected:
print(f"Attempting to reconnect to {self.host}:{self.port}")
self.connect_to_server()
@staticmethod
def generate_token() -> str:
"""生成唯一的请求token"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=12))
def get_params(self, widget: QObject, params: list, callback: Callable = None):
"""获取参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
cmd=CMD.GET_PARAMS,
widget=widget,
data={"params": params},
callback=callback
)
self.request_queue.put(request)
return token
def set_params(self, widget: QObject, params: dict, callback: Callable = None):
"""设置参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
cmd=CMD.SET_PARAMS,
widget=widget,
data={"params": params},
callback=callback
)
self.request_queue.put(request)
return token
def cleanup(self):
"""清理资源"""
self._is_running = False
self._request_timer.stop()
self._timeout_timer.stop()
self._reconnect_timer.stop()
self.socket.disconnectFromHost()
if self.socket.state() == QTcpSocket.ConnectedState:
self.socket.waitForDisconnected(1000)