param_service/socket_client.py
2025-02-21 13:56:29 +08:00

276 lines
8.7 KiB
Python

import json
import random
import string
import time
from dataclasses import dataclass
from queue import Queue
from threading import Thread, Lock, Event
from typing import Any, Optional, Dict, Callable
from datetime import datetime, timedelta
from PySide6.QtCore import QObject, Signal, Slot
from PySide6.QtNetwork import QTcpSocket
@dataclass
class Request:
token: str
widget: QObject
data: Any
callback: Optional[Callable] = None
created_at: datetime = None
timeout: int = 30 # 超时时间(秒)
def __post_init__(self):
self.created_at = datetime.now()
@property
def is_expired(self) -> bool:
return datetime.now() > self.created_at + timedelta(seconds=self.timeout)
class ParamsService(QObject):
signal_request_complete = Signal(object) # 请求完成信号
signal_connection_status = Signal(bool) # 连接状态信号
signal_error = Signal(str) # 错误信号
def __init__(self, host: str, port: int):
super().__init__()
self.host = host
self.port = port
# 初始化状态和同步对象
self._is_running = True
self._connected = False
self._reconnect_interval = 5 # 重连间隔(秒)
self._lock = Lock()
self._connection_event = Event()
# 请求队列和响应映射
self.request_queue = Queue()
self.pending_requests: Dict[str, Request] = {}
# 初始化socket
self.socket = QTcpSocket(self)
self._setup_socket_connections()
# 启动工作线程
self._start_worker_threads()
# 首次连接
self.connect_to_server()
def _setup_socket_connections(self):
"""设置socket信号连接"""
self.socket.connected.connect(self.on_connected)
self.socket.disconnected.connect(self.on_disconnected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.errorOccurred.connect(self.on_socket_error)
def _start_worker_threads(self):
"""启动工作线程"""
# 请求处理线程
self.request_thread = Thread(target=self._process_request_queue, daemon=True)
self.request_thread.start()
# 超时检查线程
self.timeout_thread = Thread(target=self._check_timeouts, daemon=True)
self.timeout_thread.start()
# 重连线程
self.reconnect_thread = Thread(target=self._reconnection_loop, daemon=True)
self.reconnect_thread.start()
def connect_to_server(self):
"""连接到服务器"""
if not self._connected:
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
"""连接成功处理"""
print(f"Connected to {self.host}:{self.port}")
with self._lock:
self._connected = True
self._connection_event.set()
self.signal_connection_status.emit(True)
@Slot()
def on_disconnected(self):
"""断开连接处理"""
print("Disconnected from server")
with self._lock:
self._connected = False
self._connection_event.clear()
self.signal_connection_status.emit(False)
@Slot()
def on_socket_error(self):
"""Socket错误处理"""
error = self.socket.errorString()
print(f"Socket error: {error}")
self.signal_error.emit(f"Socket error: {error}")
@Slot()
def on_ready_read(self):
"""数据接收处理"""
try:
data = self.socket.readAll()
response = json.loads(data.data().decode())
self._handle_response(response)
except Exception as e:
print(f"Error processing response: {e}")
self.signal_error.emit(f"Response processing error: {str(e)}")
def _handle_response(self, response: dict):
"""处理服务器响应"""
try:
token = response.get("token")
if token in self.pending_requests:
request = self.pending_requests.pop(token)
# 执行回调
if request.callback:
request.callback(response)
# 发送完成信号
self.signal_request_complete.emit(response)
except Exception as e:
print(f"Error handling response: {e}")
self.signal_error.emit(f"Response handling error: {str(e)}")
def _process_request_queue(self):
"""处理请求队列的后台线程"""
while self._is_running:
try:
if not self._connected:
self._connection_event.wait(timeout=1.0)
continue
request = self.request_queue.get(timeout=1.0)
self._send_request(request)
self.request_queue.task_done()
except Queue.Empty:
continue
except Exception as e:
print(f"Error processing request: {e}")
self.signal_error.emit(f"Request processing error: {str(e)}")
def _send_request(self, request: Request):
"""发送请求到服务器"""
try:
# 添加到待处理请求
self.pending_requests[request.token] = request
# 构建请求数据
request_data = {
"cmd": "get_params", # 或其他命令
"token": request.token,
"data": request.data
}
# 发送数据
json_data = json.dumps(request_data)
self.socket.write(json_data.encode('utf-8'))
self.socket.flush()
except Exception as e:
print(f"Error sending request: {e}")
self.signal_error.emit(f"Request sending error: {str(e)}")
self.pending_requests.pop(request.token, None)
def _check_timeouts(self):
"""检查请求超时的后台线程"""
while self._is_running:
try:
current_time = datetime.now()
expired_tokens = []
# 检查超时的请求
for token, request in self.pending_requests.items():
if request.is_expired:
expired_tokens.append(token)
# 处理超时的请求
for token in expired_tokens:
request = self.pending_requests.pop(token)
self.signal_error.emit(f"Request timeout: {token}")
# 执行超时回调
if request.callback:
request.callback({"error": "timeout", "token": token})
time.sleep(1) # 每秒检查一次
except Exception as e:
print(f"Error checking timeouts: {e}")
def _reconnection_loop(self):
"""重连循环的后台线程"""
while self._is_running:
if not self._connected:
print(f"Attempting to reconnect to {self.host}:{self.port}")
self.connect_to_server()
time.sleep(self._reconnect_interval)
@staticmethod
def generate_token() -> str:
"""生成唯一的请求token"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=12))
def get_params(self, widget: QObject, param_names: list, callback: Callable = None):
"""获取参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
widget=widget,
data={"param_names": param_names},
callback=callback
)
self.request_queue.put(request)
return token
def set_params(self, widget: QObject, params: dict, callback: Callable = None):
"""设置参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
widget=widget,
data={"params": params},
callback=callback
)
self.request_queue.put(request)
return token
def cleanup(self):
"""清理资源"""
self._is_running = False
self.socket.disconnectFromHost()
if self.socket.state() == QTcpSocket.ConnectedState:
self.socket.waitForDisconnected(1000)
if __name__== "__main__":
# # 创建服务实例
# params_service = ParamsService("localhost", 1234)
#
#
# # 回调函数
# def on_response(response):
# print(f"Received response: {response}")
#
#
# # 获取参数
# params_service.get_params(
# widget=some_widget,
# param_names=["param1", "param2"],
# callback=on_response
# )
#
# # 设置参数
# params_service.set_params(
# widget=some_widget,
# params={"param1": "value1"},
# callback=on_response
# )
#
# # 程序退出时清理
# params_service.cleanup()
pass