import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from component.widget_main.widget_main import Widget_Main from component.widget_channel.widget_channel import Widget_Channel from component.widget_card.widget_card import Widget_Card from component.widget_card.widget_card import CardData from component.widget_filter.audio_filter_componet import AudioFilterWidget from component.widget_filter.audio_filter_model import AudioFilterModel from component.widget_filter.audio_filter_controller import AudioFilterController from component.widget_card.widget_card import ParamData from component.widget_log.widget_log import Widget_Log from persistence.data_store_manager import DataStoreManager from persistence.data_store_origin import DataStore from param_struct_test.service_manager import ServiceManager from application.application_controller import ApplicationController from param_struct_test.params_service import Response from datetime import date from PySide6.QtWidgets import QMainWindow, QPushButton, QVBoxLayout from PySide6.QtWidgets import QWidget from PySide6.QtCore import QObject, QTimer import yaml # 添加 yaml 导入 import logging from component.widget_log.log_handler import setup_logger from component.widget_log.log_handler import logger class MainWindow(QWidget): def __init__(self): super().__init__() # 初始化日志系统 self.log_handler = setup_logger() # 加载配置文件 self.config = self.load_config() # 从配置文件获取服务器设置 server_host = self.config['server']['host'] server_port = self.config['server']['port'] # 初始化服务 ServiceManager.instance().init_services(server_host, server_port) self.data_manager = DataStoreManager().get_instance() # 初始化应用控制器 self.app_controller = ApplicationController.instance() self.widget_main = Widget_Main() self.widget_channel = Widget_Channel() self.widget_card = Widget_Card() self.widget_log = Widget_Log() # 设置日志处理器的目标窗口 self.log_handler.set_widget(self.widget_log) # 设置通道显示数量 channel_count = self.config['channels']['count'] logger.info(f"通道数量: {channel_count}") self.widget_channel.set_visible_channels(channel_count) self.widget_main.ui.ListWidget_vLayout.addWidget(self.widget_card) self.widget_main.ui.Channel_hLayout.addWidget(self.widget_channel) self.widget_main.ui.verticalLayout_Log.addWidget(self.widget_log) self.widget_filter_list = [] self.filter_controllers = [] # 存储控制器实例 # 添加测试按钮 self.test_button = QPushButton("Get_All") self.widget_main.ui.pushButton_2.clicked.connect(self.Get_All) # self.test_button.clicked.connect(self.Get_All) self.widget_main.ui.ListWidget_vLayout.addWidget(self.test_button) self.test_button.setVisible(False) self.create_filter_widget() self.setup_connections() self.load_default_project() self.update_channel_from_card() self.set_default_filter_widget(0) def check_param_name(param_data: dict, selected_name: str) -> bool: """ 检查param_data中是否包含指定的name值 Args: param_data: 包含参数数据的字典 selected_name: 要匹配的名称 Returns: bool: 如果找到匹配的name则返回True,否则返回False """ for param_info in param_data.values(): if param_info.get('name') == selected_name: return True return False def update_channel_from_card(self): """根据选中的卡片更新通道数据""" try: # 1. 获取当前选中的卡片数据 selected_param_name, selected_project_name = self.widget_card.get_selected_param_name() logger.info(f"selected_param_name: {selected_param_name}, selected_project_name: {selected_project_name}") #to do: 后续需要封装接口 self.data_manager._store.current_project = selected_project_name self.data_manager._store.current_paramter_name = selected_param_name if not selected_param_name or not selected_project_name: logger.warning("未选择参数或项目") return # 2. 获取参数数据 param_data = self.data_manager.get_param_data(selected_project_name, selected_param_name) # 新增: 根据参数名称查找对应的参数键并更新current_param # project_names = self.data_manager.get_projects() # project_data = self.data_manager.get_project(project_names[0]) # if project_data and 'params' in project_data: # for param_key, param_info in project_data['params'].items(): # if param_info.get('name') == selected_param_name: # # 找到匹配的参数键,更新current_param # self.data_manager.current_param = param_key # logger.info(f"已更新当前参数键: {param_key}") # break self.data_manager.current_param = param_data if not param_data: logger.error(f"未找到参数数据: {selected_param_name}") return # 3. 处理每个通道的数据 for channel_id, channel_data in param_data.items(): try: channel_id = int(channel_id) if 0 <= channel_id < len(self.filter_controllers): controller = self.filter_controllers[channel_id] controller.update_from_card_data(channel_data) except Exception as e: logger.error(f"处理通道 {channel_id} 时出错: {str(e)}") continue logger.info("成功从卡片数据更新所有通道") except Exception as e: logger.error(f"从卡片更新通道时出错: {str(e)}") def load_default_project(self): """Load the first available project or create a default one if none exists""" try: # 获取项目列表 projects = self.data_manager.get_projects() if not projects: # 如果没有项目,创建默认项目 logger.info("创建默认项目...") self.data_manager.create_project("default", "默认项目配置") projects = ["default"] # 清空当前卡片列表 self.widget_card.list_widget.clear() # 遍历所有项目并加载 for project_name in projects: # 从数据管理器获取项目数据 project_data = self.data_manager.get_project(project_name) if not project_data: logger.warning(f"无法加载项目: {project_name}") continue logger.info(f"加载项目: {project_name}") # 检查项目数据是否包含参数 if 'params' in project_data: # 创建参数对象列表 param_objects = [] for param_id, param_data in project_data['params'].items(): if isinstance(param_data, dict) and 'name' in param_data: param_objects.append(ParamData(name=param_data['name'])) # 创建卡片数据 card_data = CardData( name=project_data.get('name', project_name), date=project_data.get('created_at', '').split('T')[0] if 'created_at' in project_data else '', description=project_data.get('description', ''), params=param_objects ) # 添加卡片到界面 self.widget_card.add_card_item(card_data) logger.info(f"已添加项目卡片 '{project_data.get('name', project_name)}' 包含 {len(param_objects)} 个参数") else: logger.warning(f"项目 '{project_name}' 没有参数数据") except Exception as e: logger.error(f"加载默认项目时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) def load_config(self): """加载通道配置文件""" try: with open('config/channel_config.yaml', 'r', encoding='utf-8') as f: config = yaml.safe_load(f) logging.info("配置文件加载成功") return config except Exception as e: logging.error(f"加载配置文件失败: {e}") return {"channels": {"count": 24}} def create_filter_widget(self): # 从配置文件获取通道数量 channel_count = self.config['channels']['count'] for i in range(channel_count): # 创建widget filter_widget = AudioFilterWidget() # filter_widget.set_channel_id(i) # filter_widget.set_channel_name(f"Channel {i+1}") # 创建model和controller model = AudioFilterModel(channel_id = i, channel_name=f"Channel {i+1}") controller = AudioFilterController(model) controller.set_widget(filter_widget) # 连接控制器信号 controller.error_occurred.connect(lambda msg: print(f"Error: {msg}")) controller.state_changed.connect(lambda state: print(f"State changed: {state}")) controller.params_synced.connect(lambda: print("Params synced")) # 存储实例 self.widget_filter_list.append(filter_widget) self.filter_controllers.append(controller) def setup_connections(self): print("setup_connections") self.widget_channel.channel_btn_clicked.connect(self.on_channel_btn_clicked) self.widget_card.parameterSelected.connect(self.on_widget_card_parameter_selected) # 连接编辑信号 self.widget_card.itemEdited.connect(self.on_widget_card_item_edited) # 连接参数添加信号 self.widget_card.parameterAdded.connect(self.on_widget_card_parameter_added) # data_store_manager.current_param def on_widget_card_parameter_selected(self, param_name: str, project_name: str): logger.info(f"on_widget_card_parameter_selected: {param_name}, {project_name}") try: # 获取参数数据 param_data = self.data_manager.get_param_data(project_name, param_name) if not param_data: logger.warning(f"未找到参数数据: {param_name}") return # 更新所有通道的滤波器数据 for channel_id, channel_data in param_data.items(): try: channel_id = int(channel_id) if 0 <= channel_id < len(self.filter_controllers): controller = self.filter_controllers[channel_id] controller.update_from_card_data(channel_data) except Exception as e: logger.error(f"处理通道 {channel_id} 时出错: {str(e)}") continue logger.info(f"成功从参数 {param_name} 更新所有通道") except Exception as e: logger.error(f"参数选择更新出错: {str(e)}") import traceback logger.error(traceback.format_exc()) def on_widget_card_item_edited(self, edit_type, old_value, new_value, project_name): """处理卡片项目编辑事件""" if edit_type == "parameter_name": try: # 查找项目文件名 projects = self.data_manager.get_projects() project_filename = None for proj_filename in projects: project_data = self.data_manager.get_project(proj_filename) if project_data and project_data.get('name') == project_name: project_filename = proj_filename # 1. 更新JSON中的参数名 if 'params' in project_data: for param_id, param_info in project_data['params'].items(): if param_info.get('name') == old_value: # 更新参数名 param_info['name'] = new_value # 保存更新后的项目数据 self.data_manager.update_project(proj_filename, project_data) logger.info(f"已更新参数名: {old_value} -> {new_value}") # 2. 重命名参数文件 params_dir = os.path.join("data", "projects", "params") for file in os.listdir(params_dir): # 查找与旧参数名匹配的文件 if file.startswith(f"{project_name}_") and file.endswith('.csv'): param_part = file[len(project_name)+1:-4] # 提取参数名部分 if param_part == old_value: old_path = os.path.join(params_dir, file) new_path = os.path.join(params_dir, f"{project_name}_{new_value}.csv") try: os.rename(old_path, new_path) logger.info(f"已重命名参数文件: {old_path} -> {new_path}") except Exception as e: logger.error(f"重命名参数文件失败: {str(e)}") break break except Exception as e: logger.error(f"处理参数编辑时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) def set_default_filter_widget(self, channel_id: int): while self.widget_main.ui.verticalLayout_Filter.count(): item = self.widget_main.ui.verticalLayout_Filter.takeAt(0) if item.widget(): item.widget().hide() filter_widget = self.widget_filter_list[channel_id] self.widget_main.ui.verticalLayout_Filter.addWidget(filter_widget) filter_widget.show() def on_channel_btn_clicked(self, channel_id: int): # Clear existing widgets from the layout while self.widget_main.ui.verticalLayout_Filter.count(): item = self.widget_main.ui.verticalLayout_Filter.takeAt(0) if item.widget(): item.widget().hide() print(f"on_channel_btn_clicked: {channel_id}") # Add the selected channel's filter widget to the layout filter_widget = self.widget_filter_list[channel_id] self.widget_main.ui.verticalLayout_Filter.addWidget(filter_widget) filter_widget.ui.groupBox_4.setTitle(f"Channel {channel_id+1}") filter_widget.show() def test_communication(self): """测试通信功能""" # 测试第一个控制器的通信 if self.filter_controllers: controller = self.filter_controllers[0] # 测试从服务器加载数据 logger.info("load from server...") controller.load_from_server() # 测试同步数据到服务器 logger.info("sync to server...") controller.sync_to_server() def Get_All(self): """一次性获取所有通道的所有参数""" try: # 收集所有通道的所有参数 all_params = [] channel_count = len(self.filter_controllers) # 收集所有通道的基础参数和滤波器参数 for channel_id in range(channel_count): # 基础参数 all_params.extend([ f'tuning_parameters.mix_parameters[{channel_id}].ch_n', f'tuning_parameters.mix_parameters[{channel_id}].mix_left_data', f'tuning_parameters.mix_parameters[{channel_id}].mix_right_data', f'tuning_parameters.delay_parameters[{channel_id}].ch_n', f'tuning_parameters.delay_parameters[{channel_id}].delay_data', f'tuning_parameters.volume_parameters[{channel_id}].ch_n', f'tuning_parameters.volume_parameters[{channel_id}].vol_data' ]) # 滤波器参数 base_idx = channel_id * 20 for i in range(20): # 每个通道20个滤波器 idx = base_idx + i all_params.extend([ f'tuning_parameters.eq_parameters[{idx}].fc', f'tuning_parameters.eq_parameters[{idx}].q', f'tuning_parameters.eq_parameters[{idx}].gain', f'tuning_parameters.eq_parameters[{idx}].slope', f'tuning_parameters.eq_parameters[{idx}].filterType' ]) def handle_all_response(response): """处理所有参数的响应""" try: if not hasattr(response, 'data'): raise ValueError("Invalid response format") # 将响应数据分发给各个控制器 for controller in self.filter_controllers: channel_id = controller.model.channel_id print(f"response.data: {response.data}") print(f"\n ############################################################################################## \n") channel_data = self._extract_channel_data(response.data, channel_id) print(f"channel_data: {channel_data}") print(f"\n ############################################################################################## \n") controller._on_params_updated_new(Response( token="combined", cmd="get_params", widget = self.widget_main, data=channel_data )) print("Successfully loaded all filter data") except Exception as e: print(f"Error handling response: {e}") # 一次性请求所有参数 ServiceManager.instance().params_service.get_params( self.widget_main, all_params, handle_all_response ) except Exception as e: print(f"Error in Get_All: {e}") def _extract_channel_data(self, response_data: dict, channel_id: int) -> dict: """从完整响应中提取指定通道的数据""" channel_data = {} base_eq_index = channel_id * 20 # 每个通道20个滤波器的起始索引 for key, value in response_data.items(): # 处理基础参数 (mix, delay, volume) # 使用更精确的匹配模式,确保只匹配完整的通道索引 if any(param_type in key for param_type in [ f'mix_parameters[{channel_id}]', f'delay_parameters[{channel_id}]', f'volume_parameters[{channel_id}]' ]): channel_data[key] = value # 处理滤波器参数 if 'tuning_parameters.eq_parameters[' in key: try: eq_index = int(key.split('[')[1].split(']')[0]) if base_eq_index <= eq_index < base_eq_index + 20: channel_data[key] = value except (IndexError, ValueError): continue return channel_data def on_widget_card_parameter_added(self, param_name: str, project_name: str): """处理添加新参数的事件""" logger.info(f"添加新参数: {param_name}, 项目: {project_name}") try: # 获取项目列表 projects = self.data_manager.get_projects() project_found = False project_filename = None # 查找对应的项目文件名 for proj_filename in projects: project_data = self.data_manager.get_project(proj_filename) if project_data and project_data.get('name') == project_name: project_found = True project_filename = proj_filename # 创建空的通道数据结构 empty_channel_data = {} for i in range(len(self.filter_controllers)): empty_channel_data[i] = { 'delay_data': 0.0, 'vol_data': 0.0, 'mix_left_data': 0.0, 'mix_right_data': 0.0, 'filters': [] } # 保存新参数 - 使用项目文件名 self.data_manager.save_param(project_filename, param_name, empty_channel_data, "") # 重命名参数文件,使用项目的name字段而不是文件名 old_param_path = os.path.join("data", "projects", "params", f"{project_filename}_{param_name}.csv") new_param_path = os.path.join("data", "projects", "params", f"{project_name}_{param_name}.csv") if os.path.exists(old_param_path): try: os.rename(old_param_path, new_param_path) logger.info(f"已重命名参数文件: {old_param_path} -> {new_param_path}") except Exception as e: logger.error(f"重命名参数文件失败: {str(e)}") logger.info(f"已创建新参数: {param_name}, 文件名: {project_name}_{param_name}.csv") break if not project_found: logger.warning(f"未找到项目: {project_name}") except Exception as e: logger.error(f"创建参数时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) if __name__ == '__main__': import sys from PySide6.QtWidgets import QApplication app = QApplication(sys.argv) main_window = MainWindow() logger.info("软件启动") # test_data = { # # Channel 0 parameters # # Mix parameters # 'tuning_parameters.mix_parameters[0].ch_n': 1, # 'tuning_parameters.mix_parameters[0].mix_left_data': 0.1, # 'tuning_parameters.mix_parameters[0].mix_right_data': 0.7, # # Delay parameters # 'tuning_parameters.delay_parameters[0].ch_n': 1, # 'tuning_parameters.delay_parameters[0].delay_data': 0.1, # # Volume parameters # 'tuning_parameters.volume_parameters[0].ch_n': 1, # 'tuning_parameters.volume_parameters[0].vol_data': 0.7, # # EQ parameters (first 6 active filters) # 'tuning_parameters.eq_parameters[0].fc': 3.0, # 'tuning_parameters.eq_parameters[0].q': 0.3, # 'tuning_parameters.eq_parameters[0].gain': 3.0, # 'tuning_parameters.eq_parameters[0].slope': 3, # 'tuning_parameters.eq_parameters[0].filterType': 2, # 'tuning_parameters.eq_parameters[1].fc': 21.0, # 'tuning_parameters.eq_parameters[1].q': 0.2, # 'tuning_parameters.eq_parameters[1].gain': 3.0, # 'tuning_parameters.eq_parameters[1].slope': 6, # 'tuning_parameters.eq_parameters[1].filterType': 1, # 'tuning_parameters.eq_parameters[2].fc': 22.0, # 'tuning_parameters.eq_parameters[2].q': 0.2, # 'tuning_parameters.eq_parameters[2].gain': 3.0, # 'tuning_parameters.eq_parameters[2].slope': 12, # 'tuning_parameters.eq_parameters[2].filterType': 2, # 'tuning_parameters.eq_parameters[3].fc': 23.0, # 'tuning_parameters.eq_parameters[3].q': 0.3, # 'tuning_parameters.eq_parameters[3].gain': 4.0, # 'tuning_parameters.eq_parameters[3].slope': 18, # 'tuning_parameters.eq_parameters[3].filterType': 3, # 'tuning_parameters.eq_parameters[4].fc': 23.0, # 'tuning_parameters.eq_parameters[4].q': 0.4, # 'tuning_parameters.eq_parameters[4].gain': 5.0, # 'tuning_parameters.eq_parameters[4].slope': 24, # 'tuning_parameters.eq_parameters[4].filterType': 4, # 'tuning_parameters.eq_parameters[5].fc': 25.0, # 'tuning_parameters.eq_parameters[5].q': 0.6, # 'tuning_parameters.eq_parameters[5].gain': 7.0, # 'tuning_parameters.eq_parameters[5].slope': 30, # 'tuning_parameters.eq_parameters[5].filterType': 5, # } # # Initialize remaining channels and filters with default values (0) # for channel in range(1, 6): # Channels 1-5 # # Mix parameters # test_data.update({ # f'tuning_parameters.mix_parameters[{channel}].ch_n': 0, # f'tuning_parameters.mix_parameters[{channel}].mix_left_data': 0.0, # f'tuning_parameters.mix_parameters[{channel}].mix_right_data': 0.0, # # Delay parameters # f'tuning_parameters.delay_parameters[{channel}].ch_n': 0, # f'tuning_parameters.delay_parameters[{channel}].delay_data': 0.0, # # Volume parameters # f'tuning_parameters.volume_parameters[{channel}].ch_n': 0, # f'tuning_parameters.volume_parameters[{channel}].vol_data': 0.0, # }) # # EQ parameters (20 filters per channel) # for eq in range(channel * 20, (channel + 1) * 20): # test_data.update({ # f'tuning_parameters.eq_parameters[{eq}].fc': 0.0, # f'tuning_parameters.eq_parameters[{eq}].q': 0.0, # f'tuning_parameters.eq_parameters[{eq}].gain': 0.0, # f'tuning_parameters.eq_parameters[{eq}].slope': 0, # f'tuning_parameters.eq_parameters[{eq}].filterType': 0, # }) # channel_0_data = main_window._extract_channel_data(test_data, 1) # print("Extracted data for channel 0:") # print(channel_0_data) main_window.widget_main.show() sys.exit(app.exec())