brisonus_app_eq/app.py

596 lines
27 KiB
Python
Raw Normal View History

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
2025-02-21 17:49:10 +08:00
2025-02-18 22:05:52 +08:00
from component.widget_main.widget_main import Widget_Main
from component.widget_channel.widget_channel import Widget_Channel
from component.widget_card.widget_card import Widget_Card
from component.widget_card.widget_card import CardData
from component.widget_filter.audio_filter_componet import AudioFilterWidget
from component.widget_filter.audio_filter_model import AudioFilterModel
from component.widget_filter.audio_filter_controller import AudioFilterController
from component.widget_card.widget_card import ParamData
from component.widget_log.widget_log import Widget_Log
from persistence.data_store_manager import DataStoreManager
2025-02-25 20:37:41 +08:00
from persistence.data_store_origin import DataStore
from param_struct_test.service_manager import ServiceManager
from application.application_controller import ApplicationController
from param_struct_test.params_service import Response
2025-02-18 22:05:52 +08:00
from datetime import date
from PySide6.QtWidgets import QMainWindow, QPushButton, QVBoxLayout
2025-02-18 22:05:52 +08:00
from PySide6.QtWidgets import QWidget
2025-02-25 00:16:44 +08:00
from PySide6.QtCore import QObject, QTimer
import yaml # 添加 yaml 导入
import logging
from component.widget_log.log_handler import setup_logger
from component.widget_log.log_handler import logger
2025-02-21 17:49:10 +08:00
2025-02-18 22:05:52 +08:00
class MainWindow(QWidget):
def __init__(self):
super().__init__()
# 初始化日志系统
self.log_handler = setup_logger()
2025-02-25 00:16:44 +08:00
# 加载配置文件
self.config = self.load_config()
# 从配置文件获取服务器设置
server_host = self.config['server']['host']
server_port = self.config['server']['port']
# 初始化服务
2025-02-25 00:16:44 +08:00
ServiceManager.instance().init_services(server_host, server_port)
self.data_manager = DataStoreManager().get_instance()
2025-02-25 00:16:44 +08:00
# 初始化应用控制器
self.app_controller = ApplicationController.instance()
2025-02-18 22:05:52 +08:00
self.widget_main = Widget_Main()
self.widget_channel = Widget_Channel()
self.widget_card = Widget_Card()
self.widget_log = Widget_Log()
# 设置日志处理器的目标窗口
self.log_handler.set_widget(self.widget_log)
2025-02-25 00:16:44 +08:00
# 设置通道显示数量
channel_count = self.config['channels']['count']
logger.info(f"通道数量: {channel_count}")
self.widget_channel.set_visible_channels(channel_count)
2025-02-18 22:05:52 +08:00
2025-02-21 17:49:10 +08:00
self.widget_main.ui.ListWidget_vLayout.addWidget(self.widget_card)
2025-02-18 22:05:52 +08:00
self.widget_main.ui.Channel_hLayout.addWidget(self.widget_channel)
self.widget_main.ui.verticalLayout_Log.addWidget(self.widget_log)
2025-02-18 22:05:52 +08:00
self.widget_filter_list = []
self.filter_controllers = [] # 存储控制器实例
# 添加测试按钮
self.test_button = QPushButton("Get_All")
2025-02-25 19:44:42 +08:00
self.widget_main.ui.pushButton_2.clicked.connect(self.Get_All)
# self.test_button.clicked.connect(self.Get_All)
self.widget_main.ui.ListWidget_vLayout.addWidget(self.test_button)
2025-02-25 19:44:42 +08:00
self.test_button.setVisible(False)
2025-02-18 22:05:52 +08:00
self.create_filter_widget()
self.setup_connections()
self.load_default_project()
self.update_channel_from_card()
2025-02-25 19:44:42 +08:00
self.set_default_filter_widget(0)
def check_param_name(param_data: dict, selected_name: str) -> bool:
"""
检查param_data中是否包含指定的name值
Args:
param_data: 包含参数数据的字典
selected_name: 要匹配的名称
Returns:
bool: 如果找到匹配的name则返回True否则返回False
"""
for param_info in param_data.values():
if param_info.get('name') == selected_name:
return True
return False
def update_channel_from_card(self):
"""根据选中的卡片更新通道数据"""
try:
# 1. 获取当前选中的卡片数据
selected_param_name, selected_project_name = self.widget_card.get_selected_param_name()
logger.info(f"selected_param_name: {selected_param_name}, selected_project_name: {selected_project_name}")
#to do: 后续需要封装接口
self.data_manager._store.current_project = selected_project_name
self.data_manager._store.current_paramter_name = selected_param_name
if not selected_param_name or not selected_project_name:
logger.warning("未选择参数或项目")
return
# 2. 获取参数数据
param_data = self.data_manager.get_param_data(selected_project_name, selected_param_name)
# 新增: 根据参数名称查找对应的参数键并更新current_param
# project_names = self.data_manager.get_projects()
# project_data = self.data_manager.get_project(project_names[0])
# if project_data and 'params' in project_data:
# for param_key, param_info in project_data['params'].items():
# if param_info.get('name') == selected_param_name:
# # 找到匹配的参数键更新current_param
# self.data_manager.current_param = param_key
# logger.info(f"已更新当前参数键: {param_key}")
# break
self.data_manager.current_param = param_data
if not param_data:
logger.error(f"未找到参数数据: {selected_param_name}")
return
# 3. 处理每个通道的数据
for channel_id, channel_data in param_data.items():
try:
channel_id = int(channel_id)
if 0 <= channel_id < len(self.filter_controllers):
controller = self.filter_controllers[channel_id]
controller.update_from_card_data(channel_data)
except Exception as e:
logger.error(f"处理通道 {channel_id} 时出错: {str(e)}")
continue
logger.info("成功从卡片数据更新所有通道")
except Exception as e:
logger.error(f"从卡片更新通道时出错: {str(e)}")
def load_default_project(self):
"""Load the first available project or create a default one if none exists"""
try:
# 获取项目列表
projects = self.data_manager.get_projects()
if not projects:
# 如果没有项目,创建默认项目
logger.info("创建默认项目...")
self.data_manager.create_project("default", "默认项目配置")
projects = ["default"]
# 清空当前卡片列表
self.widget_card.list_widget.clear()
# 遍历所有项目并加载
for project_name in projects:
# 从数据管理器获取项目数据
project_data = self.data_manager.get_project(project_name)
if not project_data:
logger.warning(f"无法加载项目: {project_name}")
continue
logger.info(f"加载项目: {project_name}")
# 检查项目数据是否包含参数
if 'params' in project_data:
# 创建参数对象列表
param_objects = []
for param_id, param_data in project_data['params'].items():
if isinstance(param_data, dict) and 'name' in param_data:
param_objects.append(ParamData(name=param_data['name']))
# 创建卡片数据
card_data = CardData(
name=project_data.get('name', project_name),
date=project_data.get('created_at', '').split('T')[0] if 'created_at' in project_data else '',
description=project_data.get('description', ''),
params=param_objects
)
# 添加卡片到界面
self.widget_card.add_card_item(card_data)
logger.info(f"已添加项目卡片 '{project_data.get('name', project_name)}' 包含 {len(param_objects)} 个参数")
else:
logger.warning(f"项目 '{project_name}' 没有参数数据")
except Exception as e:
logger.error(f"加载默认项目时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def load_config(self):
"""加载通道配置文件"""
try:
with open('config/channel_config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
logging.info("配置文件加载成功")
return config
except Exception as e:
logging.error(f"加载配置文件失败: {e}")
return {"channels": {"count": 24}}
2025-02-18 22:05:52 +08:00
def create_filter_widget(self):
# 从配置文件获取通道数量
channel_count = self.config['channels']['count']
for i in range(channel_count):
# 创建widget
2025-02-18 22:05:52 +08:00
filter_widget = AudioFilterWidget()
# filter_widget.set_channel_id(i)
# filter_widget.set_channel_name(f"Channel {i+1}")
2025-02-24 17:02:51 +08:00
# 创建model和controller
model = AudioFilterModel(channel_id = i, channel_name=f"Channel {i+1}")
controller = AudioFilterController(model)
controller.set_widget(filter_widget)
# 连接控制器信号
controller.error_occurred.connect(lambda msg: print(f"Error: {msg}"))
controller.state_changed.connect(lambda state: print(f"State changed: {state}"))
controller.params_synced.connect(lambda: print("Params synced"))
# 存储实例
2025-02-18 22:05:52 +08:00
self.widget_filter_list.append(filter_widget)
self.filter_controllers.append(controller)
2025-02-18 22:05:52 +08:00
def setup_connections(self):
print("setup_connections")
self.widget_channel.channel_btn_clicked.connect(self.on_channel_btn_clicked)
self.widget_card.parameterSelected.connect(self.on_widget_card_parameter_selected)
# 连接编辑信号
self.widget_card.itemEdited.connect(self.on_widget_card_item_edited)
# 连接参数添加信号
self.widget_card.parameterAdded.connect(self.on_widget_card_parameter_added)
# data_store_manager.current_param
def on_widget_card_parameter_selected(self, param_name: str, project_name: str):
logger.info(f"on_widget_card_parameter_selected: {param_name}, {project_name}")
try:
# 获取参数数据
param_data = self.data_manager.get_param_data(project_name, param_name)
if not param_data:
logger.warning(f"未找到参数数据: {param_name}")
return
# 更新所有通道的滤波器数据
for channel_id, channel_data in param_data.items():
try:
channel_id = int(channel_id)
if 0 <= channel_id < len(self.filter_controllers):
controller = self.filter_controllers[channel_id]
controller.update_from_card_data(channel_data)
except Exception as e:
logger.error(f"处理通道 {channel_id} 时出错: {str(e)}")
continue
logger.info(f"成功从参数 {param_name} 更新所有通道")
except Exception as e:
logger.error(f"参数选择更新出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def on_widget_card_item_edited(self, edit_type, old_value, new_value, project_name):
"""处理卡片项目编辑事件"""
if edit_type == "parameter_name":
try:
# 查找项目文件名
projects = self.data_manager.get_projects()
project_filename = None
for proj_filename in projects:
project_data = self.data_manager.get_project(proj_filename)
if project_data and project_data.get('name') == project_name:
project_filename = proj_filename
# 1. 更新JSON中的参数名
if 'params' in project_data:
for param_id, param_info in project_data['params'].items():
if param_info.get('name') == old_value:
# 更新参数名
param_info['name'] = new_value
# 保存更新后的项目数据
self.data_manager.update_project(proj_filename, project_data)
logger.info(f"已更新参数名: {old_value} -> {new_value}")
# 2. 重命名参数文件
params_dir = os.path.join("data", "projects", "params")
for file in os.listdir(params_dir):
# 查找与旧参数名匹配的文件
if file.startswith(f"{project_name}_") and file.endswith('.csv'):
param_part = file[len(project_name)+1:-4] # 提取参数名部分
if param_part == old_value:
old_path = os.path.join(params_dir, file)
new_path = os.path.join(params_dir, f"{project_name}_{new_value}.csv")
try:
os.rename(old_path, new_path)
logger.info(f"已重命名参数文件: {old_path} -> {new_path}")
except Exception as e:
logger.error(f"重命名参数文件失败: {str(e)}")
break
break
except Exception as e:
logger.error(f"处理参数编辑时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
2025-02-25 19:44:42 +08:00
def set_default_filter_widget(self, channel_id: int):
while self.widget_main.ui.verticalLayout_Filter.count():
item = self.widget_main.ui.verticalLayout_Filter.takeAt(0)
if item.widget():
item.widget().hide()
filter_widget = self.widget_filter_list[channel_id]
self.widget_main.ui.verticalLayout_Filter.addWidget(filter_widget)
filter_widget.show()
2025-02-18 22:05:52 +08:00
def on_channel_btn_clicked(self, channel_id: int):
2025-02-25 19:44:42 +08:00
# Clear existing widgets from the layout
while self.widget_main.ui.verticalLayout_Filter.count():
item = self.widget_main.ui.verticalLayout_Filter.takeAt(0)
if item.widget():
item.widget().hide()
print(f"on_channel_btn_clicked: {channel_id}")
2025-02-25 19:44:42 +08:00
# Add the selected channel's filter widget to the layout
filter_widget = self.widget_filter_list[channel_id]
self.widget_main.ui.verticalLayout_Filter.addWidget(filter_widget)
filter_widget.ui.groupBox_4.setTitle(f"Channel {channel_id+1}")
2025-02-25 19:44:42 +08:00
filter_widget.show()
2025-02-18 22:05:52 +08:00
def test_communication(self):
"""测试通信功能"""
# 测试第一个控制器的通信
if self.filter_controllers:
controller = self.filter_controllers[0]
# 测试从服务器加载数据
logger.info("load from server...")
controller.load_from_server()
# 测试同步数据到服务器
logger.info("sync to server...")
2025-02-21 17:49:10 +08:00
controller.sync_to_server()
def Get_All(self):
2025-02-25 00:16:44 +08:00
"""一次性获取所有通道的所有参数"""
2025-02-21 17:49:10 +08:00
try:
2025-02-25 00:16:44 +08:00
# 收集所有通道的所有参数
all_params = []
channel_count = len(self.filter_controllers)
# 收集所有通道的基础参数和滤波器参数
for channel_id in range(channel_count):
# 基础参数
all_params.extend([
f'tuning_parameters.mix_parameters[{channel_id}].ch_n',
f'tuning_parameters.mix_parameters[{channel_id}].mix_left_data',
f'tuning_parameters.mix_parameters[{channel_id}].mix_right_data',
f'tuning_parameters.delay_parameters[{channel_id}].ch_n',
f'tuning_parameters.delay_parameters[{channel_id}].delay_data',
f'tuning_parameters.volume_parameters[{channel_id}].ch_n',
f'tuning_parameters.volume_parameters[{channel_id}].vol_data'
])
# 滤波器参数
base_idx = channel_id * 20
for i in range(20): # 每个通道20个滤波器
idx = base_idx + i
all_params.extend([
f'tuning_parameters.eq_parameters[{idx}].fc',
f'tuning_parameters.eq_parameters[{idx}].q',
f'tuning_parameters.eq_parameters[{idx}].gain',
f'tuning_parameters.eq_parameters[{idx}].slope',
f'tuning_parameters.eq_parameters[{idx}].filterType'
])
def handle_all_response(response):
"""处理所有参数的响应"""
try:
if not hasattr(response, 'data'):
raise ValueError("Invalid response format")
# 将响应数据分发给各个控制器
for controller in self.filter_controllers:
channel_id = controller.model.channel_id
print(f"response.data: {response.data}")
print(f"\n ############################################################################################## \n")
2025-02-25 00:16:44 +08:00
channel_data = self._extract_channel_data(response.data, channel_id)
print(f"channel_data: {channel_data}")
print(f"\n ############################################################################################## \n")
2025-02-25 00:16:44 +08:00
controller._on_params_updated_new(Response(
token="combined",
cmd="get_params",
widget = self.widget_main,
2025-02-25 00:16:44 +08:00
data=channel_data
))
print("Successfully loaded all filter data")
except Exception as e:
print(f"Error handling response: {e}")
# 一次性请求所有参数
ServiceManager.instance().params_service.get_params(
self.widget_main,
all_params,
handle_all_response
)
2025-02-21 17:49:10 +08:00
except Exception as e:
2025-02-25 00:16:44 +08:00
print(f"Error in Get_All: {e}")
def _extract_channel_data(self, response_data: dict, channel_id: int) -> dict:
"""从完整响应中提取指定通道的数据"""
channel_data = {}
base_eq_index = channel_id * 20 # 每个通道20个滤波器的起始索引
2025-02-25 00:16:44 +08:00
for key, value in response_data.items():
# 处理基础参数 (mix, delay, volume)
# 使用更精确的匹配模式,确保只匹配完整的通道索引
if any(param_type in key for param_type in [
f'mix_parameters[{channel_id}]',
f'delay_parameters[{channel_id}]',
f'volume_parameters[{channel_id}]'
]):
2025-02-25 00:16:44 +08:00
channel_data[key] = value
# 处理滤波器参数
if 'tuning_parameters.eq_parameters[' in key:
try:
eq_index = int(key.split('[')[1].split(']')[0])
if base_eq_index <= eq_index < base_eq_index + 20:
channel_data[key] = value
except (IndexError, ValueError):
continue
2025-02-25 00:16:44 +08:00
return channel_data
2025-02-21 17:49:10 +08:00
def on_widget_card_parameter_added(self, param_name: str, project_name: str):
"""处理添加新参数的事件"""
logger.info(f"添加新参数: {param_name}, 项目: {project_name}")
try:
# 获取项目列表
projects = self.data_manager.get_projects()
project_found = False
project_filename = None
# 查找对应的项目文件名
for proj_filename in projects:
project_data = self.data_manager.get_project(proj_filename)
if project_data and project_data.get('name') == project_name:
project_found = True
project_filename = proj_filename
# 创建空的通道数据结构
empty_channel_data = {}
for i in range(len(self.filter_controllers)):
empty_channel_data[i] = {
'delay_data': 0.0,
'vol_data': 0.0,
'mix_left_data': 0.0,
'mix_right_data': 0.0,
'filters': []
}
# 保存新参数 - 使用项目文件名
self.data_manager.save_param(project_filename, param_name, empty_channel_data, "")
# 重命名参数文件使用项目的name字段而不是文件名
old_param_path = os.path.join("data", "projects", "params", f"{project_filename}_{param_name}.csv")
new_param_path = os.path.join("data", "projects", "params", f"{project_name}_{param_name}.csv")
if os.path.exists(old_param_path):
try:
os.rename(old_param_path, new_param_path)
logger.info(f"已重命名参数文件: {old_param_path} -> {new_param_path}")
except Exception as e:
logger.error(f"重命名参数文件失败: {str(e)}")
logger.info(f"已创建新参数: {param_name}, 文件名: {project_name}_{param_name}.csv")
break
if not project_found:
logger.warning(f"未找到项目: {project_name}")
except Exception as e:
logger.error(f"创建参数时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
2025-02-18 22:05:52 +08:00
if __name__ == '__main__':
import sys
from PySide6.QtWidgets import QApplication
app = QApplication(sys.argv)
main_window = MainWindow()
logger.info("软件启动")
# test_data = {
# # Channel 0 parameters
# # Mix parameters
# 'tuning_parameters.mix_parameters[0].ch_n': 1,
# 'tuning_parameters.mix_parameters[0].mix_left_data': 0.1,
# 'tuning_parameters.mix_parameters[0].mix_right_data': 0.7,
# # Delay parameters
# 'tuning_parameters.delay_parameters[0].ch_n': 1,
# 'tuning_parameters.delay_parameters[0].delay_data': 0.1,
# # Volume parameters
# 'tuning_parameters.volume_parameters[0].ch_n': 1,
# 'tuning_parameters.volume_parameters[0].vol_data': 0.7,
# # EQ parameters (first 6 active filters)
# 'tuning_parameters.eq_parameters[0].fc': 3.0,
# 'tuning_parameters.eq_parameters[0].q': 0.3,
# 'tuning_parameters.eq_parameters[0].gain': 3.0,
# 'tuning_parameters.eq_parameters[0].slope': 3,
# 'tuning_parameters.eq_parameters[0].filterType': 2,
# 'tuning_parameters.eq_parameters[1].fc': 21.0,
# 'tuning_parameters.eq_parameters[1].q': 0.2,
# 'tuning_parameters.eq_parameters[1].gain': 3.0,
# 'tuning_parameters.eq_parameters[1].slope': 6,
# 'tuning_parameters.eq_parameters[1].filterType': 1,
# 'tuning_parameters.eq_parameters[2].fc': 22.0,
# 'tuning_parameters.eq_parameters[2].q': 0.2,
# 'tuning_parameters.eq_parameters[2].gain': 3.0,
# 'tuning_parameters.eq_parameters[2].slope': 12,
# 'tuning_parameters.eq_parameters[2].filterType': 2,
# 'tuning_parameters.eq_parameters[3].fc': 23.0,
# 'tuning_parameters.eq_parameters[3].q': 0.3,
# 'tuning_parameters.eq_parameters[3].gain': 4.0,
# 'tuning_parameters.eq_parameters[3].slope': 18,
# 'tuning_parameters.eq_parameters[3].filterType': 3,
# 'tuning_parameters.eq_parameters[4].fc': 23.0,
# 'tuning_parameters.eq_parameters[4].q': 0.4,
# 'tuning_parameters.eq_parameters[4].gain': 5.0,
# 'tuning_parameters.eq_parameters[4].slope': 24,
# 'tuning_parameters.eq_parameters[4].filterType': 4,
# 'tuning_parameters.eq_parameters[5].fc': 25.0,
# 'tuning_parameters.eq_parameters[5].q': 0.6,
# 'tuning_parameters.eq_parameters[5].gain': 7.0,
# 'tuning_parameters.eq_parameters[5].slope': 30,
# 'tuning_parameters.eq_parameters[5].filterType': 5,
# }
# # Initialize remaining channels and filters with default values (0)
# for channel in range(1, 6): # Channels 1-5
# # Mix parameters
# test_data.update({
# f'tuning_parameters.mix_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.mix_parameters[{channel}].mix_left_data': 0.0,
# f'tuning_parameters.mix_parameters[{channel}].mix_right_data': 0.0,
# # Delay parameters
# f'tuning_parameters.delay_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.delay_parameters[{channel}].delay_data': 0.0,
# # Volume parameters
# f'tuning_parameters.volume_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.volume_parameters[{channel}].vol_data': 0.0,
# })
# # EQ parameters (20 filters per channel)
# for eq in range(channel * 20, (channel + 1) * 20):
# test_data.update({
# f'tuning_parameters.eq_parameters[{eq}].fc': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].q': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].gain': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].slope': 0,
# f'tuning_parameters.eq_parameters[{eq}].filterType': 0,
# })
# channel_0_data = main_window._extract_channel_data(test_data, 1)
# print("Extracted data for channel 0:")
# print(channel_0_data)
main_window.widget_main.show()
sys.exit(app.exec())