75 lines
2.8 KiB
Python
75 lines
2.8 KiB
Python
from PySide6.QtCore import QObject, Signal, Slot
|
|
from typing import Dict, Any, Optional
|
|
from dataclasses import dataclass
|
|
from param_struct_test.params_service import ParamsService
|
|
from param_struct_test.service_manager import ServiceManager
|
|
from param_struct_test.message_proxy import SignalProxy
|
|
|
|
@dataclass
|
|
class ControllerSignalData:
|
|
controller_id: str
|
|
widget: QObject
|
|
data: Dict[str, Any]
|
|
|
|
class ApplicationController(QObject):
|
|
# 单例实例
|
|
_instance = None
|
|
|
|
# 定义转发信号
|
|
signal_params_updated = Signal(ControllerSignalData)
|
|
|
|
@classmethod
|
|
def instance(cls) -> 'ApplicationController':
|
|
"""获取ApplicationController的单例实例"""
|
|
if cls._instance is None:
|
|
cls._instance = cls()
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if ApplicationController._instance is not None:
|
|
raise RuntimeError("ApplicationController is a singleton!")
|
|
super().__init__()
|
|
ApplicationController._instance = self
|
|
self._controllers: Dict[str, QObject] = {}
|
|
self._setup_service_connections()
|
|
|
|
def _setup_service_connections(self):
|
|
"""设置服务连接"""
|
|
service = ServiceManager.instance().params_service
|
|
# 连接服务信号到应用控制器的槽
|
|
service.signal_request_complete.connect(self._on_service_request_complete)
|
|
|
|
def register_controller(self, controller_id: str, controller: QObject):
|
|
"""注册控制器"""
|
|
self._controllers[controller_id] = controller
|
|
|
|
def unregister_controller(self, controller_id: str):
|
|
"""注销控制器"""
|
|
if controller_id in self._controllers:
|
|
del self._controllers[controller_id]
|
|
|
|
@Slot(SignalProxy)
|
|
def _on_service_request_complete(self, signal_proxy: SignalProxy):
|
|
"""服务请求完成的槽函数"""
|
|
# 找到对应的controller_id
|
|
print("CCCCCC:signal_proxy.data:", signal_proxy.data)
|
|
controller_id = self._get_controller_id_for_widget(signal_proxy.widget)
|
|
|
|
# 测试用 返回值类似这种格式 用来标机控件
|
|
controller_id = "audio_filter"
|
|
if controller_id:
|
|
# 转发信号
|
|
self.signal_params_updated.emit(ControllerSignalData(
|
|
controller_id=controller_id,
|
|
widget=signal_proxy.widget,
|
|
data=signal_proxy.data
|
|
))
|
|
|
|
def _get_controller_id_for_widget(self, widget: QObject) -> Optional[str]:
|
|
"""根据widget查找对应的controller_id"""
|
|
# 这里需要根据您的具体实现来确定如何找到对应的controller_id
|
|
# 示例实现
|
|
for controller_id, controller in self._controllers.items():
|
|
if hasattr(controller, 'widget') and controller.widget == widget:
|
|
return controller_id
|
|
return None |