brisonus_app_eq/persistence/data_store.py

399 lines
17 KiB
Python
Raw Normal View History

2025-02-25 20:37:41 +08:00
import csv
import os
2025-02-25 20:37:41 +08:00
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
from persistence.models import *
from component.widget_log.log_handler import logger
class DataStore:
def __init__(self, storage_dir: str = "data/projects"):
self.storage_dir = storage_dir
self.current_project: Optional[str] = None
self.current_param: Optional[str] = None
self._ensure_storage_dir()
def _ensure_storage_dir(self):
"""确保存储目录存在"""
if not os.path.exists(self.storage_dir):
os.makedirs(self.storage_dir)
2025-02-25 20:37:41 +08:00
# 确保参数数据目录存在
params_dir = os.path.join(self.storage_dir, "params")
if not os.path.exists(params_dir):
os.makedirs(params_dir)
def _get_project_path(self, project_name: str) -> str:
2025-02-25 20:37:41 +08:00
"""获取项目元数据文件路径"""
return os.path.join(self.storage_dir, f"{project_name}.json")
2025-02-25 20:37:41 +08:00
def _get_param_path(self, project_name: str, param_name: str) -> str:
"""获取参数数据文件路径"""
params_dir = os.path.join(self.storage_dir, "params")
return os.path.join(params_dir, f"{project_name}_{param_name}.csv")
def save_project(self, project_name: str, description: str = "") -> bool:
2025-02-25 20:37:41 +08:00
"""创建或更新项目元数据"""
try:
now = datetime.now().isoformat()
project_data = ProjectData(
name=project_name,
created_at=now if not self._project_exists(project_name) else self._get_project_created_time(project_name),
last_modified=now,
description=description,
params={}
)
2025-02-25 20:37:41 +08:00
# 保存项目元数据
self._save_project_metadata(project_name, project_data)
self.current_project = project_name
logger.info(f"项目 {project_name} 保存成功")
return True
except Exception as e:
logger.error(f"保存项目失败: {e}")
return False
def add_param_to_project(self, project_name: str, param_name: str,
channel_data: Dict[int, Dict], description: str = "") -> bool:
"""向项目添加参数配置"""
try:
2025-02-25 20:37:41 +08:00
# 加载项目元数据
project_data = self.load_project(project_name)
if not project_data:
raise ValueError(f"Project {project_name} not found")
2025-02-25 20:37:41 +08:00
# 创建参数配置
param_config = ParamConfig(
name=param_name,
created_at=datetime.now().isoformat(),
description=description,
channels=self._convert_to_channel_config(channel_data)
)
2025-02-25 20:37:41 +08:00
# 更新项目元数据
project_data.params[param_name] = param_config
project_data.last_modified = datetime.now().isoformat()
2025-02-25 20:37:41 +08:00
self._save_project_metadata(project_name, project_data)
# 保存参数数据到CSV文件
self._save_param_to_csv(project_name, param_name, channel_data)
logger.info(f"参数 {param_name} 添加到项目 {project_name} 成功")
return True
except Exception as e:
logger.error(f"添加参数失败: {e}")
return False
2025-02-25 20:37:41 +08:00
def _save_param_to_csv(self, project_name: str, param_name: str, channel_data: Dict[int, Dict]):
"""将参数数据保存为CSV格式只包含参数名和值"""
csv_path = self._get_param_path(project_name, param_name)
with open(csv_path, 'w', newline='') as csvfile:
fieldnames = ['parameter', 'value']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# 写入基本参数
writer.writerow({
'parameter': 'dataset.audio_mode',
'value': '0' # 默认值
})
writer.writerow({
'parameter': 'dataset.send_action',
'value': '0' # 默认值
})
# 写入通道参数
for channel_id, data in channel_data.items():
# 混音参数
if 0 <= channel_id < 6: # 假设最多6个通道
# 通道号
writer.writerow({
'parameter': f'dataset.tuning_parameters.mix_parameters[{channel_id}].ch_n',
'value': str(channel_id)
})
# 左混音
writer.writerow({
'parameter': f'dataset.tuning_parameters.mix_parameters[{channel_id}].mix_left_data',
'value': str(data.get('mix_left_data', 0.0))
})
# 右混音
writer.writerow({
'parameter': f'dataset.tuning_parameters.mix_parameters[{channel_id}].mix_right_data',
'value': str(data.get('mix_right_data', 0.0))
})
# 延迟参数
writer.writerow({
'parameter': f'dataset.tuning_parameters.delay_parameters[{channel_id}].ch_n',
'value': str(channel_id)
})
writer.writerow({
'parameter': f'dataset.tuning_parameters.delay_parameters[{channel_id}].delay_data',
'value': str(data.get('delay_data', 0.0))
})
# 音量参数
writer.writerow({
'parameter': f'dataset.tuning_parameters.volume_parameters[{channel_id}].ch_n',
'value': str(channel_id)
})
writer.writerow({
'parameter': f'dataset.tuning_parameters.volume_parameters[{channel_id}].vol_data',
'value': str(data.get('vol_data', 0.0))
})
# 滤波器参数
for filter_idx, filter_data in enumerate(data.get('filters', [])):
base_idx = channel_id * 20 + filter_idx # 假设每个通道最多20个滤波器
if base_idx < 120: # 最多120个滤波器参数
# 中心频率
writer.writerow({
'parameter': f'dataset.tuning_parameters.eq_parameters[{base_idx}].fc',
'value': str(filter_data.get('fc', 0.0))
})
# Q值
writer.writerow({
'parameter': f'dataset.tuning_parameters.eq_parameters[{base_idx}].q',
'value': str(filter_data.get('q', 0.0))
})
# 增益
writer.writerow({
'parameter': f'dataset.tuning_parameters.eq_parameters[{base_idx}].gain',
'value': str(filter_data.get('gain', 0.0))
})
# 斜率
writer.writerow({
'parameter': f'dataset.tuning_parameters.eq_parameters[{base_idx}].slope',
'value': str(filter_data.get('slope', 0))
})
# 滤波器类型
writer.writerow({
'parameter': f'dataset.tuning_parameters.eq_parameters[{base_idx}].filterType',
'value': str(filter_data.get('filterType', 0))
})
def _get_param_structure(self):
"""解析struct_params.txt获取参数结构"""
# 这里可以实现解析struct_params.txt的逻辑
# 简化起见,我们直接使用硬编码的结构
return {}
def _convert_to_channel_config(self, channel_data: Dict[int, Dict]) -> Dict[int, ChannelConfig]:
"""转换通道数据为ChannelConfig格式"""
converted = {}
for channel_id, data in channel_data.items():
filters = [FilterConfig(**f) for f in data.get('filters', [])]
converted[channel_id] = ChannelConfig(
delay_data=data.get('delay_data', 0.0),
vol_data=data.get('vol_data', 0.0),
mix_left_data=data.get('mix_left_data', 0.0),
mix_right_data=data.get('mix_right_data', 0.0),
filters=filters
)
return converted
def load_project(self, project_name: str) -> Optional[ProjectData]:
2025-02-25 20:37:41 +08:00
"""加载项目元数据"""
try:
file_path = self._get_project_path(project_name)
if not os.path.exists(file_path):
return None
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return ProjectData(**data)
except Exception as e:
logger.error(f"加载项目失败: {e}")
return None
2025-02-25 20:37:41 +08:00
def load_param_data(self, project_name: str, param_name: str) -> Dict:
"""加载参数数据"""
try:
csv_path = self._get_param_path(project_name, param_name)
if not os.path.exists(csv_path):
return {}
param_data = {}
with open(csv_path, 'r', newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
param_data[row['parameter']] = row['value']
# 转换为通道数据格式
channel_data = self._convert_csv_to_channel_data(param_data)
return channel_data
except Exception as e:
logger.error(f"加载参数数据失败: {e}")
return {}
def _convert_csv_to_channel_data(self, param_data: Dict) -> Dict[int, Dict]:
"""将CSV格式的参数数据转换为通道数据格式"""
channel_data = {}
# 处理混音参数
for i in range(6): # 假设最多6个通道
ch_key = f'dataset.tuning_parameters.mix_parameters[{i}].ch_n'
if ch_key in param_data:
channel_id = int(param_data[ch_key])
if channel_id not in channel_data:
channel_data[channel_id] = {'filters': []}
# 左混音
left_key = f'dataset.tuning_parameters.mix_parameters[{i}].mix_left_data'
if left_key in param_data:
channel_data[channel_id]['mix_left_data'] = float(param_data[left_key])
# 右混音
right_key = f'dataset.tuning_parameters.mix_parameters[{i}].mix_right_data'
if right_key in param_data:
channel_data[channel_id]['mix_right_data'] = float(param_data[right_key])
# 处理延迟参数
for i in range(6):
ch_key = f'dataset.tuning_parameters.delay_parameters[{i}].ch_n'
if ch_key in param_data:
channel_id = int(param_data[ch_key])
if channel_id not in channel_data:
channel_data[channel_id] = {'filters': []}
delay_key = f'dataset.tuning_parameters.delay_parameters[{i}].delay_data'
if delay_key in param_data:
channel_data[channel_id]['delay_data'] = float(param_data[delay_key])
# 处理音量参数
for i in range(6):
ch_key = f'dataset.tuning_parameters.volume_parameters[{i}].ch_n'
if ch_key in param_data:
channel_id = int(param_data[ch_key])
if channel_id not in channel_data:
channel_data[channel_id] = {'filters': []}
vol_key = f'dataset.tuning_parameters.volume_parameters[{i}].vol_data'
if vol_key in param_data:
channel_data[channel_id]['vol_data'] = float(param_data[vol_key])
# 处理滤波器参数
for i in range(120): # 最多120个滤波器
fc_key = f'dataset.tuning_parameters.eq_parameters[{i}].fc'
if fc_key in param_data:
# 确定该滤波器属于哪个通道
channel_id = i // 20 # 假设每个通道最多20个滤波器
filter_idx = i % 20
if channel_id not in channel_data:
channel_data[channel_id] = {'filters': []}
# 确保filters列表有足够的元素
while len(channel_data[channel_id]['filters']) <= filter_idx:
channel_data[channel_id]['filters'].append({})
# 设置滤波器参数
filter_data = channel_data[channel_id]['filters'][filter_idx]
filter_data['fc'] = float(param_data[fc_key])
q_key = f'dataset.tuning_parameters.eq_parameters[{i}].q'
if q_key in param_data:
filter_data['q'] = float(param_data[q_key])
gain_key = f'dataset.tuning_parameters.eq_parameters[{i}].gain'
if gain_key in param_data:
filter_data['gain'] = float(param_data[gain_key])
slope_key = f'dataset.tuning_parameters.eq_parameters[{i}].slope'
if slope_key in param_data:
filter_data['slope'] = int(param_data[slope_key])
filter_type_key = f'dataset.tuning_parameters.eq_parameters[{i}].filterType'
if filter_type_key in param_data:
filter_data['filterType'] = int(param_data[filter_type_key])
return channel_data
def list_projects(self) -> List[str]:
"""列出所有项目"""
try:
projects = []
for file in os.listdir(self.storage_dir):
if file.endswith('.json'):
projects.append(file[:-5])
return projects
except Exception as e:
logger.error(f"列出项目失败: {e}")
return []
2025-02-25 20:37:41 +08:00
def list_params(self, project_name: str) -> List[str]:
"""列出项目的所有参数"""
try:
project_data = self.load_project(project_name)
if project_data:
return list(project_data.params.keys())
return []
except Exception as e:
logger.error(f"列出参数失败: {e}")
return []
def delete_project(self, project_name: str) -> bool:
"""删除项目"""
try:
2025-02-25 20:37:41 +08:00
# 删除项目元数据文件
file_path = self._get_project_path(project_name)
if os.path.exists(file_path):
os.remove(file_path)
2025-02-25 20:37:41 +08:00
# 删除项目相关的参数文件
params_dir = os.path.join(self.storage_dir, "params")
for file in os.listdir(params_dir):
if file.startswith(f"{project_name}_") and file.endswith('.csv'):
os.remove(os.path.join(params_dir, file))
if self.current_project == project_name:
self.current_project = None
self.current_param = None
logger.info(f"项目 {project_name} 删除成功")
return True
except Exception as e:
logger.error(f"删除项目失败: {e}")
return False
2025-02-25 20:37:41 +08:00
def delete_param(self, project_name: str, param_name: str) -> bool:
"""删除参数"""
try:
# 更新项目元数据
project_data = self.load_project(project_name)
if project_data and param_name in project_data.params:
del project_data.params[param_name]
project_data.last_modified = datetime.now().isoformat()
self._save_project_metadata(project_name, project_data)
# 删除参数文件
param_path = self._get_param_path(project_name, param_name)
if os.path.exists(param_path):
os.remove(param_path)
if self.current_project == project_name and self.current_param == param_name:
self.current_param = None
logger.info(f"参数 {param_name} 删除成功")
return True
except Exception as e:
logger.error(f"删除参数失败: {e}")
return False
def _project_exists(self, project_name: str) -> bool:
"""检查项目是否存在"""
return os.path.exists(self._get_project_path(project_name))
def _get_project_created_time(self, project_name: str) -> str:
"""获取项目创建时间"""
if self._project_exists(project_name):
data = self.load_project(project_name)
return data.created_at if data else datetime.now().isoformat()
return datetime.now().isoformat()
2025-02-25 20:37:41 +08:00
def _save_project_metadata(self, project_name: str, project_data: ProjectData):
"""保存项目元数据到文件"""
file_path = self._get_project_path(project_name)
with open(file_path, 'w', encoding='utf-8') as f:
2025-02-25 20:37:41 +08:00
json.dump(asdict(project_data), f, indent=2, ensure_ascii=False)