[bugfix] 1.修复ui修改时触发槽函数问题 2.set/get接口修改为批量多次下发

This commit is contained in:
Sam 2025-02-23 23:27:30 +08:00
parent f3465fc623
commit f4c197f7f2
5 changed files with 328 additions and 103 deletions

View File

@ -370,7 +370,7 @@ class AudioFilterWidget(QWidget):
vol_key = f'vol_data{self.channel_id}'
mix_right_key = f'mix_right_data{self.channel_id}'
mix_left_key = f'mix_left_data{self.channel_id}'
print(f"params:{params}")
# 更新基础参数
self._update_basic_params(params, delay_key, vol_key, mix_right_key, mix_left_key)
@ -944,6 +944,28 @@ class AudioFilterWidget(QWidget):
self.ui.verticalSlider_3.setValue(0)
self.ui.verticalSlider_4.setValue(0)
def disconnect_param_signals(self):
"""断开参数信号连接"""
self.ui.lineEdit_11.textChanged.disconnect()
self.ui.lineEdit_10.textChanged.disconnect()
self.ui.lineEdit_13.textChanged.disconnect()
self.ui.lineEdit_12.textChanged.disconnect()
def connect_param_signals(self):
"""重新连接参数信号"""
self.ui.lineEdit_11.textChanged.connect(
lambda v: self._on_param_changed('delay', v)
)
self.ui.lineEdit_10.textChanged.connect(
lambda v: self._on_param_changed('volume', v)
)
self.ui.lineEdit_13.textChanged.connect(
lambda v: self._on_param_changed('mix_right', v)
)
self.ui.lineEdit_12.textChanged.connect(
lambda v: self._on_param_changed('mix_left', v)
)
if __name__ == "__main__":
import sys
from PySide6.QtWidgets import QApplication

View File

@ -14,12 +14,12 @@ from param_struct_test.params_service import ParamsService
from component.widget_filter.audio_filter_model import AudioFilterModel
from component.widget_filter.audio_filter_componet import AudioFilterWidget
from component.widget_filter.audio_filter_model import FilterParams, FilterType, ChannelParams
from param_struct_test.params_service import Response
from param_struct_test.params_service import Response, CMD
# from audio_filter_model import AudioFilterModel
# from audio_filter_componet import AudioFilterWidget
# from audio_filter_model import FilterParams, FilterType, ChannelParams
#
# from application_controller import ApplicationController, ControllerSignalData
# from application.application_controller import ApplicationController, ControllerSignalData
# from param_struct_test.service_manager import ServiceManager
@ -187,31 +187,53 @@ class AudioFilterController(QObject):
try:
self.state = AudioControllerState.UPDATING
struct_data = res.data
print("struct_data:", struct_data)
print("\n")
# 将结构体数据转换为模型数据
model_data = self._convert_struct_data_to_model(struct_data)
# 更新通道参数
self.model.set_channel_params(model_data['channel_params'])
# 清除现有滤波器
self.model.filters.clear()
# 暂时断开widget的信号连接
if self.widget:
self.widget.disconnect_param_signals()
if model_data['channel_id'] == self.model.channel_id:
# 只添加有效的滤波器所有参数都不为0
valid_filters = [
filter_param for filter_param in model_data['filters']
if any([
filter_param.frequency != 0,
filter_param.q_value != 0,
filter_param.gain != 0,
filter_param.slope != 0
])
]
try:
# 清除UI数据
self.widget.clear_ui_data()
# 更新model的filters列表
self.model.filters = valid_filters
print("有效的滤波器数量:", len(self.model.filters))
self.widget.updateUI() # 手动更新ui
# 更新通道参数
self.model.set_channel_params(model_data['channel_params'])
# 清除现有滤波器
self.model.filters.clear()
if model_data['channel_id'] == self.model.channel_id:
# 只添加有效的滤波器所有参数都不为0
valid_filters = [
filter_param for filter_param in model_data['filters']
if any([
filter_param.frequency != 0,
filter_param.q_value != 0,
filter_param.gain != 0,
filter_param.slope != 0
])
]
# 更新model的filters列表
self.model.filters = valid_filters
print("有效的滤波器数量:", len(self.model.filters))
print(f"bbbbb, self.model.channel_params:{self.model.channel_params}")
# 从模型更新数据
print(f"aaaaa to_widget_params:{self.model.to_widget_params()}")
self.widget.set_all_params(self.model.to_widget_params())
finally:
# 恢复widget的信号连接
if self.widget:
self.widget.connect_param_signals()
except Exception as e:
self.state = AudioControllerState.ERROR
self.error_occurred.emit(f"更新参数时发生错误: {str(e)}")
@ -249,55 +271,127 @@ class AudioFilterController(QObject):
return result
def sync_to_server(self):
"""同步数据到服务器"""
"""同步数据到服务器,采用批量发送策略"""
try:
self.state = AudioControllerState.UPDATING
params = self.model.get_all_data()
# 转换数据格式
struct_params = self._convert_model_data_to_struct(params)
print("sync_to_server:", struct_params)
ServiceManager.instance().params_service.set_params(self.widget, struct_params)
# 基础参数批次
channel_id = self.model.channel_id - 1 # 调整为0基索引
base_params = {
f'tuning_parameters.mix_parameters[{channel_id}].ch_n': channel_id,
f'tuning_parameters.mix_parameters[{channel_id}].mix_left_data': struct_params[f'tuning_parameters.mix_parameters[{channel_id}].mix_left_data'],
f'tuning_parameters.mix_parameters[{channel_id}].mix_right_data': struct_params[f'tuning_parameters.mix_parameters[{channel_id}].mix_right_data'],
f'tuning_parameters.delay_parameters[{channel_id}].ch_n': channel_id,
f'tuning_parameters.delay_parameters[{channel_id}].delay_data': struct_params[f'tuning_parameters.delay_parameters[{channel_id}].delay_data'],
f'tuning_parameters.volume_parameters[{channel_id}].ch_n': channel_id,
f'tuning_parameters.volume_parameters[{channel_id}].vol_data': struct_params[f'tuning_parameters.volume_parameters[{channel_id}].vol_data']
}
# 发送基础参数
ServiceManager.instance().params_service.set_params(self.widget, base_params)
# 分批处理滤波器参数
BATCH_SIZE = 5 # 每批5个滤波器
base_idx = channel_id * 20 # 计算滤波器起始索引
# 收集所有滤波器参数
filter_params = {}
for key, value in struct_params.items():
if 'eq_parameters' in key:
filter_params[key] = value
# 按批次发送滤波器参数
for i in range(0, len(filter_params), BATCH_SIZE * 5): # 每个滤波器有5个参数
batch_params = {}
batch_keys = list(filter_params.keys())[i:i + BATCH_SIZE * 5]
for key in batch_keys:
batch_params[key] = filter_params[key]
if batch_params: # 只有当有参数时才发送
ServiceManager.instance().params_service.set_params(
self.widget,
batch_params
)
self.state = AudioControllerState.IDLE
except Exception as e:
self.state = AudioControllerState.ERROR
self.error_occurred.emit(f"Error syncing to server: {str(e)}")
def load_from_server(self):
"""从服务器加载数据"""
"""从服务器分批加载数据"""
try:
self.state = AudioControllerState.UPDATING
channel_id = self.model.channel_id
channel_id = self.model.channel_id # 使用实际通道ID不需要减1
# 构建需要获取的参数键列表
param_keys = []
# 基础参数请求 - 使用实际通道ID
base_params = [
f'tuning_parameters.mix_parameters[{channel_id-1}].ch_n',
f'tuning_parameters.mix_parameters[{channel_id-1}].mix_left_data',
f'tuning_parameters.mix_parameters[{channel_id-1}].mix_right_data',
f'tuning_parameters.delay_parameters[{channel_id-1}].ch_n',
f'tuning_parameters.delay_parameters[{channel_id-1}].delay_data',
f'tuning_parameters.volume_parameters[{channel_id-1}].ch_n',
f'tuning_parameters.volume_parameters[{channel_id-1}].vol_data'
]
# 添加通道基本参数的键
param_keys.extend([
f'tuning_parameters.mix_parameters[{channel_id}].ch_n',
f'tuning_parameters.mix_parameters[{channel_id}].mix_left_data',
f'tuning_parameters.mix_parameters[{channel_id}].mix_right_data',
f'tuning_parameters.delay_parameters[{channel_id}].ch_n',
f'tuning_parameters.delay_parameters[{channel_id}].delay_data',
f'tuning_parameters.volume_parameters[{channel_id}].ch_n',
f'tuning_parameters.volume_parameters[{channel_id}].vol_data'
])
# 分批请求滤波器参数
BATCH_SIZE = 5 # 每批请求5个滤波器的参数
base_idx = (channel_id - 1) * 20 # 修正滤波器起始索引计算通道1从0开始通道2从20开始
# 添加当前通道的20个滤波器的所有参数键
base_idx = (channel_id) * 20
for i in range(20): # 固定生成20个滤波器的参数键
idx = base_idx + i
param_keys.extend([
f'tuning_parameters.eq_parameters[{idx}].fc',
f'tuning_parameters.eq_parameters[{idx}].q',
f'tuning_parameters.eq_parameters[{idx}].gain',
f'tuning_parameters.eq_parameters[{idx}].slope',
f'tuning_parameters.eq_parameters[{idx}].filterType'
])
print(f"aa param_keys:",param_keys)
def request_batch(start_idx, end_idx):
batch_params = []
for i in range(start_idx, end_idx):
idx = base_idx + i
batch_params.extend([
f'tuning_parameters.eq_parameters[{idx}].fc',
f'tuning_parameters.eq_parameters[{idx}].q',
f'tuning_parameters.eq_parameters[{idx}].gain',
f'tuning_parameters.eq_parameters[{idx}].slope',
f'tuning_parameters.eq_parameters[{idx}].filterType'
])
return batch_params
# 创建批次请求的回调处理
all_responses = {}
remaining_batches = (20 + BATCH_SIZE - 1) // BATCH_SIZE # 向上取整
def batch_callback(response):
nonlocal remaining_batches
all_responses.update(response.data)
print("all_responses:", all_responses)
print("\n")
remaining_batches -= 1
# 当所有批次都完成时,处理完整数据
if remaining_batches == 0:
self._on_params_updated_new(Response(
token="combined",
cmd=CMD.GET_PARAMS,
widget=self.widget,
data=all_responses
))
# 发送基础参数请求
ServiceManager.instance().params_service.get_params(
self.widget,
param_keys,
callback=self._on_params_updated_new
self.widget,
base_params,
callback=lambda r: all_responses.update(r.data)
)
# 发送分批的滤波器参数请求
for batch in range(0, 20, BATCH_SIZE):
end_idx = min(batch + BATCH_SIZE, 20)
batch_params = request_batch(batch, end_idx)
ServiceManager.instance().params_service.get_params(
self.widget,
batch_params,
callback=batch_callback
)
except Exception as e:
self.state = AudioControllerState.ERROR
self.error_occurred.emit(f"从服务器加载数据时发生错误: {str(e)}")
@ -410,73 +504,182 @@ class AudioFilterController(QObject):
# except Exception as e:
# self.error_occurred.emit(f"Error loading preset: {str(e)}")
# def main():
# """测试音频滤波器控制器的参数更新功能"""
# from PySide6.QtWidgets import QApplication
# import sys
# from dataclasses import dataclass
# # 创建测试用的Response类
# @dataclass
# class TestResponse:
# data: dict
# # 创建应用实例
# app = QApplication(sys.argv)
# # 创建测试数据
# test_data = {
# 'tuning_parameters.delay_parameters[0].delay_data': 0.5,
# 'tuning_parameters.volume_parameters[0].vol_data': -6.0,
# 'tuning_parameters.mix_parameters[0].mix_right_data': 0.7,
# 'tuning_parameters.mix_parameters[0].mix_left_data': 0.3,
# # 添加两个测试滤波器
# 'tuning_parameters.eq_parameters[0].fc': 1000.0,
# 'tuning_parameters.eq_parameters[0].q': 1.4,
# 'tuning_parameters.eq_parameters[0].gain': 3.0,
# 'tuning_parameters.eq_parameters[0].slope': 12.0,
# 'tuning_parameters.eq_parameters[0].filterType': 1,
# 'tuning_parameters.eq_parameters[1].fc': 2000.0,
# 'tuning_parameters.eq_parameters[1].q': 2.0,
# 'tuning_parameters.eq_parameters[1].gain': -6.0,
# 'tuning_parameters.eq_parameters[1].slope': 12.0,
# 'tuning_parameters.eq_parameters[1].filterType': 2,
# }
# # 创建测试响应对象
# test_response = TestResponse(data=test_data)
# # 创建控制器和相关组件
# model = AudioFilterModel(channel_id=1, channel_name="测试通道")
# controller = AudioFilterController(model)
# widget = AudioFilterWidget()
# controller.set_widget(widget)
# # 调用参数更新函数
# controller._on_params_updated_new(test_response)
# # 显示widget
# widget.show()
# # 打印更新后的数据,用于验证
# print("\n=== 更新后的参数 ===")
# print(f"通道延迟: {model.channel_params.delay}")
# print(f"通道音量: {model.channel_params.volume}")
# print(f"混音左: {model.channel_params.mix_left}")
# print(f"混音右: {model.channel_params.mix_right}")
# print(f"滤波器数量: {len(model.filters)}")
# for i, filter_param in enumerate(model.filters):
# print(f"\n滤波器 {i+1}:")
# print(f"类型: {filter_param.filter_type}")
# print(f"频率: {filter_param.frequency}")
# print(f"Q值: {filter_param.q_value}")
# print(f"增益: {filter_param.gain}")
# print(f"斜率: {filter_param.slope}")
# # 运行应用
# return app.exec()
def main():
"""测试音频滤波器控制器的参数更新功能"""
from PySide6.QtWidgets import QApplication
"""测试音频滤波器控制器的分批参数加载功能"""
from PySide6.QtWidgets import QApplication, QWidget
import sys
from dataclasses import dataclass
from enum import Enum, auto
import time
# 模拟CMD枚举
class CMD(Enum):
GET_PARAMS = auto()
# 创建测试用的Response类
@dataclass
class TestResponse:
token: str
cmd: CMD
widget: QWidget
data: dict
# 模拟ParamsService
class MockParamsService:
def __init__(self):
self.server_data = {
# 基础参数 - 使用0基索引
'tuning_parameters.mix_parameters[0].ch_n': 1,
'tuning_parameters.mix_parameters[0].mix_left_data': 0.7,
'tuning_parameters.mix_parameters[0].mix_right_data': 0.3,
'tuning_parameters.delay_parameters[0].delay_data': 0.5,
'tuning_parameters.volume_parameters[0].vol_data': -6.0,
# 滤波器参数 - 使用正确的起始索引
'tuning_parameters.eq_parameters[0].fc': 100.0,
'tuning_parameters.eq_parameters[0].q': 1.4,
'tuning_parameters.eq_parameters[0].gain': 3.0,
'tuning_parameters.eq_parameters[0].slope': 12.0,
'tuning_parameters.eq_parameters[0].filterType': 1,
}
def get_params(self, widget, param_keys, callback):
print("get_params:", param_keys)
print("\n")
# 模拟异步响应
response_data = {k: self.server_data.get(k, 0) for k in param_keys}
response = TestResponse(
token="test",
cmd=CMD.GET_PARAMS,
widget=widget,
data=response_data
)
# 模拟网络延迟
time.sleep(0.1)
callback(response)
# 模拟ServiceManager
class MockServiceManager:
_instance = None
@classmethod
def instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self.params_service = MockParamsService()
# 替换真实的ServiceManager
global ServiceManager
ServiceManager = MockServiceManager
# 创建应用实例
app = QApplication(sys.argv)
# 创建测试数据
test_data = {
'tuning_parameters.delay_parameters[0].delay_data': 0.5,
'tuning_parameters.volume_parameters[0].vol_data': -6.0,
'tuning_parameters.mix_parameters[0].mix_right_data': 0.7,
'tuning_parameters.mix_parameters[0].mix_left_data': 0.3,
# 添加两个测试滤波器
'tuning_parameters.eq_parameters[0].fc': 1000.0,
'tuning_parameters.eq_parameters[0].q': 1.4,
'tuning_parameters.eq_parameters[0].gain': 3.0,
'tuning_parameters.eq_parameters[0].slope': 12.0,
'tuning_parameters.eq_parameters[0].filterType': 1,
'tuning_parameters.eq_parameters[1].fc': 2000.0,
'tuning_parameters.eq_parameters[1].q': 2.0,
'tuning_parameters.eq_parameters[1].gain': -6.0,
'tuning_parameters.eq_parameters[1].slope': 12.0,
'tuning_parameters.eq_parameters[1].filterType': 2,
}
# 创建测试响应对象
test_response = TestResponse(data=test_data)
# 创建控制器和相关组件
model = AudioFilterModel(channel_id=1, channel_name="测试通道")
controller = AudioFilterController(model)
widget = AudioFilterWidget()
controller.set_widget(widget)
# 调用参数更新函数
controller._on_params_updated_new(test_response)
# 显示widget
# 添加数据加载完成的处理
def on_params_synced():
print("\n=== 加载完成,显示更新后的参数 ===")
print(f"通道延迟: {model.channel_params.delay}")
print(f"通道音量: {model.channel_params.volume}")
print(f"混音左: {model.channel_params.mix_left}")
print(f"混音右: {model.channel_params.mix_right}")
print(f"滤波器数量: {len(model.filters)}")
for i, filter_param in enumerate(model.filters):
print(f"\n滤波器 {i+1}:")
print(f"类型: {filter_param.filter_type}")
print(f"频率: {filter_param.frequency}")
print(f"Q值: {filter_param.q_value}")
print(f"增益: {filter_param.gain}")
print(f"斜率: {filter_param.slope}")
# 连接信号
controller.params_synced.connect(on_params_synced)
# 显示widget并开始加载数据
widget.show()
# 打印更新后的数据,用于验证
print("\n=== 更新后的参数 ===")
print(f"通道延迟: {model.channel_params.delay}")
print(f"通道音量: {model.channel_params.volume}")
print(f"混音左: {model.channel_params.mix_left}")
print(f"混音右: {model.channel_params.mix_right}")
print(f"滤波器数量: {len(model.filters)}")
for i, filter_param in enumerate(model.filters):
print(f"\n滤波器 {i+1}:")
print(f"类型: {filter_param.filter_type}")
print(f"频率: {filter_param.frequency}")
print(f"Q值: {filter_param.q_value}")
print(f"增益: {filter_param.gain}")
print(f"斜率: {filter_param.slope}")
print("开始加载数据...")
controller.load_from_server()
# 运行应用
return app.exec()
if __name__ == "__main__":
main()
main()

View File

@ -113,8 +113,8 @@ class AudioFilterModel(QObject):
def to_widget_params(self) -> Dict[str, Any]:
"""转换为AudioFilterWidget兼容的参数格式"""
print(f"self.channel_params:{self.channel_params}")
params = self.channel_params.to_dict(self.channel_id)
for i, filter_param in enumerate(self.filters, 1):
params.update(filter_param.to_dict(self.channel_id, i))