[feature] 增加数据库对滤波器数据读写操作

This commit is contained in:
Sam 2025-02-16 16:22:42 +08:00
parent 9145e72263
commit bdc2bd9ba7
9 changed files with 126 additions and 82 deletions

Binary file not shown.

View File

@ -14,13 +14,14 @@ class DatabaseManager:
app_dir = Path(os.path.dirname(os.path.abspath(__file__))).parent app_dir = Path(os.path.dirname(os.path.abspath(__file__))).parent
data_dir = app_dir / 'data' data_dir = app_dir / 'data'
db_path = str(data_dir / 'database.db') db_path = str(data_dir / 'database.db')
print(f"db_path: {db_path}")
cls._instance.db_path = db_path cls._instance.db_path = db_path
cls._instance._init_database() cls._instance._init_database()
return cls._instance return cls._instance
def __init__(self): def __init__(self):
self.init_database() # self.init_database()
pass
def _init_database(self): def _init_database(self):
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
@ -30,22 +31,27 @@ class DatabaseManager:
CREATE TABLE IF NOT EXISTS configs ( CREATE TABLE IF NOT EXISTS configs (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
name TEXT NOT NULL, name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP channel_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(name, channel_id)
); );
CREATE TABLE IF NOT EXISTS params ( CREATE TABLE IF NOT EXISTS params (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
config_id INTEGER, config_id INTEGER,
channel_id INTEGER NOT NULL,
delay_data1 FLOAT, delay_data1 FLOAT,
ENC_volume_data1 FLOAT, ENC_volume_data1 FLOAT,
ENT_mx_right_data FLOAT, ENT_mx_right_data FLOAT,
ENT_mix_left_data FLOAT, ENT_mix_left_data FLOAT,
FOREIGN KEY (config_id) REFERENCES configs(id) FOREIGN KEY (config_id) REFERENCES configs(id),
FOREIGN KEY (channel_id) REFERENCES configs(channel_id)
); );
CREATE TABLE IF NOT EXISTS filters ( CREATE TABLE IF NOT EXISTS filters (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
config_id INTEGER, config_id INTEGER,
channel_id INTEGER NOT NULL,
filter_type TEXT NOT NULL, filter_type TEXT NOT NULL,
freq FLOAT, freq FLOAT,
q FLOAT, q FLOAT,
@ -53,7 +59,8 @@ class DatabaseManager:
slope FLOAT, slope FLOAT,
enabled BOOLEAN DEFAULT TRUE, enabled BOOLEAN DEFAULT TRUE,
position INTEGER, position INTEGER,
FOREIGN KEY (config_id) REFERENCES configs(id) FOREIGN KEY (config_id) REFERENCES configs(id),
FOREIGN KEY (channel_id) REFERENCES configs(channel_id)
); );
""") """)
@ -62,51 +69,84 @@ class DatabaseManager:
def get_all_configs(self) -> List[ConfigData]: def get_all_configs(self) -> List[ConfigData]:
with self.get_connection() as conn: with self.get_connection() as conn:
cursor = conn.execute("SELECT id, name, created_at FROM configs") cursor = conn.execute("SELECT id, name, channel_id, created_at FROM configs")
return [ConfigData(*row) for row in cursor.fetchall()] return [ConfigData(*row) for row in cursor.fetchall()]
def load_config(self, config_id: int) -> Tuple[Optional[ParamData], List[FilterData]]: def load_config(self, config_id: int) -> Tuple[Optional[ParamData], List[FilterData]]:
with self.get_connection() as conn: with self.get_connection() as conn:
# 1. 首先加载参数数据
cursor = conn.execute( cursor = conn.execute(
"SELECT * FROM params WHERE config_id = ?", """SELECT config_id, channel_id, delay_data1, ENC_volume_data1,
ENT_mx_right_data, ENT_mix_left_data
FROM params WHERE config_id = ?""",
(config_id,) (config_id,)
) )
param_row = cursor.fetchone() param_row = cursor.fetchone()
params = ParamData(*param_row[2:]) if param_row else None if param_row is None:
return None, []
# 2. 创建 ParamData 对象
params = ParamData(
config_id=param_row[0],
channel_id=param_row[1],
delay_data1=param_row[2],
ENC_volume_data1=param_row[3],
ENT_mx_right_data=param_row[4],
ENT_mix_left_data=param_row[5]
)
# 3. 加载滤波器数据
cursor = conn.execute( cursor = conn.execute(
"SELECT * FROM filters WHERE config_id = ? ORDER BY position", """SELECT config_id, channel_id, filter_type, freq, q, gain, slope,
enabled, position, id
FROM filters WHERE config_id = ? ORDER BY position""",
(config_id,) (config_id,)
) )
filters = [FilterData(*row[2:]) for row in cursor.fetchall()] filters = []
for row in cursor.fetchall():
filters.append(FilterData(
channel_id=row[1],
filter_type=row[2],
freq=row[3],
q=row[4],
gain=row[5],
slope=row[6],
enabled=bool(row[7]),
position=row[8],
config_id=row[0],
id=row[9]
))
print(f"load_config filters: {filters}")
return params, filters return params, filters
def save_config(self, name: str, params: ParamData, filters: List[FilterData]) -> int: def save_config(self, name: str, channel_id: int, params: ParamData, filters: List[FilterData]) -> int:
with self.get_connection() as conn: with self.get_connection() as conn:
cursor = conn.execute( cursor = conn.execute(
"INSERT INTO configs (name) VALUES (?)", "INSERT INTO configs (name, channel_id) VALUES (?, ?)",
(name,) (name, channel_id)
) )
config_id = cursor.lastrowid config_id = cursor.lastrowid
params.config_id = config_id
conn.execute( conn.execute(
"""INSERT INTO params """INSERT INTO params
(config_id, delay_data1, ENC_volume_data1, ENT_mx_right_data, ENT_mix_left_data) (config_id, channel_id, delay_data1, ENC_volume_data1, ENT_mx_right_data, ENT_mix_left_data)
VALUES (?, ?, ?, ?, ?)""", VALUES (?, ?, ?, ?, ?, ?)""",
(config_id, params.delay_data1, params.ENC_volume_data1, (params.config_id, params.channel_id, params.delay_data1, params.ENC_volume_data1,
params.ENT_mx_right_data, params.ENT_mix_left_data) params.ENT_mx_right_data, params.ENT_mix_left_data)
) )
for filter_data in filters: for filter_data in filters:
conn.execute( filter_data.config_id = config_id
cursor = conn.execute(
"""INSERT INTO filters """INSERT INTO filters
(config_id, filter_type, freq, q, gain, slope, enabled, position) (config_id, channel_id, filter_type, freq, q, gain, slope, enabled, position)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(config_id, filter_data.filter_type, filter_data.freq, (filter_data.config_id, filter_data.channel_id, filter_data.filter_type,
filter_data.q, filter_data.gain, filter_data.slope, filter_data.freq, filter_data.q, filter_data.gain, filter_data.slope,
filter_data.enabled, filter_data.position) filter_data.enabled, filter_data.position)
) )
filter_data.id = cursor.lastrowid
conn.commit() conn.commit()
return config_id return config_id
@ -137,29 +177,37 @@ class DatabaseManager:
if not configs: if not configs:
# 创建默认配置 # 创建默认配置
default_params = ParamData( default_params = ParamData(
channel_id=1,
delay_data1=0.0, delay_data1=0.0,
ENC_volume_data1=0.0, ENC_volume_data1=0.0,
ENT_mx_right_data=0.0, ENT_mx_right_data=0.0,
ENT_mix_left_data=0.0 ENT_mix_left_data=0.0,
config_id=None # 将在 save_config 中设置
) )
# 创建一些默认滤波器 # 创建一些默认滤波器
default_filters = [ default_filters = [
FilterData( FilterData(
channel_id=1,
filter_type="PEQ", filter_type="PEQ",
freq=1000.0, freq=1000.0,
q=1.0, q=1.0,
gain=0.0, gain=0.0,
slope=12.0 slope=12.0,
position=0,
config_id=None # 将在 save_config 中设置
), ),
FilterData( FilterData(
channel_id=1,
filter_type="HPF", filter_type="HPF",
freq=20.0, freq=20.0,
q=0.707, q=0.707,
gain=0.0, gain=0.0,
slope=12.0 slope=12.0,
position=1,
config_id=None # 将在 save_config 中设置
) )
] ]
# 保存默认配置 # 保存默认配置
self.save_config("Default Config", default_params, default_filters) self.save_config("Default Config", 1, default_params, default_filters)

View File

@ -4,17 +4,21 @@ from datetime import datetime
@dataclass @dataclass
class FilterData: class FilterData:
channel_id: int
filter_type: str filter_type: str
freq: float freq: float
q: float q: float
gain: float gain: float
slope: float slope: float
id: int = None
enabled: bool = True enabled: bool = True
position: int = 0 position: int = 0
config_id: Optional[int] = None
id: Optional[int] = None
@dataclass @dataclass
class ParamData: class ParamData:
config_id: int
channel_id: int
delay_data1: float delay_data1: float
ENC_volume_data1: float ENC_volume_data1: float
ENT_mx_right_data: float ENT_mx_right_data: float
@ -24,4 +28,5 @@ class ParamData:
class ConfigData: class ConfigData:
id: int id: int
name: str name: str
channel_id: int
created_at: str created_at: str

View File

@ -8,8 +8,11 @@ class AudioFilterModel:
def __init__(self, db_manager: DatabaseManager): def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager self.db_manager = db_manager
self.current_config_id: Optional[int] = None self.current_config_id: Optional[int] = None
self.current_channel_id: int = 1 # 添加当前通道ID
self.filters: List[FilterData] = [] self.filters: List[FilterData] = []
self.params: ParamData = ParamData( self.params: ParamData = ParamData(
config_id=None,
channel_id=self.current_channel_id, # 设置通道ID
delay_data1=0.0, delay_data1=0.0,
ENC_volume_data1=0.0, ENC_volume_data1=0.0,
ENT_mx_right_data=0.0, ENT_mx_right_data=0.0,
@ -19,15 +22,22 @@ class AudioFilterModel:
def load_config(self, config_id: int) -> bool: def load_config(self, config_id: int) -> bool:
params, filters = self.db_manager.load_config(config_id) params, filters = self.db_manager.load_config(config_id)
print(f"load_config filters: {filters}")
if params is not None: if params is not None:
self.params = params self.params = params
self.filters = filters self.filters = filters
self.current_config_id = config_id self.current_config_id = config_id
self.current_channel_id = params.channel_id # 更新当前通道ID
return True return True
return False return False
def save_config(self, name: str) -> int: def save_config(self, name: str) -> int:
return self.db_manager.save_config(name, self.params, self.filters) # 确保所有对象都有正确的channel_id
self.params.channel_id = self.current_channel_id
for filter_data in self.filters:
print(f"save_config filter_data.channel_id: {filter_data.channel_id}")
filter_data.channel_id = self.current_channel_id
return self.db_manager.save_config(name, self.current_channel_id, self.params, self.filters)
def update_filter(self, index: int, **kwargs): def update_filter(self, index: int, **kwargs):
if 0 <= index < len(self.filters): if 0 <= index < len(self.filters):
@ -162,39 +172,38 @@ class AudioFilterController:
return next_number return next_number
def add_filter(self): def add_filter(self):
# 检查是否达到最大滤波器数量 # 1. 用户界面创建
if len(self.model.filters) >= 20:
print("已达到最大滤波器数量限制20个")
return
# 显示滤波器类型选择对话框
dialog = FilterTypeDialog(self.view) dialog = FilterTypeDialog(self.view)
if dialog.exec(): if dialog.exec():
filter_type = dialog.get_selected_type() filter_type = dialog.get_selected_type()
# 获取下一个可用的序号
next_number = self.get_next_available_number(filter_type) next_number = self.get_next_available_number(filter_type)
# 创建新的滤波器名称
new_filter_type = f"{filter_type}_{next_number}" new_filter_type = f"{filter_type}_{next_number}"
# 创建新的滤波器,使用默认参数 # 2. 内存中创建 FilterData 对象
new_filter = FilterData( new_filter = FilterData(
filter_type=new_filter_type, filter_type=new_filter_type,
freq=1000.0, freq=10.0,
q=1.0, q=1.0,
gain=0.0, gain=1.0,
slope=12.0, slope=1.0,
position=len(self.model.filters) position=len(self.model.filters),
config_id=self.model.current_config_id,
channel_id=self.model.current_channel_id,
enabled=True,
id=None # 初始创建时 id 为 None
) )
# 添加到模型中 # 3. 添加到模型的过滤器列表
self.model.filters.append(new_filter) self.model.filters.append(new_filter)
# 如果有当前配置,保存到数据库 # 4. 保存到数据库
if self.model.current_config_id: if not self.model.current_config_id:
# 创建新配置,使用默认名称
config_name = f"Channel {self.model.current_channel_id} Config"
self.model.save_config(config_name)
else:
# 更新现有配置
self.model.save_config(f"Config {self.model.current_config_id}") self.model.save_config(f"Config {self.model.current_config_id}")
# 更新视图 # 更新视图
self.view.update_table() self.view.update_table()
@ -231,7 +240,7 @@ class AudioFilterController:
pass pass
class AudioFilterWidget(QWidget): class AudioFilterWidget(QWidget):
def __init__(self): def __init__(self, channel_id: int = 1):
super().__init__() super().__init__()
self.ui = Ui_Widget() self.ui = Ui_Widget()
self.ui.setupUi(self) self.ui.setupUi(self)
@ -242,37 +251,15 @@ class AudioFilterWidget(QWidget):
self.db_manager = DatabaseManager() self.db_manager = DatabaseManager()
self.model = AudioFilterModel(self.db_manager) self.model = AudioFilterModel(self.db_manager)
self.model.current_channel_id = channel_id # 设置当前通道ID
# 加载配置 # 加载该通道的默认配置
configs = self.db_manager.get_all_configs() configs = self.db_manager.get_all_configs()
if configs: if configs:
# 有配置则加载最后一个 # 查找当前通道的配置
last_config_id = configs[-1].id channel_config = next((config for config in configs if config.channel_id == channel_id), None)
self.model.load_config(last_config_id) if channel_config:
else: self.model.load_config(channel_config.id)
# 没有配置则创建默认配置
default_params = ParamData(
delay_data1=0.0,
ENC_volume_data1=0.0,
ENT_mx_right_data=0.0,
ENT_mix_left_data=0.0
)
# 创建默认滤波器
self.model.filters = [
FilterData(
filter_type="PEQ",
freq=1000.0,
q=1.0,
gain=0.0,
slope=12.0
)
]
self.model.params = default_params
# 保存到数据库
config_id = self.model.save_config("Default Config")
self.model.current_config_id = config_id
self.controller = AudioFilterController(self.model, self) self.controller = AudioFilterController(self.model, self)
self.update_view() self.update_view()
@ -287,10 +274,12 @@ class AudioFilterWidget(QWidget):
def update_table_row(self, row: int): def update_table_row(self, row: int):
filter_data = self.model.filters[row] filter_data = self.model.filters[row]
print(f"filter_data: {filter_data}")
# 创建带复选框的滤波器名称项 # 创建带复选框的滤波器名称项
filter_item = QTableWidgetItem(filter_data.filter_type) filter_item = QTableWidgetItem()
filter_item.setFlags(filter_item.flags() | Qt.ItemFlag.ItemIsUserCheckable) filter_item.setFlags(filter_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
filter_item.setCheckState(Qt.CheckState.Checked) # 默认选中 filter_item.setCheckState(Qt.CheckState.Checked if filter_data.enabled else Qt.CheckState.Unchecked)
filter_item.setText(str(filter_data.filter_type)) # 设置文本,这样文本会和复选框一起显示
# 更新各列的值 # 更新各列的值
self.ui.tableWidget.setItem(row, 0, filter_item) self.ui.tableWidget.setItem(row, 0, filter_item)

View File

@ -9,6 +9,8 @@ class ChannelButton(QWidget):
def __init__(self, channel_num): def __init__(self, channel_num):
super().__init__() super().__init__()
self.channel_num = channel_num # 保存通道号
# Create GroupBox for the channel # Create GroupBox for the channel
self.group = QGroupBox(f"CH{channel_num}") self.group = QGroupBox(f"CH{channel_num}")
layout = QVBoxLayout() layout = QVBoxLayout()
@ -98,8 +100,8 @@ class ChannelButton(QWidget):
def show_filter_window(self): def show_filter_window(self):
if not self.filter_window: if not self.filter_window:
self.filter_window = AudioFilterWidget() # 创建滤波器窗口时传入对应的通道号
# Set window title to include channel number self.filter_window = AudioFilterWidget(channel_id=self.channel_num)
self.filter_window.setWindowTitle(f"Channel {self.group.title()} Filter Settings") self.filter_window.setWindowTitle(f"Channel {self.group.title()} Filter Settings")
# Show the window if it's not visible # Show the window if it's not visible