brisonus_app_eq/persistence/data_store.py

439 lines
18 KiB
Python
Raw Permalink Normal View History

2025-02-25 20:37:41 +08:00
import csv
import os
2025-02-25 20:37:41 +08:00
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
from persistence.models import *
from component.widget_log.log_handler import logger
class DataStore:
def __init__(self, storage_dir: str = "data/projects"):
self.storage_dir = storage_dir
self.current_project: Optional[str] = None
self.current_param: Optional[str] = None
self.current_paramter_name: Optional[str] = None
self._ensure_storage_dir()
def _ensure_storage_dir(self):
"""确保存储目录存在"""
if not os.path.exists(self.storage_dir):
os.makedirs(self.storage_dir)
2025-02-25 20:37:41 +08:00
# 确保参数数据目录存在
params_dir = os.path.join(self.storage_dir, "params")
if not os.path.exists(params_dir):
os.makedirs(params_dir)
def _get_project_path(self, project_name: str) -> str:
2025-02-25 20:37:41 +08:00
"""获取项目元数据文件路径"""
return os.path.join(self.storage_dir, f"{project_name}.json")
2025-02-25 20:37:41 +08:00
def _get_param_path(self, project_name: str, param_name: str) -> str:
"""获取参数数据文件路径"""
params_dir = os.path.join(self.storage_dir, "params")
return os.path.join(params_dir, f"{project_name}_{param_name}.csv")
def save_project(self, project_name: str, description: str = "") -> bool:
2025-02-25 20:37:41 +08:00
"""创建或更新项目元数据"""
try:
now = datetime.now().isoformat()
project_data = ProjectData(
name=project_name,
created_at=now if not self._project_exists(project_name) else self._get_project_created_time(project_name),
last_modified=now,
description=description,
params={}
)
2025-02-25 20:37:41 +08:00
# 保存项目元数据
self._save_project_metadata(project_name, project_data)
self.current_project = project_name
logger.info(f"项目 {project_name} 保存成功")
return True
except Exception as e:
logger.error(f"保存项目失败: {e}")
return False
def add_param_to_project(self, project_name: str, param_name: str,
channel_data: Dict[int, Dict], description: str = "") -> bool:
"""向项目添加参数配置"""
try:
2025-02-25 20:37:41 +08:00
# 加载项目元数据
project_data = self.load_project(project_name)
if not project_data:
raise ValueError(f"Project {project_name} not found")
2025-02-25 22:36:11 +08:00
# 创建简化的参数配置(只包含描述信息)
param_config = ParamConfig(
name=param_name,
created_at=datetime.now().isoformat(),
description=description,
2025-02-25 22:36:11 +08:00
channels={} # 不再存储通道配置
)
2025-02-25 20:37:41 +08:00
# 更新项目元数据
project_data.params[param_name] = param_config
project_data.last_modified = datetime.now().isoformat()
2025-02-25 20:37:41 +08:00
self._save_project_metadata(project_name, project_data)
# 保存参数数据到CSV文件
self._save_param_to_csv(project_name, param_name, channel_data)
logger.info(f"参数 {param_name} 添加到项目 {project_name} 成功")
return True
except Exception as e:
logger.error(f"添加参数失败: {e}")
return False
2025-02-25 20:37:41 +08:00
def _save_param_to_csv(self, project_name: str, param_name: str, channel_data: Dict[int, Dict]):
2025-02-25 22:36:11 +08:00
"""将参数数据保存为CSV格式按照指定的顺序排列"""
2025-02-25 20:37:41 +08:00
csv_path = self._get_param_path(project_name, param_name)
2025-02-25 22:36:11 +08:00
# 准备参数数据列表
param_data = []
# 添加基本参数
param_data.append({
'parameter': 'dataset.audio_mode',
'value': '0'
})
param_data.append({
'parameter': 'dataset.send_action',
'value': '0'
})
# 添加混音参数
for i in range(6):
ch_value = str(i if i in channel_data else i)
param_data.append({
'parameter': f'dataset.tuning_parameters.mix_parameters[{i}].ch_n',
'value': ch_value
})
left_value = str(channel_data.get(i, {}).get('mix_left_data', 0.0))
param_data.append({
'parameter': f'dataset.tuning_parameters.mix_parameters[{i}].mix_left_data',
'value': left_value
})
right_value = str(channel_data.get(i, {}).get('mix_right_data', 0.0))
param_data.append({
'parameter': f'dataset.tuning_parameters.mix_parameters[{i}].mix_right_data',
'value': right_value
})
# 添加EQ参数
for i in range(120):
# 确定该滤波器属于哪个通道
channel_id = i // 20 # 假设每个通道最多20个滤波器
filter_idx = i % 20
filter_data = {}
if channel_id in channel_data and 'filters' in channel_data[channel_id]:
filters = channel_data[channel_id]['filters']
if filter_idx < len(filters):
filter_data = filters[filter_idx]
# 中心频率
param_data.append({
'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].fc',
'value': str(filter_data.get('fc', 0.0))
})
# Q值
param_data.append({
'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].q',
'value': str(filter_data.get('q', 0.0))
})
# 增益
param_data.append({
'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].gain',
'value': str(filter_data.get('gain', 0.0))
})
# 斜率
param_data.append({
'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].slope',
'value': str(filter_data.get('slope', 0))
})
# 滤波器类型
param_data.append({
'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].filterType',
'value': str(filter_data.get('filterType', 0))
})
# 添加延迟参数
for i in range(6):
ch_value = str(i if i in channel_data else i)
param_data.append({
'parameter': f'dataset.tuning_parameters.delay_parameters[{i}].ch_n',
'value': ch_value
})
delay_value = str(channel_data.get(i, {}).get('delay_data', 0.0))
param_data.append({
'parameter': f'dataset.tuning_parameters.delay_parameters[{i}].delay_data',
'value': delay_value
})
# 添加音量参数
for i in range(6):
ch_value = str(i if i in channel_data else i)
param_data.append({
'parameter': f'dataset.tuning_parameters.volume_parameters[{i}].ch_n',
'value': ch_value
})
vol_value = str(channel_data.get(i, {}).get('vol_data', 0.0))
param_data.append({
'parameter': f'dataset.tuning_parameters.volume_parameters[{i}].vol_data',
'value': vol_value
})
# 写入CSV文件
2025-02-25 20:37:41 +08:00
with open(csv_path, 'w', newline='') as csvfile:
fieldnames = ['parameter', 'value']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
2025-02-25 22:36:11 +08:00
for param in param_data:
2025-02-25 20:37:41 +08:00
writer.writerow({
2025-02-25 22:36:11 +08:00
'parameter': param['parameter'],
'value': param['value']
2025-02-25 20:37:41 +08:00
})
def _get_param_structure(self):
"""解析struct_params.txt获取参数结构"""
# 这里可以实现解析struct_params.txt的逻辑
# 简化起见,我们直接使用硬编码的结构
return {}
def _convert_to_channel_config(self, channel_data: Dict[int, Dict]) -> Dict[int, ChannelConfig]:
"""转换通道数据为ChannelConfig格式"""
2025-02-25 22:36:11 +08:00
# 由于JSON不再存储通道配置此方法可以简化
return {}
def load_project(self, project_name: str) -> Optional[ProjectData]:
2025-02-25 20:37:41 +08:00
"""加载项目元数据"""
try:
file_path = self._get_project_path(project_name)
if not os.path.exists(file_path):
return None
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
project_data = ProjectData(**data)
return project_data
except Exception as e:
logger.error(f"加载项目失败: {e}")
return None
2025-02-25 20:37:41 +08:00
def load_param_data(self, project_name: str, param_name: str) -> Dict:
"""加载参数数据"""
try:
csv_path = self._get_param_path(project_name, param_name)
if not os.path.exists(csv_path):
return {}
param_data = {}
with open(csv_path, 'r', newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
param_data[row['parameter']] = row['value']
# 转换为通道数据格式
channel_data = self._convert_csv_to_channel_data(param_data)
return channel_data
except Exception as e:
logger.error(f"加载参数数据失败: {e}")
return {}
def _convert_csv_to_channel_data(self, param_data: Dict) -> Dict[int, Dict]:
"""将CSV格式的参数数据转换为通道数据格式"""
channel_data = {}
# 处理混音参数
# to do: 后续处理这个6
2025-02-25 20:37:41 +08:00
for i in range(6): # 假设最多6个通道
ch_key = f'dataset.tuning_parameters.mix_parameters[{i}].ch_n'
if ch_key in param_data:
# 将字符串先转为浮点数,再转为整数,避免小数点导致的转换错误
channel_id = int(float(param_data[ch_key]))
2025-02-25 20:37:41 +08:00
if channel_id not in channel_data:
channel_data[channel_id] = {'filters': []}
2025-02-25 20:37:41 +08:00
left_key = f'dataset.tuning_parameters.mix_parameters[{i}].mix_left_data'
if left_key in param_data:
channel_data[channel_id]['mix_left_data'] = float(param_data[left_key])
2025-02-25 20:37:41 +08:00
right_key = f'dataset.tuning_parameters.mix_parameters[{i}].mix_right_data'
if right_key in param_data:
channel_data[channel_id]['mix_right_data'] = float(param_data[right_key])
2025-02-25 20:37:41 +08:00
for i in range(6):
ch_key = f'dataset.tuning_parameters.delay_parameters[{i}].ch_n'
if ch_key in param_data:
# 同样先转为浮点数再转为整数
channel_id = int(float(param_data[ch_key]))
2025-02-25 20:37:41 +08:00
if channel_id not in channel_data:
channel_data[channel_id] = {'filters': []}
delay_key = f'dataset.tuning_parameters.delay_parameters[{i}].delay_data'
if delay_key in param_data:
channel_data[channel_id]['delay_data'] = float(param_data[delay_key])
2025-02-25 20:37:41 +08:00
for i in range(6):
ch_key = f'dataset.tuning_parameters.volume_parameters[{i}].ch_n'
if ch_key in param_data:
# 同样先转为浮点数再转为整数
channel_id = int(float(param_data[ch_key]))
2025-02-25 20:37:41 +08:00
if channel_id not in channel_data:
channel_data[channel_id] = {'filters': []}
vol_key = f'dataset.tuning_parameters.volume_parameters[{i}].vol_data'
if vol_key in param_data:
channel_data[channel_id]['vol_data'] = float(param_data[vol_key])
for i in range(120): # 最多120个滤波器
2025-02-25 20:37:41 +08:00
fc_key = f'dataset.tuning_parameters.eq_parameters[{i}].fc'
if fc_key in param_data:
# 确定该滤波器属于哪个通道
channel_id = i // 20 # 假设每个通道最多20个滤波器
filter_idx = i % 20
if channel_id not in channel_data:
channel_data[channel_id] = {'filters': []}
# 确保filters列表有足够的元素
while len(channel_data[channel_id]['filters']) <= filter_idx:
channel_data[channel_id]['filters'].append({})
# 设置滤波器参数
filter_data = channel_data[channel_id]['filters'][filter_idx]
filter_data['fc'] = float(param_data[fc_key])
q_key = f'dataset.tuning_parameters.eq_parameters[{i}].q'
if q_key in param_data:
filter_data['q'] = float(param_data[q_key])
gain_key = f'dataset.tuning_parameters.eq_parameters[{i}].gain'
if gain_key in param_data:
filter_data['gain'] = float(param_data[gain_key])
slope_key = f'dataset.tuning_parameters.eq_parameters[{i}].slope'
if slope_key in param_data:
# 先转为浮点数再转为整数
filter_data['slope'] = int(float(param_data[slope_key]))
2025-02-25 20:37:41 +08:00
filter_type_key = f'dataset.tuning_parameters.eq_parameters[{i}].filterType'
if filter_type_key in param_data:
# 先转为浮点数再转为整数
filter_data['filterType'] = int(float(param_data[filter_type_key]))
# 添加enable和name参数
enable_key = f'dataset.tuning_parameters.eq_parameters[{i}].enable'
if enable_key in param_data:
filter_data['enable'] = param_data[enable_key].lower() == 'true'
name_key = f'dataset.tuning_parameters.eq_parameters[{i}].name'
if name_key in param_data:
filter_data['name'] = param_data[name_key]
2025-02-25 20:37:41 +08:00
return channel_data
def list_projects(self) -> List[str]:
"""列出所有项目"""
try:
projects = []
for file in os.listdir(self.storage_dir):
if file.endswith('.json'):
projects.append(file[:-5])
return projects
except Exception as e:
logger.error(f"列出项目失败: {e}")
return []
2025-02-25 20:37:41 +08:00
def list_params(self, project_name: str) -> List[str]:
"""列出项目的所有参数"""
try:
project_data = self.load_project(project_name)
if project_data:
return list(project_data.params.keys())
return []
except Exception as e:
logger.error(f"列出参数失败: {e}")
return []
def delete_project(self, project_name: str) -> bool:
"""删除项目"""
try:
2025-02-25 20:37:41 +08:00
# 删除项目元数据文件
file_path = self._get_project_path(project_name)
if os.path.exists(file_path):
os.remove(file_path)
2025-02-25 20:37:41 +08:00
# 删除项目相关的参数文件
params_dir = os.path.join(self.storage_dir, "params")
for file in os.listdir(params_dir):
if file.startswith(f"{project_name}_") and file.endswith('.csv'):
os.remove(os.path.join(params_dir, file))
if self.current_project == project_name:
self.current_project = None
self.current_param = None
logger.info(f"项目 {project_name} 删除成功")
return True
except Exception as e:
logger.error(f"删除项目失败: {e}")
return False
2025-02-25 20:37:41 +08:00
def delete_param(self, project_name: str, param_name: str) -> bool:
"""删除参数"""
try:
# 更新项目元数据
project_data = self.load_project(project_name)
if project_data and param_name in project_data.params:
del project_data.params[param_name]
project_data.last_modified = datetime.now().isoformat()
self._save_project_metadata(project_name, project_data)
# 删除参数文件
param_path = self._get_param_path(project_name, param_name)
if os.path.exists(param_path):
os.remove(param_path)
if self.current_project == project_name and self.current_param == param_name:
self.current_param = None
logger.info(f"参数 {param_name} 删除成功")
return True
except Exception as e:
logger.error(f"删除参数失败: {e}")
return False
def _project_exists(self, project_name: str) -> bool:
"""检查项目是否存在"""
return os.path.exists(self._get_project_path(project_name))
def _get_project_created_time(self, project_name: str) -> str:
"""获取项目创建时间"""
if self._project_exists(project_name):
data = self.load_project(project_name)
return data.created_at if data else datetime.now().isoformat()
return datetime.now().isoformat()
2025-02-25 20:37:41 +08:00
def _save_project_metadata(self, project_name: str, project_data: ProjectData):
"""保存项目元数据到文件"""
file_path = self._get_project_path(project_name)
with open(file_path, 'w', encoding='utf-8') as f:
2025-02-25 20:37:41 +08:00
json.dump(asdict(project_data), f, indent=2, ensure_ascii=False)