brisonus_app_eq/app.py
2025-02-25 19:44:42 +08:00

448 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from component.widget_main.widget_main import Widget_Main
from component.widget_channel.widget_channel import Widget_Channel
from component.widget_card.widget_card import Widget_Card
from component.widget_card.widget_card import CardData
from component.widget_filter.audio_filter_componet import AudioFilterWidget
from component.widget_filter.audio_filter_model import AudioFilterModel
from component.widget_filter.audio_filter_controller import AudioFilterController
from component.widget_card.widget_card import ParamData
from component.widget_log.widget_log import Widget_Log
from persistence.data_store_manager import DataStoreManager
from persistence.data_store import DataStore
from param_struct_test.service_manager import ServiceManager
from application.application_controller import ApplicationController
from param_struct_test.params_service import Response
from datetime import date
from PySide6.QtWidgets import QMainWindow, QPushButton, QVBoxLayout
from PySide6.QtWidgets import QWidget
from PySide6.QtCore import QObject, QTimer
import yaml # 添加 yaml 导入
import logging
from component.widget_log.log_handler import setup_logger
from component.widget_log.log_handler import logger
class MainWindow(QWidget):
def __init__(self):
super().__init__()
# 初始化日志系统
self.log_handler = setup_logger()
# 加载配置文件
self.config = self.load_config()
# 从配置文件获取服务器设置
server_host = self.config['server']['host']
server_port = self.config['server']['port']
# 初始化服务
ServiceManager.instance().init_services(server_host, server_port)
self.data_manager = DataStoreManager().get_instance()
# 初始化应用控制器
self.app_controller = ApplicationController.instance()
self.widget_main = Widget_Main()
self.widget_channel = Widget_Channel()
self.widget_card = Widget_Card()
self.widget_log = Widget_Log()
# 设置日志处理器的目标窗口
self.log_handler.set_widget(self.widget_log)
# 设置通道显示数量
channel_count = self.config['channels']['count']
logger.info(f"通道数量: {channel_count}")
self.widget_channel.set_visible_channels(channel_count)
self.widget_main.ui.ListWidget_vLayout.addWidget(self.widget_card)
self.widget_main.ui.Channel_hLayout.addWidget(self.widget_channel)
self.widget_main.ui.verticalLayout_Log.addWidget(self.widget_log)
self.widget_filter_list = []
self.filter_controllers = [] # 存储控制器实例
# 添加测试按钮
self.test_button = QPushButton("Get_All")
self.widget_main.ui.pushButton_2.clicked.connect(self.Get_All)
# self.test_button.clicked.connect(self.Get_All)
self.widget_main.ui.ListWidget_vLayout.addWidget(self.test_button)
self.test_button.setVisible(False)
self.create_filter_widget()
self.setup_connections()
self.load_default_project()
self.update_channel_from_card()
self.set_default_filter_widget(0)
def check_param_name(param_data: dict, selected_name: str) -> bool:
"""
检查param_data中是否包含指定的name值
Args:
param_data: 包含参数数据的字典
selected_name: 要匹配的名称
Returns:
bool: 如果找到匹配的name则返回True否则返回False
"""
for param_info in param_data.values():
if param_info.get('name') == selected_name:
return True
return False
# app.py
def update_channel_from_card(self):
"""根据选中的卡片更新通道数据"""
try:
# 1. 获取当前选中的卡片数据
selected_param_name, selected_project_name = self.widget_card.get_selected_param_name()
logger.info(f"selected_param_name: {selected_param_name}, selected_project_name: {selected_project_name}")
# 2. 获取项目数据
projects = self.data_manager.get_projects()
project_name = projects[0]
project_data = self.data_manager.get_project(project_name)
if not project_data or 'params' not in project_data:
logger.error("Project data not found or invalid")
return
# 3. 查找匹配的参数数据
param_data = None
for key, param_info in project_data['params'].items():
if param_info.get('name') == selected_param_name:
param_data = param_info
break
if not param_data or 'channels' not in param_data:
logger.error(f"Parameter data not found for name: {selected_param_name}")
return
# 4. 处理每个通道的数据
for channel_id, channel_data in param_data['channels'].items():
try:
channel_id = int(channel_id) - 1 # JSON中通道从1开始程序中从0开始
if 0 <= channel_id < len(self.filter_controllers):
controller = self.filter_controllers[channel_id]
controller.update_from_card_data(channel_data)
# controller.update_ui()
except Exception as e:
logger.error(f"Error processing channel {channel_id}: {str(e)}")
continue
logger.info("Successfully updated all channels from card data")
except Exception as e:
logger.error(f"Error updating channel from card: {str(e)}")
def load_default_project(self):
"""Load the first available project or create a default one if none exists"""
try:
projects = self.data_manager.get_projects()
if not projects:
# Create default project if none exists
logger.info("Creating default project...")
self.data_manager.create_project("default", "Default project configuration")
projects = ["default"]
# Load the first project
project_name = projects[0]
project_data = self.data_manager.get_project(project_name)
logger.info(f"Loading project: {project_name}")
if project_data and 'params' in project_data:
self.widget_card.list_widget.clear()
# 将字符串转换为 ParamData 对象
param_objects = [
ParamData(name=param_data['name'])
for param_data in project_data['params'].values()
]
card_data = CardData(
name=project_data['name'],
date=project_data['created_at'].split('T')[0],
description=project_data['description'],
params=param_objects # 使用 ParamData 对象列表
)
self.widget_card.add_card_item(card_data)
except Exception as e:
logger.error(f"Error loading default project: {e}")
def load_config(self):
"""加载通道配置文件"""
try:
with open('config/channel_config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
logging.info("配置文件加载成功")
return config
except Exception as e:
logging.error(f"加载配置文件失败: {e}")
return {"channels": {"count": 24}}
def create_filter_widget(self):
# 从配置文件获取通道数量
channel_count = self.config['channels']['count']
for i in range(channel_count):
# 创建widget
filter_widget = AudioFilterWidget()
# filter_widget.set_channel_id(i)
# filter_widget.set_channel_name(f"Channel {i+1}")
# 创建model和controller
model = AudioFilterModel(channel_id = i, channel_name=f"Channel {i+1}")
controller = AudioFilterController(model)
controller.set_widget(filter_widget)
# 连接控制器信号
controller.error_occurred.connect(lambda msg: print(f"Error: {msg}"))
controller.state_changed.connect(lambda state: print(f"State changed: {state}"))
controller.params_synced.connect(lambda: print("Params synced"))
# 存储实例
self.widget_filter_list.append(filter_widget)
self.filter_controllers.append(controller)
def setup_connections(self):
print("setup_connections")
self.widget_channel.channel_btn_clicked.connect(self.on_channel_btn_clicked)
self.widget_card.parameterSelected.connect(self.on_widget_card_parameter_selected)
def on_widget_card_parameter_selected(self, param_name: str, project_name: str):
logger.info(f"on_widget_card_parameter_selected: {param_name}, {project_name}")
def set_default_filter_widget(self, channel_id: int):
while self.widget_main.ui.verticalLayout_Filter.count():
item = self.widget_main.ui.verticalLayout_Filter.takeAt(0)
if item.widget():
item.widget().hide()
filter_widget = self.widget_filter_list[channel_id]
self.widget_main.ui.verticalLayout_Filter.addWidget(filter_widget)
filter_widget.show()
def on_channel_btn_clicked(self, channel_id: int):
# Clear existing widgets from the layout
while self.widget_main.ui.verticalLayout_Filter.count():
item = self.widget_main.ui.verticalLayout_Filter.takeAt(0)
if item.widget():
item.widget().hide()
# Add the selected channel's filter widget to the layout
filter_widget = self.widget_filter_list[channel_id]
self.widget_main.ui.verticalLayout_Filter.addWidget(filter_widget)
filter_widget.show()
def test_communication(self):
"""测试通信功能"""
# 测试第一个控制器的通信
if self.filter_controllers:
controller = self.filter_controllers[0]
# 测试从服务器加载数据
logger.info("load from server...")
controller.load_from_server()
# 测试同步数据到服务器
logger.info("sync to server...")
controller.sync_to_server()
def Get_All(self):
"""一次性获取所有通道的所有参数"""
try:
# 收集所有通道的所有参数
all_params = []
channel_count = len(self.filter_controllers)
# 收集所有通道的基础参数和滤波器参数
for channel_id in range(channel_count):
# 基础参数
all_params.extend([
f'tuning_parameters.mix_parameters[{channel_id}].ch_n',
f'tuning_parameters.mix_parameters[{channel_id}].mix_left_data',
f'tuning_parameters.mix_parameters[{channel_id}].mix_right_data',
f'tuning_parameters.delay_parameters[{channel_id}].ch_n',
f'tuning_parameters.delay_parameters[{channel_id}].delay_data',
f'tuning_parameters.volume_parameters[{channel_id}].ch_n',
f'tuning_parameters.volume_parameters[{channel_id}].vol_data'
])
# 滤波器参数
base_idx = channel_id * 20
for i in range(20): # 每个通道20个滤波器
idx = base_idx + i
all_params.extend([
f'tuning_parameters.eq_parameters[{idx}].fc',
f'tuning_parameters.eq_parameters[{idx}].q',
f'tuning_parameters.eq_parameters[{idx}].gain',
f'tuning_parameters.eq_parameters[{idx}].slope',
f'tuning_parameters.eq_parameters[{idx}].filterType'
])
def handle_all_response(response):
"""处理所有参数的响应"""
try:
if not hasattr(response, 'data'):
raise ValueError("Invalid response format")
# 将响应数据分发给各个控制器
for controller in self.filter_controllers:
channel_id = controller.model.channel_id
print(f"response.data: {response.data}")
print(f"\n ############################################################################################## \n")
channel_data = self._extract_channel_data(response.data, channel_id)
print(f"channel_data: {channel_data}")
print(f"\n ############################################################################################## \n")
controller._on_params_updated_new(Response(
token="combined",
cmd="get_params",
widget = self.widget_main,
data=channel_data
))
print("Successfully loaded all filter data")
except Exception as e:
print(f"Error handling response: {e}")
# 一次性请求所有参数
ServiceManager.instance().params_service.get_params(
self.widget_main,
all_params,
handle_all_response
)
except Exception as e:
print(f"Error in Get_All: {e}")
def _extract_channel_data(self, response_data: dict, channel_id: int) -> dict:
"""从完整响应中提取指定通道的数据"""
channel_data = {}
base_eq_index = channel_id * 20 # 每个通道20个滤波器的起始索引
for key, value in response_data.items():
# 处理基础参数 (mix, delay, volume)
# 使用更精确的匹配模式,确保只匹配完整的通道索引
if any(param_type in key for param_type in [
f'mix_parameters[{channel_id}]',
f'delay_parameters[{channel_id}]',
f'volume_parameters[{channel_id}]'
]):
channel_data[key] = value
# 处理滤波器参数
if 'tuning_parameters.eq_parameters[' in key:
try:
eq_index = int(key.split('[')[1].split(']')[0])
if base_eq_index <= eq_index < base_eq_index + 20:
channel_data[key] = value
except (IndexError, ValueError):
continue
return channel_data
if __name__ == '__main__':
import sys
from PySide6.QtWidgets import QApplication
app = QApplication(sys.argv)
main_window = MainWindow()
logger.info("软件启动")
# test_data = {
# # Channel 0 parameters
# # Mix parameters
# 'tuning_parameters.mix_parameters[0].ch_n': 1,
# 'tuning_parameters.mix_parameters[0].mix_left_data': 0.1,
# 'tuning_parameters.mix_parameters[0].mix_right_data': 0.7,
# # Delay parameters
# 'tuning_parameters.delay_parameters[0].ch_n': 1,
# 'tuning_parameters.delay_parameters[0].delay_data': 0.1,
# # Volume parameters
# 'tuning_parameters.volume_parameters[0].ch_n': 1,
# 'tuning_parameters.volume_parameters[0].vol_data': 0.7,
# # EQ parameters (first 6 active filters)
# 'tuning_parameters.eq_parameters[0].fc': 3.0,
# 'tuning_parameters.eq_parameters[0].q': 0.3,
# 'tuning_parameters.eq_parameters[0].gain': 3.0,
# 'tuning_parameters.eq_parameters[0].slope': 3,
# 'tuning_parameters.eq_parameters[0].filterType': 2,
# 'tuning_parameters.eq_parameters[1].fc': 21.0,
# 'tuning_parameters.eq_parameters[1].q': 0.2,
# 'tuning_parameters.eq_parameters[1].gain': 3.0,
# 'tuning_parameters.eq_parameters[1].slope': 6,
# 'tuning_parameters.eq_parameters[1].filterType': 1,
# 'tuning_parameters.eq_parameters[2].fc': 22.0,
# 'tuning_parameters.eq_parameters[2].q': 0.2,
# 'tuning_parameters.eq_parameters[2].gain': 3.0,
# 'tuning_parameters.eq_parameters[2].slope': 12,
# 'tuning_parameters.eq_parameters[2].filterType': 2,
# 'tuning_parameters.eq_parameters[3].fc': 23.0,
# 'tuning_parameters.eq_parameters[3].q': 0.3,
# 'tuning_parameters.eq_parameters[3].gain': 4.0,
# 'tuning_parameters.eq_parameters[3].slope': 18,
# 'tuning_parameters.eq_parameters[3].filterType': 3,
# 'tuning_parameters.eq_parameters[4].fc': 23.0,
# 'tuning_parameters.eq_parameters[4].q': 0.4,
# 'tuning_parameters.eq_parameters[4].gain': 5.0,
# 'tuning_parameters.eq_parameters[4].slope': 24,
# 'tuning_parameters.eq_parameters[4].filterType': 4,
# 'tuning_parameters.eq_parameters[5].fc': 25.0,
# 'tuning_parameters.eq_parameters[5].q': 0.6,
# 'tuning_parameters.eq_parameters[5].gain': 7.0,
# 'tuning_parameters.eq_parameters[5].slope': 30,
# 'tuning_parameters.eq_parameters[5].filterType': 5,
# }
# # Initialize remaining channels and filters with default values (0)
# for channel in range(1, 6): # Channels 1-5
# # Mix parameters
# test_data.update({
# f'tuning_parameters.mix_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.mix_parameters[{channel}].mix_left_data': 0.0,
# f'tuning_parameters.mix_parameters[{channel}].mix_right_data': 0.0,
# # Delay parameters
# f'tuning_parameters.delay_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.delay_parameters[{channel}].delay_data': 0.0,
# # Volume parameters
# f'tuning_parameters.volume_parameters[{channel}].ch_n': 0,
# f'tuning_parameters.volume_parameters[{channel}].vol_data': 0.0,
# })
# # EQ parameters (20 filters per channel)
# for eq in range(channel * 20, (channel + 1) * 20):
# test_data.update({
# f'tuning_parameters.eq_parameters[{eq}].fc': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].q': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].gain': 0.0,
# f'tuning_parameters.eq_parameters[{eq}].slope': 0,
# f'tuning_parameters.eq_parameters[{eq}].filterType': 0,
# })
# channel_0_data = main_window._extract_channel_data(test_data, 1)
# print("Extracted data for channel 0:")
# print(channel_0_data)
main_window.widget_main.show()
sys.exit(app.exec())