brisonus_app_eq/communication/param.py
2025-02-19 17:07:55 +08:00

132 lines
4.0 KiB
Python

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
from communication.com_cmd import ComCmd
class ParamType(Enum):
FILTER = "filter"
VOLUME = "volume"
DELAY = "delay"
MIX = "mix"
@dataclass
class FilterParam:
enabled: bool = False
filter_type: str = "PEAK" # PEAK, LOWPASS, HIGHPASS, ALLPASS
freq: float = 1.0
q: float = 1.0
gain: float = 1.0
slope: float = 1.0
@dataclass
class Param:
name: str
param_type: ParamType
channel_id: int
address: int
length: int
# 基础参数值
delay: float = 0.0
volume: float = 0.0
mix_left: float = 0.0
mix_right: float = 0.0
# 滤波器参数列表
filters: List[FilterParam] = field(default_factory=list)
# 缓存实际值
_raw_values: List[int] = field(default_factory=list)
def __post_init__(self):
"""初始化后处理"""
if self.param_type == ParamType.FILTER:
# 初始化8个滤波器 待定
self.filters = [FilterParam() for _ in range(8)]
def addr(self) -> int:
"""获取参数地址"""
return self.address
@property
def list_val(self) -> List[float]:
"""获取参数当前值列表"""
if self.param_type == ParamType.FILTER:
return [f.freq for f in self.filters if f.enabled]
return [self.delay, self.volume, self.mix_left, self.mix_right]
def gen_cmd_read_all(self) -> List[ComCmd]:
"""生成读取所有值的命令列表"""
cmds = []
if self.param_type == ParamType.FILTER:
for i, filter in enumerate(self.filters):
cmd = ComCmd(
param_name=self.name,
addr=self.address + i * 16, # 假设每个滤波器占16字节
length=16,
is_read=True
)
cmds.append(cmd)
else:
cmd = ComCmd(
param_name=self.name,
addr=self.address,
length=self.length,
is_read=True
)
cmds.append(cmd)
return cmds
def gen_cmd_write_all(self) -> List[ComCmd]:
"""生成写入所有值的命令列表"""
cmds = []
if self.param_type == ParamType.FILTER:
for i, filter in enumerate(self.filters):
if filter.enabled:
cmd = ComCmd(
param_name=self.name,
addr=self.address + i * 16,
length=16,
is_read=False,
data=self._encode_filter(filter)
)
cmds.append(cmd)
else:
cmd = ComCmd(
param_name=self.name,
addr=self.address,
length=self.length,
is_read=False,
data=self._encode_basic_params()
)
cmds.append(cmd)
return cmds
def process_cmd_response(self, cmd: ComCmd, response_data: bytes):
"""处理命令响应数据"""
if self.param_type == ParamType.FILTER:
filter_idx = (cmd.addr - self.address) // 16
if 0 <= filter_idx < len(self.filters):
self.filters[filter_idx] = self._decode_filter(response_data)
else:
self._decode_basic_params(response_data)
def _encode_filter(self, filter: FilterParam) -> bytes:
"""编码滤波器参数"""
# 实现滤波器参数编码逻辑
pass
def _decode_filter(self, data: bytes) -> FilterParam:
"""解码滤波器参数"""
# 实现滤波器参数解码逻辑
pass
def _encode_basic_params(self) -> bytes:
"""编码基础参数"""
# 实现基础参数编码逻辑
pass
def _decode_basic_params(self, data: bytes):
"""解码基础参数"""
# 实现基础参数解码逻辑
pass