import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from component.widget_main.widget_main import Widget_Main from component.widget_channel.widget_channel import Widget_Channel from component.widget_card.widget_card import Widget_Card from component.widget_card.widget_card import CardData from component.widget_filter.audio_filter_componet import AudioFilterWidget from component.widget_filter.audio_filter_model import AudioFilterModel from component.widget_filter.audio_filter_controller import AudioFilterController from component.widget_card.widget_card import ParamData from component.widget_log.widget_log import Widget_Log from param_struct_test.service_manager import ServiceManager from application.application_controller import ApplicationController from param_struct_test.params_service import Response from datetime import date from PySide6.QtWidgets import QMainWindow, QPushButton, QVBoxLayout from PySide6.QtWidgets import QWidget from PySide6.QtCore import QObject, QTimer import yaml # 添加 yaml 导入 import logging from component.widget_log.log_handler import setup_logger from component.widget_log.log_handler import logger class MainWindow(QWidget): def __init__(self): super().__init__() # 初始化日志系统 self.log_handler = setup_logger() # 加载配置文件 self.config = self.load_config() # 从配置文件获取服务器设置 server_host = self.config['server']['host'] server_port = self.config['server']['port'] # 初始化服务 ServiceManager.instance().init_services(server_host, server_port) # 初始化应用控制器 self.app_controller = ApplicationController.instance() self.widget_main = Widget_Main() self.widget_channel = Widget_Channel() self.widget_card = Widget_Card() self.widget_log = Widget_Log() # 设置日志处理器的目标窗口 self.log_handler.set_widget(self.widget_log) # 设置通道显示数量 channel_count = self.config['channels']['count'] logger.info(f"通道数量: {channel_count}") self.widget_channel.set_visible_channels(channel_count) self.widget_main.ui.ListWidget_vLayout.addWidget(self.widget_card) self.widget_main.ui.Channel_hLayout.addWidget(self.widget_channel) self.widget_main.ui.verticalLayout_Log.addWidget(self.widget_log) self.widget_filter_list = [] self.filter_controllers = [] # 存储控制器实例 # 添加测试按钮 self.test_button = QPushButton("Get_All") self.test_button.clicked.connect(self.Get_All) self.widget_main.ui.ListWidget_vLayout.addWidget(self.test_button) self.create_filter_widget() self.setup_connections() def load_config(self): """加载通道配置文件""" try: with open('config/channel_config.yaml', 'r', encoding='utf-8') as f: config = yaml.safe_load(f) logging.info("配置文件加载成功") return config except Exception as e: logging.error(f"加载配置文件失败: {e}") return {"channels": {"count": 24}} def create_filter_widget(self): # 从配置文件获取通道数量 channel_count = self.config['channels']['count'] for i in range(channel_count): # 创建widget filter_widget = AudioFilterWidget() # filter_widget.set_channel_id(i) # filter_widget.set_channel_name(f"Channel {i+1}") # 创建model和controller model = AudioFilterModel(channel_id = i, channel_name=f"Channel {i+1}") controller = AudioFilterController(model) controller.set_widget(filter_widget) # 连接控制器信号 controller.error_occurred.connect(lambda msg: print(f"Error: {msg}")) controller.state_changed.connect(lambda state: print(f"State changed: {state}")) controller.params_synced.connect(lambda: print("Params synced")) # 存储实例 self.widget_filter_list.append(filter_widget) self.filter_controllers.append(controller) def setup_connections(self): print("setup_connections") self.widget_channel.channel_btn_clicked.connect(self.on_channel_btn_clicked) def on_channel_btn_clicked(self, channel_id: int): print(f"channel_id: {channel_id}") self.widget_filter_list[channel_id].show() def test_communication(self): """测试通信功能""" # 测试第一个控制器的通信 if self.filter_controllers: controller = self.filter_controllers[0] # 测试从服务器加载数据 logger.info("load from server...") controller.load_from_server() # 测试同步数据到服务器 logger.info("sync to server...") controller.sync_to_server() def Get_All(self): """一次性获取所有通道的所有参数""" try: # 收集所有通道的所有参数 all_params = [] channel_count = len(self.filter_controllers) # 收集所有通道的基础参数和滤波器参数 for channel_id in range(channel_count): # 基础参数 all_params.extend([ f'tuning_parameters.mix_parameters[{channel_id}].ch_n', f'tuning_parameters.mix_parameters[{channel_id}].mix_left_data', f'tuning_parameters.mix_parameters[{channel_id}].mix_right_data', f'tuning_parameters.delay_parameters[{channel_id}].ch_n', f'tuning_parameters.delay_parameters[{channel_id}].delay_data', f'tuning_parameters.volume_parameters[{channel_id}].ch_n', f'tuning_parameters.volume_parameters[{channel_id}].vol_data' ]) # 滤波器参数 base_idx = channel_id * 20 for i in range(20): # 每个通道20个滤波器 idx = base_idx + i all_params.extend([ f'tuning_parameters.eq_parameters[{idx}].fc', f'tuning_parameters.eq_parameters[{idx}].q', f'tuning_parameters.eq_parameters[{idx}].gain', f'tuning_parameters.eq_parameters[{idx}].slope', f'tuning_parameters.eq_parameters[{idx}].filterType' ]) def handle_all_response(response): """处理所有参数的响应""" try: if not hasattr(response, 'data'): raise ValueError("Invalid response format") # 将响应数据分发给各个控制器 for controller in self.filter_controllers: channel_id = controller.model.channel_id print(f"response.data: {response.data}") print(f"\n ############################################################################################## \n") channel_data = self._extract_channel_data(response.data, channel_id) print(f"channel_data: {channel_data}") print(f"\n ############################################################################################## \n") controller._on_params_updated_new(Response( token="combined", cmd="get_params", widget = self.widget_main, data=channel_data )) print("Successfully loaded all filter data") except Exception as e: print(f"Error handling response: {e}") # 一次性请求所有参数 ServiceManager.instance().params_service.get_params( self.widget_main, all_params, handle_all_response ) except Exception as e: print(f"Error in Get_All: {e}") def _extract_channel_data(self, response_data: dict, channel_id: int) -> dict: """从完整响应中提取指定通道的数据""" channel_data = {} base_eq_index = channel_id * 20 # 每个通道20个滤波器的起始索引 for key, value in response_data.items(): # 处理基础参数 (mix, delay, volume) # 使用更精确的匹配模式,确保只匹配完整的通道索引 if any(param_type in key for param_type in [ f'mix_parameters[{channel_id}]', f'delay_parameters[{channel_id}]', f'volume_parameters[{channel_id}]' ]): channel_data[key] = value # 处理滤波器参数 if 'tuning_parameters.eq_parameters[' in key: try: eq_index = int(key.split('[')[1].split(']')[0]) if base_eq_index <= eq_index < base_eq_index + 20: channel_data[key] = value except (IndexError, ValueError): continue return channel_data if __name__ == '__main__': import sys from PySide6.QtWidgets import QApplication app = QApplication(sys.argv) main_window = MainWindow() logger.info("软件启动") test_data = { # Channel 0 parameters # Mix parameters 'tuning_parameters.mix_parameters[0].ch_n': 1, 'tuning_parameters.mix_parameters[0].mix_left_data': 0.1, 'tuning_parameters.mix_parameters[0].mix_right_data': 0.7, # Delay parameters 'tuning_parameters.delay_parameters[0].ch_n': 1, 'tuning_parameters.delay_parameters[0].delay_data': 0.1, # Volume parameters 'tuning_parameters.volume_parameters[0].ch_n': 1, 'tuning_parameters.volume_parameters[0].vol_data': 0.7, # EQ parameters (first 6 active filters) 'tuning_parameters.eq_parameters[0].fc': 3.0, 'tuning_parameters.eq_parameters[0].q': 0.3, 'tuning_parameters.eq_parameters[0].gain': 3.0, 'tuning_parameters.eq_parameters[0].slope': 3, 'tuning_parameters.eq_parameters[0].filterType': 2, 'tuning_parameters.eq_parameters[1].fc': 21.0, 'tuning_parameters.eq_parameters[1].q': 0.2, 'tuning_parameters.eq_parameters[1].gain': 3.0, 'tuning_parameters.eq_parameters[1].slope': 6, 'tuning_parameters.eq_parameters[1].filterType': 1, 'tuning_parameters.eq_parameters[2].fc': 22.0, 'tuning_parameters.eq_parameters[2].q': 0.2, 'tuning_parameters.eq_parameters[2].gain': 3.0, 'tuning_parameters.eq_parameters[2].slope': 12, 'tuning_parameters.eq_parameters[2].filterType': 2, 'tuning_parameters.eq_parameters[3].fc': 23.0, 'tuning_parameters.eq_parameters[3].q': 0.3, 'tuning_parameters.eq_parameters[3].gain': 4.0, 'tuning_parameters.eq_parameters[3].slope': 18, 'tuning_parameters.eq_parameters[3].filterType': 3, 'tuning_parameters.eq_parameters[4].fc': 23.0, 'tuning_parameters.eq_parameters[4].q': 0.4, 'tuning_parameters.eq_parameters[4].gain': 5.0, 'tuning_parameters.eq_parameters[4].slope': 24, 'tuning_parameters.eq_parameters[4].filterType': 4, 'tuning_parameters.eq_parameters[5].fc': 25.0, 'tuning_parameters.eq_parameters[5].q': 0.6, 'tuning_parameters.eq_parameters[5].gain': 7.0, 'tuning_parameters.eq_parameters[5].slope': 30, 'tuning_parameters.eq_parameters[5].filterType': 5, } # Initialize remaining channels and filters with default values (0) for channel in range(1, 6): # Channels 1-5 # Mix parameters test_data.update({ f'tuning_parameters.mix_parameters[{channel}].ch_n': 0, f'tuning_parameters.mix_parameters[{channel}].mix_left_data': 0.0, f'tuning_parameters.mix_parameters[{channel}].mix_right_data': 0.0, # Delay parameters f'tuning_parameters.delay_parameters[{channel}].ch_n': 0, f'tuning_parameters.delay_parameters[{channel}].delay_data': 0.0, # Volume parameters f'tuning_parameters.volume_parameters[{channel}].ch_n': 0, f'tuning_parameters.volume_parameters[{channel}].vol_data': 0.0, }) # EQ parameters (20 filters per channel) for eq in range(channel * 20, (channel + 1) * 20): test_data.update({ f'tuning_parameters.eq_parameters[{eq}].fc': 0.0, f'tuning_parameters.eq_parameters[{eq}].q': 0.0, f'tuning_parameters.eq_parameters[{eq}].gain': 0.0, f'tuning_parameters.eq_parameters[{eq}].slope': 0, f'tuning_parameters.eq_parameters[{eq}].filterType': 0, }) channel_0_data = main_window._extract_channel_data(test_data, 1) print("Extracted data for channel 0:") print(channel_0_data) # main_window.widget_main.show() # sys.exit(app.exec())