import csv import os import json from typing import Dict, List, Any, Optional from datetime import datetime from persistence.models import * from component.widget_log.log_handler import logger class DataStore: def __init__(self, storage_dir: str = "data/projects"): self.storage_dir = storage_dir self.current_project: Optional[str] = None self.current_param: Optional[str] = None self._ensure_storage_dir() def _ensure_storage_dir(self): """确保存储目录存在""" if not os.path.exists(self.storage_dir): os.makedirs(self.storage_dir) # 确保参数数据目录存在 params_dir = os.path.join(self.storage_dir, "params") if not os.path.exists(params_dir): os.makedirs(params_dir) def _get_project_path(self, project_name: str) -> str: """获取项目元数据文件路径""" return os.path.join(self.storage_dir, f"{project_name}.json") def _get_param_path(self, project_name: str, param_name: str) -> str: """获取参数数据文件路径""" params_dir = os.path.join(self.storage_dir, "params") return os.path.join(params_dir, f"{project_name}_{param_name}.csv") def save_project(self, project_name: str, description: str = "") -> bool: """创建或更新项目元数据""" try: now = datetime.now().isoformat() project_data = ProjectData( name=project_name, created_at=now if not self._project_exists(project_name) else self._get_project_created_time(project_name), last_modified=now, description=description, params={} ) # 保存项目元数据 self._save_project_metadata(project_name, project_data) self.current_project = project_name logger.info(f"项目 {project_name} 保存成功") return True except Exception as e: logger.error(f"保存项目失败: {e}") return False def add_param_to_project(self, project_name: str, param_name: str, channel_data: Dict[int, Dict], description: str = "") -> bool: """向项目添加参数配置""" try: # 加载项目元数据 project_data = self.load_project(project_name) if not project_data: raise ValueError(f"Project {project_name} not found") # 创建简化的参数配置(只包含描述信息) param_config = ParamConfig( name=param_name, created_at=datetime.now().isoformat(), description=description, channels={} # 不再存储通道配置 ) # 更新项目元数据 project_data.params[param_name] = param_config project_data.last_modified = datetime.now().isoformat() self._save_project_metadata(project_name, project_data) # 保存参数数据到CSV文件 self._save_param_to_csv(project_name, param_name, channel_data) logger.info(f"参数 {param_name} 添加到项目 {project_name} 成功") return True except Exception as e: logger.error(f"添加参数失败: {e}") return False def _save_param_to_csv(self, project_name: str, param_name: str, channel_data: Dict[int, Dict]): """将参数数据保存为CSV格式,按照指定的顺序排列""" csv_path = self._get_param_path(project_name, param_name) # 准备参数数据列表 param_data = [] # 添加基本参数 param_data.append({ 'parameter': 'dataset.audio_mode', 'value': '0' }) param_data.append({ 'parameter': 'dataset.send_action', 'value': '0' }) # 添加混音参数 for i in range(6): ch_value = str(i if i in channel_data else i) param_data.append({ 'parameter': f'dataset.tuning_parameters.mix_parameters[{i}].ch_n', 'value': ch_value }) left_value = str(channel_data.get(i, {}).get('mix_left_data', 0.0)) param_data.append({ 'parameter': f'dataset.tuning_parameters.mix_parameters[{i}].mix_left_data', 'value': left_value }) right_value = str(channel_data.get(i, {}).get('mix_right_data', 0.0)) param_data.append({ 'parameter': f'dataset.tuning_parameters.mix_parameters[{i}].mix_right_data', 'value': right_value }) # 添加EQ参数 for i in range(120): # 确定该滤波器属于哪个通道 channel_id = i // 20 # 假设每个通道最多20个滤波器 filter_idx = i % 20 filter_data = {} if channel_id in channel_data and 'filters' in channel_data[channel_id]: filters = channel_data[channel_id]['filters'] if filter_idx < len(filters): filter_data = filters[filter_idx] # 中心频率 param_data.append({ 'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].fc', 'value': str(filter_data.get('fc', 0.0)) }) # Q值 param_data.append({ 'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].q', 'value': str(filter_data.get('q', 0.0)) }) # 增益 param_data.append({ 'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].gain', 'value': str(filter_data.get('gain', 0.0)) }) # 斜率 param_data.append({ 'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].slope', 'value': str(filter_data.get('slope', 0)) }) # 滤波器类型 param_data.append({ 'parameter': f'dataset.tuning_parameters.eq_parameters[{i}].filterType', 'value': str(filter_data.get('filterType', 0)) }) # 添加延迟参数 for i in range(6): ch_value = str(i if i in channel_data else i) param_data.append({ 'parameter': f'dataset.tuning_parameters.delay_parameters[{i}].ch_n', 'value': ch_value }) delay_value = str(channel_data.get(i, {}).get('delay_data', 0.0)) param_data.append({ 'parameter': f'dataset.tuning_parameters.delay_parameters[{i}].delay_data', 'value': delay_value }) # 添加音量参数 for i in range(6): ch_value = str(i if i in channel_data else i) param_data.append({ 'parameter': f'dataset.tuning_parameters.volume_parameters[{i}].ch_n', 'value': ch_value }) vol_value = str(channel_data.get(i, {}).get('vol_data', 0.0)) param_data.append({ 'parameter': f'dataset.tuning_parameters.volume_parameters[{i}].vol_data', 'value': vol_value }) # 写入CSV文件 with open(csv_path, 'w', newline='') as csvfile: fieldnames = ['parameter', 'value'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for param in param_data: writer.writerow({ 'parameter': param['parameter'], 'value': param['value'] }) def _get_param_structure(self): """解析struct_params.txt获取参数结构""" # 这里可以实现解析struct_params.txt的逻辑 # 简化起见,我们直接使用硬编码的结构 return {} def _convert_to_channel_config(self, channel_data: Dict[int, Dict]) -> Dict[int, ChannelConfig]: """转换通道数据为ChannelConfig格式""" # 由于JSON不再存储通道配置,此方法可以简化 return {} def load_project(self, project_name: str) -> Optional[ProjectData]: """加载项目元数据""" try: file_path = self._get_project_path(project_name) if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return ProjectData(**data) except Exception as e: logger.error(f"加载项目失败: {e}") return None def load_param_data(self, project_name: str, param_name: str) -> Dict: """加载参数数据""" try: csv_path = self._get_param_path(project_name, param_name) if not os.path.exists(csv_path): return {} param_data = {} with open(csv_path, 'r', newline='') as csvfile: reader = csv.DictReader(csvfile) for row in reader: param_data[row['parameter']] = row['value'] # 转换为通道数据格式 channel_data = self._convert_csv_to_channel_data(param_data) return channel_data except Exception as e: logger.error(f"加载参数数据失败: {e}") return {} def _convert_csv_to_channel_data(self, param_data: Dict) -> Dict[int, Dict]: """将CSV格式的参数数据转换为通道数据格式""" channel_data = {} # 处理混音参数 for i in range(6): # 假设最多6个通道 ch_key = f'dataset.tuning_parameters.mix_parameters[{i}].ch_n' if ch_key in param_data: channel_id = int(param_data[ch_key]) if channel_id not in channel_data: channel_data[channel_id] = {'filters': []} # 左混音 left_key = f'dataset.tuning_parameters.mix_parameters[{i}].mix_left_data' if left_key in param_data: channel_data[channel_id]['mix_left_data'] = float(param_data[left_key]) # 右混音 right_key = f'dataset.tuning_parameters.mix_parameters[{i}].mix_right_data' if right_key in param_data: channel_data[channel_id]['mix_right_data'] = float(param_data[right_key]) # 处理延迟参数 for i in range(6): ch_key = f'dataset.tuning_parameters.delay_parameters[{i}].ch_n' if ch_key in param_data: channel_id = int(param_data[ch_key]) if channel_id not in channel_data: channel_data[channel_id] = {'filters': []} delay_key = f'dataset.tuning_parameters.delay_parameters[{i}].delay_data' if delay_key in param_data: channel_data[channel_id]['delay_data'] = float(param_data[delay_key]) # 处理音量参数 for i in range(6): ch_key = f'dataset.tuning_parameters.volume_parameters[{i}].ch_n' if ch_key in param_data: channel_id = int(param_data[ch_key]) if channel_id not in channel_data: channel_data[channel_id] = {'filters': []} vol_key = f'dataset.tuning_parameters.volume_parameters[{i}].vol_data' if vol_key in param_data: channel_data[channel_id]['vol_data'] = float(param_data[vol_key]) # 处理滤波器参数 for i in range(120): # 最多120个滤波器 fc_key = f'dataset.tuning_parameters.eq_parameters[{i}].fc' if fc_key in param_data: # 确定该滤波器属于哪个通道 channel_id = i // 20 # 假设每个通道最多20个滤波器 filter_idx = i % 20 if channel_id not in channel_data: channel_data[channel_id] = {'filters': []} # 确保filters列表有足够的元素 while len(channel_data[channel_id]['filters']) <= filter_idx: channel_data[channel_id]['filters'].append({}) # 设置滤波器参数 filter_data = channel_data[channel_id]['filters'][filter_idx] filter_data['fc'] = float(param_data[fc_key]) q_key = f'dataset.tuning_parameters.eq_parameters[{i}].q' if q_key in param_data: filter_data['q'] = float(param_data[q_key]) gain_key = f'dataset.tuning_parameters.eq_parameters[{i}].gain' if gain_key in param_data: filter_data['gain'] = float(param_data[gain_key]) slope_key = f'dataset.tuning_parameters.eq_parameters[{i}].slope' if slope_key in param_data: filter_data['slope'] = int(param_data[slope_key]) filter_type_key = f'dataset.tuning_parameters.eq_parameters[{i}].filterType' if filter_type_key in param_data: filter_data['filterType'] = int(param_data[filter_type_key]) return channel_data def list_projects(self) -> List[str]: """列出所有项目""" try: projects = [] for file in os.listdir(self.storage_dir): if file.endswith('.json'): projects.append(file[:-5]) return projects except Exception as e: logger.error(f"列出项目失败: {e}") return [] def list_params(self, project_name: str) -> List[str]: """列出项目的所有参数""" try: project_data = self.load_project(project_name) if project_data: return list(project_data.params.keys()) return [] except Exception as e: logger.error(f"列出参数失败: {e}") return [] def delete_project(self, project_name: str) -> bool: """删除项目""" try: # 删除项目元数据文件 file_path = self._get_project_path(project_name) if os.path.exists(file_path): os.remove(file_path) # 删除项目相关的参数文件 params_dir = os.path.join(self.storage_dir, "params") for file in os.listdir(params_dir): if file.startswith(f"{project_name}_") and file.endswith('.csv'): os.remove(os.path.join(params_dir, file)) if self.current_project == project_name: self.current_project = None self.current_param = None logger.info(f"项目 {project_name} 删除成功") return True except Exception as e: logger.error(f"删除项目失败: {e}") return False def delete_param(self, project_name: str, param_name: str) -> bool: """删除参数""" try: # 更新项目元数据 project_data = self.load_project(project_name) if project_data and param_name in project_data.params: del project_data.params[param_name] project_data.last_modified = datetime.now().isoformat() self._save_project_metadata(project_name, project_data) # 删除参数文件 param_path = self._get_param_path(project_name, param_name) if os.path.exists(param_path): os.remove(param_path) if self.current_project == project_name and self.current_param == param_name: self.current_param = None logger.info(f"参数 {param_name} 删除成功") return True except Exception as e: logger.error(f"删除参数失败: {e}") return False def _project_exists(self, project_name: str) -> bool: """检查项目是否存在""" return os.path.exists(self._get_project_path(project_name)) def _get_project_created_time(self, project_name: str) -> str: """获取项目创建时间""" if self._project_exists(project_name): data = self.load_project(project_name) return data.created_at if data else datetime.now().isoformat() return datetime.now().isoformat() def _save_project_metadata(self, project_name: str, project_data: ProjectData): """保存项目元数据到文件""" file_path = self._get_project_path(project_name) with open(file_path, 'w', encoding='utf-8') as f: json.dump(asdict(project_data), f, indent=2, ensure_ascii=False)