2025-02-15 21:20:54 +08:00
|
|
|
import sqlite3
|
|
|
|
from typing import List, Optional, Tuple
|
|
|
|
from pathlib import Path
|
|
|
|
import os
|
|
|
|
from .models import FilterData, ParamData, ConfigData
|
|
|
|
|
|
|
|
class DatabaseManager:
|
|
|
|
_instance = None
|
|
|
|
|
|
|
|
def __new__(cls, db_path: str = None):
|
|
|
|
if cls._instance is None:
|
|
|
|
cls._instance = super(DatabaseManager, cls).__new__(cls)
|
|
|
|
if db_path is None:
|
|
|
|
app_dir = Path(os.path.dirname(os.path.abspath(__file__))).parent
|
|
|
|
data_dir = app_dir / 'data'
|
|
|
|
db_path = str(data_dir / 'database.db')
|
2025-02-16 16:22:42 +08:00
|
|
|
print(f"db_path: {db_path}")
|
2025-02-15 21:20:54 +08:00
|
|
|
cls._instance.db_path = db_path
|
|
|
|
cls._instance._init_database()
|
|
|
|
return cls._instance
|
|
|
|
|
|
|
|
def __init__(self):
|
2025-02-16 16:22:42 +08:00
|
|
|
# self.init_database()
|
|
|
|
pass
|
2025-02-15 21:20:54 +08:00
|
|
|
|
|
|
|
def _init_database(self):
|
|
|
|
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
|
|
conn.executescript("""
|
|
|
|
CREATE TABLE IF NOT EXISTS configs (
|
|
|
|
id INTEGER PRIMARY KEY,
|
|
|
|
name TEXT NOT NULL,
|
2025-02-16 16:22:42 +08:00
|
|
|
channel_id INTEGER NOT NULL,
|
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
UNIQUE(name, channel_id)
|
2025-02-15 21:20:54 +08:00
|
|
|
);
|
|
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS params (
|
|
|
|
id INTEGER PRIMARY KEY,
|
|
|
|
config_id INTEGER,
|
2025-02-16 16:22:42 +08:00
|
|
|
channel_id INTEGER NOT NULL,
|
2025-02-15 21:20:54 +08:00
|
|
|
delay_data1 FLOAT,
|
|
|
|
ENC_volume_data1 FLOAT,
|
|
|
|
ENT_mx_right_data FLOAT,
|
|
|
|
ENT_mix_left_data FLOAT,
|
2025-02-16 16:22:42 +08:00
|
|
|
FOREIGN KEY (config_id) REFERENCES configs(id),
|
|
|
|
FOREIGN KEY (channel_id) REFERENCES configs(channel_id)
|
2025-02-15 21:20:54 +08:00
|
|
|
);
|
|
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS filters (
|
|
|
|
id INTEGER PRIMARY KEY,
|
|
|
|
config_id INTEGER,
|
2025-02-16 16:22:42 +08:00
|
|
|
channel_id INTEGER NOT NULL,
|
2025-02-15 21:20:54 +08:00
|
|
|
filter_type TEXT NOT NULL,
|
|
|
|
freq FLOAT,
|
|
|
|
q FLOAT,
|
|
|
|
gain FLOAT,
|
|
|
|
slope FLOAT,
|
|
|
|
enabled BOOLEAN DEFAULT TRUE,
|
|
|
|
position INTEGER,
|
2025-02-16 16:22:42 +08:00
|
|
|
FOREIGN KEY (config_id) REFERENCES configs(id),
|
|
|
|
FOREIGN KEY (channel_id) REFERENCES configs(channel_id)
|
2025-02-15 21:20:54 +08:00
|
|
|
);
|
|
|
|
""")
|
|
|
|
|
|
|
|
def get_connection(self):
|
|
|
|
return sqlite3.connect(self.db_path)
|
|
|
|
|
|
|
|
def get_all_configs(self) -> List[ConfigData]:
|
|
|
|
with self.get_connection() as conn:
|
2025-02-16 16:22:42 +08:00
|
|
|
cursor = conn.execute("SELECT id, name, channel_id, created_at FROM configs")
|
2025-02-15 21:20:54 +08:00
|
|
|
return [ConfigData(*row) for row in cursor.fetchall()]
|
|
|
|
|
|
|
|
def load_config(self, config_id: int) -> Tuple[Optional[ParamData], List[FilterData]]:
|
|
|
|
with self.get_connection() as conn:
|
2025-02-16 16:22:42 +08:00
|
|
|
# 1. 首先加载参数数据
|
2025-02-15 21:20:54 +08:00
|
|
|
cursor = conn.execute(
|
2025-02-16 16:22:42 +08:00
|
|
|
"""SELECT config_id, channel_id, delay_data1, ENC_volume_data1,
|
|
|
|
ENT_mx_right_data, ENT_mix_left_data
|
|
|
|
FROM params WHERE config_id = ?""",
|
2025-02-15 21:20:54 +08:00
|
|
|
(config_id,)
|
|
|
|
)
|
|
|
|
param_row = cursor.fetchone()
|
2025-02-16 16:22:42 +08:00
|
|
|
if param_row is None:
|
|
|
|
return None, []
|
|
|
|
|
|
|
|
# 2. 创建 ParamData 对象
|
|
|
|
params = ParamData(
|
|
|
|
config_id=param_row[0],
|
|
|
|
channel_id=param_row[1],
|
|
|
|
delay_data1=param_row[2],
|
|
|
|
ENC_volume_data1=param_row[3],
|
|
|
|
ENT_mx_right_data=param_row[4],
|
|
|
|
ENT_mix_left_data=param_row[5]
|
|
|
|
)
|
2025-02-15 21:20:54 +08:00
|
|
|
|
2025-02-16 16:22:42 +08:00
|
|
|
# 3. 加载滤波器数据
|
2025-02-15 21:20:54 +08:00
|
|
|
cursor = conn.execute(
|
2025-02-16 16:22:42 +08:00
|
|
|
"""SELECT config_id, channel_id, filter_type, freq, q, gain, slope,
|
|
|
|
enabled, position, id
|
|
|
|
FROM filters WHERE config_id = ? ORDER BY position""",
|
2025-02-15 21:20:54 +08:00
|
|
|
(config_id,)
|
|
|
|
)
|
2025-02-16 16:22:42 +08:00
|
|
|
filters = []
|
|
|
|
for row in cursor.fetchall():
|
|
|
|
filters.append(FilterData(
|
|
|
|
channel_id=row[1],
|
|
|
|
filter_type=row[2],
|
|
|
|
freq=row[3],
|
|
|
|
q=row[4],
|
|
|
|
gain=row[5],
|
|
|
|
slope=row[6],
|
|
|
|
enabled=bool(row[7]),
|
|
|
|
position=row[8],
|
|
|
|
config_id=row[0],
|
|
|
|
id=row[9]
|
|
|
|
))
|
|
|
|
print(f"load_config filters: {filters}")
|
2025-02-15 21:20:54 +08:00
|
|
|
return params, filters
|
|
|
|
|
2025-02-16 16:22:42 +08:00
|
|
|
def save_config(self, name: str, channel_id: int, params: ParamData, filters: List[FilterData]) -> int:
|
2025-02-15 21:20:54 +08:00
|
|
|
with self.get_connection() as conn:
|
2025-02-16 21:24:51 +08:00
|
|
|
# 检查是否存在相同的配置
|
2025-02-15 21:20:54 +08:00
|
|
|
cursor = conn.execute(
|
2025-02-16 21:24:51 +08:00
|
|
|
"SELECT id FROM configs WHERE name = ? AND channel_id = ?",
|
2025-02-16 16:22:42 +08:00
|
|
|
(name, channel_id)
|
2025-02-15 21:20:54 +08:00
|
|
|
)
|
2025-02-16 21:24:51 +08:00
|
|
|
existing_config = cursor.fetchone()
|
2025-02-15 21:20:54 +08:00
|
|
|
|
2025-02-16 21:24:51 +08:00
|
|
|
if existing_config:
|
|
|
|
# 更新现有配置
|
|
|
|
config_id = existing_config[0]
|
|
|
|
|
|
|
|
# 更新参数
|
|
|
|
params.config_id = config_id
|
|
|
|
self.update_params(config_id, params)
|
|
|
|
|
|
|
|
# 删除旧的滤波器
|
|
|
|
conn.execute("DELETE FROM filters WHERE config_id = ?", (config_id,))
|
|
|
|
|
|
|
|
# 插入新的滤波器
|
|
|
|
for filter_data in filters:
|
|
|
|
filter_data.config_id = config_id
|
|
|
|
cursor = conn.execute(
|
|
|
|
"""INSERT INTO filters
|
|
|
|
(config_id, channel_id, filter_type, freq, q, gain, slope, enabled, position)
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
|
|
(filter_data.config_id, filter_data.channel_id, filter_data.filter_type,
|
|
|
|
filter_data.freq, filter_data.q, filter_data.gain, filter_data.slope,
|
|
|
|
filter_data.enabled, filter_data.position)
|
|
|
|
)
|
|
|
|
filter_data.id = cursor.lastrowid
|
|
|
|
|
|
|
|
return config_id
|
|
|
|
else:
|
|
|
|
# 创建新配置(原有的插入逻辑)
|
2025-02-16 16:22:42 +08:00
|
|
|
cursor = conn.execute(
|
2025-02-16 21:24:51 +08:00
|
|
|
"INSERT INTO configs (name, channel_id) VALUES (?, ?)",
|
|
|
|
(name, channel_id)
|
2025-02-15 21:20:54 +08:00
|
|
|
)
|
2025-02-16 21:24:51 +08:00
|
|
|
config_id = cursor.lastrowid
|
|
|
|
|
|
|
|
params.config_id = config_id
|
|
|
|
conn.execute(
|
|
|
|
"""INSERT INTO params
|
|
|
|
(config_id, channel_id, delay_data1, ENC_volume_data1, ENT_mx_right_data, ENT_mix_left_data)
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
|
|
(params.config_id, params.channel_id, params.delay_data1, params.ENC_volume_data1,
|
|
|
|
params.ENT_mx_right_data, params.ENT_mix_left_data)
|
|
|
|
)
|
|
|
|
|
|
|
|
for filter_data in filters:
|
|
|
|
filter_data.config_id = config_id
|
|
|
|
cursor = conn.execute(
|
|
|
|
"""INSERT INTO filters
|
|
|
|
(config_id, channel_id, filter_type, freq, q, gain, slope, enabled, position)
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
|
|
(filter_data.config_id, filter_data.channel_id, filter_data.filter_type,
|
|
|
|
filter_data.freq, filter_data.q, filter_data.gain, filter_data.slope,
|
|
|
|
filter_data.enabled, filter_data.position)
|
|
|
|
)
|
|
|
|
filter_data.id = cursor.lastrowid
|
|
|
|
|
|
|
|
conn.commit()
|
|
|
|
return config_id
|
2025-02-15 21:20:54 +08:00
|
|
|
|
|
|
|
def update_filter(self, filter_id: int, **kwargs):
|
|
|
|
with self.get_connection() as conn:
|
|
|
|
set_clause = ", ".join(f"{k} = ?" for k in kwargs.keys())
|
|
|
|
query = f"UPDATE filters SET {set_clause} WHERE id = ?"
|
|
|
|
conn.execute(query, (*kwargs.values(), filter_id))
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
def update_params(self, config_id: int, params: ParamData):
|
|
|
|
with self.get_connection() as conn:
|
|
|
|
conn.execute(
|
|
|
|
"""UPDATE params SET
|
|
|
|
delay_data1 = ?, ENC_volume_data1 = ?,
|
|
|
|
ENT_mx_right_data = ?, ENT_mix_left_data = ?
|
|
|
|
WHERE config_id = ?""",
|
|
|
|
(params.delay_data1, params.ENC_volume_data1,
|
|
|
|
params.ENT_mx_right_data, params.ENT_mix_left_data,
|
|
|
|
config_id)
|
|
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
def init_database(self):
|
|
|
|
"""初始化数据库,如果没有数据则创建默认配置"""
|
|
|
|
configs = self.get_all_configs()
|
|
|
|
if not configs:
|
|
|
|
# 创建默认配置
|
|
|
|
default_params = ParamData(
|
2025-02-16 16:22:42 +08:00
|
|
|
channel_id=1,
|
2025-02-15 21:20:54 +08:00
|
|
|
delay_data1=0.0,
|
|
|
|
ENC_volume_data1=0.0,
|
|
|
|
ENT_mx_right_data=0.0,
|
2025-02-16 16:22:42 +08:00
|
|
|
ENT_mix_left_data=0.0,
|
|
|
|
config_id=None # 将在 save_config 中设置
|
2025-02-15 21:20:54 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
# 创建一些默认滤波器
|
|
|
|
default_filters = [
|
|
|
|
FilterData(
|
2025-02-16 16:22:42 +08:00
|
|
|
channel_id=1,
|
2025-02-15 21:20:54 +08:00
|
|
|
filter_type="PEQ",
|
|
|
|
freq=1000.0,
|
|
|
|
q=1.0,
|
|
|
|
gain=0.0,
|
2025-02-16 16:22:42 +08:00
|
|
|
slope=12.0,
|
|
|
|
position=0,
|
|
|
|
config_id=None # 将在 save_config 中设置
|
2025-02-15 21:20:54 +08:00
|
|
|
),
|
|
|
|
FilterData(
|
2025-02-16 16:22:42 +08:00
|
|
|
channel_id=1,
|
2025-02-15 21:20:54 +08:00
|
|
|
filter_type="HPF",
|
|
|
|
freq=20.0,
|
|
|
|
q=0.707,
|
|
|
|
gain=0.0,
|
2025-02-16 16:22:42 +08:00
|
|
|
slope=12.0,
|
|
|
|
position=1,
|
|
|
|
config_id=None # 将在 save_config 中设置
|
2025-02-15 21:20:54 +08:00
|
|
|
)
|
|
|
|
]
|
|
|
|
|
|
|
|
# 保存默认配置
|
2025-02-16 16:22:42 +08:00
|
|
|
self.save_config("Default Config", 1, default_params, default_filters)
|