Source code for cratepy.clustering.adaptivity.adaptivity_criterion

"""Clustering adaptivity criterion.

This module includes the interface to implement any clustering adaptivity
criterion and some clustering adaptivity criteria.

The concept of clustering adaptivity criterion arises in the context of
adaptive clustering-based reduced order modeling proposed by
Ferreira et. al (2022) [#]_.

.. [#] Ferreira, B.P., Andrade Pires, F.M. and Bessa, M.A. (2022).
       *Adaptivity for clustering-based reduced-order modeling of
       localized history-dependent phenomena.* Comp Methods Appl M, 393
       (see `here <https://www.sciencedirect.com/science/article/pii/
       S0045782522000895?via%3Dihub>`_)

Classes
-------
AdaptivityCriterion(ABC)
    Clustering adaptivity criterion interface.
AdaptiveClusterGrouping(AdaptivityCriterion)
    Clustering adaptivity cluster grouping criterion.
SpatialDiscontinuities(AdaptivityCriterion)
    Clustering adaptivity spatial discontinuities criterion.
"""
#
#                                                                       Modules
# =============================================================================
# Standard
from abc import ABC, abstractmethod
import copy
import random
# Third-party
import numpy as np
# Local
import ioput.ioutilities as ioutil
#
#                                                          Authorship & Credits
# =============================================================================
__author__ = 'Bernardo Ferreira (bernardo_ferreira@brown.edu)'
__credits__ = ['Bernardo Ferreira', ]
__status__ = 'Stable'
# =============================================================================
#
#                                    Interface: Clustering adaptivity criterion
# =============================================================================
[docs]class AdaptivityCriterion(ABC): """Clustering adaptivity criterion interface. Methods ------- get_parameters() *abstract*: Get clustering adaptivity criterion parameters. get_target_clusters(self) *abstract*: Get clustering adaptivity target clusters ans associated data. """
[docs] @abstractmethod def __init__(self): """Constructor.""" pass
# -------------------------------------------------------------------------
[docs] @staticmethod @abstractmethod def get_parameters(): """Get clustering adaptivity criterion parameters. Besides returning the mandatory and optional adaptivity criterion parameters, this method establishes the default values for the optional parameters. Returns ------- mandatory_parameters : dict Mandatory adaptivity type parameters (str) and associated type (item, type). optional_parameters : dict Optional adaptivity type parameters (key, str) and associated default value (item). """ pass
# -------------------------------------------------------------------------
[docs] @abstractmethod def get_target_clusters(self): """Get clustering adaptivity target clusters ans associated data. Returns ------- target_clusters : list[int] List containing the labels (int) of clusters to be adapted. target_clusters_data : dict For each target cluster (key, str), store dictionary (item, dict) containing cluster associated parameters required for the adaptive procedures. """ pass
# # Adaptivity criteria # =============================================================================
[docs]class AdaptiveClusterGrouping(AdaptivityCriterion): """Clustering adaptivity cluster grouping criterion. This adaptivity clustering criterion is not formally documented. Attributes ---------- _adapt_groups : dict Store the cluster labels (item, list[int]) associated with each adaptive cluster group (key, int). _groups_adapt_level : dict Store adaptive level (item, int) of each adaptive cluster group (key, int). _target_groups_ids : list Target adaptive cluster groups (item, list[int]) whose clusters are to be adapted. Methods ------- get_parameters() Get clustering adaptivity criterion parameters. get_target_clusters(self, adapt_data_matrix, voxels_clusters) Get clustering adaptivity target clusters. update_group_clusters(self, adaptive_clustering_map) Update adaptive cluster groups after adaptive procedures. get_adapt_groups(self) Get adaptive cluster groups. get_groups_adapt_level(self) Get adaptive cluster groups adaptive level. get_min_adapt_feature_val(self) Get minimum significant value of clustering adaptivity feature. _adaptivity_trigger_condition(self, adapt_data_matrix) Evaluate adaptive cluster group adaptivity trigger condition. _adaptivity_selection_criterion(self, adapt_data_matrix) Select target clusters from adaptive cluster group. """
[docs] def __init__(self, adapt_mat_phase, phase_clusters, adapt_trigger_ratio=None, adapt_split_threshold=None, adapt_max_level=None, adapt_min_voxels=None, is_merge_adapt_groups=None, min_adapt_feature_val=None): """Constructor. Parameters ---------- adapt_mat_phase : str Adaptive material phase label. phase_clusters : dict Clusters labels (item, list[int]) associated with each material phase (key, str). adapt_trigger_ratio : float, default=None Threshold associated with the adaptivity trigger condition, defining the value of the relative ratio (max - avg)/avg above which the adaptive cluster group is set to be adapted, where `max` and `avg` are the maximum and average adaptivity feature values in the adaptive cluster group, respectively. adapt_split_threshold : float, default=None Threshold associated with the adaptivity selection criterion, defining the split boundary of each adaptive cluster group according to the associated position in terms of the adaptivity value range within the group. For instance, a `adapt_split_threshold` of 0.8 means that the split boundary divides the clusters whose adaptivity feature value is above min + 0.8*(max - min) (top 20% of the value range) from the remaining clusters. adapt_max_level : int, default=None Maximum adaptive cluster group adaptive level. adapt_min_voxels : int, default=None Minimum number of voxels that cluster must contain to be targeted for adaptivity. is_merge_adapt_groups : bool, default=None True if the adaptive cluster groups of the same adaptive level are to be merged, False if each adaptive cluster group follows an independent hierarchy. min_adapt_feature_val : float, default=None Minimum significant value of clustering adaptivity control feature. """ # Set initial adaptive clusters group (labeled as group '0') and # initialize associated adaptive level self._adapt_groups = {} self._groups_adapt_level = {} self._adapt_groups['0'] = \ copy.deepcopy(phase_clusters[adapt_mat_phase]) self._groups_adapt_level['0'] = 0 # Get optional parameters optional_parameters = type(self).get_parameters() # Get adaptivity trigger ratio if adapt_trigger_ratio is None: self._adapt_trigger_ratio = \ optional_parameters['adapt_trigger_ratio'] else: self._adapt_trigger_ratio = adapt_trigger_ratio # Get adaptivity split threshold if adapt_split_threshold is None: self._adapt_split_threshold = \ optional_parameters['adapt_split_threshold'] else: self._adapt_split_threshold = adapt_split_threshold # Get maximum adaptive cluster group adaptive level if adapt_max_level is None: self._adapt_max_level = optional_parameters['adapt_max_level'] else: self._adapt_max_level = adapt_max_level # Get minimum number of voxels that cluster must contain to be targeted # for adaptivity if adapt_min_voxels is None: self._adapt_min_voxels = optional_parameters['adapt_min_voxels'] else: self._adapt_min_voxels = adapt_min_voxels # Set merge adaptive cluster groups flag if is_merge_adapt_groups is None: self._is_merge_adapt_groups = \ bool(optional_parameters['is_merge_adapt_groups']) else: self._is_merge_adapt_groups = bool(is_merge_adapt_groups) # Set minimum significant value of clustering adaptivity control # feature if min_adapt_feature_val is None: self._min_adapt_feature_val = \ optional_parameters['min_adapt_feature_val'] else: self._min_adapt_feature_val = min_adapt_feature_val
# -------------------------------------------------------------------------
[docs] @staticmethod def get_parameters(): """Get clustering adaptivity criterion parameters. Besides returning the mandatory and optional adaptivity criterion parameters, this method establishes the default values for the optional parameters. Returns ------- mandatory_parameters : dict Mandatory adaptivity type parameters (str) and associated type (item, type). optional_parameters : dict Optional adaptivity type parameters (key, str) and associated default value (item). """ # Set mandatory adaptivity criterion parameters mandatory_parameters = {} # Set optional adaptivity criterion parameters and associated default # values optional_parameters = {'adapt_trigger_ratio': 0.1, 'adapt_split_threshold': 0.5, 'adapt_max_level': 15, 'adapt_min_voxels': 1, 'is_merge_adapt_groups': 1, 'min_adapt_feature_val': 0.0} # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Return return mandatory_parameters, optional_parameters
# -------------------------------------------------------------------------
[docs] def get_target_clusters(self, adapt_data_matrix, voxels_clusters): """Get clustering adaptivity target clusters. Parameters ---------- adapt_data_matrix : numpy.ndarray (2d) Adaptivity feature data matrix (numpy.ndarray of shape (adapt_phase_n_clusters, 2)) that, for the i-th cluster of the adaptive material phase, contains the cluster label in adapt_data_matrix[i, 0] and the associated adaptive feature value in adapt_data_matrix[i, 1]. voxels_clusters : numpy.ndarray (2d or 3d) Regular grid of voxels (spatial discretization of the RVE), where each entry contains the cluster label (int) assigned to the corresponding voxel. Returns ------- target_clusters : list[int] List containing the labels (int) of clusters to be adapted. target_clusters_data : dict For each target cluster (key, str), store dictionary (item, dict) containing cluster associated parameters required for the adaptive procedures. """ # Initialize target clusters list target_clusters = [] # Initialize target clusters data target_clusters_data = {} # Initialize target cluster groups self._target_groups_ids = [] # Get adaptive material phase cluster groups adapt_groups_old = copy.deepcopy(self._adapt_groups) adapt_groups_ids_old = list(adapt_groups_old.keys()) groups_adapt_level_old = copy.deepcopy(self._groups_adapt_level) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Loop over adaptive cluster groups for group_id in adapt_groups_ids_old: # Check if adaptive cluster group can be further adapted and, # if not, skip to the next one if groups_adapt_level_old[group_id] >= self._adapt_max_level: continue # Get adaptive cluster group clusters and current adaptive level adapt_group = adapt_groups_old[group_id] base_adapt_level = groups_adapt_level_old[group_id] # Get adaptivity data matrix row indexes associated with the # adaptive cluster group clusters adapt_group_idxs = np.where(np.in1d(adapt_data_matrix[:, 0], adapt_group))[0] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check adaptive cluster group adaptivity trigger condition is_trigger = self._adaptivity_trigger_condition( adapt_data_matrix[adapt_group_idxs, :]) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get adaptive cluster group target clusters and assemble them to # the global target clusters list if is_trigger: # Get target clusters according to adaptivity selection # criterion group_target_clusters = self._adaptivity_selection_criterion( adapt_data_matrix[adapt_group_idxs, :]) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self._is_merge_adapt_groups: # Add target clusters to adaptive cluster group according # to the current adaptive level target_group_id = base_adapt_level + 1 if target_group_id in self._adapt_groups.keys(): self._adapt_groups[target_group_id] += \ group_target_clusters else: self._adapt_groups[target_group_id] = \ group_target_clusters self._groups_adapt_level[target_group_id] = \ target_group_id # Update target adaptive cluster groups if target_group_id not in self._target_groups_ids: self._target_groups_ids.append(target_group_id) # Remove target clusters from old adaptive cluster group for cluster in group_target_clusters: self._adapt_groups[group_id].remove(cluster) else: # Get maximum adaptive cluster group id max_group_id = np.max([int(x) for x in self._adapt_groups.keys()]) # Set child adaptive cluster group to be adapted (set id, # set clusters and increment adaptive level relative to # parent cluster group) child_group_1_id = str(max_group_id + 1) self._adapt_groups[child_group_1_id] = \ group_target_clusters self._groups_adapt_level[child_group_1_id] = \ base_adapt_level + 1 self._target_groups_ids.append(child_group_1_id) # Set child adaptive cluster group to be left unchanged # (set id, set clusters and set adaptive level equal to the # parent cluster group) child_group_2_id = str(max_group_id + 2) self._adapt_groups[child_group_2_id] = list( set(adapt_group) - set(group_target_clusters)) self._groups_adapt_level[child_group_2_id] = \ base_adapt_level # Remove parent adaptive cluster group self._adapt_groups.pop(group_id) self._groups_adapt_level.pop(group_id) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Assemble target clusters target_clusters += group_target_clusters # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Loop over target clusters target_clusters_copy = copy.deepcopy(target_clusters) for cluster in target_clusters_copy: # Evaluate target conditions if self._adapt_min_voxels > 1: # Evaluate target cluster number of voxels n_cluster_voxels = np.count_nonzero(voxels_clusters == cluster) is_cond = n_cluster_voxels > self._adapt_min_voxels # Untarget cluster if number of voxels is lower or equal than # minimum, then proceed to the next target cluster if not is_cond: target_clusters.remove(cluster) continue # Build target cluster data dictionary target_clusters_data[str(cluster)] = \ {'is_dynamic_split_factor': False} # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return target_clusters, target_clusters_data
# -------------------------------------------------------------------------
[docs] def update_group_clusters(self, adaptive_clustering_map): """Update adaptive cluster groups after adaptive procedures. Parameters ---------- adaptive_clustering_map : dict List of new cluster labels (item, list[int]) resulting from the adaptive procedures over each target cluster (key, str). """ # Loop over adaptive cluster groups for group_id in self._target_groups_ids: # Get parent adaptive cluster group clusters old_clusters = self._adapt_groups[group_id].copy() # Loop over adaptive cluster group clusters for old_cluster in old_clusters: # If cluster has been adapted, then update adaptive cluster # group if str(old_cluster) in adaptive_clustering_map.keys(): # Remove parent cluster from group self._adapt_groups[group_id].remove(old_cluster) # Add child clusters to group new_clusters = adaptive_clustering_map[str(old_cluster)] self._adapt_groups[group_id] += new_clusters
# -------------------------------------------------------------------------
[docs] def get_adapt_groups(self): """Get adaptive cluster groups. Returns ------- adapt_groups : dict Store the cluster labels (item, list[int]) associated with each adaptive cluster group (key, int). """ return copy.deepcopy(self._adapt_groups)
# -------------------------------------------------------------------------
[docs] def get_groups_adapt_level(self): """Get adaptive cluster groups adaptive level. Returns ------- groups_adapt_level : dict Store adaptive level (item, int) of each adaptive cluster group (key, int). """ return copy.deepcopy(self._groups_adapt_level)
# -------------------------------------------------------------------------
[docs] def get_min_adapt_feature_val(self): """Get minimum significant value of clustering adaptivity feature. Returns ------- min_adapt_feature_val : float Minimum significant value of clustering adaptivity control feature. """ return self._min_adapt_feature_val
# -------------------------------------------------------------------------
[docs] def _adaptivity_trigger_condition(self, adapt_data_matrix): """Evaluate adaptive cluster group adaptivity trigger condition. Parameters ---------- adapt_data_matrix : numpy.ndarray (2d) Adaptivity feature data matrix (numpy.ndarray of shape (adapt_phase_n_clusters, 2)) that, for the i-th cluster of the adaptive material phase, contains the cluster label in adapt_data_matrix[i, 0] and the associated adaptive feature value in adapt_data_matrix[i, 1]. Returns ------- bool True if clustering adaptivity is triggered in the adaptive cluster group, False otherwise. """ # Compute average and maximum values of adaptivity feature a = np.average(adapt_data_matrix[:, 1]) b = np.max(adapt_data_matrix[:, 1]) # Compute adaptivity ratio if abs(a) < 1e-10: adapt_ratio = abs(b - a) else: adapt_ratio = abs(b - a)/abs(a) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check adaptivity trigger condition if adapt_ratio > self._adapt_trigger_ratio: return True else: return False
# -------------------------------------------------------------------------
[docs] def _adaptivity_selection_criterion(self, adapt_data_matrix): """Select target clusters from adaptive cluster group. Parameters ---------- adapt_data_matrix : numpy.ndarray (2d) Adaptivity feature data matrix (numpy.ndarray of shape (adapt_phase_n_clusters, 2)) that, for the i-th cluster of the adaptive material phase, contains the cluster label in adapt_data_matrix[i, 0] and the associated adaptive feature value in adapt_data_matrix[i, 1]. Returns ------- target_clusters : list List containing the labels (int) of clusters to be adapted. """ # Check threshold if not ioutil.is_between(self._adapt_split_threshold, lower_bound=0, upper_bound=1): raise RuntimeError('Clustering adaptivity selection criterion\'s ' 'threshold must be between 0 and 1 (included).') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get split boundary adaptivity feature value adapt_boundary = min(adapt_data_matrix[:, 1]) \ + self._adapt_split_threshold*(max(adapt_data_matrix[:, 1]) - min(adapt_data_matrix[:, 1])) # Get indexes of clusters whose adaptivity feature value is greater or # equal than the split boundary value idxs = adapt_data_matrix[:, 1] >= adapt_boundary # Get target clusters target_clusters = [int(x) for x in adapt_data_matrix[idxs, 0]] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return target_clusters
# =============================================================================
[docs]class SpatialDiscontinuities(AdaptivityCriterion): """Clustering adaptivity spatial discontinuities criterion. The detailed description of the clustering adaptivity spatial discontinuities criterion can be found in Ferreira et. al (2022) [#]_. .. [#] Ferreira, B.P., Andrade Pires, F.M. and Bessa, M.A. (2022). *Adaptivity for clustering-based reduced-order modeling of localized history-dependent phenomena.* Comp Methods Appl M, 393 (see `here <https://www.sciencedirect.com/science/article/pii/ S0045782522000895?via%3Dihub>`_) Attributes ---------- _clusters_adapt_level : dict Adaptive level (item, int) of each cluster (key, str). _swipe_dims_every : list[int], default=None Swipe frequency (item, int) of each spatial dimension. First and last (boundary) voxels are always considered, irrespective of the swipe frequency. Methods ------- get_parameters() Get clustering adaptivity criterion parameters. get_target_clusters(self, adapt_data_matrix, voxels_clusters) Get clustering adaptivity target clusters. update_clusters_adapt_level(self, adaptive_clustering_map) Update clusters adaptive level after adaptive procedures. get_clusters_adapt_level(self) Get clusters adaptive level. get_min_adapt_feature_val(self) Get minimum significant value of clustering adaptivity feature. _swipe_dimension(self, adapt_data_matrix, voxels_clusters, \ target_clusters, target_clusters_data, dim_loops) Evaluate spatial discontinuities along a given dimension. _update_swipe_dims_init_idx(self) Randomly update the dimensions swipe frequency initial index. """
[docs] def __init__(self, adapt_mat_phase, phase_clusters, adapt_trigger_ratio=None, adapt_max_level=None, adapt_min_voxels=None, adapt_level_max_diff=None, swipe_dim_1_every=None, swipe_dim_2_every=None, swipe_dim_3_every=None, min_adapt_feature_val=None, magnitude_lower_factor=None): """Constructor. Parameters ---------- adapt_mat_phase : str Adaptive material phase label. phase_clusters : dict Clusters labels (item, list[int]) associated with each material phase (key, str). adapt_trigger_ratio : float, default=None Threshold associated with the adaptivity trigger condition. adapt_max_level : int, default=None Maximum cluster adaptive level. adapt_min_voxels : int, default=None Minimum number of voxels that cluster must contain to be targeted for adaptivity. adapt_level_max_diff : int, default=None Maximum adaptive level difference when targeting clusters of adjacent voxels. swipe_dim_1_every : int, default=None Swipe frequency of spatial dimension 1. First and last (boundary) voxels are always considered. swipe_dim_2_every : int, default=None Swipe frequency of spatial dimension 2. First and last (boundary) voxels are always considered. swipe_dim_3_every : int, default=None Swipe frequency of spatial dimension 3. First and last (boundary) voxels are always considered. min_adapt_feature_val : float, default=None Minimum significant value of clustering adaptivity control feature. magnitude_lower_factor : float, default=None Factor which affects the maximum stored magnitude associated with the lower valued targeted cluster. """ # Initialize clusters adaptive level self._clusters_adapt_level = {str(cluster): 0 for cluster in phase_clusters[adapt_mat_phase]} # Get optional parameters optional_parameters = type(self).get_parameters() # Get adaptivity trigger ratio if adapt_trigger_ratio is None: self._adapt_trigger_ratio = \ optional_parameters['adapt_trigger_ratio'] else: self._adapt_trigger_ratio = adapt_trigger_ratio # Get maximum cluster adaptive level if adapt_max_level is None: self._adapt_max_level = optional_parameters['adapt_max_level'] else: self._adapt_max_level = adapt_max_level # Get minimum number of voxels that cluster must contain to be targeted # for adaptivity if adapt_min_voxels is None: self._adapt_min_voxels = optional_parameters['adapt_min_voxels'] else: self._adapt_min_voxels = adapt_min_voxels # Get cluster adaptive level maximum difference if adapt_max_level is None: self._adapt_level_max_diff = \ optional_parameters['adapt_level_max_diff'] else: self._adapt_level_max_diff = adapt_level_max_diff # Get spatial dimensions swipe frequencies self._swipe_dims_every = [] if swipe_dim_1_every is None: self._swipe_dims_every.append( optional_parameters['swipe_dim_1_every']) else: self._swipe_dims_every.append(max(swipe_dim_1_every, 1)) if swipe_dim_2_every is None: self._swipe_dims_every.append( optional_parameters['swipe_dim_2_every']) else: self._swipe_dims_every.append(max(swipe_dim_2_every, 1)) if swipe_dim_3_every is None: self._swipe_dims_every.append( optional_parameters['swipe_dim_3_every']) else: self._swipe_dims_every.append(max(swipe_dim_3_every, 1)) # Set minimum significant value of clustering adaptivity control # feature if min_adapt_feature_val is None: self._min_adapt_feature_val = \ optional_parameters['min_adapt_feature_val'] else: self._min_adapt_feature_val = min_adapt_feature_val # Set magnitude factor associated with lower valued targeted cluster if magnitude_lower_factor is None: self._magnitude_lower_factor = \ optional_parameters['magnitude_lower_factor'] else: self._magnitude_lower_factor = magnitude_lower_factor # Initialize spatial dimensions swipe frequency initial index self._swipe_dims_init_idx = [0, 0, 0]
# -------------------------------------------------------------------------
[docs] @staticmethod def get_parameters(): """Get clustering adaptivity criterion parameters. Besides returning the mandatory and optional adaptivity criterion parameters, this method establishes the default values for the optional parameters. ---- Returns ------- mandatory_parameters : dict Mandatory adaptivity type parameters (str) and associated type (item, type). optional_parameters : dict Optional adaptivity type parameters (key, str) and associated default value (item). """ # Set mandatory adaptivity criterion parameters mandatory_parameters = {} # Set optional adaptivity criterion parameters and associated default # values optional_parameters = {'adapt_trigger_ratio': 0.1, 'adapt_max_level': 15, 'adapt_min_voxels': 1, 'adapt_level_max_diff': 2, 'swipe_dim_1_every': 1, 'swipe_dim_2_every': 1, 'swipe_dim_3_every': 1, 'min_adapt_feature_val': 0.0, 'magnitude_lower_factor': 1.0} # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Return return mandatory_parameters, optional_parameters
# -------------------------------------------------------------------------
[docs] def get_target_clusters(self, adapt_data_matrix, voxels_clusters): """Get clustering adaptivity target clusters. Parameters ---------- adapt_data_matrix : numpy.ndarray (2d) Adaptivity feature data matrix (numpy.ndarray of shape (adapt_phase_n_clusters, 2)) that, for the i-th cluster of the adaptive material phase, contains the cluster label in adapt_data_matrix[i, 0] and the associated adaptive feature value in adapt_data_matrix[i, 1]. voxels_clusters : numpy.ndarray (2d or 3d) Regular grid of voxels (spatial discretization of the RVE), where each entry contains the cluster label (int) assigned to the corresponding voxel. Returns ------- target_clusters : list[int] List containing the labels (int) of clusters to be adapted. target_clusters_data : dict For each target cluster (key, str), store dictionary (item, dict) containing cluster associated parameters required for the adaptive procedures. """ # Initialize target clusters list target_clusters = [] # Initialize target clusters data target_clusters_data = {} # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Randomly update the spatial dimensions swipe frequency initial index self._update_swipe_dims_init_idx() # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Perform required spatial evaluations, update list of target clusters # and associated data if len(voxels_clusters.shape) == 2: self._swipe_dimension(adapt_data_matrix, voxels_clusters, target_clusters, target_clusters_data, '12') self._swipe_dimension(adapt_data_matrix, voxels_clusters, target_clusters, target_clusters_data, '21') elif len(voxels_clusters.shape) == 3: self._swipe_dimension(adapt_data_matrix, voxels_clusters, target_clusters, target_clusters_data, '123') self._swipe_dimension(adapt_data_matrix, voxels_clusters, target_clusters, target_clusters_data, '213') self._swipe_dimension(adapt_data_matrix, voxels_clusters, target_clusters, target_clusters_data, '312') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return target_clusters, target_clusters_data
# -------------------------------------------------------------------------
[docs] def update_clusters_adapt_level(self, adaptive_clustering_map): """Update clusters adaptive level after adaptive procedures. Parameters ---------- adaptive_clustering_map : dict List of new cluster labels (item, list[int]) resulting from the adaptive procedures over each target cluster (key, str). """ for old_cluster in adaptive_clustering_map.keys(): # Get parent cluster adaptive level old_cluster_adapt_level = \ self._clusters_adapt_level[str(old_cluster)] # Update child clusters adaptive level for cluster in adaptive_clustering_map[str(old_cluster)]: self._clusters_adapt_level[str(cluster)] = \ old_cluster_adapt_level + 1 # Remove parent cluster self._clusters_adapt_level.pop(old_cluster)
# -------------------------------------------------------------------------
[docs] def get_clusters_adapt_level(self): """Get clusters adaptive level. Returns ------- clusters_adapt_level : dict Adaptive level (item, int) of each cluster (key, str). """ return copy.deepcopy(self._clusters_adapt_level)
# -------------------------------------------------------------------------
[docs] def get_min_adapt_feature_val(self): """Get minimum significant value of clustering adaptivity feature. Returns ------- min_adapt_feature_val : float Minimum significant value of clustering adaptivity control feature. """ return self._min_adapt_feature_val
# -------------------------------------------------------------------------
[docs] def _swipe_dimension(self, adapt_data_matrix, voxels_clusters, target_clusters, target_clusters_data, dim_loops): """Evaluate spatial discontinuities along a given dimension. The spatial dimensions are cycled according to the code provided in `dim_loops`. The dimension where the spatial discontinuities are evaluated is always the first dimension specified in this code (assumed dimension i), while the others cycle the remainder dimensions (assumed dimensions j and k). During this process, both the list of target clusters and the dictionary containing associated data are updated. ---- Parameters ---------- adapt_data_matrix : numpy.ndarray (2d) Adaptivity feature data matrix (numpy.ndarray of shape (adapt_phase_n_clusters, 2)) that, for the i-th cluster of the adaptive material phase, contains the cluster label in adapt_data_matrix[i, 0] and the associated adaptive feature value in adapt_data_matrix[i, 1]. voxels_clusters : numpy.ndarray (2d or 3d) Regular grid of voxels (spatial discretization of the RVE), where each entry contains the cluster label (int) assigned to the corresponding voxel. target_clusters : list[int] List containing the labels (int) of clusters to be adapted. target_clusters_data : dict For each target cluster (key, str), store dictionary (item, dict) containing cluster associated parameters required for the adaptive procedures. dim_loops : str Ordered specification of dimension cycles, being the spatial discontinuities evaluated along dimension `dim_loops[0]`. """ # Get material phase clusters phase_clusters = adapt_data_matrix[:, 0] # Get number of voxels in each dimension n_voxels_dims = [voxels_clusters.shape[i] for i in range(len(voxels_clusters.shape))] # Set numbers of voxels associated with dimension cycles if len(n_voxels_dims) == 2: if dim_loops not in ('12', '21'): raise RuntimeError('Invalid dimension cycles code.') else: # Set cycling dimensions map (ordered as i, j) cycle_spatial_map = [int(dim_loops[0]) - 1, int(dim_loops[1]) - 1] # Set cycling dimensions numbers of voxels n_voxels_i = n_voxels_dims[0] n_voxels_j = n_voxels_dims[1] n_voxels_k = 1 elif len(n_voxels_dims) == 3: if dim_loops not in ('123', '132', '213', '231', '312', '321'): raise RuntimeError('Invalid dimension cycles code.') else: # Set cycling dimensions map (ordered as i, j, k) cycle_spatial_map = [int(dim_loops[0]) - 1, int(dim_loops[1]) - 1, int(dim_loops[2]) - 1] # Set cycling dimensions numbers of voxels n_voxels_i = n_voxels_dims[0] n_voxels_j = n_voxels_dims[1] n_voxels_k = n_voxels_dims[2] else: raise RuntimeError('Invalid number of dimensions.') # Set spatial dimensions map (ordered as 1, 2 [, 3]) spatial_cycle_map = [cycle_spatial_map.index(i) for i in range(len(n_voxels_dims))] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set cycling dimensions swipe frequencies swipe_dim_i_every = self._swipe_dims_every[cycle_spatial_map[0]] swipe_dim_j_every = self._swipe_dims_every[cycle_spatial_map[1]] if len(cycle_spatial_map) == 3: swipe_dim_k_every = self._swipe_dims_every[cycle_spatial_map[2]] else: swipe_dim_k_every = 1 # Set cycling dimensions swipe frequency initial index swipe_dim_i_init = self._swipe_dims_init_idx[cycle_spatial_map[0]] swipe_dim_j_init = self._swipe_dims_init_idx[cycle_spatial_map[1]] if len(cycle_spatial_map) == 3: swipe_dim_k_init = self._swipe_dims_init_idx[2] else: swipe_dim_k_init = 0 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get minimum and maximum value of adaptivity feature min_feature_val = min(adapt_data_matrix[:, 1]) max_feature_val = max(adapt_data_matrix[:, 1]) # Get absolute value of maximum range of adaptivity feature norm_factor = abs(max_feature_val - min_feature_val) # If the maximum range of the adaptivity feature is null, then its # value is uniform in the adaptive material phase if abs(norm_factor) < 1e-10: return # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Loop over voxels of dimension k for voxel_k in range(n_voxels_k): # Proceed according swiping frequency of dimension k. First and # last voxels are always considered if voxel_k == 0 or voxel_k == n_voxels_k - 1: pass elif (voxel_k - swipe_dim_k_init) % swipe_dim_k_every != 0: continue # Loop over voxels of dimension j for voxel_j in range(n_voxels_j): # Proceed according swiping frequency of dimension j. First and # last voxels are always considered if voxel_j == 0 or voxel_j == n_voxels_j - 1: pass elif (voxel_j - swipe_dim_j_init) % swipe_dim_j_every != 0: continue # Loop over voxels of dimension i (evaluation of spatial # discontinuities) for voxel_i in range(n_voxels_i): # Proceed according swiping frequency of dimension i. First # and last voxels are always considered if voxel_i == 0 or voxel_i == n_voxels_i - 1: pass elif (voxel_i - swipe_dim_i_init) % swipe_dim_i_every != 0: continue # Get voxel (i) ordered spatial indexes idxs = [(voxel_i, voxel_j, voxel_k)[i] for i in spatial_cycle_map] # Get cluster label of voxel (i) cluster = voxels_clusters[tuple(idxs)] # Get next voxel (i+1) ordered spatial indexes. If voxel # (i) is the last one, then the next voxel (i+1) is the # first one if voxel_i == n_voxels_i - 1: idxs[cycle_spatial_map[0]] = 0 else: idxs[cycle_spatial_map[0]] += 1 # Get cluster labels of next voxel (i+1) cluster_next = voxels_clusters[tuple(idxs)] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Skip computations if at least one of the clusters does # not belong to the adaptive material phase associated with # the criterion instance if cluster not in phase_clusters \ or cluster_next not in phase_clusters: continue # Skip computations if voxels belong to the same cluster if cluster == cluster_next: continue # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get clusters number of voxels if self._adapt_min_voxels > 1: n_cluster_voxels = \ np.count_nonzero(voxels_clusters == cluster) n_cluster_next_voxels = \ np.count_nonzero(voxels_clusters == cluster_next) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Evaluate clusters adaptive level is_cond_1 = self._clusters_adapt_level[str(cluster)] \ < self._adapt_max_level is_cond_next_1 = \ self._clusters_adapt_level[str(cluster_next)] \ < self._adapt_max_level # Evaluate clusters number of voxels if self._adapt_min_voxels > 1: is_cond_2 = n_cluster_voxels > self._adapt_min_voxels is_cond_next_2 = n_cluster_next_voxels \ > self._adapt_min_voxels else: is_cond_2 = True is_cond_next_2 = True # Evaluate target conditions is_cluster_targetable = is_cond_1 and is_cond_2 is_cluster_next_targetable = is_cond_next_1 \ and is_cond_next_2 # Skip computations if both clusters are untargetable if not is_cluster_targetable \ and not is_cluster_next_targetable: continue # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Get cluster index and feature value cluster_idx = list(adapt_data_matrix[:, 0]).index(cluster) value = adapt_data_matrix[cluster_idx, 1] # Get cluster of next voxel and feature value cluster_next_idx = list( adapt_data_matrix[:, 0]).index(cluster_next) value_next = adapt_data_matrix[cluster_next_idx, 1] # Compute normalized spatial discontinuity along # dimension i ratio = abs(value_next - value)/norm_factor # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Skip computations if normalized spatial discontinuity is # lower than prescribed threshold. Otherwise, compute # associated magnitude if ratio < self._adapt_trigger_ratio: continue else: magnitude = abs(ratio - self._adapt_trigger_ratio) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Evaluate if clusters have already been targeted is_cluster_targeted = cluster in target_clusters is_cluster_next_targeted = cluster_next in target_clusters # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Update previously targeted clusters data if is_cluster_targeted: max_magn = \ target_clusters_data[str(cluster)]['max_magnitude'] if value >= value_next: if magnitude > max_magn: target_clusters_data[str(cluster)][ 'max_magnitude'] = magnitude else: if self._magnitude_lower_factor*magnitude \ > max_magn: target_clusters_data[str(cluster)][ 'max_magnitude'] = \ self._magnitude_lower_factor*magnitude if is_cluster_next_targeted: max_magn = target_clusters_data[str(cluster_next)][ 'max_magnitude'] if value_next > value: if magnitude > max_magn: target_clusters_data[str(cluster_next)][ 'max_magnitude'] = magnitude else: if self._magnitude_lower_factor*magnitude \ > max_magn: target_clusters_data[str(cluster_next)][ 'max_magnitude'] = \ self._magnitude_lower_factor*magnitude # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Update target clusters list and associated data if not (is_cluster_targeted and is_cluster_next_targeted): # Evaluate clusters adaptive level cluster_adapt_level = \ self._clusters_adapt_level[str(cluster)] cluster_next_adapt_level = \ self._clusters_adapt_level[str(cluster_next)] # Compute differences of clusters adaptive level diff_1 = cluster_next_adapt_level - cluster_adapt_level diff_2 = cluster_adapt_level - cluster_next_adapt_level # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Update target clusters list according to associated # adaptive and update data of new targeted clusters if diff_1 > self._adapt_level_max_diff: if not is_cluster_targeted \ and is_cluster_targetable: target_clusters.append(cluster) target_clusters_data[str(cluster)] = {} if value > value_next: target_clusters_data[str(cluster)][ 'max_magnitude'] = magnitude else: target_clusters_data[str(cluster)][ 'max_magnitude'] = \ self._magnitude_lower_factor*magnitude elif diff_2 > self._adapt_level_max_diff: if not is_cluster_next_targeted \ and is_cluster_next_targetable: target_clusters.append(cluster_next) target_clusters_data[str(cluster_next)] = {} if value_next > value: target_clusters_data[str(cluster_next)][ 'max_magnitude'] = magnitude else: target_clusters_data[str(cluster_next)][ 'max_magnitude'] = \ self._magnitude_lower_factor*magnitude else: if not is_cluster_targeted \ and is_cluster_targetable: target_clusters.append(cluster) target_clusters_data[str(cluster)] = {} if value > value_next: target_clusters_data[str(cluster)][ 'max_magnitude'] = magnitude else: target_clusters_data[str(cluster)][ 'max_magnitude'] = \ self._magnitude_lower_factor*magnitude if not is_cluster_next_targeted \ and is_cluster_next_targetable: target_clusters.append(cluster_next) target_clusters_data[str(cluster_next)] = {} if value_next > value: target_clusters_data[str(cluster_next)][ 'max_magnitude'] = magnitude else: target_clusters_data[str(cluster_next)][ 'max_magnitude'] = \ self._magnitude_lower_factor*magnitude # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Loop over target clusters for cluster in target_clusters: # Store adaptive trigger ratio for each target cluster target_clusters_data[str(cluster)]['adapt_trigger_ratio'] = \ self._adapt_trigger_ratio # Set dynamic adaptive clustering split factor flag target_clusters_data[str(cluster)]['is_dynamic_split_factor'] = \ True
# -------------------------------------------------------------------------
[docs] def _update_swipe_dims_init_idx(self): """Randomly update the dimensions swipe frequency initial index. The detailed description of this procedure can be found in Ferreira et. al (2022) [#]_. .. [#] Ferreira, B.P., Andrade Pires, F.M. and Bessa, M.A. (2022). *Adaptivity for clustering-based reduced-order modeling of localized history-dependent phenomena.* Comp Methods Appl M, 393 (see `here <https://www.sciencedirect.com/science/article/pii/ S0045782522000895?via%3Dihub>`_) """ for i in range(len(self._swipe_dims_every)): if self._swipe_dims_every[i] > 1: self._swipe_dims_init_idx[i] = \ random.choice(range(self._swipe_dims_every[i]))