132 lines
4.0 KiB
Python
132 lines
4.0 KiB
Python
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Any, Optional
|
|
from enum import Enum
|
|
from communication.com_cmd import ComCmd
|
|
|
|
class ParamType(Enum):
|
|
FILTER = "filter"
|
|
VOLUME = "volume"
|
|
DELAY = "delay"
|
|
MIX = "mix"
|
|
|
|
@dataclass
|
|
class FilterParam:
|
|
enabled: bool = False
|
|
filter_type: str = "PEAK" # PEAK, LOWPASS, HIGHPASS, ALLPASS
|
|
freq: float = 1.0
|
|
q: float = 1.0
|
|
gain: float = 1.0
|
|
slope: float = 1.0
|
|
|
|
@dataclass
|
|
class Param:
|
|
name: str
|
|
param_type: ParamType
|
|
channel_id: int
|
|
address: int
|
|
length: int
|
|
|
|
# 基础参数值
|
|
delay: float = 0.0
|
|
volume: float = 0.0
|
|
mix_left: float = 0.0
|
|
mix_right: float = 0.0
|
|
|
|
# 滤波器参数列表
|
|
filters: List[FilterParam] = field(default_factory=list)
|
|
|
|
# 缓存实际值
|
|
_raw_values: List[int] = field(default_factory=list)
|
|
|
|
def __post_init__(self):
|
|
"""初始化后处理"""
|
|
if self.param_type == ParamType.FILTER:
|
|
# 初始化8个滤波器 待定
|
|
self.filters = [FilterParam() for _ in range(8)]
|
|
|
|
def addr(self) -> int:
|
|
"""获取参数地址"""
|
|
return self.address
|
|
|
|
@property
|
|
def list_val(self) -> List[float]:
|
|
"""获取参数当前值列表"""
|
|
if self.param_type == ParamType.FILTER:
|
|
return [f.freq for f in self.filters if f.enabled]
|
|
return [self.delay, self.volume, self.mix_left, self.mix_right]
|
|
|
|
def gen_cmd_read_all(self) -> List[ComCmd]:
|
|
"""生成读取所有值的命令列表"""
|
|
cmds = []
|
|
if self.param_type == ParamType.FILTER:
|
|
for i, filter in enumerate(self.filters):
|
|
cmd = ComCmd(
|
|
param_name=self.name,
|
|
addr=self.address + i * 16, # 假设每个滤波器占16字节
|
|
length=16,
|
|
is_read=True
|
|
)
|
|
cmds.append(cmd)
|
|
else:
|
|
cmd = ComCmd(
|
|
param_name=self.name,
|
|
addr=self.address,
|
|
length=self.length,
|
|
is_read=True
|
|
)
|
|
cmds.append(cmd)
|
|
return cmds
|
|
|
|
def gen_cmd_write_all(self) -> List[ComCmd]:
|
|
"""生成写入所有值的命令列表"""
|
|
cmds = []
|
|
if self.param_type == ParamType.FILTER:
|
|
for i, filter in enumerate(self.filters):
|
|
if filter.enabled:
|
|
cmd = ComCmd(
|
|
param_name=self.name,
|
|
addr=self.address + i * 16,
|
|
length=16,
|
|
is_read=False,
|
|
data=self._encode_filter(filter)
|
|
)
|
|
cmds.append(cmd)
|
|
else:
|
|
cmd = ComCmd(
|
|
param_name=self.name,
|
|
addr=self.address,
|
|
length=self.length,
|
|
is_read=False,
|
|
data=self._encode_basic_params()
|
|
)
|
|
cmds.append(cmd)
|
|
return cmds
|
|
|
|
def process_cmd_response(self, cmd: ComCmd, response_data: bytes):
|
|
"""处理命令响应数据"""
|
|
if self.param_type == ParamType.FILTER:
|
|
filter_idx = (cmd.addr - self.address) // 16
|
|
if 0 <= filter_idx < len(self.filters):
|
|
self.filters[filter_idx] = self._decode_filter(response_data)
|
|
else:
|
|
self._decode_basic_params(response_data)
|
|
|
|
def _encode_filter(self, filter: FilterParam) -> bytes:
|
|
"""编码滤波器参数"""
|
|
# 实现滤波器参数编码逻辑
|
|
pass
|
|
|
|
def _decode_filter(self, data: bytes) -> FilterParam:
|
|
"""解码滤波器参数"""
|
|
# 实现滤波器参数解码逻辑
|
|
pass
|
|
|
|
def _encode_basic_params(self) -> bytes:
|
|
"""编码基础参数"""
|
|
# 实现基础参数编码逻辑
|
|
pass
|
|
|
|
def _decode_basic_params(self, data: bytes):
|
|
"""解码基础参数"""
|
|
# 实现基础参数解码逻辑
|
|
pass |