brisonus_app_eq/app.py

596 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from component.widget_main.widget_main import Widget_Main
from component.widget_channel.widget_channel import Widget_Channel
from component.widget_card.widget_card import Widget_Card
from component.widget_card.widget_card import CardData
from component.widget_filter.audio_filter_componet import AudioFilterWidget
from component.widget_filter.audio_filter_model import AudioFilterModel
from component.widget_filter.audio_filter_controller import AudioFilterController
from component.widget_card.widget_card import ParamData
from component.widget_log.widget_log import Widget_Log
from persistence.data_store_manager import DataStoreManager
from persistence.data_store_origin import DataStore
from param_struct_test.service_manager import ServiceManager
from application.application_controller import ApplicationController
from param_struct_test.params_service import Response
from datetime import date
from PySide6.QtWidgets import QMainWindow, QPushButton, QVBoxLayout
from PySide6.QtWidgets import QWidget
from PySide6.QtCore import QObject, QTimer
import yaml # 添加 yaml 导入
import logging
from component.widget_log.log_handler import setup_logger
from component.widget_log.log_handler import logger
class MainWindow(QWidget):
def __init__(self):
super().__init__()
# 初始化日志系统
self.log_handler = setup_logger()
# 加载配置文件
self.config = self.load_config()
# 从配置文件获取服务器设置
server_host = self.config['server']['host']
server_port = self.config['server']['port']
# 初始化服务
ServiceManager.instance().init_services(server_host, server_port)
self.data_manager = DataStoreManager().get_instance()
# 初始化应用控制器
self.app_controller = ApplicationController.instance()
self.widget_main = Widget_Main()
self.widget_channel = Widget_Channel()
self.widget_card = Widget_Card()
self.widget_log = Widget_Log()
# 设置日志处理器的目标窗口
self.log_handler.set_widget(self.widget_log)
# 设置通道显示数量
channel_count = self.config['channels']['count']
logger.info(f"通道数量: {channel_count}")
self.widget_channel.set_visible_channels(channel_count)
self.widget_main.ui.ListWidget_vLayout.addWidget(self.widget_card)
self.widget_main.ui.Channel_hLayout.addWidget(self.widget_channel)
self.widget_main.ui.verticalLayout_Log.addWidget(self.widget_log)
self.widget_filter_list = []
self.filter_controllers = [] # 存储控制器实例
# 添加测试按钮
self.test_button = QPushButton("Get_All")
self.widget_main.ui.pushButton_2.clicked.connect(self.Get_All)
# self.test_button.clicked.connect(self.Get_All)
self.widget_main.ui.ListWidget_vLayout.addWidget(self.test_button)
self.test_button.setVisible(False)
self.create_filter_widget()
self.setup_connections()
self.load_default_project()
self.update_channel_from_card()
self.set_default_filter_widget(0)
def check_param_name(param_data: dict, selected_name: str) -> bool:
"""
检查param_data中是否包含指定的name值
Args:
param_data: 包含参数数据的字典
selected_name: 要匹配的名称
Returns:
bool: 如果找到匹配的name则返回True否则返回False
"""
for param_info in param_data.values():
if param_info.get('name') == selected_name:
return True
return False
def update_channel_from_card(self):
"""根据选中的卡片更新通道数据"""
try:
# 1. 获取当前选中的卡片数据
selected_param_name, selected_project_name = self.widget_card.get_selected_param_name()
logger.info(f"selected_param_name: {selected_param_name}, selected_project_name: {selected_project_name}")
#to do: 后续需要封装接口
self.data_manager._store.current_project = selected_project_name
self.data_manager._store.current_paramter_name = selected_param_name
if not selected_param_name or not selected_project_name:
logger.warning("未选择参数或项目")
return
# 2. 获取参数数据
param_data = self.data_manager.get_param_data(selected_project_name, selected_param_name)
# 新增: 根据参数名称查找对应的参数键并更新current_param
# project_names = self.data_manager.get_projects()
# project_data = self.data_manager.get_project(project_names[0])
# if project_data and 'params' in project_data:
# for param_key, param_info in project_data['params'].items():
# if param_info.get('name') == selected_param_name:
# # 找到匹配的参数键更新current_param
# self.data_manager.current_param = param_key
# logger.info(f"已更新当前参数键: {param_key}")
# break
self.data_manager.current_param = param_data
if not param_data:
logger.error(f"未找到参数数据: {selected_param_name}")
return
# 3. 处理每个通道的数据
for channel_id, channel_data in param_data.items():
try:
channel_id = int(channel_id)
if 0 <= channel_id < len(self.filter_controllers):
controller = self.filter_controllers[channel_id]
controller.update_from_card_data(channel_data)
except Exception as e:
logger.error(f"处理通道 {channel_id} 时出错: {str(e)}")
continue
logger.info("成功从卡片数据更新所有通道")
except Exception as e:
logger.error(f"从卡片更新通道时出错: {str(e)}")
def load_default_project(self):
"""Load the first available project or create a default one if none exists"""
try:
# 获取项目列表
projects = self.data_manager.get_projects()
if not projects:
# 如果没有项目,创建默认项目
logger.info("创建默认项目...")
self.data_manager.create_project("default", "默认项目配置")
projects = ["default"]
# 清空当前卡片列表
self.widget_card.list_widget.clear()
# 遍历所有项目并加载
for project_name in projects:
# 从数据管理器获取项目数据
project_data = self.data_manager.get_project(project_name)
if not project_data:
logger.warning(f"无法加载项目: {project_name}")
continue
logger.info(f"加载项目: {project_name}")
# 检查项目数据是否包含参数
if 'params' in project_data:
# 创建参数对象列表
param_objects = []
for param_id, param_data in project_data['params'].items():
if isinstance(param_data, dict) and 'name' in param_data:
param_objects.append(ParamData(name=param_data['name']))
# 创建卡片数据
card_data = CardData(
name=project_data.get('name', project_name),
date=project_data.get('created_at', '').split('T')[0] if 'created_at' in project_data else '',
description=project_data.get('description', ''),
params=param_objects
)
# 添加卡片到界面
self.widget_card.add_card_item(card_data)
logger.info(f"已添加项目卡片 '{project_data.get('name', project_name)}' 包含 {len(param_objects)} 个参数")
else:
logger.warning(f"项目 '{project_name}' 没有参数数据")
except Exception as e:
logger.error(f"加载默认项目时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def load_config(self):
"""加载通道配置文件"""
try:
with open('config/channel_config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
logging.info("配置文件加载成功")
return config
except Exception as e:
logging.error(f"加载配置文件失败: {e}")
return {"channels": {"count": 24}}
def create_filter_widget(self):
# 从配置文件获取通道数量
channel_count = self.config['channels']['count']
for i in range(channel_count):
# 创建widget
filter_widget = AudioFilterWidget()
# filter_widget.set_channel_id(i)
# filter_widget.set_channel_name(f"Channel {i+1}")
# 创建model和controller
model = AudioFilterModel(channel_id = i, channel_name=f"Channel {i+1}")
controller = AudioFilterController(model)
controller.set_widget(filter_widget)
# 连接控制器信号
controller.error_occurred.connect(lambda msg: print(f"Error: {msg}"))
controller.state_changed.connect(lambda state: print(f"State changed: {state}"))
controller.params_synced.connect(lambda: print("Params synced"))
# 存储实例
self.widget_filter_list.append(filter_widget)
self.filter_controllers.append(controller)
def setup_connections(self):
print("setup_connections")
self.widget_channel.channel_btn_clicked.connect(self.on_channel_btn_clicked)
self.widget_card.parameterSelected.connect(self.on_widget_card_parameter_selected)
# 连接编辑信号
self.widget_card.itemEdited.connect(self.on_widget_card_item_edited)
# 连接参数添加信号
self.widget_card.parameterAdded.connect(self.on_widget_card_parameter_added)
# data_store_manager.current_param
def on_widget_card_parameter_selected(self, param_name: str, project_name: str):
logger.info(f"on_widget_card_parameter_selected: {param_name}, {project_name}")
try:
# 获取参数数据
param_data = self.data_manager.get_param_data(project_name, param_name)
if not param_data:
logger.warning(f"未找到参数数据: {param_name}")
return
# 更新所有通道的滤波器数据
for channel_id, channel_data in param_data.items():
try:
channel_id = int(channel_id)
if 0 <= channel_id < len(self.filter_controllers):
controller = self.filter_controllers[channel_id]
controller.update_from_card_data(channel_data)
except Exception as e:
logger.error(f"处理通道 {channel_id} 时出错: {str(e)}")
continue
logger.info(f"成功从参数 {param_name} 更新所有通道")
except Exception as e:
logger.error(f"参数选择更新出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def on_widget_card_item_edited(self, edit_type, old_value, new_value, project_name):
"""处理卡片项目编辑事件"""
if edit_type == "parameter_name":
try:
# 查找项目文件名
projects = self.data_manager.get_projects()
project_filename = None
for proj_filename in projects:
project_data = self.data_manager.get_project(proj_filename)
if project_data and project_data.get('name') == project_name:
project_filename = proj_filename
# 1. 更新JSON中的参数名
if 'params' in project_data:
for param_id, param_info in project_data['params'].items():
if param_info.get('name') == old_value:
# 更新参数名
param_info['name'] = new_value
# 保存更新后的项目数据
self.data_manager.update_project(proj_filename, project_data)
logger.info(f"已更新参数名: {old_value} -> {new_value}")
# 2. 重命名参数文件
params_dir = os.path.join("data", "projects", "params")
for file in os.listdir(params_dir):
# 查找与旧参数名匹配的文件
if file.startswith(f"{project_name}_") and file.endswith('.csv'):
param_part = file[len(project_name)+1:-4] # 提取参数名部分
if param_part == old_value:
old_path = os.path.join(params_dir, file)
new_path = os.path.join(params_dir, f"{project_name}_{new_value}.csv")
try:
os.rename(old_path, new_path)
logger.info(f"已重命名参数文件: {old_path} -> {new_path}")
except Exception as e:
logger.error(f"重命名参数文件失败: {str(e)}")
break
break
except Exception as e:
logger.error(f"处理参数编辑时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def set_default_filter_widget(self, channel_id: int):
while self.widget_main.ui.verticalLayout_Filter.count():
item = self.widget_main.ui.verticalLayout_Filter.takeAt(0)
if item.widget():
item.widget().hide()
filter_widget = self.widget_filter_list[channel_id]
self.widget_main.ui.verticalLayout_Filter.addWidget(filter_widget)
filter_widget.show()
def on_channel_btn_clicked(self, channel_id: int):
# Clear existing widgets from the layout
while self.widget_main.ui.verticalLayout_Filter.count():
item = self.widget_main.ui.verticalLayout_Filter.takeAt(0)
if item.widget():
item.widget().hide()
print(f"on_channel_btn_clicked: {channel_id}")
# Add the selected channel's filter widget to the layout
filter_widget = self.widget_filter_list[channel_id]
self.widget_main.ui.verticalLayout_Filter.addWidget(filter_widget)
filter_widget.ui.groupBox_4.setTitle(f"Channel {channel_id+1}")
filter_widget.show()
def test_communication(self):
"""测试通信功能"""
# 测试第一个控制器的通信
if self.filter_controllers:
controller = self.filter_controllers[0]
# 测试从服务器加载数据
logger.info("load from server...")
controller.load_from_server()
# 测试同步数据到服务器
logger.info("sync to server...")
controller.sync_to_server()
def Get_All(self):
"""一次性获取所有通道的所有参数"""
try:
# 收集所有通道的所有参数
all_params = []
channel_count = len(self.filter_controllers)
# 收集所有通道的基础参数和滤波器参数
for channel_id in range(channel_count):
# 基础参数
all_params.extend([
f'tuning_parameters.mix_parameters[{channel_id}].ch_n',
f'tuning_parameters.mix_parameters[{channel_id}].mix_left_data',
f'tuning_parameters.mix_parameters[{channel_id}].mix_right_data',
f'tuning_parameters.delay_parameters[{channel_id}].ch_n',
f'tuning_parameters.delay_parameters[{channel_id}].delay_data',
f'tuning_parameters.volume_parameters[{channel_id}].ch_n',
f'tuning_parameters.volume_parameters[{channel_id}].vol_data'
])
# 滤波器参数
base_idx = channel_id * 20
for i in range(20): # 每个通道20个滤波器
idx = base_idx + i
all_params.extend([
f'tuning_parameters.eq_parameters[{idx}].fc',
f'tuning_parameters.eq_parameters[{idx}].q',
f'tuning_parameters.eq_parameters[{idx}].gain',
f'tuning_parameters.eq_parameters[{idx}].slope',
f'tuning_parameters.eq_parameters[{idx}].filterType'
])
def handle_all_response(response):
"""处理所有参数的响应"""
try:
if not hasattr(response, 'data'):
raise ValueError("Invalid response format")
# 将响应数据分发给各个控制器
for controller in self.filter_controllers:
channel_id = controller.model.channel_id
print(f"response.data: {response.data}")
print(f"\n ############################################################################################## \n")
channel_data = self._extract_channel_data(response.data, channel_id)
print(f"channel_data: {channel_data}")
print(f"\n ############################################################################################## \n")
controller._on_params_updated_new(Response(
token="combined",
cmd="get_params",
widget = self.widget_main,
data=channel_data
))
print("Successfully loaded all filter data")
except Exception as e:
print(f"Error handling response: {e}")
# 一次性请求所有参数
ServiceManager.instance().params_service.get_params(
self.widget_main,
all_params,
handle_all_response
)
except Exception as e:
print(f"Error in Get_All: {e}")
def _extract_channel_data(self, response_data: dict, channel_id: int) -> dict:
"""从完整响应中提取指定通道的数据"""
channel_data = {}
base_eq_index = channel_id * 20 # 每个通道20个滤波器的起始索引
for key, value in response_data.items():
# 处理基础参数 (mix, delay, volume)
# 使用更精确的匹配模式,确保只匹配完整的通道索引
if any(param_type in key for param_type in [
f'mix_parameters[{channel_id}]',
f'delay_parameters[{channel_id}]',
f'volume_parameters[{channel_id}]'
]):
channel_data[key] = value
# 处理滤波器参数
if 'tuning_parameters.eq_parameters[' in key:
try:
eq_index = int(key.split('[')[1].split(']')[0])
if base_eq_index <= eq_index < base_eq_index + 20:
channel_data[key] = value
except (IndexError, ValueError):
continue
return channel_data
def on_widget_card_parameter_added(self, param_name: str, project_name: str):
"""处理添加新参数的事件"""
logger.info(f"添加新参数: {param_name}, 项目: {project_name}")
try:
# 获取项目列表
projects = self.data_manager.get_projects()
project_found = False
project_filename = None
# 查找对应的项目文件名
for proj_filename in projects:
project_data = self.data_manager.get_project(proj_filename)
if project_data and project_data.get('name') == project_name:
project_found = True
project_filename = proj_filename
# 创建空的通道数据结构
empty_channel_data = {}
for i in range(len(self.filter_controllers)):
empty_channel_data[i] = {
'delay_data': 0.0,
'vol_data': 0.0,
'mix_left_data': 0.0,
'mix_right_data': 0.0,
'filters': []
}
# 保存新参数 - 使用项目文件名
self.data_manager.save_param(project_filename, param_name, empty_channel_data, "")
# 重命名参数文件使用项目的name字段而不是文件名
old_param_path = os.path.join("data", "projects", "params", f"{project_filename}_{param_name}.csv")
new_param_path = os.path.join("data", "projects", "params", f"{project_name}_{param_name}.csv")
if os.path.exists(old_param_path):
try:
os.rename(old_param_path, new_param_path)
logger.info(f"已重命名参数文件: {old_param_path} -> {new_param_path}")
except Exception as e:
logger.error(f"重命名参数文件失败: {str(e)}")
logger.info(f"已创建新参数: {param_name}, 文件名: {project_name}_{param_name}.csv")
break
if not project_found:
logger.warning(f"未找到项目: {project_name}")
except Exception as e:
logger.error(f"创建参数时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
if __name__ == '__main__':
import sys
from PySide6.QtWidgets import QApplication
app = QApplication(sys.argv)
main_window = MainWindow()
logger.info("软件启动")
# test_data = {
# # Channel 0 parameters
# # Mix parameters
# 'tuning_parameters.mix_parameters[0].ch_n': 1,
# 'tuning_parameters.mix_parameters[0].mix_left_data': 0.1,
# 'tuning_parameters.mix_parameters[0].mix_right_data': 0.7,
# # Delay parameters
# 'tuning_parameters.delay_parameters[0].ch_n': 1,
# 'tuning_parameters.delay_parameters[0].delay_data': 0.1,
# # Volume parameters
# 'tuning_parameters.volume_parameters[0].ch_n': 1,
# 'tuning_parameters.volume_parameters[0].vol_data': 0.7,
# # EQ parameters (first 6 active filters)
# 'tuning_parameters.eq_parameters[0].fc': 3.0,
# 'tuning_parameters.eq_parameters[0].q': 0.3,
# 'tuning_parameters.eq_parameters[0].gain': 3.0,
# 'tuning_parameters.eq_parameters[0].slope': 3,
# 'tuning_parameters.eq_parameters[0].filterType': 2,
# 'tuning_parameters.eq_parameters[1].fc': 21.0,
# 'tuning_parameters.eq_parameters[1].q': 0.2,
# 'tuning_parameters.eq_parameters[1].gain': 3.0,
# 'tuning_parameters.eq_parameters[1].slope': 6,
# 'tuning_parameters.eq_parameters[1].filterType': 1,
# 'tuning_parameters.eq_parameters[2].fc': 22.0,
# 'tuning_parameters.eq_parameters[2].q': 0.2,
# 'tuning_parameters.eq_parameters[2].gain': 3.0,
# 'tuning_parameters.eq_parameters[2].slope': 12,
# 'tuning_parameters.eq_parameters[2].filterType': 2,
# 'tuning_parameters.eq_parameters[3].fc': 23.0,
# 'tuning_parameters.eq_parameters[3].q': 0.3,
# 'tuning_parameters.eq_parameters[3].gain': 4.0,
# 'tuning_parameters.eq_parameters[3].slope': 18,
# 'tuning_parameters.eq_parameters[3].filterType': 3,
# 'tuning_parameters.eq_parameters[4].fc': 23.0,
# 'tuning_parameters.eq_parameters[4].q': 0.4,
# 'tuning_parameters.eq_parameters[4].gain': 5.0,
# 'tuning_parameters.eq_parameters[4].slope': 24,
# 'tuning_parameters.eq_parameters[4].filterType': 4,
# 'tuning_parameters.eq_parameters[5].fc': 25.0,
# 'tuning_parameters.eq_parameters[5].q': 0.6,
# 'tuning_parameters.eq_parameters[5].gain': 7.0,
# 'tuning_parameters.eq_parameters[5].slope': 30,
# 'tuning_parameters.eq_parameters[5].filterType': 5,
# }
# # Initialize remaining channels and filters with default values (0)
# for channel in range(1, 6): # Channels 1-5
# # Mix parameters
# test_data.update({
# f'tuning_parameters.mix_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.mix_parameters[{channel}].mix_left_data': 0.0,
# f'tuning_parameters.mix_parameters[{channel}].mix_right_data': 0.0,
# # Delay parameters
# f'tuning_parameters.delay_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.delay_parameters[{channel}].delay_data': 0.0,
# # Volume parameters
# f'tuning_parameters.volume_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.volume_parameters[{channel}].vol_data': 0.0,
# })
# # EQ parameters (20 filters per channel)
# for eq in range(channel * 20, (channel + 1) * 20):
# test_data.update({
# f'tuning_parameters.eq_parameters[{eq}].fc': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].q': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].gain': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].slope': 0,
# f'tuning_parameters.eq_parameters[{eq}].filterType': 0,
# })
# channel_0_data = main_window._extract_channel_data(test_data, 1)
# print("Extracted data for channel 0:")
# print(channel_0_data)
main_window.widget_main.show()
sys.exit(app.exec())