通信服务设计

This commit is contained in:
cuijingwei@brisonus.com 2025-02-19 14:35:44 +08:00
parent bb72fbac83
commit 1ea9a7031d
17 changed files with 508 additions and 105 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
__pycache__
.idea

3
.idea/.gitignore generated vendored
View File

@ -1,3 +0,0 @@
# 默认忽略的文件
/shelf/
/workspace.xml

View File

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated
View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.13 (PythonProject3)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13 (pythonProject3) (2)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated
View File

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/pythonProject3.iml" filepath="$PROJECT_DIR$/.idea/pythonProject3.iml" />
</modules>
</component>
</project>

View File

@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.13 (pythonProject3) (2)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

56
api_client.py Normal file
View File

@ -0,0 +1,56 @@
import sys
from PySide6.QtCore import QCoreApplication, QByteArray
from PySide6.QtNetwork import QTcpSocket
from PySide6.QtCore import Slot, QObject
class ApiClient(QObject):
def __init__(self, host, port):
super().__init__()
# 创建一个 TCP 客户端对象
self.socket = QTcpSocket(self)
self.host = host
self.port = port
# 连接信号
self.socket.connected.connect(self.on_connected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_disconnected)
# 连接到服务器
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
print(f"Connected to {self.host}:{self.port}")
# 发送读取请求 (示例: 读取 'key1')
self.socket.write(b"READ key1")
self.socket.flush()
print("Sent: READ key1")
@Slot()
def on_ready_read(self):
data = self.socket.readAll() # 读取服务器返回的数据
print(f"Received from server: {data.data().decode()}")
@Slot()
def on_disconnected(self):
print("Disconnected from server.")
self.socket.close() # 关闭连接
def get_params(self, list_param_name):
#
print(list_param_name)
# 返回一个字典
return
def set_params(self, list_param_name, list_param_data):
print(list_param_name)
if __name__ == "__main__":
app = QCoreApplication()
api = ApiClient("127.0.0.1", 1234)
sys.exit(app.exec())

12
cmd_send_get_params.json Normal file
View File

@ -0,0 +1,12 @@
{
"cmd": "get_params",
"token": "random_string",
"data": {
"param_num": 2,
"param_names": [
"test_param_1",
"test_param_2"
]
}
}

15
cmd_send_set_params.json Normal file
View File

@ -0,0 +1,15 @@
{
"cmd": "set_params",
"token": "random_string",
"data": {
"param_num": 2,
"param_names": [
"test_param_1",
"test_param_2"
],
"param_datas": [
10.1,
20
]
}
}

72
field_addr_resolution.py Normal file
View File

@ -0,0 +1,72 @@
import ctypes
def get_field_address(struct_instance, field_path):
# 分割字段路径
fields = field_path.split('.')
# 获取起始结构体的基地址
current_address = ctypes.addressof(struct_instance)
current_type = type(struct_instance)
# 遍历path的每个field
for field in fields:
# 在 _fields_ 中查找字段
found = False
# 遍历当前结构体的所有field
for f_name, f_type in current_type._fields_:
# 如果结构体中的field和当前查找的field一致
if f_name == field:
found = True
# 计算偏移量
offset = sum(
getattr(current_type, fname).offset for fname, _ in current_type._fields_ if fname == field)
current_address += offset
# 如果还有下一个字段,更新当前类型
if field != fields[-1]:
# 判断当前的field是不是type且是不是结构体类型
if isinstance(f_type, type) and issubclass(f_type, ctypes.Structure):
current_type = f_type
else:
raise ValueError(f"字段 '{field}' 不是结构体类型")
# 跳出当前循环
break
if not found:
raise ValueError(f"在结构体中找不到字段 '{field}'")
return current_address
# 使用示例
def print_all_addresses(struct_instance, prefix=""):
struct_type = type(struct_instance)
for field_name, field_type in struct_type._fields_:
full_path = f"{prefix}{field_name}"
# 如果是结构体类型,递归打印其成员
if isinstance(field_type, type) and issubclass(field_type, ctypes.Structure):
nested_struct = getattr(struct_instance, field_name)
print_all_addresses(nested_struct, f"{full_path}.")
else:
try:
address = get_field_address(struct_instance, field_name)
print(f"{full_path} 的地址: {hex(address)}")
except ValueError as e:
print(e)
# 计算成员变量的offset
def get_filed_offset(struct_instance, field_path):
print(type(struct_instance))
print(issubclass(type(struct_instance), ctypes.Structure))
if isinstance(struct_instance, type) and issubclass(struct_instance, ctypes.Structure):
base_address = ctypes.addressof(struct_instance)
field_address = get_field_address(struct_instance, field_path)
return field_address - base_address
return 0

73
main.py
View File

@ -1,5 +1,6 @@
import ctypes import ctypes
from field_addr_resolution import *
from struct_def import *
# 定义内部结构体 # 定义内部结构体
class InnerStructLevel2(ctypes.Structure): class InnerStructLevel2(ctypes.Structure):
@ -31,76 +32,6 @@ class OuterStruct(ctypes.Structure):
] ]
def get_field_address(struct_instance, field_path):
# 分割字段路径
fields = field_path.split('.')
# 获取起始结构体的基地址
current_address = ctypes.addressof(struct_instance)
current_type = type(struct_instance)
# 遍历path的每个field
for field in fields:
# 在 _fields_ 中查找字段
found = False
# 遍历当前结构体的所有field
for f_name, f_type in current_type._fields_:
# 如果结构体中的field和当前查找的field一致
if f_name == field:
found = True
# 计算偏移量
offset = sum(
getattr(current_type, fname).offset for fname, _ in current_type._fields_ if fname == field)
current_address += offset
# 如果还有下一个字段,更新当前类型
if field != fields[-1]:
# 判断当前的field是不是type且是不是结构体类型
if isinstance(f_type, type) and issubclass(f_type, ctypes.Structure):
current_type = f_type
else:
raise ValueError(f"字段 '{field}' 不是结构体类型")
# 跳出当前循环
break
if not found:
raise ValueError(f"在结构体中找不到字段 '{field}'")
return current_address
# 使用示例
def print_all_addresses(struct_instance, prefix=""):
struct_type = type(struct_instance)
for field_name, field_type in struct_type._fields_:
full_path = f"{prefix}{field_name}"
# 如果是结构体类型,递归打印其成员
if isinstance(field_type, type) and issubclass(field_type, ctypes.Structure):
nested_struct = getattr(struct_instance, field_name)
print_all_addresses(nested_struct, f"{full_path}.")
else:
try:
address = get_field_address(struct_instance, field_name)
print(f"{full_path} 的地址: {hex(address)}")
except ValueError as e:
print(e)
# 计算成员变量的offset
def get_filed_offset(struct_instance, field_path):
print(type(struct_instance))
print(issubclass(type(struct_instance), ctypes.Structure))
if isinstance(struct_instance, type) and issubclass(struct_instance, ctypes.Structure):
base_address = ctypes.addressof(struct_instance)
field_address = get_field_address(struct_instance, field_path)
return field_address - base_address
return 0
# 测试代码 # 测试代码
if __name__ == "__main__": if __name__ == "__main__":
outer = OuterStruct() outer = OuterStruct()

18
message_proxy.py Normal file
View File

@ -0,0 +1,18 @@
from dataclasses import dataclass
from PySide6.QtCore import QObject
from PySide6.QtWidgets import QWidget
@dataclass
class MessageProxy:
token: str
widget: QObject
data: {}
@dataclass
class SignalProxy:
widget: QWidget
data: {}

182
params_service.py Normal file
View File

@ -0,0 +1,182 @@
from importlib.metadata import always_iterable
from queue import Queue
from PySide6.QtCore import QObject, SignalInstance
from PySide6.QtCore import Signal, Slot
from PySide6.QtNetwork import QTcpSocket
from socket_client import SocketClient
import random
import string
from message_proxy import MessageProxy, SignalProxy
class ParamsService(QObject):
# signal_busy: SignalInstance = Signal()
signal_request_complete: SignalInstance = Signal(SignalProxy)
def __init__(self, host, port):
super().__init__()
# 初始化socket client
self.__busy = False
# self.client = SocketClient("127.0.0.1", 1234)
self.queue = Queue
# 创建一个 TCP Socket 对象
self.socket = QTcpSocket(self)
self.host = host
self.port = port
# 连接信号
self.socket.connected.connect(self.on_connected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_disconnected)
# 连接到服务器
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
print(f"Connected to {self.host}:{self.port}")
# self.socket.write(b"Hello, Server!") # 向服务器发送数据
# print("Message sent to server.")
@Slot()
def on_ready_read(self):
data = self.socket.readAll() # 读取服务器发送的数据
print(f"Received from server: {data.data().decode()}")
@Slot()
def on_disconnected(self):
print("Disconnected from server.")
self.socket.close() # 关闭连接
def send_data(self, data: bytes):
self.socket.write(data)
@staticmethod
def generate_token():
token_str = ''.join(random.choices(string.ascii_letters + string.digits, k=12))
return token_str
def get_params(self, widget_proxy: QObject):
# 生成一个请求
token = self.generate_token()
data = { "hello world!"}
message = MessageProxy(token, widget_proxy, data)
# 发送请求
# 将发送的请求放入一个队列
pass
print('请求数据')
self.signal_request_complete.emit(SignalProxy(widget_proxy, {"str1": "test 1", "str2": 100 }))
def set_params(self):
pass
import sys
from PySide6.QtWidgets import QApplication, QWidget, QLabel, QLineEdit, QPushButton, QVBoxLayout
class MyComponent:
def __init__(self, parent=None):
pass
def set_data(self, data):
pass
class MyWidget(QWidget, MyComponent):
def __init__(self, parent=None):
super().__init__(parent=parent)
layout = QVBoxLayout()
self.label1 = QLabel('Test Info 1', self)
self.label2 = QLabel('Test Info 2', self)
layout.addWidget(self.label1)
layout.addWidget(self.label2)
self.setLayout(layout)
def set_data(self, data):
self.label1.setText(data["str1"])
self.label2.setText(str(data["str2"]))
class MyWindow(QWidget):
def __init__(self):
super().__init__()
# 设置窗口标题和尺寸
self.setWindowTitle("PySide6 Example")
self.setFixedSize(300, 800)
self.test_widget1 = MyWidget(self)
self.test_widget2 = MyWidget(self)
self.test_widget3 = MyWidget(self)
self.test_widget4 = MyWidget(self)
self.test_widget5 = MyWidget(self)
self.test_widget6 = MyWidget(self)
self.test_widget7 = MyWidget(self)
self.test_widget8 = MyWidget(self)
self.test_widget9 = MyWidget(self)
# 创建 QLabel、QLineEdit 和 QPushButton 控件
self.label = QLabel("请输入内容:", self)
self.input_line = QLineEdit(self)
self.button = QPushButton("更新标签", self)
# 设置按钮点击事件
self.button.clicked.connect(self.on_button_clicked)
# 创建垂直布局并添加控件
layout = QVBoxLayout()
layout.addWidget(self.test_widget1)
layout.addWidget(self.test_widget2)
layout.addWidget(self.test_widget3)
layout.addWidget(self.test_widget4)
layout.addWidget(self.test_widget5)
layout.addWidget(self.test_widget6)
layout.addWidget(self.test_widget7)
layout.addWidget(self.test_widget8)
layout.addWidget(self.test_widget9)
layout.addWidget(self.label)
layout.addWidget(self.input_line)
layout.addWidget(self.button)
# 设置窗口的布局
self.setLayout(layout)
self.params_service = ParamsService("127.0.0.1", 1234)
self.params_service.signal_request_complete.connect(self.on_params_service)
def on_params_service(self, data: SignalProxy):
data.widget.set_data(data.data)
def on_button_clicked(self):
self.params_service.get_params(self.test_widget1)
self.params_service.get_params(self.test_widget2)
self.params_service.get_params(self.test_widget3)
self.params_service.get_params(self.test_widget4)
self.params_service.get_params(self.test_widget5)
self.params_service.get_params(self.test_widget6)
self.params_service.get_params(self.test_widget7)
self.params_service.get_params(self.test_widget8)
self.params_service.get_params(self.test_widget9)
# def update_label(self):
# # 获取输入框的文本并更新标签内容
# input_text = self.input_line.text()
# self.label.setText(f"你输入的是:{input_text}")
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MyWindow()
window.show()
sys.exit(app.exec())

13
random_code.py Normal file
View File

@ -0,0 +1,13 @@
import random
import string
# # 生成一个随机整数作为识别码例如1000 到 9999
# random_id = random.randint(1000, 9999)
# 或者生成一个随机的 8 位字母和数字组成的字符串
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=12))
# 插入命令中
command = f"random str: {random_str}"
print(command)

44
socket_client.py Normal file
View File

@ -0,0 +1,44 @@
import sys
from PySide6.QtCore import QCoreApplication, QByteArray
from PySide6.QtNetwork import QTcpSocket
from PySide6.QtCore import Signal, Slot
class SocketClient(QCoreApplication):
def __init__(self, host, port):
super().__init__(sys.argv)
# 创建一个 TCP Socket 对象
self.socket = QTcpSocket(self)
self.host = host
self.port = port
# 连接信号
self.socket.connected.connect(self.on_connected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_disconnected)
# 连接到服务器
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
print(f"Connected to {self.host}:{self.port}")
self.socket.write(b"Hello, Server!") # 向服务器发送数据
print("Message sent to server.")
@Slot()
def on_ready_read(self):
data = self.socket.readAll() # 读取服务器发送的数据
print(f"Received from server: {data.data().decode()}")
@Slot()
def on_disconnected(self):
print("Disconnected from server.")
self.socket.close() # 关闭连接
if __name__ == "__main__":
app = SocketClient("127.0.0.1", 1234)
sys.exit(app.exec())

62
socket_server.py Normal file
View File

@ -0,0 +1,62 @@
import sys
from PySide6.QtCore import QCoreApplication, QByteArray, Slot
from PySide6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress
from PySide6.QtCore import Signal
class TcpServer(QCoreApplication):
def __init__(self, host, port):
super().__init__(sys.argv)
# 创建一个 TCP 服务器对象
self.server = QTcpServer(self)
self.host = host
self.port = port
# 连接信号,新的客户端连接时触发
self.server.newConnection.connect(self.on_new_connection)
# 绑定并开始监听指定的地址和端口
if not self.server.listen(QHostAddress(self.host), self.port):
print(f"Server could not start on {self.host}:{self.port}")
sys.exit(1)
print(f"Server started on {self.host}:{self.port}")
@Slot()
def on_new_connection(self):
# 获取客户端连接的 socket 对象
client_socket = self.server.nextPendingConnection()
# 连接信号
client_socket.readyRead.connect(lambda: self.on_ready_read(client_socket))
client_socket.disconnected.connect(lambda: self.on_disconnected(client_socket))
print(f"New connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()}")
# 发送欢迎消息给客户端
client_socket.write(b"Hello from server!")
client_socket.flush() # 确保数据已发送
print("Welcome message sent to client.")
@Slot()
def on_ready_read(self, client_socket: QTcpSocket):
# 读取客户端发送的数据
data = client_socket.readAll()
print(f"Received from client: {data.data().decode()}")
# 发送响应给客户端
response = "Server has received your message."
client_socket.write(response.encode())
client_socket.flush()
print(f"Sent to client: {response}")
@Slot()
def on_disconnected(self, client_socket: QTcpSocket):
print(f"Connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()} closed.")
client_socket.deleteLater() # 清理套接字资源
if __name__ == "__main__":
app = TcpServer("127.0.0.1", 1234)
sys.exit(app.exec())

30
struct_def.py Normal file
View File

@ -0,0 +1,30 @@
import ctypes
# 定义内部结构体
class InnerStructLevel2(ctypes.Structure):
_pack_ = 1
_fields_ = [
("inner_field1", ctypes.c_char),
("inner_field2", ctypes.c_double),
("inner_field3", ctypes.c_double),
]
# 定义内部结构体
class InnerStructLevel1(ctypes.Structure):
_pack_ = 1
_fields_ = [
("inner_field1", ctypes.c_int),
("inner_field2", ctypes.c_double),
("inner_level2_field3", InnerStructLevel2)
]
# 定义外部结构体
class OuterStruct(ctypes.Structure):
_pack_ = 1
_fields_ = [
("field1", ctypes.c_int),
("nested", InnerStructLevel1), # 嵌套结构体
("field2", ctypes.c_char * 10),
]