279 lines
9.2 KiB
Python
279 lines
9.2 KiB
Python
import random
|
|
import string
|
|
import json
|
|
import time
|
|
import queue
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from typing import Any, Optional, Dict, Callable
|
|
|
|
from PySide6.QtCore import QObject, Signal, Slot, QTimer
|
|
from PySide6.QtNetwork import QTcpSocket
|
|
from PySide6.QtCore import QByteArray
|
|
|
|
|
|
class CMD(Enum):
|
|
GET_PARAMS = 1
|
|
SET_PARAMS = 2
|
|
|
|
|
|
@dataclass
|
|
class Request:
|
|
token: str
|
|
cmd: CMD
|
|
widget: QObject
|
|
data: Any
|
|
callback: Optional[Callable] = None
|
|
created_at: datetime = None
|
|
timeout: int = 30 # 超时时间(秒)
|
|
|
|
def __post_init__(self):
|
|
self.created_at = datetime.now()
|
|
|
|
@property
|
|
def is_expired(self) -> bool:
|
|
return datetime.now() > self.created_at + timedelta(seconds=self.timeout)
|
|
|
|
@dataclass()
|
|
class Response:
|
|
token: str
|
|
cmd: CMD
|
|
widget: QObject
|
|
data: Any
|
|
|
|
|
|
class ParamsService(QObject):
|
|
signal_request_complete = Signal(object) # 请求完成信号
|
|
signal_connection_status = Signal(bool) # 连接状态信号
|
|
signal_error = Signal(str) # 错误信号
|
|
|
|
def __init__(self, host: str, port: int, parent=None):
|
|
super().__init__(parent)
|
|
self.host = host
|
|
self.port = port
|
|
|
|
# 初始化状态
|
|
self._is_running = True
|
|
self._connected = False
|
|
self._current_request = None # 当前正在处理的请求
|
|
|
|
# 请求队列和响应映射
|
|
self.request_queue = queue.Queue()
|
|
self.pending_requests: Dict[str, Request] = {}
|
|
|
|
# 初始化socket
|
|
self.socket = QTcpSocket(self)
|
|
self._setup_socket_connections()
|
|
|
|
# 初始化定时器
|
|
self._request_timer = QTimer(self)
|
|
self._request_timer.timeout.connect(self._process_next_request)
|
|
self._request_timer.setInterval(100) # 100ms间隔
|
|
|
|
# 重连定时器
|
|
self._reconnect_timer = QTimer(self)
|
|
self._reconnect_timer.timeout.connect(self._try_reconnect)
|
|
self._reconnect_timer.setInterval(5000) # 5秒重连间隔
|
|
|
|
# 超时检查定时器
|
|
self._timeout_timer = QTimer(self)
|
|
self._timeout_timer.timeout.connect(self._check_timeouts)
|
|
self._timeout_timer.setInterval(1000) # 1秒检查一次
|
|
|
|
# 启动定时器
|
|
self._request_timer.start()
|
|
self._timeout_timer.start()
|
|
|
|
# 首次连接
|
|
self.connect_to_server()
|
|
|
|
def _setup_socket_connections(self):
|
|
"""设置socket信号连接"""
|
|
self.socket.connected.connect(self._on_connected)
|
|
self.socket.disconnected.connect(self._on_disconnected)
|
|
self.socket.readyRead.connect(self._on_ready_read)
|
|
self.socket.errorOccurred.connect(self._on_socket_error)
|
|
|
|
def connect_to_server(self):
|
|
"""连接到服务器"""
|
|
if not self._connected:
|
|
self.socket.connectToHost(self.host, self.port)
|
|
|
|
@Slot()
|
|
def _on_connected(self):
|
|
"""连接成功处理"""
|
|
print(f"Connected to {self.host}:{self.port}")
|
|
self._connected = True
|
|
self._reconnect_timer.stop()
|
|
self.signal_connection_status.emit(True)
|
|
|
|
@Slot()
|
|
def _on_disconnected(self):
|
|
"""断开连接处理"""
|
|
print("Disconnected from server")
|
|
self._connected = False
|
|
self.signal_connection_status.emit(False)
|
|
self._reconnect_timer.start() # 启动重连定时器
|
|
|
|
@Slot()
|
|
def _on_socket_error(self):
|
|
"""Socket错误处理"""
|
|
error = self.socket.errorString()
|
|
print(f"Socket error: {error}")
|
|
self.signal_error.emit(f"Socket error: {error}")
|
|
|
|
@Slot()
|
|
def _on_ready_read(self):
|
|
"""数据接收处理"""
|
|
try:
|
|
data = self.socket.readAll()
|
|
response = json.loads(bytes(data).decode())
|
|
self._handle_response(response)
|
|
except json.JSONDecodeError as e:
|
|
print(f"JSON decode error: {e}")
|
|
self.signal_error.emit(f"Invalid JSON format: {str(e)}")
|
|
except Exception as e:
|
|
print(f"Error processing response: {e}")
|
|
self.signal_error.emit(f"Response processing error: {str(e)}")
|
|
|
|
@Slot()
|
|
def _process_next_request(self):
|
|
"""处理队列中的下一个请求"""
|
|
# 如果未连接、队列为空,或者当前有正在处理的请求,则返回
|
|
if not self._connected or self.request_queue.empty() or self.pending_requests:
|
|
return
|
|
|
|
try:
|
|
# 获取但不移除请求
|
|
request = self.request_queue.get()
|
|
self._current_request = request
|
|
self._send_request(request)
|
|
time.sleep(0.1)
|
|
except queue.Empty:
|
|
pass
|
|
except Exception as e:
|
|
print(f"Error processing request: {e}")
|
|
self.signal_error.emit(f"Request processing error: {str(e)}")
|
|
# 发生错误时,确保清理当前请求
|
|
if self._current_request:
|
|
self.request_queue.task_done()
|
|
self._current_request = None
|
|
|
|
def _send_request(self, request: Request):
|
|
"""发送请求到服务器"""
|
|
try:
|
|
match request.cmd:
|
|
case CMD.GET_PARAMS:
|
|
self.pending_requests[request.token] = request
|
|
|
|
request_data = {
|
|
"cmd": "get_params",
|
|
"token": request.token,
|
|
"data": request.data
|
|
}
|
|
json_data = json.dumps(request_data)
|
|
self.socket.write(json_data.encode('utf-8')+b'\0')
|
|
self.socket.flush()
|
|
case CMD.SET_PARAMS:
|
|
self.pending_requests[request.token] = request
|
|
request_data = {
|
|
"cmd": "set_params",
|
|
"token": request.token,
|
|
"data": request.data
|
|
}
|
|
json_data = json.dumps(request_data)
|
|
self.socket.write(json_data.encode('utf-8')+b'\0')
|
|
self.socket.flush()
|
|
|
|
except Exception as e:
|
|
print(f"Error sending request: {e}")
|
|
self.signal_error.emit(f"Request sending error: {str(e)}")
|
|
self.pending_requests.pop(request.token, None)
|
|
|
|
def _handle_response(self, response: dict):
|
|
"""处理服务器响应"""
|
|
try:
|
|
token = response.get("token")
|
|
if token in self.pending_requests:
|
|
request = self.pending_requests.pop(token)
|
|
res_data = ''
|
|
if request.callback:
|
|
res_data = response["data"]
|
|
|
|
res = Response(token, CMD.GET_PARAMS, request.widget, res_data)
|
|
request.callback(res)
|
|
|
|
self.signal_request_complete.emit(response)
|
|
|
|
# 完成当前请求的处理
|
|
if self._current_request and self._current_request.token == token:
|
|
self.request_queue.task_done()
|
|
self._current_request = None
|
|
|
|
except Exception as e:
|
|
print(f"Error handling response: {e}")
|
|
self.signal_error.emit(f"Response handling error: {str(e)}")
|
|
|
|
@Slot()
|
|
def _check_timeouts(self):
|
|
"""检查请求超时"""
|
|
current_time = datetime.now()
|
|
expired_tokens = [
|
|
token for token, request in self.pending_requests.items()
|
|
if request.is_expired
|
|
]
|
|
|
|
for token in expired_tokens:
|
|
request = self.pending_requests.pop(token)
|
|
self.signal_error.emit(f"Request timeout: {token}")
|
|
if request.callback:
|
|
request.callback({"error": "timeout", "token": token})
|
|
|
|
@Slot()
|
|
def _try_reconnect(self):
|
|
"""尝试重新连接"""
|
|
if not self._connected:
|
|
print(f"Attempting to reconnect to {self.host}:{self.port}")
|
|
self.connect_to_server()
|
|
|
|
@staticmethod
|
|
def generate_token() -> str:
|
|
"""生成唯一的请求token"""
|
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=12))
|
|
|
|
def get_params(self, widget: QObject, params: list, callback: Callable = None):
|
|
"""获取参数(外部接口)"""
|
|
token = self.generate_token()
|
|
request = Request(
|
|
token=token,
|
|
cmd=CMD.GET_PARAMS,
|
|
widget=widget,
|
|
data={"params": params},
|
|
callback=callback
|
|
)
|
|
self.request_queue.put(request)
|
|
return token
|
|
|
|
def set_params(self, widget: QObject, params: dict, callback: Callable = None):
|
|
"""设置参数(外部接口)"""
|
|
token = self.generate_token()
|
|
request = Request(
|
|
token=token,
|
|
cmd=CMD.SET_PARAMS,
|
|
widget=widget,
|
|
data={"params": params},
|
|
callback=callback
|
|
)
|
|
self.request_queue.put(request)
|
|
return token
|
|
|
|
def cleanup(self):
|
|
"""清理资源"""
|
|
self._is_running = False
|
|
self._request_timer.stop()
|
|
self._timeout_timer.stop()
|
|
self._reconnect_timer.stop()
|
|
self.socket.disconnectFromHost()
|
|
if self.socket.state() == QTcpSocket.ConnectedState:
|
|
self.socket.waitForDisconnected(1000) |