[refactor] 删除无关文件

This commit is contained in:
JingweiCui 2025-04-01 10:56:23 +08:00
parent 728eae521f
commit 8892879a24
23 changed files with 75 additions and 757 deletions

1
.env Normal file
View File

@ -0,0 +1 @@
PYTHONPATH=d:\python_dev\param_struct_test\param_service

4
.gitignore vendored
View File

@ -1,2 +1,6 @@
__pycache__
.idea
.venv
.vscode
build
dist

View File

@ -1,56 +0,0 @@
import sys
from PySide6.QtCore import QCoreApplication, QByteArray
from PySide6.QtNetwork import QTcpSocket
from PySide6.QtCore import Slot, QObject
class ApiClient(QObject):
def __init__(self, host, port):
super().__init__()
# 创建一个 TCP 客户端对象
self.socket = QTcpSocket(self)
self.host = host
self.port = port
# 连接信号
self.socket.connected.connect(self.on_connected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_disconnected)
# 连接到服务器
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
print(f"Connected to {self.host}:{self.port}")
# 发送读取请求 (示例: 读取 'key1')
self.socket.write(b"READ key1")
self.socket.flush()
print("Sent: READ key1")
@Slot()
def on_ready_read(self):
data = self.socket.readAll() # 读取服务器返回的数据
print(f"Received from server: {data.data().decode()}")
@Slot()
def on_disconnected(self):
print("Disconnected from server.")
self.socket.close() # 关闭连接
def get_params(self, list_param_name):
#
print(list_param_name)
# 返回一个字典
return
def set_params(self, list_param_name, list_param_data):
print(list_param_name)
if __name__ == "__main__":
app = QCoreApplication()
api = ApiClient("127.0.0.1", 1234)
sys.exit(app.exec())

View File

@ -1,72 +0,0 @@
import ctypes
def get_field_address(struct_instance, field_path):
# 分割字段路径
fields = field_path.split('.')
# 获取起始结构体的基地址
current_address = ctypes.addressof(struct_instance)
current_type = type(struct_instance)
# 遍历path的每个field
for field in fields:
# 在 _fields_ 中查找字段
found = False
# 遍历当前结构体的所有field
for f_name, f_type in current_type._fields_:
# 如果结构体中的field和当前查找的field一致
if f_name == field:
found = True
# 计算偏移量
offset = sum(
getattr(current_type, fname).offset for fname, _ in current_type._fields_ if fname == field)
current_address += offset
# 如果还有下一个字段,更新当前类型
if field != fields[-1]:
# 判断当前的field是不是type且是不是结构体类型
if isinstance(f_type, type) and issubclass(f_type, ctypes.Structure):
current_type = f_type
else:
raise ValueError(f"字段 '{field}' 不是结构体类型")
# 跳出当前循环
break
if not found:
raise ValueError(f"在结构体中找不到字段 '{field}'")
return current_address
# 使用示例
def print_all_addresses(struct_instance, prefix=""):
struct_type = type(struct_instance)
for field_name, field_type in struct_type._fields_:
full_path = f"{prefix}{field_name}"
# 如果是结构体类型,递归打印其成员
if isinstance(field_type, type) and issubclass(field_type, ctypes.Structure):
nested_struct = getattr(struct_instance, field_name)
print_all_addresses(nested_struct, f"{full_path}.")
else:
try:
address = get_field_address(struct_instance, field_name)
print(f"{full_path} 的地址: {hex(address)}")
except ValueError as e:
print(e)
# 计算成员变量的offset
def get_filed_offset(struct_instance, field_path):
print(type(struct_instance))
print(issubclass(type(struct_instance), ctypes.Structure))
if isinstance(struct_instance, type) and issubclass(struct_instance, ctypes.Structure):
base_address = ctypes.addressof(struct_instance)
field_address = get_field_address(struct_instance, field_path)
return field_address - base_address
return 0

53
main.py
View File

@ -1,53 +0,0 @@
import ctypes
from field_addr_resolution import *
from struct_def import *
# 定义内部结构体
class InnerStructLevel2(ctypes.Structure):
_pack_ = 1
_fields_ = [
("inner_field1", ctypes.c_char),
("inner_field2", ctypes.c_double),
("inner_field3", ctypes.c_double),
]
# 定义内部结构体
class InnerStructLevel1(ctypes.Structure):
_pack_ = 1
_fields_ = [
("inner_field1", ctypes.c_int),
("inner_field2", ctypes.c_double),
("inner_level2_field3", InnerStructLevel2)
]
# 定义外部结构体
class OuterStruct(ctypes.Structure):
_pack_ = 1
_fields_ = [
("field1", ctypes.c_int),
("nested", InnerStructLevel1), # 嵌套结构体
("field2", ctypes.c_char * 10),
]
# 测试代码
if __name__ == "__main__":
outer = OuterStruct()
# # 打印所有字段的地址
# print_all_addresses(outer)
print(get_filed_offset(outer, "nested.inner_level2_field3"))
# # 获取特定嵌套字段的地址
# print(f'0x{ctypes.addressof(outer):08X}')
# nested_field_address = get_field_address(outer, "nested.inner_level2_field3")
#
# print(f"\n直接访问嵌套字段 nested.inner_field2 的地址: {hex(nested_field_address)}")
#
# # 获取特定嵌套字段的地址
# nested_field_address = get_field_address(outer, "nested.inner_level2_field3.inner_field3")
#
# print(f"\n直接访问嵌套字段 nested.inner_level_field3.inner_field1 的地址: {hex(nested_field_address)}")

View File

@ -1,19 +0,0 @@
from dataclasses import dataclass
from PySide6.QtCore import QObject
from PySide6.QtWidgets import QWidget
@dataclass
class MessageProxy:
token: str
widget: QObject
data: {}
@dataclass
class SignalProxy:
res: bool
widget: QWidget
data: {}

View File

@ -1,31 +0,0 @@
class ParamManager:
def __init__(self):
self.sim_data = {
"test_param1": 0,
"test_param2": 1,
"test_param3": 2
}
def get_param_by_name(self, name):
for item_name, item_v in self.sim_data.items():
if name == item_name:
return { "name":name, "val": item_v}
# return name
def get_params(self, name_list):
res_list = []
for item_name in name_list:
if self.get_param_by_name(item_name) is not None:
res_list.append(self.get_param_by_name(item_name))
return res_list
if __name__ == "__main__":
param_manager = ParamManager()
print(
param_manager.get_params(['test_param1', 'test_param2'])
)

View File

@ -0,0 +1,12 @@
Metadata-Version: 2.2
Name: param_service
Version: 0.2
Summary: Write/Read param from server
Author: CuiJingwei
Author-email: cuijingwei@brisonus.com
Requires-Dist: numpy
Requires-Dist: PySide6
Dynamic: author
Dynamic: author-email
Dynamic: requires-dist
Dynamic: summary

View File

@ -0,0 +1,9 @@
setup.py
param_service/__init__.py
param_service/param_manager.py
param_service/params_service.py
param_service.egg-info/PKG-INFO
param_service.egg-info/SOURCES.txt
param_service.egg-info/dependency_links.txt
param_service.egg-info/requires.txt
param_service.egg-info/top_level.txt

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,2 @@
numpy
PySide6

View File

@ -0,0 +1 @@
param_service

View File

@ -0,0 +1,5 @@
name = "param_service"
from .params_service import ParamsService, Request, Response
__all__ = ["ParamsService", "Request", "Response"]

View File

@ -56,6 +56,7 @@ class ParamsService(QObject):
# 初始化状态
self._is_running = True
self._connected = False
self._current_request = None # 当前正在处理的请求
# 请求队列和响应映射
self.request_queue = queue.Queue()
@ -139,18 +140,25 @@ class ParamsService(QObject):
@Slot()
def _process_next_request(self):
"""处理队列中的下一个请求"""
if not self._connected or self.request_queue.empty():
# 如果未连接、队列为空,或者当前有正在处理的请求,则返回
if not self._connected or self.request_queue.empty() or self.pending_requests:
return
try:
request = self.request_queue.get_nowait()
# 获取但不移除请求
request = self.request_queue.get()
self._current_request = request
self._send_request(request)
self.request_queue.task_done()
time.sleep(0.1)
except queue.Empty:
pass
except Exception as e:
print(f"Error processing request: {e}")
self.signal_error.emit(f"Request processing error: {str(e)}")
# 发生错误时,确保清理当前请求
if self._current_request:
self.request_queue.task_done()
self._current_request = None
def _send_request(self, request: Request):
"""发送请求到服务器"""
@ -191,17 +199,17 @@ class ParamsService(QObject):
request = self.pending_requests.pop(token)
res_data = ''
if request.callback:
if response.get('cmd') == "get_params":
res_data = response["data"]
if response.get('cmd') == 'set_params':
res_data = ''
res_data = response["data"]
res = Response(token, CMD.GET_PARAMS, request.widget, res_data)
request.callback(res)
self.signal_request_complete.emit(response)
# 完成当前请求的处理
if self._current_request and self._current_request.token == token:
self.request_queue.task_done()
self._current_request = None
except Exception as e:
print(f"Error handling response: {e}")

View File

@ -35,7 +35,7 @@ class MyWindow(QWidget):
self.setWindowTitle("PySide6 Example")
self.setFixedSize(300, 800)
self.test_widget1 = MyWidget(self, "tuning_parameters.mix_parameters[0].ch_n")
self.test_widget1 = MyWidget(self, "inputPoi[13]")
self.test_widget2 = MyWidget(self, "tuning_parameters.mix_parameters[1].ch_n")
self.test_widget3 = MyWidget(self, "tuning_parameters.mix_parameters[2].ch_n")
self.test_widget4 = MyWidget(self, "tuning_parameters.mix_parameters[3].ch_n")
@ -65,12 +65,14 @@ class MyWindow(QWidget):
# 设置窗口的布局
self.setLayout(layout)
self.params_service = ParamsService("192.168.5.4", 12345)
self.params_service = ParamsService("127.0.0.1", 12345)
# self.params_service.signal_request_complete.connect(self.on_params_service)
def on_params_service(self, res: Response):
widget = res.widget
widget.set_data(res.data)
print(res)
# print(res.data)
# widget.set_data(res.data)
def on_button_clicked(self):
# self.params_service.get_params(
@ -98,49 +100,13 @@ class MyWindow(QWidget):
# param_names=[self.test_widget5.param_name],
# callback=self.on_params_service
# )
self.params_service.set_params(
widget=self.test_widget5,
params={self.test_widget5.param_name: 10,
self.test_widget4.param_name: 20}
# callback=self.on_params_service
)
TEST_PARAM_NAME = "OrderOne_Flag"
self.params_service.get_params(
widget=self.test_widget5,
param_names=[self.test_widget5.param_name],
widget=None,
params=[TEST_PARAM_NAME],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget5,
param_names=[self.test_widget5.param_name],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget5,
param_names=[self.test_widget5.param_name],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget5,
param_names=[self.test_widget5.param_name],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget5,
param_names=[self.test_widget5.param_name],
callback=self.on_params_service
)
self.params_service.get_params(
widget=self.test_widget5,
param_names=[self.test_widget5.param_name],
callback=self.on_params_service
)
# def update_label(self):
# # 获取输入框的文本并更新标签内容
# input_text = self.input_line.text()
# self.label.setText(f"你输入的是:{input_text}")
if __name__ == "__main__":
app = QApplication(sys.argv)

View File

@ -1,13 +0,0 @@
import random
import string
# # 生成一个随机整数作为识别码例如1000 到 9999
# random_id = random.randint(1000, 9999)
# 或者生成一个随机的 8 位字母和数字组成的字符串
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=12))
# 插入命令中
command = f"random str: {random_str}"
print(command)

14
setup.py Normal file
View File

@ -0,0 +1,14 @@
from setuptools import setup
setup(
name='param_service',
version='0.2',
description='Write/Read param from server',
author='CuiJingwei',
author_email='cuijingwei@brisonus.com',
packages=['param_service'],
install_requires=[
'numpy',
'PySide6',
],
)

View File

@ -1,276 +0,0 @@
import json
import random
import string
import time
from dataclasses import dataclass
from queue import Queue
from threading import Thread, Lock, Event
from typing import Any, Optional, Dict, Callable
from datetime import datetime, timedelta
from PySide6.QtCore import QObject, Signal, Slot
from PySide6.QtNetwork import QTcpSocket
@dataclass
class Request:
token: str
widget: QObject
data: Any
callback: Optional[Callable] = None
created_at: datetime = None
timeout: int = 30 # 超时时间(秒)
def __post_init__(self):
self.created_at = datetime.now()
@property
def is_expired(self) -> bool:
return datetime.now() > self.created_at + timedelta(seconds=self.timeout)
class ParamsService(QObject):
signal_request_complete = Signal(object) # 请求完成信号
signal_connection_status = Signal(bool) # 连接状态信号
signal_error = Signal(str) # 错误信号
def __init__(self, host: str, port: int):
super().__init__()
self.host = host
self.port = port
# 初始化状态和同步对象
self._is_running = True
self._connected = False
self._reconnect_interval = 5 # 重连间隔(秒)
self._lock = Lock()
self._connection_event = Event()
# 请求队列和响应映射
self.request_queue = Queue()
self.pending_requests: Dict[str, Request] = {}
# 初始化socket
self.socket = QTcpSocket(self)
self._setup_socket_connections()
# 启动工作线程
self._start_worker_threads()
# 首次连接
self.connect_to_server()
def _setup_socket_connections(self):
"""设置socket信号连接"""
self.socket.connected.connect(self.on_connected)
self.socket.disconnected.connect(self.on_disconnected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.errorOccurred.connect(self.on_socket_error)
def _start_worker_threads(self):
"""启动工作线程"""
# 请求处理线程
self.request_thread = Thread(target=self._process_request_queue, daemon=True)
self.request_thread.start()
# 超时检查线程
self.timeout_thread = Thread(target=self._check_timeouts, daemon=True)
self.timeout_thread.start()
# 重连线程
self.reconnect_thread = Thread(target=self._reconnection_loop, daemon=True)
self.reconnect_thread.start()
def connect_to_server(self):
"""连接到服务器"""
if not self._connected:
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
"""连接成功处理"""
print(f"Connected to {self.host}:{self.port}")
with self._lock:
self._connected = True
self._connection_event.set()
self.signal_connection_status.emit(True)
@Slot()
def on_disconnected(self):
"""断开连接处理"""
print("Disconnected from server")
with self._lock:
self._connected = False
self._connection_event.clear()
self.signal_connection_status.emit(False)
@Slot()
def on_socket_error(self):
"""Socket错误处理"""
error = self.socket.errorString()
print(f"Socket error: {error}")
self.signal_error.emit(f"Socket error: {error}")
@Slot()
def on_ready_read(self):
"""数据接收处理"""
try:
data = self.socket.readAll()
response = json.loads(data.data().decode())
self._handle_response(response)
except Exception as e:
print(f"Error processing response: {e}")
self.signal_error.emit(f"Response processing error: {str(e)}")
def _handle_response(self, response: dict):
"""处理服务器响应"""
try:
token = response.get("token")
if token in self.pending_requests:
request = self.pending_requests.pop(token)
# 执行回调
if request.callback:
request.callback(response)
# 发送完成信号
self.signal_request_complete.emit(response)
except Exception as e:
print(f"Error handling response: {e}")
self.signal_error.emit(f"Response handling error: {str(e)}")
def _process_request_queue(self):
"""处理请求队列的后台线程"""
while self._is_running:
try:
if not self._connected:
self._connection_event.wait(timeout=1.0)
continue
request = self.request_queue.get(timeout=1.0)
self._send_request(request)
self.request_queue.task_done()
except Queue.Empty:
continue
except Exception as e:
print(f"Error processing request: {e}")
self.signal_error.emit(f"Request processing error: {str(e)}")
def _send_request(self, request: Request):
"""发送请求到服务器"""
try:
# 添加到待处理请求
self.pending_requests[request.token] = request
# 构建请求数据
request_data = {
"cmd": "get_params", # 或其他命令
"token": request.token,
"data": request.data
}
# 发送数据
json_data = json.dumps(request_data)
self.socket.write(json_data.encode('utf-8'))
self.socket.flush()
except Exception as e:
print(f"Error sending request: {e}")
self.signal_error.emit(f"Request sending error: {str(e)}")
self.pending_requests.pop(request.token, None)
def _check_timeouts(self):
"""检查请求超时的后台线程"""
while self._is_running:
try:
current_time = datetime.now()
expired_tokens = []
# 检查超时的请求
for token, request in self.pending_requests.items():
if request.is_expired:
expired_tokens.append(token)
# 处理超时的请求
for token in expired_tokens:
request = self.pending_requests.pop(token)
self.signal_error.emit(f"Request timeout: {token}")
# 执行超时回调
if request.callback:
request.callback({"error": "timeout", "token": token})
time.sleep(1) # 每秒检查一次
except Exception as e:
print(f"Error checking timeouts: {e}")
def _reconnection_loop(self):
"""重连循环的后台线程"""
while self._is_running:
if not self._connected:
print(f"Attempting to reconnect to {self.host}:{self.port}")
self.connect_to_server()
time.sleep(self._reconnect_interval)
@staticmethod
def generate_token() -> str:
"""生成唯一的请求token"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=12))
def get_params(self, widget: QObject, param_names: list, callback: Callable = None):
"""获取参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
widget=widget,
data={"param_names": param_names},
callback=callback
)
self.request_queue.put(request)
return token
def set_params(self, widget: QObject, params: dict, callback: Callable = None):
"""设置参数(外部接口)"""
token = self.generate_token()
request = Request(
token=token,
widget=widget,
data={"params": params},
callback=callback
)
self.request_queue.put(request)
return token
def cleanup(self):
"""清理资源"""
self._is_running = False
self.socket.disconnectFromHost()
if self.socket.state() == QTcpSocket.ConnectedState:
self.socket.waitForDisconnected(1000)
if __name__== "__main__":
# # 创建服务实例
# params_service = ParamsService("localhost", 1234)
#
#
# # 回调函数
# def on_response(response):
# print(f"Received response: {response}")
#
#
# # 获取参数
# params_service.get_params(
# widget=some_widget,
# param_names=["param1", "param2"],
# callback=on_response
# )
#
# # 设置参数
# params_service.set_params(
# widget=some_widget,
# params={"param1": "value1"},
# callback=on_response
# )
#
# # 程序退出时清理
# params_service.cleanup()
pass

View File

@ -1,44 +0,0 @@
import sys
from PySide6.QtCore import QCoreApplication, QByteArray
from PySide6.QtNetwork import QTcpSocket
from PySide6.QtCore import Signal, Slot
class SocketClient(QCoreApplication):
def __init__(self, host, port):
super().__init__(sys.argv)
# 创建一个 TCP Socket 对象
self.socket = QTcpSocket(self)
self.host = host
self.port = port
# 连接信号
self.socket.connected.connect(self.on_connected)
self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_disconnected)
# 连接到服务器
self.socket.connectToHost(self.host, self.port)
@Slot()
def on_connected(self):
print(f"Connected to {self.host}:{self.port}")
self.socket.write(b"Hello, Server!") # 向服务器发送数据
print("Message sent to server.")
@Slot()
def on_ready_read(self):
data = self.socket.readAll() # 读取服务器发送的数据
print(f"Received from server: {data.data().decode()}")
@Slot()
def on_disconnected(self):
print("Disconnected from server.")
self.socket.close() # 关闭连接
if __name__ == "__main__":
app = SocketClient("127.0.0.1", 1234)
sys.exit(app.exec())

View File

@ -1,75 +0,0 @@
import json
import sys
from PySide6.QtCore import QCoreApplication, QByteArray, Slot
from PySide6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress
from PySide6.QtCore import Signal
from param_manager import ParamManager
class TcpServer(QCoreApplication):
def __init__(self, host, port):
super().__init__(sys.argv)
# 创建一个 TCP 服务器对象
self.server = QTcpServer(self)
self.host = host
self.port = port
# 连接信号,新的客户端连接时触发
self.server.newConnection.connect(self.on_new_connection)
self.param_manager = ParamManager()
# 绑定并开始监听指定的地址和端口
if not self.server.listen(QHostAddress(self.host), self.port):
print(f"Server could not start on {self.host}:{self.port}")
sys.exit(1)
print(f"Server started on {self.host}:{self.port}")
@Slot()
def on_new_connection(self):
# 获取客户端连接的 socket 对象
client_socket = self.server.nextPendingConnection()
# 连接信号
client_socket.readyRead.connect(lambda: self.on_ready_read(client_socket))
client_socket.disconnected.connect(lambda: self.on_disconnected(client_socket))
print(f"New connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()}")
# 发送欢迎消息给客户端
# client_socket.write(b"Hello from server!")
client_socket.flush() # 确保数据已发送
# print("Welcome message sent to client.")
@Slot()
def on_ready_read(self, client_socket: QTcpSocket):
# 读取客户端发送的数据
data = client_socket.readAll()
print(f"Received from client: {data.data().decode()}")
self.request_process(data.data().decode())
# # 发送响应给客户端
# response = "Server has received your message."
# client_socket.write(response.encode())
# client_socket.flush()
# print(f"Sent to client: {response}")
@Slot()
def on_disconnected(self, client_socket: QTcpSocket):
print(f"Connection from {client_socket.peerAddress().toString()}:{client_socket.peerPort()} closed.")
client_socket.deleteLater() # 清理套接字资源
def request_process(self, data_string):
# try:
# data_obj = json.loads(data_string)
print(data_obj["payload"])
# except Exception as e:
# print(e)
if __name__ == "__main__":
app = TcpServer("127.0.0.1", 1234)
sys.exit(app.exec())

View File

@ -1,35 +0,0 @@
import sys
from params_service import ParamsService
from PySide6.QtWidgets import QWidget, QApplication
app = QApplication()
params_service = ParamsService("192.168.5.4", 12345)
def print_res(res_data):
print(res_data)
# params_service.get_params(
# widget=QWidget(),
# param_names=['tuning_parameters.mix_parameters[1].ch_n', 'tuning_parameters.mix_parameters[1].mix_left_data', 'tuning_parameters.mix_parameters[1].mix_right_data', 'tuning_parameters.delay_parameters[1].ch_n', 'tuning_parameters.delay_parameters[1].delay_data', 'tuning_parameters.volume_parameters[1].ch_n', 'tuning_parameters.volume_parameters[1].vol_data', 'tuning_parameters.eq_parameters[20].fc', 'tuning_parameters.eq_parameters[20].q', 'tuning_parameters.eq_parameters[20].gain', 'tuning_parameters.eq_parameters[20].slope', 'tuning_parameters.eq_parameters[20].filterType', 'tuning_parameters.eq_parameters[21].fc', 'tuning_parameters.eq_parameters[21].q', 'tuning_parameters.eq_parameters[21].gain', 'tuning_parameters.eq_parameters[21].slope', 'tuning_parameters.eq_parameters[21].filterType', 'tuning_parameters.eq_parameters[22].fc', 'tuning_parameters.eq_parameters[22].q', 'tuning_parameters.eq_parameters[22].gain', 'tuning_parameters.eq_parameters[22].slope', 'tuning_parameters.eq_parameters[22].filterType', 'tuning_parameters.eq_parameters[23].fc', 'tuning_parameters.eq_parameters[23].q', 'tuning_parameters.eq_parameters[23].gain', 'tuning_parameters.eq_parameters[23].slope', 'tuning_parameters.eq_parameters[23].filterType', 'tuning_parameters.eq_parameters[24].fc', 'tuning_parameters.eq_parameters[24].q', 'tuning_parameters.eq_parameters[24].gain', 'tuning_parameters.eq_parameters[24].slope', 'tuning_parameters.eq_parameters[24].filterType', 'tuning_parameters.eq_parameters[25].fc', 'tuning_parameters.eq_parameters[25].q', 'tuning_parameters.eq_parameters[25].gain', 'tuning_parameters.eq_parameters[25].slope', 'tuning_parameters.eq_parameters[25].filterType', 'tuning_parameters.eq_parameters[26].fc', 'tuning_parameters.eq_parameters[26].q', 'tuning_parameters.eq_parameters[26].gain', 'tuning_parameters.eq_parameters[26].slope', 'tuning_parameters.eq_parameters[26].filterType', 'tuning_parameters.eq_parameters[27].fc', 'tuning_parameters.eq_parameters[27].q', 'tuning_parameters.eq_parameters[27].gain', 'tuning_parameters.eq_parameters[27].slope', 'tuning_parameters.eq_parameters[27].filterType', 'tuning_parameters.eq_parameters[28].fc', 'tuning_parameters.eq_parameters[28].q', 'tuning_parameters.eq_parameters[28].gain', 'tuning_parameters.eq_parameters[28].slope', 'tuning_parameters.eq_parameters[28].filterType', 'tuning_parameters.eq_parameters[29].fc', 'tuning_parameters.eq_parameters[29].q', 'tuning_parameters.eq_parameters[29].gain', 'tuning_parameters.eq_parameters[29].slope', 'tuning_parameters.eq_parameters[29].filterType', 'tuning_parameters.eq_parameters[30].fc', 'tuning_parameters.eq_parameters[30].q', 'tuning_parameters.eq_parameters[30].gain', 'tuning_parameters.eq_parameters[30].slope', 'tuning_parameters.eq_parameters[30].filterType', 'tuning_parameters.eq_parameters[31].fc', 'tuning_parameters.eq_parameters[31].q', 'tuning_parameters.eq_parameters[31].gain', 'tuning_parameters.eq_parameters[31].slope', 'tuning_parameters.eq_parameters[31].filterType', 'tuning_parameters.eq_parameters[32].fc', 'tuning_parameters.eq_parameters[32].q', 'tuning_parameters.eq_parameters[32].gain', 'tuning_parameters.eq_parameters[32].slope', 'tuning_parameters.eq_parameters[32].filterType', 'tuning_parameters.eq_parameters[33].fc', 'tuning_parameters.eq_parameters[33].q', 'tuning_parameters.eq_parameters[33].gain', 'tuning_parameters.eq_parameters[33].slope', 'tuning_parameters.eq_parameters[33].filterType', 'tuning_parameters.eq_parameters[34].fc', 'tuning_parameters.eq_parameters[34].q', 'tuning_parameters.eq_parameters[34].gain', 'tuning_parameters.eq_parameters[34].slope', 'tuning_parameters.eq_parameters[34].filterType', 'tuning_parameters.eq_parameters[35].fc', 'tuning_parameters.eq_parameters[35].q', 'tuning_parameters.eq_parameters[35].gain', 'tuning_parameters.eq_parameters[35].slope', 'tuning_parameters.eq_parameters[35].filterType', 'tuning_parameters.eq_parameters[36].fc', 'tuning_parameters.eq_parameters[36].q', 'tuning_parameters.eq_parameters[36].gain', 'tuning_parameters.eq_parameters[36].slope', 'tuning_parameters.eq_parameters[36].filterType', 'tuning_parameters.eq_parameters[37].fc', 'tuning_parameters.eq_parameters[37].q', 'tuning_parameters.eq_parameters[37].gain', 'tuning_parameters.eq_parameters[37].slope', 'tuning_parameters.eq_parameters[37].filterType', 'tuning_parameters.eq_parameters[38].fc', 'tuning_parameters.eq_parameters[38].q', 'tuning_parameters.eq_parameters[38].gain', 'tuning_parameters.eq_parameters[38].slope', 'tuning_parameters.eq_parameters[38].filterType', 'tuning_parameters.eq_parameters[39].fc', 'tuning_parameters.eq_parameters[39].q', 'tuning_parameters.eq_parameters[39].gain', 'tuning_parameters.eq_parameters[39].slope', 'tuning_parameters.eq_parameters[39].filterType'],
# callback=print_res
# )
# params_service.get_params(
# widget=QWidget(),
# param_names=['tuning_parameters.mix_parameters[1].ch_n'],
# callback=print_res
# )
params_service.get_params(
widget=QWidget(),
params=["tuning_parameters.eq_parameters[1].fc"]
)
# params_service.set_params(
# widget=QWidget(),
# params={"tuning_parameters.eq_parameters[1].fc": 10}
# )
sys.exit(app.exec())

View File

@ -1,30 +0,0 @@
import ctypes
# 定义内部结构体
class InnerStructLevel2(ctypes.Structure):
_pack_ = 1
_fields_ = [
("inner_field1", ctypes.c_char),
("inner_field2", ctypes.c_double),
("inner_field3", ctypes.c_double),
]
# 定义内部结构体
class InnerStructLevel1(ctypes.Structure):
_pack_ = 1
_fields_ = [
("inner_field1", ctypes.c_int),
("inner_field2", ctypes.c_double),
("inner_level2_field3", InnerStructLevel2)
]
# 定义外部结构体
class OuterStruct(ctypes.Structure):
_pack_ = 1
_fields_ = [
("field1", ctypes.c_int),
("nested", InnerStructLevel1), # 嵌套结构体
("field2", ctypes.c_char * 10),
]

View File

@ -1 +0,0 @@
print(isinstance(int, type))