Source code for utilities.data_denoisers

"""Data denoising methods.

Classes
-------
Denoiser
    Data denoiser.
"""
#
#                                                                       Modules
# =============================================================================
# Third-party
import torch
import torch.nn.functional as F
import numpy as np
import scipy
import matplotlib.pyplot as plt
# Local
from ioput.plots import plot_xy_data
#
#                                                          Authorship & Credits
# =============================================================================
__author__ = 'Bernardo Ferreira (bernardo_ferreira@brown.edu)'
__credits__ = ['Bernardo Ferreira', ]
__status__ = 'Stable'
# =============================================================================
#
# =============================================================================
[docs]class Denoiser: """Data denoiser. Methods ------- denoise(self, tensor, denoise_method, denoise_parameters={}) Denoise features tensor. _dn_moving_average(self, tensor, window_size) Denoising method: Moving Average. _dn_savitzky_golay(self, tensor, window_size, poly_order) Denoising method: Savitzky-Golay filter. _dn_frequency_low_pass(self, tensor, cutoff_frequency, \ is_plot_magnitude_spectrum=False) Denoising method: Frequency low-pass filter. _check_tensor(self, tensor) Check features tensor to be denoised. """
[docs] def __init__(self): """Constructor.""" pass
# -------------------------------------------------------------------------
[docs] def denoise(self, tensor, denoise_method, denoise_parameters={}, n_denoise_cycle=1): """Denoise features tensor. Parameters ---------- tensor : torch.Tensor Features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). denoise_method : str Denoising method. denoise_parameters : dict, default={} Denoising method parameters. n_denoise_cycle : int, default=1 Number of times that the denoise method is applied recurrently to denoise the features tensor. Returns ------- dn_tensor : torch.Tensor Denoised features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). """ # Check features tensor self._check_tensor(tensor) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Initialize denoising method if denoise_method == 'moving_average': denoiser = self._dn_moving_average elif denoise_method == 'savitzky_golay': denoiser = self._dn_savitzky_golay elif denoise_method == 'frequency_low_pass': denoiser = self._dn_frequency_low_pass else: raise RuntimeError('Unknown denoising method.') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Initialize denoised features tensor dn_tensor = tensor # Denoise features tensor for _ in range(n_denoise_cycle): dn_tensor = denoiser(dn_tensor, **denoise_parameters) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return dn_tensor
# -------------------------------------------------------------------------
[docs] def _dn_moving_average(self, tensor, window_size): """Denoising method: Moving Average. Convolution is performed independently for each feature. Edge padding is performed to preserve input tensor shape. Parameters ---------- tensor : torch.Tensor Features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). window_size : int Convolution kernel size. Returns ------- dn_tensor : torch.Tensor Denoised features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). """ # Get number of features n_features = tensor.shape[1] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Reshape to (batch_size=1, n_features, sequence_length) tensor = tensor.T.unsqueeze(0) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Compute required padding to preserve input tensor shape padding = (window_size//2, window_size - 1 - window_size//2) # Perform edge padding (replicate edge values of each feature) tensor = F.pad(tensor, (padding[0], padding[1]), mode='replicate') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set convolution kernel of shape (n_features, 1, window_size) # (set independent kernel for each feature) kernel = torch.ones(n_features, 1, window_size)/window_size # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Compute moving average (independent convolution of each feature) denoised_tensor = \ F.conv1d(tensor, kernel, padding=0, groups=n_features) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Reshape back to (sequence_length, n_features) dn_tensor = denoised_tensor.squeeze(0).T # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return dn_tensor
# -------------------------------------------------------------------------
[docs] def _dn_savitzky_golay(self, tensor, window_size, poly_order): """Denoising method: Savitzky-Golay filter. Convolution is performed independently for each feature. Edge padding is performed to preserve input tensor shape. General thumb rules: 1. Set an odd number for the window_size (ensures that there is a central point around which the smoothing occurs) 2. The window_size should be around 5% to 15% of the sequence length, depending on the level of noise. Larger window_size smooths more effectively but may remove important details, while a smaller window preserves more detail but may not adequately smooth out noise 3. The poly_order must be less than window_size (ensures that there are enough points to fit the polynomial correctly) 4. The poly_order is usually between 1 and 4. Lower poly_order is effective when data has a simple trend or is relative smooth, while larger poly_order is effective for data that is inherently more complex. Higher polynomial orders may introduce undesired oscillations! Parameters ---------- tensor : torch.Tensor Features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). window_size : int Convolution kernel size (number of Savitzky-Golay coefficients). poly_order : int Order of polynomial used to fit the samples in the window. Must be less that window_size. Returns ------- dn_tensor : torch.Tensor Denoised features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). """ # Get number of features n_features = tensor.shape[1] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Reshape to (batch_size=1, n_features, sequence_length) tensor = tensor.T.unsqueeze(0) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Compute required padding to preserve input tensor shape padding = (window_size//2, window_size - 1 - window_size//2) # Perform edge padding (replicate edge values of each feature) tensor = F.pad(tensor, (padding[0], padding[1]), mode='replicate') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Compute the coefficients for 1D Savitzky-Golay filter coeffs = np.array([scipy.signal.savgol_coeffs(window_size, poly_order) for _ in range(n_features)]) # Set convolution kernel of shape (n_features, 1, window_size) # (set independent kernel for each feature) kernel = torch.tensor(coeffs).view(n_features, 1, window_size) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Compute moving average (independent convolution of each feature) denoised_tensor = \ F.conv1d(tensor, kernel, padding=0, groups=n_features) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Reshape back to (sequence_length, n_features) dn_tensor = denoised_tensor.squeeze(0).T # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return dn_tensor
# -------------------------------------------------------------------------
[docs] def _dn_frequency_low_pass(self, tensor, cutoff_frequency, is_plot_magnitude_spectrum=False): """Denoising method: Frequency low-pass filter. Filtering is performed independently for each feature. Parameters ---------- tensor : torch.Tensor Features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). cutoff_frequency : int Cutoff frequency of the low-pass filter. is_plot_magnitude_spectrum : bool, default=False If True, then plot magnitude spectrum and cutoff frequency. Returns ------- dn_tensor : torch.Tensor Denoised features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). """ # Get sequence length n_time = tensor.shape[0] # Get number of features n_features = tensor.shape[1] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Perform Discrete Fourier Transform # (independently for each feature) tensor_dft = torch.fft.fft(tensor, dim=0) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set sampling period sampling_period = 1 # Get discrete frequencies discrete_freqs = torch.fft.fftfreq(n_time, sampling_period) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Plot magnitude spectrum and cutoff frequency if is_plot_magnitude_spectrum: # Compute magnitude spectrum magnitude = np.abs(tensor_dft) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Initialize data array data_xy = torch.zeros((n_time//2, n_time//2)) # Build data array for i in range(n_features): data_xy[:, 2*i] = discrete_freqs[:n_time//2] data_xy[:, 2*i + 1] = magnitude[:n_time//2, i] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Plot only the first feature data_xy = data_xy[:, (0, 1)] # Set data labels data_labels = ('Feature 1',) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set axes limits (limited by Nyquist frequency) x_lims = (0, (sampling_period**-1)/2) y_lims = (None, None) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set axes labels x_label = 'Frequency (Hz)' y_label = 'Magnitude' # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Plot parameter history _, axes = plot_xy_data(data_xy, data_labels=data_labels, x_lims=x_lims, y_lims=y_lims, x_label=x_label, y_label=y_label, marker='o', is_latex=True) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Plot cutoff frequency axes.axvline(cutoff_frequency, color='red', linestyle='--', label='Cutoff frequency') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Display legend axes.legend() # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Display figure plt.show() # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Create low-pass filter low_pass_filter = torch.abs(discrete_freqs) <= cutoff_frequency # Expand low-pass filter for all features low_pass_filter = \ low_pass_filter.view(-1, 1).expand(n_time, n_features) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Apply low-pass filter tensor_dft = low_pass_filter*tensor_dft # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Perform Inverse Discrete Fourier Transform # (independently for each feature) dn_tensor = torch.fft.ifft(tensor_dft, dim=0).real # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return dn_tensor
# -------------------------------------------------------------------------
[docs] def _check_tensor(self, tensor): """Check features tensor to be denoised. Parameters ---------- tensor : torch.Tensor Features PyTorch tensor stored as torch.Tensor with shape (sequence_length, n_features). """ if not isinstance(tensor, torch.Tensor): raise RuntimeError('Features tensor is not a torch.Tensor.') elif len(tensor.shape) != 2: raise RuntimeError('Features tensor is not a torch.Tensor with ' 'shape (sequence_length, n_features).')