[update] 通信组件测试

This commit is contained in:
cuijingwei@brisonus.com 2025-02-21 13:56:29 +08:00
parent 1ea9a7031d
commit f478c09d75
7 changed files with 692 additions and 189 deletions

View File

@ -14,5 +14,6 @@ class MessageProxy:
@dataclass
class SignalProxy:
res: bool
widget: QWidget
data: {}

31
param_manager.py Normal file
View File

@ -0,0 +1,31 @@
class ParamManager:
def __init__(self):
self.sim_data = {
"test_param1": 0,
"test_param2": 1,
"test_param3": 2
}
def get_param_by_name(self, name):
for item_name, item_v in self.sim_data.items():
if name == item_name:
return { "name":name, "val": item_v}
# return name
def get_params(self, name_list):
res_list = []
for item_name in name_list:
if self.get_param_by_name(item_name) is not None:
res_list.append(self.get_param_by_name(item_name))
return res_list
if __name__ == "__main__":
param_manager = ParamManager()
print(
param_manager.get_params(['test_param1', 'test_param2'])
)

111
param_service_test.py Normal file
View File

@ -0,0 +1,111 @@
import sys
from PySide6.QtWidgets import QApplication, QWidget, QLabel, QLineEdit, QPushButton, QVBoxLayout
from params_service import ParamsService, Response
class MyComponent:
def __init__(self, parent=None):
pass
def set_data(self, data):
pass
class MyWidget(QWidget, MyComponent):
def __init__(self, parent=None, param_name=None):
super().__init__(parent=parent)
layout = QVBoxLayout()
self.param_name = param_name
self.label1 = QLabel('param_value', self)
self.label2 = QLabel(f'param_name: {self.param_name}', self)
layout.addWidget(self.label1)
layout.addWidget(self.label2)
self.setLayout(layout)
def set_data(self, data):
self.label1.setText(str(data[self.param_name]))
class MyWindow(QWidget):
def __init__(self):
super().__init__()
# 设置窗口标题和尺寸
self.setWindowTitle("PySide6 Example")
self.setFixedSize(300, 800)
self.test_widget1 = MyWidget(self, "tuning_parameters.mix_parameters[0].ch_n")
self.test_widget2 = MyWidget(self, "tuning_parameters.mix_parameters[1].ch_n")
self.test_widget3 = MyWidget(self, "tuning_parameters.mix_parameters[2].ch_n")
self.test_widget4 = MyWidget(self, "tuning_parameters.mix_parameters[3].ch_n")
self.test_widget5 = MyWidget(self, "tuning_parameters.mix_parameters[4].ch_n")
# 创建 QLabel、QLineEdit 和 QPushButton 控件
self.label = QLabel("请输入内容:", self)
self.input_line = QLineEdit(self)
self.button = QPushButton("更新标签", self)
# 设置按钮点击事件
self.button.clicked.connect(self.on_button_clicked)
# 创建垂直布局并添加控件
layout = QVBoxLayout()
layout.addWidget(self.test_widget1)
layout.addWidget(self.test_widget2)
layout.addWidget(self.test_widget3)
layout.addWidget(self.test_widget4)
layout.addWidget(self.test_widget5)
layout.addWidget(self.label)
layout.addWidget(self.input_line)
layout.addWidget(self.button)
# 设置窗口的布局
self.setLayout(layout)
self.params_service = ParamsService("127.0.0.1", 12345)
# self.params_service.signal_request_complete.connect(self.on_params_service)
def on_params_service(self, res: Response):
widget = res.widget
widget.set_data(res.data)
def on_button_clicked(self):
self.params_service.get_params(
widget=self.test_widget1,
param_names=[self.test_widget1.param_name],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget2,
param_names=[self.test_widget2.param_name],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget3,
param_names=[self.test_widget3.param_name],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget4,
param_names=[self.test_widget4.param_name],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget5,
param_names=[self.test_widget5.param_name],
callback=self.on_params_service
)
# def update_label(self):
# # 获取输入框的文本并更新标签内容
# input_text = self.input_line.text()
# self.label.setText(f"你输入的是:{input_text}")
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MyWindow()
window.show()
sys.exit(app.exec())

View File

@ -1,182 +1,253 @@
from importlib.metadata import always_iterable
from queue import Queue
from PySide6.QtCore import QObject, SignalInstance
from PySide6.QtCore import Signal, Slot
from PySide6.QtNetwork import QTcpSocket
from socket_client import SocketClient
import random
import string
from message_proxy import MessageProxy, SignalProxy
import json
import time
import queue
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional, Dict, Callable
from PySide6.QtCore import QObject, Signal, Slot, QTimer
from PySide6.QtNetwork import QTcpSocket
from PySide6.QtCore import QByteArray
class CMD(Enum):
GET_PARAMS = 1
SET_PARAMS = 2
@dataclass
class Request:
token: str
cmd: CMD
widget: QObject
data: Any
callback: Optional[Callable] = None
created_at: datetime = None
timeout: int = 30 # 超时时间(秒)
def __post_init__(self):
self.created_at = datetime.now()
@property
def is_expired(self) -> bool:
return datetime.now() > self.created_at + timedelta(seconds=self.timeout)
@dataclass()
class Response:
token: str
cmd: CMD
widget: QObject
data: Any
class ParamsService(QObject):
# signal_busy: SignalInstance = Signal()
signal_request_complete: SignalInstance = Signal(SignalProxy)
def __init__(self, host, port):
super().__init__()
signal_request_complete = Signal(object) # 请求完成信号
signal_connection_status = Signal(bool) # 连接状态信号
signal_error = Signal(str) # 错误信号
# 初始化socket client
self.__busy = False
# self.client = SocketClient("127.0.0.1", 1234)
self.queue = Queue
# 创建一个 TCP Socket 对象
self.socket = QTcpSocket(self)
def __init__(self, host: str, port: int, parent=None):
super().__init__(parent)
self.host = host
self.port = port
# 连接信号
self.socket.connected.connect(self.on_connected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_disconnected)
# 初始化状态
self._is_running = True
self._connected = False
# 连接到服务器
self.socket.connectToHost(self.host, self.port)
# 请求队列和响应映射
self.request_queue = queue.Queue()
self.pending_requests: Dict[str, Request] = {}
# 初始化socket
self.socket = QTcpSocket(self)
self._setup_socket_connections()
# 初始化定时器
self._request_timer = QTimer(self)
self._request_timer.timeout.connect(self._process_next_request)
self._request_timer.setInterval(100) # 100ms间隔
# 重连定时器
self._reconnect_timer = QTimer(self)
self._reconnect_timer.timeout.connect(self._try_reconnect)
self._reconnect_timer.setInterval(5000) # 5秒重连间隔
# 超时检查定时器
self._timeout_timer = QTimer(self)
self._timeout_timer.timeout.connect(self._check_timeouts)
self._timeout_timer.setInterval(1000) # 1秒检查一次
# 启动定时器
self._request_timer.start()
self._timeout_timer.start()
# 首次连接
self.connect_to_server()
def _setup_socket_connections(self):
"""设置socket信号连接"""
self.socket.connected.connect(self._on_connected)
self.socket.disconnected.connect(self._on_disconnected)
self.socket.readyRead.connect(self._on_ready_read)
self.socket.errorOccurred.connect(self._on_socket_error)
def connect_to_server(self):
"""连接到服务器"""
if not self._connected:
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
def _on_connected(self):
"""连接成功处理"""
print(f"Connected to {self.host}:{self.port}")
# self.socket.write(b"Hello, Server!") # 向服务器发送数据
# print("Message sent to server.")
self._connected = True
self._reconnect_timer.stop()
self.signal_connection_status.emit(True)
@Slot()
def on_ready_read(self):
data = self.socket.readAll() # 读取服务器发送的数据
print(f"Received from server: {data.data().decode()}")
def _on_disconnected(self):
"""断开连接处理"""
print("Disconnected from server")
self._connected = False
self.signal_connection_status.emit(False)
self._reconnect_timer.start() # 启动重连定时器
@Slot()
def on_disconnected(self):
print("Disconnected from server.")
self.socket.close() # 关闭连接
def _on_socket_error(self):
"""Socket错误处理"""
error = self.socket.errorString()
print(f"Socket error: {error}")
self.signal_error.emit(f"Socket error: {error}")
def send_data(self, data: bytes):
self.socket.write(data)
@Slot()
def _on_ready_read(self):
"""数据接收处理"""
try:
data = self.socket.readAll()
response = json.loads(bytes(data).decode())
self._handle_response(response)
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
self.signal_error.emit(f"Invalid JSON format: {str(e)}")
except Exception as e:
print(f"Error processing response: {e}")
self.signal_error.emit(f"Response processing error: {str(e)}")
@Slot()
def _process_next_request(self):
"""处理队列中的下一个请求"""
if not self._connected or self.request_queue.empty():
return
try:
request = self.request_queue.get_nowait()
self._send_request(request)
self.request_queue.task_done()
except queue.Empty:
pass
except Exception as e:
print(f"Error processing request: {e}")
self.signal_error.emit(f"Request processing error: {str(e)}")
def _send_request(self, request: Request):
"""发送请求到服务器"""
try:
self.pending_requests[request.token] = request
request_data = {
"cmd": "get_params",
"token": request.token,
"data": request.data
}
json_data = json.dumps(request_data)
self.socket.write(json_data.encode('utf-8'))
self.socket.flush()
except Exception as e:
print(f"Error sending request: {e}")
self.signal_error.emit(f"Request sending error: {str(e)}")
self.pending_requests.pop(request.token, None)
def _handle_response(self, response: dict):
"""处理服务器响应"""
try:
token = response.get("token")
if token in self.pending_requests:
request = self.pending_requests.pop(token)
if request.callback:
res_data = response["data"]
res = Response(token, CMD.GET_PARAMS, request.widget, res_data)
request.callback(res)
self.signal_request_complete.emit(response)
except Exception as e:
print(f"Error handling response: {e}")
self.signal_error.emit(f"Response handling error: {str(e)}")
@Slot()
def _check_timeouts(self):
"""检查请求超时"""
current_time = datetime.now()
expired_tokens = [
token for token, request in self.pending_requests.items()
if request.is_expired
]
for token in expired_tokens:
request = self.pending_requests.pop(token)
self.signal_error.emit(f"Request timeout: {token}")
if request.callback:
request.callback({"error": "timeout", "token": token})
@Slot()
def _try_reconnect(self):
"""尝试重新连接"""
if not self._connected:
print(f"Attempting to reconnect to {self.host}:{self.port}")
self.connect_to_server()
@staticmethod
def generate_token():
token_str = ''.join(random.choices(string.ascii_letters + string.digits, k=12))
return token_str
def generate_token() -> str:
"""生成唯一的请求token"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=12))
def get_params(self, widget_proxy: QObject):
# 生成一个请求
def get_params(self, widget: QObject, param_names: list, callback: Callable = None):
"""获取参数(外部接口)"""
token = self.generate_token()
data = { "hello world!"}
message = MessageProxy(token, widget_proxy, data)
# 发送请求
# 将发送的请求放入一个队列
pass
print('请求数据')
self.signal_request_complete.emit(SignalProxy(widget_proxy, {"str1": "test 1", "str2": 100 }))
def set_params(self):
pass
import sys
from PySide6.QtWidgets import QApplication, QWidget, QLabel, QLineEdit, QPushButton, QVBoxLayout
class MyComponent:
def __init__(self, parent=None):
pass
def set_data(self, data):
pass
class MyWidget(QWidget, MyComponent):
def __init__(self, parent=None):
super().__init__(parent=parent)
layout = QVBoxLayout()
self.label1 = QLabel('Test Info 1', self)
self.label2 = QLabel('Test Info 2', self)
layout.addWidget(self.label1)
layout.addWidget(self.label2)
self.setLayout(layout)
def set_data(self, data):
self.label1.setText(data["str1"])
self.label2.setText(str(data["str2"]))
class MyWindow(QWidget):
def __init__(self):
super().__init__()
# 设置窗口标题和尺寸
self.setWindowTitle("PySide6 Example")
self.setFixedSize(300, 800)
self.test_widget1 = MyWidget(self)
self.test_widget2 = MyWidget(self)
self.test_widget3 = MyWidget(self)
self.test_widget4 = MyWidget(self)
self.test_widget5 = MyWidget(self)
self.test_widget6 = MyWidget(self)
self.test_widget7 = MyWidget(self)
self.test_widget8 = MyWidget(self)
self.test_widget9 = MyWidget(self)
# 创建 QLabel、QLineEdit 和 QPushButton 控件
self.label = QLabel("请输入内容:", self)
self.input_line = QLineEdit(self)
self.button = QPushButton("更新标签", self)
# 设置按钮点击事件
self.button.clicked.connect(self.on_button_clicked)
# 创建垂直布局并添加控件
layout = QVBoxLayout()
layout.addWidget(self.test_widget1)
layout.addWidget(self.test_widget2)
layout.addWidget(self.test_widget3)
layout.addWidget(self.test_widget4)
layout.addWidget(self.test_widget5)
layout.addWidget(self.test_widget6)
layout.addWidget(self.test_widget7)
layout.addWidget(self.test_widget8)
layout.addWidget(self.test_widget9)
layout.addWidget(self.label)
layout.addWidget(self.input_line)
layout.addWidget(self.button)
# 设置窗口的布局
self.setLayout(layout)
self.params_service = ParamsService("127.0.0.1", 1234)
self.params_service.signal_request_complete.connect(self.on_params_service)
def on_params_service(self, data: SignalProxy):
data.widget.set_data(data.data)
def on_button_clicked(self):
self.params_service.get_params(self.test_widget1)
self.params_service.get_params(self.test_widget2)
self.params_service.get_params(self.test_widget3)
self.params_service.get_params(self.test_widget4)
self.params_service.get_params(self.test_widget5)
self.params_service.get_params(self.test_widget6)
self.params_service.get_params(self.test_widget7)
self.params_service.get_params(self.test_widget8)
self.params_service.get_params(self.test_widget9)
# def update_label(self):
# # 获取输入框的文本并更新标签内容
# input_text = self.input_line.text()
# self.label.setText(f"你输入的是:{input_text}")
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MyWindow()
window.show()
sys.exit(app.exec())
request = Request(
token=token,
cmd=CMD.GET_PARAMS,
widget=widget,
data={"param_names": param_names},
callback=callback
)
self.request_queue.put(request)
return token
def set_params(self, widget: QObject, params: dict, callback: Callable = None):
"""设置参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
widget=widget,
data={"params": params},
callback=callback
)
self.request_queue.put(request)
return token
def cleanup(self):
"""清理资源"""
self._is_running = False
self._request_timer.stop()
self._timeout_timer.stop()
self._reconnect_timer.stop()
self.socket.disconnectFromHost()
if self.socket.state() == QTcpSocket.ConnectedState:
self.socket.waitForDisconnected(1000)

View File

@ -1,44 +1,276 @@
import sys
from PySide6.QtCore import QCoreApplication, QByteArray
import json
import random
import string
import time
from dataclasses import dataclass
from queue import Queue
from threading import Thread, Lock, Event
from typing import Any, Optional, Dict, Callable
from datetime import datetime, timedelta
from PySide6.QtCore import QObject, Signal, Slot
from PySide6.QtNetwork import QTcpSocket
from PySide6.QtCore import Signal, Slot
class SocketClient(QCoreApplication):
def __init__(self, host, port):
super().__init__(sys.argv)
@dataclass
class Request:
token: str
widget: QObject
data: Any
callback: Optional[Callable] = None
created_at: datetime = None
timeout: int = 30 # 超时时间(秒)
# 创建一个 TCP Socket 对象
self.socket = QTcpSocket(self)
def __post_init__(self):
self.created_at = datetime.now()
@property
def is_expired(self) -> bool:
return datetime.now() > self.created_at + timedelta(seconds=self.timeout)
class ParamsService(QObject):
signal_request_complete = Signal(object) # 请求完成信号
signal_connection_status = Signal(bool) # 连接状态信号
signal_error = Signal(str) # 错误信号
def __init__(self, host: str, port: int):
super().__init__()
self.host = host
self.port = port
# 连接信号
self.socket.connected.connect(self.on_connected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_disconnected)
# 初始化状态和同步对象
self._is_running = True
self._connected = False
self._reconnect_interval = 5 # 重连间隔(秒)
self._lock = Lock()
self._connection_event = Event()
# 连接到服务器
self.socket.connectToHost(self.host, self.port)
# 请求队列和响应映射
self.request_queue = Queue()
self.pending_requests: Dict[str, Request] = {}
# 初始化socket
self.socket = QTcpSocket(self)
self._setup_socket_connections()
# 启动工作线程
self._start_worker_threads()
# 首次连接
self.connect_to_server()
def _setup_socket_connections(self):
"""设置socket信号连接"""
self.socket.connected.connect(self.on_connected)
self.socket.disconnected.connect(self.on_disconnected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.errorOccurred.connect(self.on_socket_error)
def _start_worker_threads(self):
"""启动工作线程"""
# 请求处理线程
self.request_thread = Thread(target=self._process_request_queue, daemon=True)
self.request_thread.start()
# 超时检查线程
self.timeout_thread = Thread(target=self._check_timeouts, daemon=True)
self.timeout_thread.start()
# 重连线程
self.reconnect_thread = Thread(target=self._reconnection_loop, daemon=True)
self.reconnect_thread.start()
def connect_to_server(self):
"""连接到服务器"""
if not self._connected:
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
"""连接成功处理"""
print(f"Connected to {self.host}:{self.port}")
self.socket.write(b"Hello, Server!") # 向服务器发送数据
print("Message sent to server.")
@Slot()
def on_ready_read(self):
data = self.socket.readAll() # 读取服务器发送的数据
print(f"Received from server: {data.data().decode()}")
with self._lock:
self._connected = True
self._connection_event.set()
self.signal_connection_status.emit(True)
@Slot()
def on_disconnected(self):
print("Disconnected from server.")
self.socket.close() # 关闭连接
"""断开连接处理"""
print("Disconnected from server")
with self._lock:
self._connected = False
self._connection_event.clear()
self.signal_connection_status.emit(False)
@Slot()
def on_socket_error(self):
"""Socket错误处理"""
error = self.socket.errorString()
print(f"Socket error: {error}")
self.signal_error.emit(f"Socket error: {error}")
@Slot()
def on_ready_read(self):
"""数据接收处理"""
try:
data = self.socket.readAll()
response = json.loads(data.data().decode())
self._handle_response(response)
except Exception as e:
print(f"Error processing response: {e}")
self.signal_error.emit(f"Response processing error: {str(e)}")
if __name__ == "__main__":
app = SocketClient("127.0.0.1", 1234)
sys.exit(app.exec())
def _handle_response(self, response: dict):
"""处理服务器响应"""
try:
token = response.get("token")
if token in self.pending_requests:
request = self.pending_requests.pop(token)
# 执行回调
if request.callback:
request.callback(response)
# 发送完成信号
self.signal_request_complete.emit(response)
except Exception as e:
print(f"Error handling response: {e}")
self.signal_error.emit(f"Response handling error: {str(e)}")
def _process_request_queue(self):
"""处理请求队列的后台线程"""
while self._is_running:
try:
if not self._connected:
self._connection_event.wait(timeout=1.0)
continue
request = self.request_queue.get(timeout=1.0)
self._send_request(request)
self.request_queue.task_done()
except Queue.Empty:
continue
except Exception as e:
print(f"Error processing request: {e}")
self.signal_error.emit(f"Request processing error: {str(e)}")
def _send_request(self, request: Request):
"""发送请求到服务器"""
try:
# 添加到待处理请求
self.pending_requests[request.token] = request
# 构建请求数据
request_data = {
"cmd": "get_params", # 或其他命令
"token": request.token,
"data": request.data
}
# 发送数据
json_data = json.dumps(request_data)
self.socket.write(json_data.encode('utf-8'))
self.socket.flush()
except Exception as e:
print(f"Error sending request: {e}")
self.signal_error.emit(f"Request sending error: {str(e)}")
self.pending_requests.pop(request.token, None)
def _check_timeouts(self):
"""检查请求超时的后台线程"""
while self._is_running:
try:
current_time = datetime.now()
expired_tokens = []
# 检查超时的请求
for token, request in self.pending_requests.items():
if request.is_expired:
expired_tokens.append(token)
# 处理超时的请求
for token in expired_tokens:
request = self.pending_requests.pop(token)
self.signal_error.emit(f"Request timeout: {token}")
# 执行超时回调
if request.callback:
request.callback({"error": "timeout", "token": token})
time.sleep(1) # 每秒检查一次
except Exception as e:
print(f"Error checking timeouts: {e}")
def _reconnection_loop(self):
"""重连循环的后台线程"""
while self._is_running:
if not self._connected:
print(f"Attempting to reconnect to {self.host}:{self.port}")
self.connect_to_server()
time.sleep(self._reconnect_interval)
@staticmethod
def generate_token() -> str:
"""生成唯一的请求token"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=12))
def get_params(self, widget: QObject, param_names: list, callback: Callable = None):
"""获取参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
widget=widget,
data={"param_names": param_names},
callback=callback
)
self.request_queue.put(request)
return token
def set_params(self, widget: QObject, params: dict, callback: Callable = None):
"""设置参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
widget=widget,
data={"params": params},
callback=callback
)
self.request_queue.put(request)
return token
def cleanup(self):
"""清理资源"""
self._is_running = False
self.socket.disconnectFromHost()
if self.socket.state() == QTcpSocket.ConnectedState:
self.socket.waitForDisconnected(1000)
if __name__== "__main__":
# # 创建服务实例
# params_service = ParamsService("localhost", 1234)
#
#
# # 回调函数
# def on_response(response):
# print(f"Received response: {response}")
#
#
# # 获取参数
# params_service.get_params(
# widget=some_widget,
# param_names=["param1", "param2"],
# callback=on_response
# )
#
# # 设置参数
# params_service.set_params(
# widget=some_widget,
# params={"param1": "value1"},
# callback=on_response
# )
#
# # 程序退出时清理
# params_service.cleanup()
pass

44
socket_client_back.py Normal file
View File

@ -0,0 +1,44 @@
import sys
from PySide6.QtCore import QCoreApplication, QByteArray
from PySide6.QtNetwork import QTcpSocket
from PySide6.QtCore import Signal, Slot
class SocketClient(QCoreApplication):
def __init__(self, host, port):
super().__init__(sys.argv)
# 创建一个 TCP Socket 对象
self.socket = QTcpSocket(self)
self.host = host
self.port = port
# 连接信号
self.socket.connected.connect(self.on_connected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_disconnected)
# 连接到服务器
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
print(f"Connected to {self.host}:{self.port}")
self.socket.write(b"Hello, Server!") # 向服务器发送数据
print("Message sent to server.")
@Slot()
def on_ready_read(self):
data = self.socket.readAll() # 读取服务器发送的数据
print(f"Received from server: {data.data().decode()}")
@Slot()
def on_disconnected(self):
print("Disconnected from server.")
self.socket.close() # 关闭连接
if __name__ == "__main__":
app = SocketClient("127.0.0.1", 1234)
sys.exit(app.exec())

View File

@ -1,8 +1,11 @@
import json
import sys
from PySide6.QtCore import QCoreApplication, QByteArray, Slot
from PySide6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress
from PySide6.QtCore import Signal
from param_manager import ParamManager
class TcpServer(QCoreApplication):
def __init__(self, host, port):
@ -15,6 +18,7 @@ class TcpServer(QCoreApplication):
# 连接信号,新的客户端连接时触发
self.server.newConnection.connect(self.on_new_connection)
self.param_manager = ParamManager()
# 绑定并开始监听指定的地址和端口
if not self.server.listen(QHostAddress(self.host), self.port):
@ -35,9 +39,9 @@ class TcpServer(QCoreApplication):
print(f"New connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()}")
# 发送欢迎消息给客户端
client_socket.write(b"Hello from server!")
# client_socket.write(b"Hello from server!")
client_socket.flush() # 确保数据已发送
print("Welcome message sent to client.")
# print("Welcome message sent to client.")
@Slot()
def on_ready_read(self, client_socket: QTcpSocket):
@ -45,11 +49,12 @@ class TcpServer(QCoreApplication):
data = client_socket.readAll()
print(f"Received from client: {data.data().decode()}")
# 发送响应给客户端
response = "Server has received your message."
client_socket.write(response.encode())
client_socket.flush()
print(f"Sent to client: {response}")
self.request_process(data.data().decode())
# # 发送响应给客户端
# response = "Server has received your message."
# client_socket.write(response.encode())
# client_socket.flush()
# print(f"Sent to client: {response}")
@Slot()
def on_disconnected(self, client_socket: QTcpSocket):
@ -57,6 +62,14 @@ class TcpServer(QCoreApplication):
client_socket.deleteLater() # 清理套接字资源
def request_process(self, data_string):
# try:
# data_obj = json.loads(data_string)
print(data_obj["payload"])
# except Exception as e:
# print(e)
if __name__ == "__main__":
app = TcpServer("127.0.0.1", 1234)
sys.exit(app.exec())