"""
The following documents the pyfar filter classes. Examples for working with
filter objects are part of the
:doc:`examples gallery<gallery:gallery/interactive/pyfar_filtering>`.
Available filters are shown in the
:doc:`filter types examples<gallery:gallery/interactive/pyfar_filter_types>`
and documented in :py:mod:`pyfar.dsp.filter`.
"""
import deepdiff
import warnings
import numpy as np
import scipy.signal as spsignal
import pyfar as pf
from copy import deepcopy
def _atleast_3d_first_dim(arr):
arr = np.asarray(arr)
ndim = np.ndim(arr)
if ndim < 2:
arr = np.atleast_2d(arr)
if ndim < 3:
return arr[np.newaxis]
else:
return arr
def _atleast_4d_first_dim(arr):
arr = np.asarray(arr)
ndim = np.ndim(arr)
if ndim < 3:
arr = _atleast_3d_first_dim(arr)
if ndim < 4:
return arr[np.newaxis]
else:
return arr
def _pop_state_from_kwargs(**kwargs):
kwargs.pop('zi', None)
warnings.warn(
"This filter function does not support saving the filter state",
stacklevel=2)
return kwargs
def _extend_sos_coefficients(sos, order):
"""
Extend a set of SOS filter coefficients to match a required filter order
by adding sections with coefficients resulting in an ideal frequency
response.
Parameters
----------
sos : array-like
The second order section filter coefficients.
order : int
The order to which the coefficients are to be extended.
Returns
-------
sos_ext : array-like
The extended second order section coefficients.
"""
sos_order = sos.shape[0]
if sos_order == order:
return sos
pad_len = order - sos_order
sos_ext = np.zeros((pad_len, 6))
sos_ext[:, 3] = 1.
sos_ext[:, 0] = 1.
return np.vstack((sos, sos_ext))
def _repr_string(filter_type, order, n_channels, sampling_rate):
"""Generate repr string for filter objects."""
ch_str = 'channel' if n_channels == 1 else 'channels'
if filter_type == "SOS":
sec_str = 'section' if order == 1 else 'sections'
representation = (f"SOS filter with {order} {sec_str} and "
f"{n_channels} {ch_str} "
f"@ {sampling_rate} Hz sampling rate")
else:
if order % 10 == 1:
order_string = 'st'
elif order % 10 == 2:
order_string = 'nd'
elif order % 10 == 3:
order_string = 'rd'
else:
order_string = 'th'
representation = (f"{order}{order_string} order "
f"{filter_type} filter with "
f"{n_channels} {ch_str} @ {sampling_rate} "
"Hz sampling rate")
return representation
class Filter(object):
"""
Container class for digital filters.
This is an abstract class method, only used for the shared processing
method used for the application of a filter on a signal.
"""
def __init__(
self,
coefficients=None,
sampling_rate=None,
state=None,
comment=""):
"""
Initialize a general Filter object.
Parameters
----------
coefficients : array, double
The filter coefficients as an array.
sampling_rate : number
The sampling rate of the filter in Hz.
state : array, double, optional
The state of the buffer elements.
comment : str
A comment. The default is ``''``, which initializes an empty
string.
Returns
-------
Filter
The filter object.
"""
if coefficients is not None:
self.coefficients = coefficients
else:
self._coefficients = None
self.state = state
self._sampling_rate = sampling_rate
self.comment = comment
def init_state(self, state):
"""
Initialize the buffer elements to pre-defined initial conditions.
This method is overwritten in the child classes and called after
setting the state there.
"""
self._state = state
self._initialized = True
@staticmethod
def _check_state_keyword(state):
"""
Check the value of the state keyword for 'ini_state' class methods.
"""
if state not in ['zeros', 'step']:
raise ValueError(
f"state is '{state}' but must be 'zeros' or 'step'")
@property
def coefficients(self):
"""
Get and set the coefficients of the filter.
Refer to the
:doc:`gallery:gallery/interactive/pyfar_filter_types` for use
cases.
"""
return self._coefficients
@coefficients.setter
def coefficients(self, value):
"""Coefficients of the filter."""
self._coefficients = _atleast_3d_first_dim(value)
@property
def sampling_rate(self):
"""Sampling rate of the filter in Hz. The sampling rate is set upon
initialization and cannot be changed after the object has been created.
"""
return self._sampling_rate
@property
def n_channels(self):
"""The number of channels of the filter."""
return self._coefficients.shape[0]
@property
def state(self):
"""Get or set the filter state."""
return self._state
@state.setter
def state(self, state):
"""Set state of the filter."""
if state is not None:
if self.coefficients is None:
raise ValueError(
"Cannot set a state without filter coefficients")
self._initialized = True
else:
self._initialized = False
self._state = state
@staticmethod
def _process(coefficients, data, zi=None):
raise NotImplementedError("Abstract class method.")
def process(self, signal, reset=False):
"""Apply the filter to a signal.
Parameters
----------
signal : Signal
The data to be filtered as Signal object.
reset : bool, optional
If set to ``True``, the filter state will be reset to zeros before
the filter is applied to the signal. Note that if the filter state
is ``None``, this option will have no effect. Use ``init_state``
to initialize a filter with no previously set state. The default
is ``'False'``.
Returns
-------
filtered : Signal
A filtered copy of the input signal.
"""
if not isinstance(signal, pf.Signal):
raise ValueError("The input needs to be a Signal object.")
if self.sampling_rate != signal.sampling_rate:
raise ValueError(
"The sampling rates of filter and signal do not match")
if reset is True:
self.reset()
# shape of the output signal. if n_channels is 1, it will be squeezed
# below
filtered_signal_data = np.zeros(
(self.n_channels, *signal.time.shape),
dtype=signal.time.dtype)
if self.state is not None:
new_state = np.zeros_like(self._state)
for idx, (coeff, state) in enumerate(
zip(self._coefficients, self._state)):
filtered_signal_data[idx, ...], new_state[idx, ...] = \
self._process(coeff, signal.time, state)
self._state = new_state
else:
for idx, coeff in enumerate(self._coefficients):
filtered_signal_data[idx, ...] = self._process(
coeff, signal.time, zi=None)
# prepare output signal
filtered_signal = deepcopy(signal)
# squeeze first dimension if there is only one filter channel
if self.n_channels == 1:
filtered_signal.time = np.squeeze(filtered_signal_data, axis=0)
else:
filtered_signal.time = filtered_signal_data
return filtered_signal
def impulse_response(self, n_samples):
"""
Compute or approximate the impulse response of the filter.
See `impulse_response` methods in derived classes for more details.
Parameters
----------
n_samples : int, None
Length in samples for which the impulse response is computed. If
this is ``None``the length is estimated using
`minimum_impulse_response_length`.
Returns
-------
impulse_response : Signal
The impulse response of the filter of with a
``cshape = (n_channels, )``, with the channel shape
:py:func:`~pyfar.Signal.cshape` and the number of filter channels
:py:func:`~n_channels`.
"""
# set or check the impulse response length
minimum_impulse_response_length = int(np.max(
self.minimum_impulse_response_length()))
if n_samples is None:
n_samples = minimum_impulse_response_length
elif np.any(minimum_impulse_response_length > n_samples):
warnings.warn(
('n_samples should be at least as long as the filter, '
f'which is {minimum_impulse_response_length}'), stacklevel=2)
# track the state (better than copying the entire filter)
if self.state is not None:
state = self.state.copy()
self._state = None
reset = True
else:
reset = False
# get impulse response
impulse_response = self.process(pf.signals.impulse(
n_samples, sampling_rate=self.sampling_rate), reset=reset)
# reset the state if required
if reset:
self._state = state
return impulse_response
def reset(self):
"""Reset the filter state by filling it with zeros."""
if self._state is not None:
self._state = np.zeros_like(self._state)
else:
self._state = None
@property
def comment(self):
"""Get comment."""
return self._comment
@comment.setter
def comment(self, value):
"""Set comment."""
if not isinstance(value, str):
raise TypeError("comment has to be of type string.")
else:
self._comment = value
def copy(self):
"""Return a copy of the Filter object."""
return deepcopy(self)
def _encode(self):
"""Return dictionary for the encoding."""
return self.copy().__dict__
@classmethod
def _decode(cls, obj_dict):
"""Decode object based on its respective object dictionary."""
# initializing this way satisfies FIR, IIR and SOS initialization
obj = cls(np.zeros((1, 6)), None)
obj.__dict__.update(obj_dict)
return obj
def __eq__(self, other):
"""Check for equality of two objects."""
return not deepdiff.DeepDiff(self, other)
[docs]
class FilterFIR(Filter):
"""
Filter object for FIR filters.
Parameters
----------
coefficients : array, double
The filter coefficients as an array with dimensions
(number of channels, number of filter coefficients)
sampling_rate : number
The sampling rate of the filter in Hz.
state : array, double, optional
The state of the filter from prior information with dimensions
``(n_filter_chan, *cshape, order)``, where ``cshape`` is
the channel shape of the :py:class:`~pyfar.Signal`
to be filtered.
comment : str
A comment. The default is ``''``, which initializes an empty
string.
Returns
-------
FilterFIR
The FIR filter object.
"""
def __init__(self, coefficients, sampling_rate, state=None, comment=""):
if state is not None and np.asarray(state).ndim < 3:
state = _atleast_3d_first_dim(state)
warnings.warn(
"The state should be of shape (n_channels, *cshape, order). "
f"The state has been reshaped to shape=({state.shape}",
stacklevel=2)
super().__init__(coefficients, sampling_rate, state, comment)
@property
def order(self):
"""The order of the filter."""
return self._coefficients.shape[-1] - 1
@property
def coefficients(self):
"""
Get and set the coefficients of the filter.
Refer to the
:doc:`gallery:gallery/interactive/pyfar_filter_types` for use
cases.
"""
# property from Filter is overwritten, because FilterFIR internally
# also stores a-coefficients easier handling of coefficients across
# filter classes. The user should only see the b-coefficients, however.
return self._coefficients[:, 0]
@coefficients.setter
def coefficients(self, value):
"""Coefficients of the filter."""
b = np.atleast_2d(value)
# add a-coefficients for easier handling across filter classes
a = np.zeros_like(b)
a[..., 0] = 1
coeff = np.stack((b, a), axis=-2)
self._coefficients = _atleast_3d_first_dim(coeff)
@property
def state(self):
"""
Get or set the filter state.
The filter state sets the initial condition of the filter and can for
example be used for block-wise filtering.
Note that the state can also be initialized with the
:py:func:`~pyfar.classes.filter.FilterFIR.init_state` method.
Shape of the state must be ``(n_filter_channels, *cshape, order)``,
with ``cshape`` being the channel shape of the
:py:class:`~pyfar.Signal` to be filtered.
"""
return self._state
@state.setter
def state(self, state):
"""
Set initial state of FIR filter.
Parameters
----------
state : array, double
The state of the filter with dimensions
``(n_channels, *cshape, order)``, where ``cshape`` is
the channel shape of the :py:class:`~pyfar.Signal`
to be filtered.
"""
if state is not None:
state = np.asarray(state)
if (state.shape[-1] != self.order
or state.ndim < 3
or state.shape[0] != self.n_channels):
raise ValueError(
"The state does not match the filter structure. Required "
"shape for FilterFIR is (n_channels, *cshape, order).")
# sets filter state in parent class
Filter.state.fset(self, state)
[docs]
def init_state(self, cshape, state='zeros'):
"""Initialize the buffer elements to pre-defined initial conditions.
Parameters
----------
cshape : tuple, int
The channel shape of the :py:class:`~pyfar.Signal`
which is to be filtered.
state : str, optional
The desired state. This can either be ``'zeros'`` which initializes
an empty filter, or ``'step'`` which constructs the initial
conditions for step response steady-state. The default is 'zeros'.
"""
self._check_state_keyword(state)
new_state = np.zeros((self.n_channels, *cshape, self.order))
if state == 'step':
for idx, coeff in enumerate(self._coefficients):
new_state[idx, ...] = spsignal.lfilter_zi(coeff[0], coeff[1])
super().init_state(state=new_state)
[docs]
def minimum_impulse_response_length(self, unit='samples', tolerance=None):
r"""
Get the minimum length of the filter impulse response.
The length is computed from the last non-zero coefficient per channel.
Parameters
----------
tolerance : float, optional
Tolerance for estimating the minimum length of noisy FIR filters.
The length is estimated by finding the last coefficient with an
absolute value greater or equal to the absolute maximum of the
filter coefficients per channel multiplied by `tolerance`. For
example if ``tolerance = 0.001`` trailing values below
:math:`20 \log_{10}(0.001)=-60` dB would be ignored in the length
estimation. The default ``None`` uses the numerical precision
``2 * numpy.finfo(float).resolution`` as a strict threshold.
unit : string, optional
The unit in which the length is returned. Can be ``'samples'`` or
``'s'`` (seconds). The default is ``'samples'``.
Returns
-------
minimum_impulse_response_length : array
An integer array of shape(n_channels, ) containing the length in
samples with the number of filter channels :py:func:`~n_channels`.
"""
# get filter coefficients
b = self.coefficients.astype(float)
if tolerance is None:
thresholds = np.ones(b.shape[0]) * 2 * np.finfo(b.dtype).eps
else:
thresholds = np.max(np.abs(b), -1) * tolerance
# find last entry above tolerance per channel
above_threshold = b > np.repeat(thresholds[..., None], b.shape[-1], -1)
estimated_length = np.where(above_threshold)[1].reshape(b.shape[0], -1)
estimated_length = np.max(estimated_length, -1) + 1
# convert to desired unit
if unit == 's':
estimated_length /= self.sampling_rate
elif unit == 'samples':
estimated_length = estimated_length.astype(int)
else:
raise ValueError(f"unit is '{unit}' but must be 'samples' or 's'")
return estimated_length
[docs]
def impulse_response(self, n_samples=None):
"""
Compute the impulse response of the filter.
Parameters
----------
n_samples : int, optional
Length in samples for which the impulse response is computed. The
default is ``None`` in which case ``n_samples`` is computed as the
maximum value across filter channels returned by
:py:func:`~FilterFIR.minimum_impulse_response_length`.
A warning is returned if ``n_samples`` is shorter than the value
returned by :py:func:`~FilterFIR.minimum_impulse_response_length`.
Returns
-------
impulse_response : Signal
The impulse response of the filter of with a
``cshape = (n_channels, )``, with the channel shape
:py:func:`~pyfar.Signal.cshape` and the number of filter channels
:py:func:`~n_channels`.
"""
return super().impulse_response(n_samples)
@staticmethod
def _process(coefficients, data, zi=None):
"""Process a single filter channel.
This is a hidden static method required for a shared processing
function in the parent class.
Parameters
----------
coefficients : array
Coefficients of the filter channel.
data : array
The data to be filtered of shape ``(cshape, n_samples)``.
zi : array, optional
The initial filter state of shape ``(cshape, order)`` where
``cshape`` is the channel shape of the signal to be filtered.
The default is ``None``.
Returns
-------
out : array
The filtered data.
zf : array
The final filter state. Only returned if ``zi is not None``.
"""
b = coefficients[0]
# broadcast b to match ndim of data for convolution
b = np.broadcast_to(b, (1, ) * (data.ndim - b.ndim) + b.shape)
out_full = spsignal.oaconvolve(data, b, mode='full')
# Ensure that the output has the same shape as the input
out = out_full[..., 0:data.shape[-1]]
if zi is not None:
if zi.shape[0:-1] != data.shape[0:-1]:
raise ValueError(
"The initial state does not match the cshape of "
"the signal. Required shape for `state` in "
"FilterFIR is (n_channels, *cshape, order).")
# add initial filter state to the beginning of the output
out[..., 0:zi.shape[-1]] += zi
# get new filter state
zf = out_full[..., data.shape[-1]:]
return out, zf
else:
return out
def __repr__(self):
"""Representation of the filter object."""
return _repr_string(
"FIR", self.order, self.n_channels, self.sampling_rate)
[docs]
class FilterIIR(Filter):
"""
Filter object for IIR filters.
For IIR filters with high orders, second order section IIR filters using
FilterSOS should be considered.
Parameters
----------
coefficients : array, double
The filter coefficients as an array, with shape
(number of channels, 2, max(number of coefficients in the nominator,
number of coefficients in the denominator))
sampling_rate : number
The sampling rate of the filter in Hz.
state : array, double, optional
The state of the filter from prior information with dimensions
``(n_filter_chan, *cshape, order)``, where ``cshape`` is
the channel shape of the :py:class:`~pyfar.Signal`
to be filtered.
comment : str
A comment. The default is ``''``, which initializes an empty
string.
Returns
-------
FilterIIR
The IIR filter object.
"""
def __init__(self, coefficients, sampling_rate, state=None, comment=""):
if state is not None and np.asarray(state).ndim < 3:
state = _atleast_3d_first_dim(state)
warnings.warn(
"The state should be of shape (n_channels, *cshape, order). "
f"The state has been reshaped to shape=({state.shape}",
stacklevel=2)
super().__init__(coefficients, sampling_rate, state, comment)
@property
def order(self):
"""The order of the filter."""
return np.max(self._coefficients.shape[-2:]) - 1
@property
def state(self):
"""
Get or set the filter state.
The filter state sets the initial condition of the filter and can for
example be used for block-wise filtering.
Note that the state can also be initialized with the
:py:func:`~pyfar.classes.filter.FilterIIR.init_state` method.
Shape of the state must be ``(n_filter_channels, *cshape, order)``,
with ``cshape`` being the channel shape of the
:py:class:`~pyfar.Signal` to be filtered.
"""
return self._state
@state.setter
def state(self, state):
"""
Set initial state of IIR filter.
Parameters
----------
state : array, double
The state of the filter with dimensions
``(n_channels, *cshape, order)``, where ``cshape`` is
the channel shape of the :py:class:`~pyfar.Signal`
to be filtered.
"""
if state is not None:
state = np.asarray(state)
if (state.shape[-1] != self.order
or state.ndim < 3
or state.shape[0] != self.n_channels):
raise ValueError(
"The state does not match the filter structure. Required "
"shape for FilterIIR is (n_channels, *cshape, order).")
# sets filter state in parent class
Filter.state.fset(self, state)
[docs]
def init_state(self, cshape, state='zeros'):
"""Initialize the buffer elements to pre-defined initial conditions.
Parameters
----------
cshape : tuple, int
The channel shape of the :py:class:`~pyfar.Signal`
which is to be filtered.
state : str, optional
The desired state. This can either be ``'zeros'`` which initializes
an empty filter, or ``'step'`` which constructs the initial
conditions for step response steady-state. The default is 'zeros'.
"""
self._check_state_keyword(state)
new_state = np.zeros((self.n_channels, *cshape, self.order))
if state == 'step':
for idx, coeff in enumerate(self._coefficients):
new_state[idx, ...] = spsignal.lfilter_zi(coeff[0], coeff[1])
return super().init_state(state=new_state)
[docs]
def impulse_response(self, n_samples=None):
"""
Compute the impulse response of the filter.
Parameters
----------
n_samples : int, optional
Length in samples for which the impulse response is computed. The
default is ``None`` in which case ``n_samples`` is computed as the
maximum value across filter channels returned by
:py:func:`~FilterIIR.minimum_impulse_response_length`.
A warning is returned if ``n_samples`` is shorter than the value
returned by :py:func:`~FilterIIR.minimum_impulse_response_length`.
Returns
-------
impulse_response : Signal
The impulse response of the filter of with a
``cshape = (n_channels, )``, with the channel shape
:py:func:`~pyfar.Signal.cshape` and the number of filter channels
:py:func:`~n_channels`.
"""
return super().impulse_response(n_samples)
[docs]
def minimum_impulse_response_length(self, unit='samples', tolerance=5e-5):
"""
Estimate the minimum length of the filter impulse response.
The length is estimated from the positions of the poles of the filter.
The closer the poles are to the unit circle, the longer the estimated
length.
Parameters
----------
unit : string, optional
The unit in which the length is returned. Can be ``'samples'`` or
``'s'`` (seconds). The default is ``'samples'``.
tolerance : float, optional
Tolerance for the accuracy. Smaller tolerances will result in
larger impulse response lengths. The default is ``5e-5``.
Returns
-------
minimum_impulse_response_length : array
An integer array of shape(n_channels, ) containing the length in
samples with the number of filter channels :py:func:`~n_channels`.
"""
# This is a direct python port of the parts of Matlab's impzlength that
# refer to IIR filters
channels = self.coefficients.shape[0]
estimated_length = np.zeros(channels)
for channel in range(channels):
b = self.coefficients[channel, 0]
a = self.coefficients[channel, 1]
# this is an FIR filter
if self.order == 0 or np.all(a[1:] == 0):
estimated_length[channel] = np.max(np.nonzero(b)) + 1
continue
# This is an IIR filter. Delay of non-recursive part
estimated_length[channel] = np.min(np.nonzero(b))
# poles of the transfer function
poles = np.roots(a)
if np.any(np.abs(poles) > 1.0001):
# This is an unstable filter. The closer the outmost pole is to
# the unit circle, the longer the impulse response.
estimated_length[channel] += \
6 / np.log10(np.max(np.abs(poles[np.abs(poles) > 1])))
else:
# This is a stable filter. Minimum height is 1e-5 of original
# amplitude
idx = np.abs(poles - 1) < 1e-5
poles[idx] = -poles[idx]
# poles on and close to unit circle
idx_oscillation = np.argwhere(np.abs(np.abs(poles)-1) < 1e-5)
# poles further away from unit circle
idx_damped = np.argwhere(np.abs(np.abs(poles)-1) >= 1e-5)
if len(idx_oscillation) == len(poles):
# pure oscillation
estimated_length[channel] += \
5 * np.max(2 * np.pi / np.abs(np.angle(poles)))
elif len(idx_damped) == len(poles):
# no oscillation
idx = np.argmax(np.abs(poles))
pole_multiplicity = self._pole_multiplicity(poles, idx)
estimated_length[channel] += \
pole_multiplicity * np.log10(tolerance) / \
np.log10(np.abs(poles[idx]))
else:
# mixture of both
periods = 5 * np.max(2 * np.pi / \
np.abs(np.angle(poles[idx_oscillation[0]])))
poles_damped = poles[idx_damped[0]]
idx = np.argmax(np.abs(poles_damped))
multiplicity = self._pole_multiplicity(poles_damped, idx)
# catch runtime warning for poles in the origin
if np.abs(poles_damped[idx]) > 0:
multiplicity_factor = np.log10(tolerance) / \
np.log10(np.abs(poles_damped[idx]))
else:
multiplicity_factor = 0
estimated_length[channel] += np.maximum(
periods,
multiplicity * multiplicity_factor)
estimated_length[channel] = np.maximum(
len(a) + len(b) - 1, estimated_length[channel])
if unit == 's':
estimated_length /= self.sampling_rate
elif unit == 'samples':
estimated_length = estimated_length.astype(int)
else:
raise ValueError(f"unit is '{unit}' but must be 'samples' or 's'")
return estimated_length
@staticmethod
def _pole_multiplicity(poles, index, tolerance=.001):
"""
Find multiplicity of a pole.
Required for minimum_impulse_response_length.
"""
if np.all(poles != 0):
tolerance *= np.abs(poles[index])
return np.sum(np.abs(poles - poles[index]) < tolerance)
@staticmethod
def _process(coefficients, data, zi=None):
"""Process a single filter channel.
This is a hidden static method required for a shared processing
function in the parent class.
"""
if zi is not None and zi.shape[0:-1] != data.shape[0:-1]:
raise ValueError("The initial state does not match the cshape of "
"the signal. Required shape for `state` in "
"FilterIIR is (n_channels, *cshape, order).")
return spsignal.lfilter(coefficients[0], coefficients[1], data, zi=zi)
def __repr__(self):
"""Representation of the filter object."""
return _repr_string(
"IIR", self.order, self.n_channels, self.sampling_rate)
[docs]
class FilterSOS(Filter):
"""
Filter object for IIR filters as second order sections (SOS).
Parameters
----------
coefficients : array, double
The filter coefficients as an array with dimensions
``(n_filter_chan, n_sections, 6)`` The first three values of
a section provide the numerator coefficients, the last three values
the denominator coefficients, e.g,
``[[[ b[0], b[1], b[2], a[0], a[1], a[2] ]]]`` for a single channel
SOS filter with one section.
sampling_rate : number
The sampling rate of the filter in Hz.
state : array, double, optional
The state of the filter from prior information with dimensions
``(n_filter_chan, *cshape, n_sections, 2)``, where ``cshape`` is
the channel shape of the :py:class:`~pyfar.Signal`
to be filtered.
comment : str
A comment. The default is ``''``, which initializes an emptry
string.
Returns
-------
FilterSOS
The SOS filter object.
"""
def __init__(self, coefficients, sampling_rate, state=None, comment=""):
if state is not None and np.asarray(state).ndim < 4:
state = _atleast_4d_first_dim(state)
warnings.warn(
"The state should be of shape (n_channels, *cshape, n_sections"
f", 2). The state has been reshaped to shape=({state.shape}",
stacklevel=2)
super().__init__(coefficients, sampling_rate, state, comment)
@Filter.coefficients.setter
def coefficients(self, value):
"""Coefficients of the filter."""
coeff = _atleast_3d_first_dim(value)
if coeff.shape[-1] != 6:
raise ValueError(
"The coefficients are not in line with a second order",
"section filter structure.")
self._coefficients = coeff
@property
def order(self):
"""The order of the filter.
This is always twice the number of sections.
"""
return 2*self.n_sections
@property
def n_sections(self):
"""The number of sections."""
return self._coefficients.shape[-2]
@property
def state(self):
"""
Get or set the filter state.
The filter state sets the initial condition of the filter and can for
example be used for block-wise filtering.
Note that the state can also be initialized with the
:py:func:`~pyfar.classes.filter.FilterSOS.init_state` method.
Shape of the state must be
``(n_filter_channels, *cshape, n_sections, 2))``,
with ``cshape`` being the channel shape of the
:py:class:`~pyfar.Signal` to be filtered.
"""
return self._state
@state.setter
def state(self, state):
"""
Set initial state of SOS filter.
Parameters
----------
state : array, double
The state of the filter with dimensions
``(n_channels, *cshape, n_sections, 2)``, where ``cshape`` is
the channel shape of the :py:class:`~pyfar.Signal`
to be filtered.
"""
if state is not None:
state = np.asarray(state)
if (state.shape[0] != self.n_channels
or state.shape[-1] != 2
or state.shape[-2] != self.n_sections
or state.ndim < 4):
raise ValueError(
"The state does not match the filter structure. Required "
"shape for FilterSOS is (n_channels, *cshape, "
"n_sections, 2).")
# sets filter state in parent class
Filter.state.fset(self, state)
[docs]
def init_state(self, cshape, state='zeros'):
"""Initialize the buffer elements to pre-defined initial conditions.
Parameters
----------
cshape : tuple, int
The channel shape of the :py:class:`~pyfar.Signal`
which is to be filtered.
state : str, optional
The desired state. This can either be ``'zeros'`` which initializes
an empty filter, or ``'step'`` which constructs the initial
conditions for step response steady-state. The default is 'zeros'.
"""
self._check_state_keyword(state)
new_state = np.zeros((self.n_channels, *cshape, self.n_sections, 2))
if state == 'step':
for idx, coeff in enumerate(self._coefficients):
new_state[idx, ...] = spsignal.sosfilt_zi(coeff)
return super().init_state(state=new_state)
[docs]
def impulse_response(self, n_samples=None):
"""
Approximate the infinite impulse response of the filter by a finite
impulse response.
Note that the number of samples must be sufficiently long for
`impulse_response` to be a good approximation of the theoretically
infinitely long impulse response of the filter.
Parameters
----------
n_samples : int
Length in samples for which the impulse response is computed. The
default is ``None`` in which case ``n_samples`` is computed as the
maximum value across filter channels returned by
:py:func:`~FilterSOS.minimum_impulse_response_length`.
A warning is returned if ``n_samples`` is shorter than the value
returned by :py:func:`~FilterSOS.minimum_impulse_response_length`.
Returns
-------
impulse_response : Signal
The impulse response of the filter of with a
``cshape = (n_channels, )``, with the channel shape
:py:func:`~pyfar.Signal.cshape` and the number of filter channels
:py:func:`~n_channels`.
"""
return super().impulse_response(n_samples)
[docs]
def minimum_impulse_response_length(self, unit='samples', tolerance=5e-5):
"""
Estimate the minimum length of the filter impulse response.
The length is estimated separately per second order section using
:py:func:`~FilterIIR.minimum_impulse_response_length`. The final length
is given by the length of the longest section.
Parameters
----------
unit : string, optional
The unit in which the length is returned. Can be ``'samples'`` or
``'s'`` (seconds). The default is ``'samples'``.
tolerance : float, optional
Tolerance for the accuracy. Smaller tolerances will result in
larger impulse response lengths. The default is ``5e-5``.
Returns
-------
minimum_impulse_response_length : array
An integer array of shape(n_channels, ) containing the length in
samples with the number of filter channels :py:func:`~n_channels`.
"""
# This is a direct python port of the parts of Matlab's impzlength that
# refer to SOS filters
channels = self.coefficients.shape[0]
sections = self.coefficients.shape[1]
estimated_length = np.zeros(channels)
for channel in range(channels):
# initialize length for FIR and IIR sections
# Note: Matlab uses 1 for initialization, which does not make sense
# for FIR sections. Using 0 should be better there and does
# not change anything for IIR sections
length_fir = 0
length_iir = 0
for section in range(sections):
# estimate length of each section and channel
b = self.coefficients[channel, section, :3]
a = self.coefficients[channel, section, 3:]
if np.all(a[1:] == 0):
# FIR sections: accumulate FIR length
length_fir += np.max(np.nonzero(b)) + 1
else:
# IIR section: track maximum IIR length
# (assuming this dominates the impulse response length)
filter_iir = FilterIIR([b, a], self.sampling_rate)
length_iir = np.maximum(
length_iir,
filter_iir.minimum_impulse_response_length(
tolerance=tolerance)[0])
# use maximum of FIR and IIR length for final estimate
estimated_length[channel] = np.maximum(length_fir, length_iir)
if unit == 's':
estimated_length /= self.sampling_rate
elif unit == 'samples':
estimated_length = estimated_length.astype(int)
else:
raise ValueError(f"unit is '{unit}' but must be 'samples' or 's'")
return estimated_length
@staticmethod
def _process(sos, data, zi=None):
"""Process a single filter channel.
This is a hidden static method required for a shared processing
function in the parent class.
"""
if zi is not None and zi.shape[0:-2] != data.shape[0:-1]:
raise ValueError("The initial state does not match the cshape of "
"the signal. Required shape for `state` in "
"FilterSOS is (n_channels, *cshape, n_sections,"
" 2).")
if zi is not None:
zi = zi.transpose(1, 0, 2)
res = spsignal.sosfilt(sos, data, zi=zi, axis=-1)
if zi is not None:
zi = res[1].transpose(1, 0, 2)
return res[0], zi
else:
return res
def __repr__(self):
"""Representation of the filter object."""
return _repr_string(
"SOS", self.n_sections, self.n_channels, self.sampling_rate)