diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f007077 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +.idea diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 359bb53..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -# 默认忽略的文件 -/shelf/ -/workspace.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml deleted file mode 100644 index 105ce2d..0000000 --- a/.idea/inspectionProfiles/profiles_settings.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index d65c45b..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index 1fb3c8b..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/pythonProject3.iml b/.idea/pythonProject3.iml deleted file mode 100644 index cd430bd..0000000 --- a/.idea/pythonProject3.iml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - - - - \ No newline at end of file diff --git a/api_client.py b/api_client.py new file mode 100644 index 0000000..b1cec5c --- /dev/null +++ b/api_client.py @@ -0,0 +1,56 @@ +import sys +from PySide6.QtCore import QCoreApplication, QByteArray +from PySide6.QtNetwork import QTcpSocket +from PySide6.QtCore import Slot, QObject + + +class ApiClient(QObject): + def __init__(self, host, port): + super().__init__() + + # 创建一个 TCP 客户端对象 + self.socket = QTcpSocket(self) + self.host = host + self.port = port + + # 连接信号 + self.socket.connected.connect(self.on_connected) + self.socket.readyRead.connect(self.on_ready_read) + self.socket.disconnected.connect(self.on_disconnected) + + # 连接到服务器 + self.socket.connectToHost(self.host, self.port) + + @Slot() + def on_connected(self): + print(f"Connected to {self.host}:{self.port}") + + # 发送读取请求 (示例: 读取 'key1') + self.socket.write(b"READ key1") + self.socket.flush() + print("Sent: READ key1") + + @Slot() + def on_ready_read(self): + data = self.socket.readAll() # 读取服务器返回的数据 + print(f"Received from server: {data.data().decode()}") + + @Slot() + def on_disconnected(self): + print("Disconnected from server.") + self.socket.close() # 关闭连接 + + def get_params(self, list_param_name): + # + print(list_param_name) + # 返回一个字典 + return + + def set_params(self, list_param_name, list_param_data): + print(list_param_name) + +if __name__ == "__main__": + app = QCoreApplication() + api = ApiClient("127.0.0.1", 1234) + + sys.exit(app.exec()) diff --git a/cmd_send_get_params.json b/cmd_send_get_params.json new file mode 100644 index 0000000..198bd56 --- /dev/null +++ b/cmd_send_get_params.json @@ -0,0 +1,12 @@ +{ + "cmd": "get_params", + "token": "random_string", + "data": { + "param_num": 2, + "param_names": [ + "test_param_1", + "test_param_2" + ] + } +} + diff --git a/cmd_send_set_params.json b/cmd_send_set_params.json new file mode 100644 index 0000000..a5a5349 --- /dev/null +++ b/cmd_send_set_params.json @@ -0,0 +1,15 @@ +{ + "cmd": "set_params", + "token": "random_string", + "data": { + "param_num": 2, + "param_names": [ + "test_param_1", + "test_param_2" + ], + "param_datas": [ + 10.1, + 20 + ] + } +} \ No newline at end of file diff --git a/field_addr_resolution.py b/field_addr_resolution.py new file mode 100644 index 0000000..b74c859 --- /dev/null +++ b/field_addr_resolution.py @@ -0,0 +1,72 @@ +import ctypes + + +def get_field_address(struct_instance, field_path): + # 分割字段路径 + fields = field_path.split('.') + # 获取起始结构体的基地址 + current_address = ctypes.addressof(struct_instance) + current_type = type(struct_instance) + + # 遍历path的每个field + for field in fields: + # 在 _fields_ 中查找字段 + found = False + # 遍历当前结构体的所有field + for f_name, f_type in current_type._fields_: + + # 如果结构体中的field和当前查找的field一致 + if f_name == field: + found = True + # 计算偏移量 + offset = sum( + getattr(current_type, fname).offset for fname, _ in current_type._fields_ if fname == field) + current_address += offset + + # 如果还有下一个字段,更新当前类型 + if field != fields[-1]: + # 判断当前的field是不是type,且是不是结构体类型 + if isinstance(f_type, type) and issubclass(f_type, ctypes.Structure): + current_type = f_type + else: + raise ValueError(f"字段 '{field}' 不是结构体类型") + # 跳出当前循环 + break + + if not found: + raise ValueError(f"在结构体中找不到字段 '{field}'") + + return current_address + + +# 使用示例 +def print_all_addresses(struct_instance, prefix=""): + struct_type = type(struct_instance) + + for field_name, field_type in struct_type._fields_: + full_path = f"{prefix}{field_name}" + + # 如果是结构体类型,递归打印其成员 + if isinstance(field_type, type) and issubclass(field_type, ctypes.Structure): + nested_struct = getattr(struct_instance, field_name) + print_all_addresses(nested_struct, f"{full_path}.") + else: + try: + address = get_field_address(struct_instance, field_name) + print(f"{full_path} 的地址: {hex(address)}") + except ValueError as e: + print(e) + + +# 计算成员变量的offset +def get_filed_offset(struct_instance, field_path): + print(type(struct_instance)) + print(issubclass(type(struct_instance), ctypes.Structure)) + + if isinstance(struct_instance, type) and issubclass(struct_instance, ctypes.Structure): + base_address = ctypes.addressof(struct_instance) + + field_address = get_field_address(struct_instance, field_path) + + return field_address - base_address + return 0 \ No newline at end of file diff --git a/main.py b/main.py index 871a52a..4069929 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ import ctypes - +from field_addr_resolution import * +from struct_def import * # 定义内部结构体 class InnerStructLevel2(ctypes.Structure): @@ -31,76 +32,6 @@ class OuterStruct(ctypes.Structure): ] -def get_field_address(struct_instance, field_path): - # 分割字段路径 - fields = field_path.split('.') - # 获取起始结构体的基地址 - current_address = ctypes.addressof(struct_instance) - current_type = type(struct_instance) - - # 遍历path的每个field - for field in fields: - # 在 _fields_ 中查找字段 - found = False - # 遍历当前结构体的所有field - for f_name, f_type in current_type._fields_: - - # 如果结构体中的field和当前查找的field一致 - if f_name == field: - found = True - # 计算偏移量 - offset = sum( - getattr(current_type, fname).offset for fname, _ in current_type._fields_ if fname == field) - current_address += offset - - # 如果还有下一个字段,更新当前类型 - if field != fields[-1]: - # 判断当前的field是不是type,且是不是结构体类型 - if isinstance(f_type, type) and issubclass(f_type, ctypes.Structure): - current_type = f_type - else: - raise ValueError(f"字段 '{field}' 不是结构体类型") - # 跳出当前循环 - break - - if not found: - raise ValueError(f"在结构体中找不到字段 '{field}'") - - return current_address - - -# 使用示例 -def print_all_addresses(struct_instance, prefix=""): - struct_type = type(struct_instance) - - for field_name, field_type in struct_type._fields_: - full_path = f"{prefix}{field_name}" - - # 如果是结构体类型,递归打印其成员 - if isinstance(field_type, type) and issubclass(field_type, ctypes.Structure): - nested_struct = getattr(struct_instance, field_name) - print_all_addresses(nested_struct, f"{full_path}.") - else: - try: - address = get_field_address(struct_instance, field_name) - print(f"{full_path} 的地址: {hex(address)}") - except ValueError as e: - print(e) - - -# 计算成员变量的offset -def get_filed_offset(struct_instance, field_path): - print(type(struct_instance)) - print(issubclass(type(struct_instance), ctypes.Structure)) - - if isinstance(struct_instance, type) and issubclass(struct_instance, ctypes.Structure): - base_address = ctypes.addressof(struct_instance) - - field_address = get_field_address(struct_instance, field_path) - - return field_address - base_address - return 0 - # 测试代码 if __name__ == "__main__": outer = OuterStruct() diff --git a/message_proxy.py b/message_proxy.py new file mode 100644 index 0000000..5b3fdc5 --- /dev/null +++ b/message_proxy.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass +from PySide6.QtCore import QObject +from PySide6.QtWidgets import QWidget + + + + + +@dataclass +class MessageProxy: + token: str + widget: QObject + data: {} + +@dataclass +class SignalProxy: + widget: QWidget + data: {} \ No newline at end of file diff --git a/params_service.py b/params_service.py new file mode 100644 index 0000000..84bbd51 --- /dev/null +++ b/params_service.py @@ -0,0 +1,182 @@ +from importlib.metadata import always_iterable +from queue import Queue +from PySide6.QtCore import QObject, SignalInstance +from PySide6.QtCore import Signal, Slot +from PySide6.QtNetwork import QTcpSocket + +from socket_client import SocketClient +import random +import string +from message_proxy import MessageProxy, SignalProxy + + + + +class ParamsService(QObject): + # signal_busy: SignalInstance = Signal() + signal_request_complete: SignalInstance = Signal(SignalProxy) + def __init__(self, host, port): + super().__init__() + + # 初始化socket client + self.__busy = False + # self.client = SocketClient("127.0.0.1", 1234) + self.queue = Queue + + # 创建一个 TCP Socket 对象 + self.socket = QTcpSocket(self) + self.host = host + self.port = port + + # 连接信号 + self.socket.connected.connect(self.on_connected) + self.socket.readyRead.connect(self.on_ready_read) + self.socket.disconnected.connect(self.on_disconnected) + + # 连接到服务器 + self.socket.connectToHost(self.host, self.port) + + @Slot() + def on_connected(self): + print(f"Connected to {self.host}:{self.port}") + # self.socket.write(b"Hello, Server!") # 向服务器发送数据 + # print("Message sent to server.") + + @Slot() + def on_ready_read(self): + data = self.socket.readAll() # 读取服务器发送的数据 + print(f"Received from server: {data.data().decode()}") + + @Slot() + def on_disconnected(self): + print("Disconnected from server.") + self.socket.close() # 关闭连接 + + def send_data(self, data: bytes): + self.socket.write(data) + + @staticmethod + def generate_token(): + token_str = ''.join(random.choices(string.ascii_letters + string.digits, k=12)) + return token_str + + def get_params(self, widget_proxy: QObject): + # 生成一个请求 + token = self.generate_token() + data = { "hello world!"} + + message = MessageProxy(token, widget_proxy, data) + # 发送请求 + + # 将发送的请求放入一个队列 + pass + print('请求数据') + self.signal_request_complete.emit(SignalProxy(widget_proxy, {"str1": "test 1", "str2": 100 })) + + + def set_params(self): + pass + + +import sys +from PySide6.QtWidgets import QApplication, QWidget, QLabel, QLineEdit, QPushButton, QVBoxLayout + + +class MyComponent: + def __init__(self, parent=None): + pass + + def set_data(self, data): + pass + + +class MyWidget(QWidget, MyComponent): + def __init__(self, parent=None): + super().__init__(parent=parent) + layout = QVBoxLayout() + self.label1 = QLabel('Test Info 1', self) + self.label2 = QLabel('Test Info 2', self) + layout.addWidget(self.label1) + layout.addWidget(self.label2) + self.setLayout(layout) + + def set_data(self, data): + self.label1.setText(data["str1"]) + self.label2.setText(str(data["str2"])) + +class MyWindow(QWidget): + def __init__(self): + super().__init__() + + # 设置窗口标题和尺寸 + self.setWindowTitle("PySide6 Example") + self.setFixedSize(300, 800) + + self.test_widget1 = MyWidget(self) + self.test_widget2 = MyWidget(self) + self.test_widget3 = MyWidget(self) + self.test_widget4 = MyWidget(self) + self.test_widget5 = MyWidget(self) + self.test_widget6 = MyWidget(self) + self.test_widget7 = MyWidget(self) + self.test_widget8 = MyWidget(self) + self.test_widget9 = MyWidget(self) + + # 创建 QLabel、QLineEdit 和 QPushButton 控件 + self.label = QLabel("请输入内容:", self) + self.input_line = QLineEdit(self) + self.button = QPushButton("更新标签", self) + + # 设置按钮点击事件 + self.button.clicked.connect(self.on_button_clicked) + + # 创建垂直布局并添加控件 + layout = QVBoxLayout() + layout.addWidget(self.test_widget1) + layout.addWidget(self.test_widget2) + layout.addWidget(self.test_widget3) + layout.addWidget(self.test_widget4) + layout.addWidget(self.test_widget5) + layout.addWidget(self.test_widget6) + layout.addWidget(self.test_widget7) + layout.addWidget(self.test_widget8) + layout.addWidget(self.test_widget9) + + layout.addWidget(self.label) + layout.addWidget(self.input_line) + layout.addWidget(self.button) + + # 设置窗口的布局 + self.setLayout(layout) + + self.params_service = ParamsService("127.0.0.1", 1234) + self.params_service.signal_request_complete.connect(self.on_params_service) + + def on_params_service(self, data: SignalProxy): + data.widget.set_data(data.data) + + + def on_button_clicked(self): + self.params_service.get_params(self.test_widget1) + self.params_service.get_params(self.test_widget2) + self.params_service.get_params(self.test_widget3) + self.params_service.get_params(self.test_widget4) + self.params_service.get_params(self.test_widget5) + self.params_service.get_params(self.test_widget6) + self.params_service.get_params(self.test_widget7) + self.params_service.get_params(self.test_widget8) + self.params_service.get_params(self.test_widget9) + + + # def update_label(self): + # # 获取输入框的文本并更新标签内容 + # input_text = self.input_line.text() + # self.label.setText(f"你输入的是:{input_text}") + +if __name__ == "__main__": + app = QApplication(sys.argv) + window = MyWindow() + window.show() + sys.exit(app.exec()) + + diff --git a/random_code.py b/random_code.py new file mode 100644 index 0000000..73ff207 --- /dev/null +++ b/random_code.py @@ -0,0 +1,13 @@ +import random +import string + +# # 生成一个随机整数作为识别码(例如:1000 到 9999) +# random_id = random.randint(1000, 9999) + +# 或者生成一个随机的 8 位字母和数字组成的字符串 +random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=12)) + +# 插入命令中 +command = f"random str: {random_str}" + +print(command) diff --git a/socket_client.py b/socket_client.py new file mode 100644 index 0000000..d6fb964 --- /dev/null +++ b/socket_client.py @@ -0,0 +1,44 @@ +import sys +from PySide6.QtCore import QCoreApplication, QByteArray +from PySide6.QtNetwork import QTcpSocket +from PySide6.QtCore import Signal, Slot + + +class SocketClient(QCoreApplication): + def __init__(self, host, port): + super().__init__(sys.argv) + + # 创建一个 TCP Socket 对象 + self.socket = QTcpSocket(self) + self.host = host + self.port = port + + # 连接信号 + self.socket.connected.connect(self.on_connected) + self.socket.readyRead.connect(self.on_ready_read) + self.socket.disconnected.connect(self.on_disconnected) + + # 连接到服务器 + self.socket.connectToHost(self.host, self.port) + + @Slot() + def on_connected(self): + print(f"Connected to {self.host}:{self.port}") + self.socket.write(b"Hello, Server!") # 向服务器发送数据 + print("Message sent to server.") + + @Slot() + def on_ready_read(self): + data = self.socket.readAll() # 读取服务器发送的数据 + print(f"Received from server: {data.data().decode()}") + + @Slot() + def on_disconnected(self): + print("Disconnected from server.") + self.socket.close() # 关闭连接 + + + +if __name__ == "__main__": + app = SocketClient("127.0.0.1", 1234) + sys.exit(app.exec()) diff --git a/socket_server.py b/socket_server.py new file mode 100644 index 0000000..7a76579 --- /dev/null +++ b/socket_server.py @@ -0,0 +1,62 @@ +import sys +from PySide6.QtCore import QCoreApplication, QByteArray, Slot +from PySide6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress +from PySide6.QtCore import Signal + + +class TcpServer(QCoreApplication): + def __init__(self, host, port): + super().__init__(sys.argv) + + # 创建一个 TCP 服务器对象 + self.server = QTcpServer(self) + self.host = host + self.port = port + + # 连接信号,新的客户端连接时触发 + self.server.newConnection.connect(self.on_new_connection) + + # 绑定并开始监听指定的地址和端口 + if not self.server.listen(QHostAddress(self.host), self.port): + print(f"Server could not start on {self.host}:{self.port}") + sys.exit(1) + + print(f"Server started on {self.host}:{self.port}") + + @Slot() + def on_new_connection(self): + # 获取客户端连接的 socket 对象 + client_socket = self.server.nextPendingConnection() + + # 连接信号 + client_socket.readyRead.connect(lambda: self.on_ready_read(client_socket)) + client_socket.disconnected.connect(lambda: self.on_disconnected(client_socket)) + + print(f"New connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()}") + + # 发送欢迎消息给客户端 + client_socket.write(b"Hello from server!") + client_socket.flush() # 确保数据已发送 + print("Welcome message sent to client.") + + @Slot() + def on_ready_read(self, client_socket: QTcpSocket): + # 读取客户端发送的数据 + data = client_socket.readAll() + print(f"Received from client: {data.data().decode()}") + + # 发送响应给客户端 + response = "Server has received your message." + client_socket.write(response.encode()) + client_socket.flush() + print(f"Sent to client: {response}") + + @Slot() + def on_disconnected(self, client_socket: QTcpSocket): + print(f"Connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()} closed.") + client_socket.deleteLater() # 清理套接字资源 + + +if __name__ == "__main__": + app = TcpServer("127.0.0.1", 1234) + sys.exit(app.exec()) diff --git a/struct_def.py b/struct_def.py new file mode 100644 index 0000000..e270626 --- /dev/null +++ b/struct_def.py @@ -0,0 +1,30 @@ +import ctypes + +# 定义内部结构体 +class InnerStructLevel2(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("inner_field1", ctypes.c_char), + ("inner_field2", ctypes.c_double), + ("inner_field3", ctypes.c_double), + ] + +# 定义内部结构体 +class InnerStructLevel1(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("inner_field1", ctypes.c_int), + ("inner_field2", ctypes.c_double), + ("inner_level2_field3", InnerStructLevel2) + ] + + + +# 定义外部结构体 +class OuterStruct(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("field1", ctypes.c_int), + ("nested", InnerStructLevel1), # 嵌套结构体 + ("field2", ctypes.c_char * 10), + ] \ No newline at end of file