Source code for clustering.clusteringphase
"""Cluster-Reduced Material Phase.
This module includes the interface to implement any Cluster-Reduced Material
Phase and several cluster-reduced material phases, namely the most basic Static
Cluster-Reduced Material Phase (SCRMP). Each class contains all the required
methods to manage the material phase clustering.
The concept of Cluster-Reduced Material Phase was coined by
Ferreira et. al (2022) [#]_ and arises in the context of clustering-based
reduced order modeling (see Chapter 4 of Ferreira (2022) [#]_).
.. [#] Ferreira, B.P., Andrade Pires, F.M. and Bessa, M.A. (2022).
*Adaptivity for clustering-based reduced-order modeling of
localized history-dependent phenomena.* Comp Methods Appl M, 393
(see `here <https://www.sciencedirect.com/science/article/pii/
S0045782522000895?via%3Dihub>`_)
.. [#] Ferreira, B.P. (2022). *Towards Data-driven Multi-scale
Optimization of Thermoplastic Blends: Microstructural
Generation, Constitutive Development and Clustering-based
Reduced-Order Modeling.* PhD Thesis, University of Porto
(see `here <https://repositorio-aberto.up.pt/handle/10216/
146900?locale=en>`_)
Classes
-------
CRMP
Cluster-Reduced Material Phase interface.
ACRMP
Adaptive Cluster-Reduced Material Phase interface.
SCRMP
Static Cluster-Reduced Material Phase (SCRMP).
GACRMP
Generalized Adaptive Cluster-Reduced Material Phase.
HAACRMP
Hierarchical Agglomerative Adaptive Cluster-Reduced Material Phase.
"""
#
# Modules
# =============================================================================
# Standard
from abc import ABC, abstractmethod
import time
import copy
# Third-party
import numpy as np
import scipy.cluster.hierarchy as sciclst
import anytree
# import anytree.exporter
# Local
import clustering.clusteringalgs as clstalgs
from clustering.clusteringalgs import ClusterAnalysis
import tensor.matrixoperations as mop
#
# Authorship & Credits
# =============================================================================
__author__ = 'Bernardo Ferreira (bernardo_ferreira@brown.edu)'
__credits__ = ['Bernardo Ferreira', ]
__status__ = 'Stable'
# =============================================================================
#
# =============================================================================
#
# Interface: Cluster-reduced material phase
# =============================================================================
class CRMP(ABC):
"""Cluster-Reduced Material Phase interface.
Methods
-------
perform_base_clustering(self)
*abstract*: Perform CRMP base clustering.
get_n_clusters(self)
*abstract*: Get current number of clusters.
get_clustering_type(self)
*abstract*: Get cluster-reduced material phase adaptivity type.
get_valid_clust_algs()
*abstract*: Get valid clustering algorithms to compute the CRMP.
_update_cluster_labels(labels, min_label=0)
Update cluster labels starting with the provided minimum label.
"""
@abstractmethod
def __init__(self):
"""Constructor."""
pass
# -------------------------------------------------------------------------
@abstractmethod
def perform_base_clustering(self):
"""Perform CRMP base clustering."""
pass
# -------------------------------------------------------------------------
@abstractmethod
def get_n_clusters(self):
"""Get current number of clusters.
Returns
-------
n_clusters : int
Number of material phase clusters.
"""
pass
# -------------------------------------------------------------------------
@abstractmethod
def get_clustering_type(self):
"""Get cluster-reduced material phase adaptivity type.
Returns
-------
clustering_type : str
Type of cluster-reduced material phase.
"""
pass
# -------------------------------------------------------------------------
@staticmethod
@abstractmethod
def get_valid_clust_algs():
"""Get valid clustering algorithms to compute the CRMP.
Returns
----------
clust_algs : list[str]
Clustering algorithms identifiers (str).
"""
pass
# -------------------------------------------------------------------------
@staticmethod
def _update_cluster_labels(labels, min_label=0):
"""Update cluster labels starting with the provided minimum label.
Parameters
----------
labels : numpy.ndarray (1d)
Cluster labels (numpy.ndarray[int] of shape (n_items,)).
min_label : int, default=0
Minimum cluster label.
Returns
-------
new_labels : numpy.ndarray (1d)
Updated cluster labels (numpy.ndarray[int] of shape (n_items,)).
max_label : int
Maximum cluster label.
"""
# Get sorted set of original labels
unique_labels = sorted(list(set(labels)))
# Initialize updated labels array
new_labels = np.full(len(labels), -1, dtype=int)
# Initialize new label
new_label = min_label
# Loop over sorted original labels
for label in unique_labels:
# Get original labels indexes
indexes = np.where(labels == label)
# Set updated labels
new_labels[indexes] = new_label
# Increment new label
new_label += 1
# Get maximum cluster label
max_label = max(new_labels)
# Check cluster updated labels
if len(unique_labels) != len(set(new_labels)):
raise RuntimeError('Number of clusters differs between original '
'and updated labels.')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Return
return new_labels, max_label
# =============================================================================
class ACRMP(CRMP):
"""Adaptive Cluster-Reduced Material Phase interface.
Methods
-------
perform_adaptive_clustering(self, target_clusters, target_clusters_data)
*abstract*: Perform ACRMP adaptive clustering step.
get_adaptive_output(self)
*abstract*: Get adaptivity metrics for clustering adaptivity output.
_check_adaptivity_lock(self)
*abstract*: Check ACRMP adaptivity locking conditions.
get_adaptivity_type_parameters()
*abstract*: Get ACRMP mandatory and optional adaptivity type
parameters.
_set_adaptivity_type_parameters(self, adaptivity_type)
*abstract*: Set clustering adaptivity parameters.
_dynamic_split_factor(ref_split_factor, adapt_trigger_ratio, magnitude, \
dynamic_amp=0)
Compute dynamic adaptive clustering split factor.
"""
@abstractmethod
def perform_adaptive_clustering(self, target_clusters,
target_clusters_data):
"""Perform ACRMP adaptive clustering step.
Parameters
----------
target_clusters : list[int]
List with the labels (int) of clusters to be adapted.
target_clusters_data : dict
For each target cluster (key, str), store dictionary (item, dict)
containing cluster associated parameters relevant for the adaptive
procedures.
Returns
-------
adaptive_clustering_map : dict
List of new cluster labels (item, list[int] resulting from the
adaptivity of each target cluster (key, str).
"""
pass
# -------------------------------------------------------------------------
@abstractmethod
def get_adaptive_output(self):
"""Get adaptivity metrics for clustering adaptivity output.
Returns
-------
adaptivity_output : list[int or float]
List containing the adaptivity metrics associated with the
clustering adaptivity output file.
"""
pass
# -------------------------------------------------------------------------
@abstractmethod
def _check_adaptivity_lock(self):
"""Check ACRMP adaptivity locking conditions.
Check conditions that may deactivate the adaptive procedures in the
ACRMP. Once the ACRMP adaptivity is locked, it is treated as a SCRMP
for the remainder of the problem numerical solution.
"""
pass
# -------------------------------------------------------------------------
@staticmethod
@abstractmethod
def get_adaptivity_type_parameters():
"""Get ACRMP mandatory and optional adaptivity type parameters.
Besides returning the ACRMP mandatory and optional adaptivity type
parameters, this method establishes the default values for the optional
parameters.
Returns
----------
adapt_type_man_parameters : list[str]
Mandatory adaptivity type parameters (str).
adapt_type_opt_parameters : dict
Optional adaptivity type parameters (key, str) and associated
default value (item).
"""
pass
# -------------------------------------------------------------------------
@abstractmethod
def _set_adaptivity_type_parameters(self, adaptivity_type):
"""Set clustering adaptivity parameters.
Parameters
----------
adaptivity_type : dict
Clustering adaptivity parameters.
"""
pass
# -------------------------------------------------------------------------
@staticmethod
def _dynamic_split_factor(ref_split_factor, adapt_trigger_ratio, magnitude,
dynamic_amp=0):
"""Compute dynamic adaptive clustering split factor.
A detailed description of the dynamic adaptive clustering split factor
can be found in Ferreira et. al (2022) [#]_.
.. [#] Ferreira, B.P., Andrade Pires, F.M. and Bessa, M.A. (2022).
*Adaptivity for clustering-based reduced-order modeling of
localized history-dependent phenomena.* Comp Methods Appl M, 393
(see `here <https://www.sciencedirect.com/science/article/pii/
S0045782522000895?via%3Dihub>`_)
----
Parameters
----------
ref_split_factor : float
Reference (centered) adaptive clustering split factor. The adaptive
clustering split factor must be contained between 0 and 1
(included). The lower bound (0) enforces a single split, while the
upper bound (1) performs the maximum number splits of each cluster
(leading to single-voxel clusters).
adapt_trigger_ratio : float
Threshold associated with the adaptivity trigger condition.
magnitude : float
Difference between cluster ratio and adaptive trigger ratio. Given
that the cluster ratio ranges between 0 and 1 and only clusters
with a ratio greater or equal than the adaptive trigger ratio are
targeted, the magnitude ranges between 0 and 1 - trigger ratio.
dynamic_amp : float, default=0
Dynamic split factor amplitude centered around the reference
adaptive clustering split factor.
Returns
-------
adapt_split_factor : float
Adaptive clustering split factor. The adaptive clustering split
factor must be contained between 0 and 1 (included). The lower
bound (0) enforces a single split, i.e., 2 new clusters, while the
upper bound (1) is associated with a maximum defined number of new
voxels.
"""
# Check provided parameters
if not (ref_split_factor >= 0 and ref_split_factor <= 1):
raise RuntimeError('Invalid reference adaptive clustering split '
'factor.')
if not (adapt_trigger_ratio >= 0 and adapt_trigger_ratio <= 1):
raise RuntimeError('Invalid adaptive trigger ratio.')
if not (dynamic_amp >= 0):
raise RuntimeError('Invalid dynamic split factor amplitude.')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# If the dynamic split factor amplitude is null, skip computations and
# return reference adaptive clustering split factor. Otherwise, compute
# adaptive clustering split factor lower and upper bounds
if abs(dynamic_amp) < 1e-10:
return ref_split_factor
else:
lower_bound = max(0, ref_split_factor - 0.5*dynamic_amp)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Choose dynamic function type
dynamic_type = 'power'
# Set dynamic function
if dynamic_type == 'power':
# Set dynamic function power
n = 1.0
# Check power admissibility
if n < 0:
raise RuntimeError('Dynamic function power must be greater or '
'equal than zero.')
# Power dynamic function
dynamic_function = \
lambda magnitude: (1/((1 - adapt_trigger_ratio)**n))*(
magnitude**n)
else:
# Linear dynamic function
dynamic_function = \
lambda magnitude: (1/(1 - adapt_trigger_ratio))*magnitude
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Compute adaptive clustering split factor
adapt_split_factor = min(1, lower_bound
+ dynamic_function(magnitude)*dynamic_amp)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Return
return adapt_split_factor
#
# Cluster-Reduced Material Phases
# =============================================================================
[docs]class SCRMP(CRMP):
"""Static Cluster-Reduced Material Phase (SCRMP).
Attributes
----------
_clustering_type : str
Type of cluster-reduced material phase.
max_label : int
Clustering maximum label.
cluster_labels : numpy.ndarray (1d)
Material phase cluster labels (numpy.ndarray[int] of shape
(n_phase_voxels,)).
Methods
-------
perform_base_clustering(self, base_clustering_scheme, min_label=0)
Perform SCRMP base clustering.
get_n_clusters(self)
Get current number of clusters.
get_clustering_type(self)
Get cluster-reduced material phase adaptivity type.
get_valid_clust_algs()
Get valid clustering algorithms to compute the CRMP.
"""
[docs] def __init__(self, mat_phase, cluster_data_matrix, n_clusters):
"""Constructor.
Parameters
----------
mat_phase : str
Material phase label.
cluster_data_matrix: numpy.ndarray (2d)
Data matrix (numpy.ndarray of shape
(n_phase_voxels, n_features_dims)) containing the clustering
features data to perform the material phase cluster analyses.
n_clusters : int
Number of material phase clusters.
"""
self._mat_phase = mat_phase
self._cluster_data_matrix = cluster_data_matrix
self._clustering_type = 'static'
self._n_clusters = n_clusters
self.max_label = 0
self.cluster_labels = None
# -------------------------------------------------------------------------
[docs] def perform_base_clustering(self, base_clustering_scheme, min_label=0):
"""Perform SCRMP base clustering.
Parameters
----------
base_clustering_scheme : dict
Prescribed base clustering scheme (item, numpy.ndarray of shape
(n_clusterings, 3)) for each material phase (key, str). Each row is
associated with a unique clustering characterized by a clustering
algorithm (col 1, int), a list of features (col 2, list[int]) and a
list of the features data matrix' indexes (col 3, list[int]).
min_label : int, default=0
Minimum cluster label.
"""
# Get number of material phase voxels
n_phase_voxels = self._cluster_data_matrix.shape[0]
# Get number of prescribed clusterings
n_clusterings = base_clustering_scheme.shape[0]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Initialize collection of clustering solutions
clustering_solutions = []
# Loop over prescribed clustering solutions
for i in range(n_clusterings):
# Get base clustering algorithm and check validity
clust_alg_id = str(base_clustering_scheme[i, 0])
if clust_alg_id not in self.get_valid_clust_algs():
raise RuntimeError('Invalid base clustering algorithm.')
# Get clustering features' column indexes
indexes = base_clustering_scheme[i, 2]
# Get base clustering data matrix
data_matrix = mop.get_condensed_matrix(
self._cluster_data_matrix, list(range(n_phase_voxels)),
indexes)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Perform cluster analysis
cluster_labels, _, is_n_clusters_satisfied = \
ClusterAnalysis().get_fitted_estimator(data_matrix,
clust_alg_id,
self._n_clusters)
# Check if prescribed number of clusters is satisfied
if not is_n_clusters_satisfied:
raise RuntimeError('The number of clusters ('
+ str(len(set(cluster_labels)))
+ ') obtained is different from the '
+ 'prescribed number of clusters ('
+ str(self._n_clusters) + ').')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Add clustering to collection of clustering solutions
clustering_solutions.append(cluster_labels)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Get consensus clustering
if n_clusterings > 1:
raise RuntimeError('No clustering ensemble method has been '
'implemented yet.')
else:
self.cluster_labels = cluster_labels
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Update cluster labels
self.cluster_labels, self.max_label = \
self._update_cluster_labels(self.cluster_labels, min_label)
# -------------------------------------------------------------------------
[docs] def get_n_clusters(self):
"""Get current number of clusters.
Returns
-------
n_clusters : int
Number of material phase clusters.
"""
return self._n_clusters
# -------------------------------------------------------------------------
[docs] def get_clustering_type(self):
"""Get cluster-reduced material phase adaptivity type.
Returns
-------
clustering_type : str
Type of cluster-reduced material phase.
"""
return self._clustering_type
# -------------------------------------------------------------------------
[docs] @staticmethod
def get_valid_clust_algs():
"""Get valid clustering algorithms to compute the CRMP.
Returns
----------
clust_algs : list
Clustering algorithms identifiers (str).
"""
return list(ClusterAnalysis.available_clustering_alg.keys())
# =============================================================================
[docs]class GACRMP(ACRMP):
"""Generalized Adaptive Cluster-Reduced Material Phase.
A detailed description of a Generalized Adaptive Cluster-Reduced Material
Phase can be found in Ferreira et. al (2022) [#]_.
.. [#] Ferreira, B.P., Andrade Pires, F.M. and Bessa, M.A. (2022).
*Adaptivity for clustering-based reduced-order modeling of
localized history-dependent phenomena.* Comp Methods Appl M, 393
(see `here <https://www.sciencedirect.com/science/article/pii/
S0045782522000895?via%3Dihub>`_)
Attributes
----------
_clustering_type : str
Type of cluster-reduced material phase.
_adaptive_step : int
Counter of adaptive clustering steps, with 0 associated with the base
clustering.
_adapt_split_factor : float
Adaptive clustering split factor. The adaptive clustering split factor
must be contained between 0 and 1 (included). The lower bound (0)
enforces a single split, i.e., 2 new clusters, while the upper bound
(1) is associated with a maximum defined number of new voxels.
_threshold_n_clusters : int
Threshold number of adaptive material phase number of clusters. Once
this threshold is surpassed, the adaptive procedures of the adaptive
material phase are deactivated.
_is_dynamic_split_factor : bool
True if adaptive clustering split factor is to be computed dynamically.
Otherwise, the adaptive clustering split factor is always set equal to
`_adapt_split_factor`.
_clustering_tree_nodes : dict
Clustering tree node (item, anytree.Node) associated with each material
cluster (key, str).
_root_cluster_node : anytree.Node
Clustering tree root node.
max_label : int
Clustering maximum label.
cluster_labels : numpy.ndarray (1d)
Material phase cluster labels (numpy.ndarray[int] of shape
(n_phase_voxels,)).
adaptive_time : float
Total amount of time (s) spent in the adaptive procedures.
adaptivity_lock : bool
True if the adaptive procedures are deactivated, False otherwise.
Methods
-------
perform_base_clustering(self, base_clustering_scheme, min_label=0)
Perform GACRMP base clustering.
get_valid_clust_algs():
Get valid clustering algorithms to compute the CRMP.
perform_adaptive_clustering(self, target_clusters, target_clusters_data, \
adaptive_clustering_scheme=None, min_label=0)
Perform GACRMP adaptive clustering step.
_check_adaptivity_lock(self)
Check ACRMP adaptivity locking conditions.
get_n_clusters(self)
Get current number of clusters.
get_clustering_type(self)
Get cluster-reduced material phase adaptivity type.
get_clustering_tree_nodes(self)
Get clustering tree nodes.
get_adaptivity_type_parameters()
Get ACRMP mandatory and optional adaptivity type parameters.
_set_adaptivity_type_parameters(self, adaptivity_type)
Set clustering adaptivity parameters.
get_adaptive_output(self)
Get adaptivity metrics for clustering adaptivity output.
reset_adaptive_parameters(self)
Reset clustering adaptive progress parameters.
update_adaptivity_type(self, adaptivity_type)
Update clustering adaptivity parameters.
"""
[docs] def __init__(self, mat_phase, cluster_data_matrix, n_clusters,
adaptivity_type):
"""Constructor.
Parameters
----------
mat_phase : str
Material phase label.
cluster_data_matrix: numpy.ndarray (2d)
Data matrix (numpy.ndarray of shape
(n_phase_voxels, n_features_dims)) containing the clustering
features data to perform the material phase cluster analyses.
n_clusters : int
Number of material phase clusters.
adaptivity_type : dict
Clustering adaptivity parameters.
"""
self._mat_phase = mat_phase
self._cluster_data_matrix = cluster_data_matrix
self._clustering_type = 'adaptive'
self._n_clusters = n_clusters
self._adaptivity_type = adaptivity_type
self._adaptive_step = 0
self._clustering_tree_nodes = {}
self.max_label = 0
self.cluster_labels = None
self.adaptive_time = 0
self.adaptivity_lock = False
# Set clustering adaptivity parameters
self._set_adaptivity_type_parameters(self._adaptivity_type)
# Set dynamic adaptive split factor
if abs(self._dynamic_split_factor_amp) < 1e-10:
self._is_dynamic_split_factor = False
else:
self._is_dynamic_split_factor = True
# Set clustering tree root node
root_cluster = -1
self._root_cluster_node = anytree.Node(-1)
self._clustering_tree_nodes[str(root_cluster)] = \
self._root_cluster_node
# -------------------------------------------------------------------------
[docs] def perform_base_clustering(self, base_clustering_scheme, min_label=0):
"""Perform GACRMP base clustering.
Parameters
----------
base_clustering_scheme : dict
Prescribed base clustering scheme (item, numpy.ndarray of shape
(n_clusterings, 3)) for each material phase (key, str). Each row is
associated with a unique clustering characterized by a clustering
algorithm (col 1, int), a list of features (col 2, list[int]) and a
list of the features data matrix' indexes (col 3, list[int]).
min_label : int, default=0
Minimum cluster label.
"""
# Get number of material phase voxels
n_phase_voxels = self._cluster_data_matrix.shape[0]
# Get number of prescribed clusterings
n_clusterings = base_clustering_scheme.shape[0]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Initialize collection of clustering solutions
clustering_solutions = []
# Loop over prescribed clustering solutions
for i in range(n_clusterings):
# Get base clustering algorithm and check validity
clust_alg_id = str(base_clustering_scheme[i, 0])
if clust_alg_id not in self.get_valid_clust_algs():
raise RuntimeError('Invalid base clustering algorithm.')
# Get clustering features' column indexes
indexes = base_clustering_scheme[i, 2]
# Get base clustering data matrix
data_matrix = mop.get_condensed_matrix(
self._cluster_data_matrix, list(range(n_phase_voxels)),
indexes)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Perform cluster analysis
cluster_labels, _, is_n_clusters_satisfied = \
ClusterAnalysis().get_fitted_estimator(data_matrix,
clust_alg_id,
self._n_clusters)
# Check if prescribed number of clusters is satisfied
if not is_n_clusters_satisfied:
raise RuntimeError('The number of clusters ('
+ str(len(set(cluster_labels)))
+ ') obtained is different from the '
+ 'prescribed number of clusters ('
+ str(self._n_clusters) + ').')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Add clustering to collection of clustering solutions
clustering_solutions.append(cluster_labels)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Get consensus clustering
if n_clusterings > 1:
raise RuntimeError('No clustering ensemble method has been '
'implemented yet.')
else:
self.cluster_labels = cluster_labels
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Update cluster labels
self.cluster_labels, self.max_label = \
self._update_cluster_labels(self.cluster_labels, min_label)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Loop over base clustering clusters
for cluster in set(self.cluster_labels):
# Update clustering tree
self._clustering_tree_nodes[str(cluster)] = \
anytree.Node(cluster, parent=self._root_cluster_node)
# -------------------------------------------------------------------------
[docs] @staticmethod
def get_valid_clust_algs():
"""Get valid clustering algorithms to compute the CRMP.
Returns
----------
clust_algs : list
Clustering algorithms identifiers (str).
"""
return list(ClusterAnalysis.available_clustering_alg.keys())
# -------------------------------------------------------------------------
[docs] def perform_adaptive_clustering(
self, target_clusters, target_clusters_data,
adaptive_clustering_scheme=None, min_label=0):
"""Perform GACRMP adaptive clustering step.
Refine the provided target clusters by splitting them according to the
prescribed adaptive clustering scheme.
----
Parameters
----------
target_clusters : list[int]
List with the labels (int) of clusters to be adapted.
target_clusters_data : dict
For each target cluster (key, str), store dictionary (item, dict)
containing cluster associated parameters required for the adaptive
procedures.
adaptive_clustering_scheme : dict
Prescribed adaptive clustering scheme (item, numpy.ndarray of shape
(n_clusterings, 3)) for each material phase (key, str). Each row is
associated with a unique clustering characterized by a clustering
algorithm (col 1, int), a list of features (col 2, list[int]) and a
list of the features data matrix' indexes (col 3, list[int]).
min_label : int, default=0
Minimum cluster label.
Returns
-------
adaptive_clustering_map : dict
List of new cluster labels (item, list[int]) resulting from the
adaptivity of each target cluster (key, str).
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Check for duplicated target clusters
if len(target_clusters) != len(np.unique(target_clusters)):
raise RuntimeError('List of target clusters contains duplicated '
'labels.')
# Check for unexistent target clusters
for target_cluster in target_clusters:
if target_cluster not in self.cluster_labels:
raise RuntimeError('Target cluster ' + str(target_cluster)
+ ' does not exist in material phase '
+ str(self._mat_phase))
# Check adaptive clustering scheme
if adaptive_clustering_scheme is None:
raise RuntimeError('An adaptive clustering scheme must be '
'prescribed to perform the GACRMP clustering '
'adaptivity.')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
init_time = time.time()
# Increment adaptive clustering step counter
self._adaptive_step += 1
# Initialize adaptive clustering mapping dictionary
adaptive_clustering_map = {}
# Get material phase original clustering
original_cluster_labels = copy.deepcopy(self.cluster_labels)
# Initialize new cluster label
new_cluster_label = min_label
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Initialize sorted target clusters flag
is_sorted_target_clusters = False
# Check if target clusters magnitude is available
if 'max_magnitude' in \
target_clusters_data[str(target_clusters[0])].keys():
# Set sorted target clusters flag
is_sorted_target_clusters = True
# Get target clusters magnitude
target_clusters_magnitude = {
str(cluster):
target_clusters_data[str(cluster)]['max_magnitude']
for cluster in target_clusters}
# Get target clusters in descending order of magnitude
target_clusters_sorted = \
[int(x[0]) for x in sorted(target_clusters_magnitude.items(),
key=lambda x: x[1], reverse=True)]
# Set sorted target clusters
if set(target_clusters) == set(target_clusters_sorted):
target_clusters = copy.deepcopy(target_clusters_sorted)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Loop over target clusters
for i in range(len(target_clusters)):
# Get target cluster label
target_cluster = target_clusters[i]
# Get total number of voxels associated with target cluster.
# If target cluster is already single-voxeled, skip to the next
# target cluster. Otherwise compute the number of child clusters.
# If the target cluster number of voxels is lower or equal than the
# number of child clusters, skip to the next target cluster.
n_cluster_voxels = \
np.count_nonzero(original_cluster_labels == target_cluster)
if n_cluster_voxels == 1:
continue
else:
# Get referece adaptive clustering split factor
ref_split_factor = self._adapt_split_factor
# Get target cluster dynamic split factor ability
is_cluster_dynamic_split_factor = target_clusters_data[
str(target_cluster)]['is_dynamic_split_factor']
# Set adaptive clustering split factor
if self._is_dynamic_split_factor \
and is_cluster_dynamic_split_factor:
# Get adaptive trigger ratio and magnitude
adapt_trigger_ratio = target_clusters_data[
str(target_cluster)]['adapt_trigger_ratio']
magnitude = target_clusters_data[
str(target_cluster)]['max_magnitude']
# Compute dynamic adaptive clustering split factor
adapt_split_factor = super()._dynamic_split_factor(
ref_split_factor, adapt_trigger_ratio, magnitude,
dynamic_amp=self._dynamic_split_factor_amp)
else:
adapt_split_factor = ref_split_factor
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Compute number of child clusters, enforcing at least two
# clusters
n_new_clusters = max(
2, int(round(adapt_split_factor*int(
round(1.0/self._child_cluster_vol_fraction)))))
# Compare number of child clusters and number of target cluster
# number of voxels
if n_cluster_voxels <= n_new_clusters:
continue
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Initialize target cluster mapping
adaptive_clustering_map[str(target_cluster)] = []
# Get target cluster indexes
target_cluster_idxs = \
list(*np.nonzero(original_cluster_labels == target_cluster))
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Get number of prescribed clusterings
n_clusterings = adaptive_clustering_scheme.shape[0]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Initialize collection of clustering solutions
clustering_solutions = []
# Loop over prescribed clustering solutions
for i in range(n_clusterings):
# Get adaptive clustering algorithm and check validity
clust_alg_id = str(adaptive_clustering_scheme[i, 0])
if clust_alg_id not in self.get_valid_clust_algs():
raise RuntimeError('Invalid adaptive clustering '
'algorithm.')
# Get clustering features' column indexes
indexes = adaptive_clustering_scheme[i, 2]
# Get adaptive clustering data matrix
data_matrix = mop.get_condensed_matrix(
self._cluster_data_matrix, target_cluster_idxs, indexes)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Perform cluster analysis
cluster_labels, _, is_n_clusters_satisfied = \
ClusterAnalysis().get_fitted_estimator(data_matrix,
clust_alg_id,
n_new_clusters)
# Check if prescribed number of clusters is satisfied
if not is_n_clusters_satisfied:
# If the prescribed number of clusters is not satisfied,
# proceed with the number of clusters obtained
pass
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Add clustering to collection of clustering solutions
clustering_solutions.append(cluster_labels)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Get consensus clustering
if n_clusterings > 1:
raise RuntimeError('No clustering ensemble method has been '
'implemented yet.')
else:
child_cluster_labels = cluster_labels
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Update cluster labels
child_cluster_labels, max_label = self._update_cluster_labels(
child_cluster_labels, new_cluster_label)
# Update new cluster label
new_cluster_label = max_label + 1
# Add new clusters to target cluster mapping
adaptive_clustering_map[str(target_cluster)] += \
list(set(child_cluster_labels))
# Update material phase clustering
self.cluster_labels[target_cluster_idxs] = child_cluster_labels
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Check if number of clusters threshold has been surpassed
if is_sorted_target_clusters:
if len(set(self.cluster_labels)) > self._threshold_n_clusters:
break
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Update number of material phase clusters
self._n_clusters = len(set(self.cluster_labels))
# Update material phase maximum cluster label
self.max_label = max(self.cluster_labels)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Loop over target clusters
for target_cluster in adaptive_clustering_map.keys():
# Get target cluster node
parent_node = self._clustering_tree_nodes[str(target_cluster)]
# Loop over child clusters
for child_cluster in adaptive_clustering_map[target_cluster]:
# Set child cluster tree node
self._clustering_tree_nodes[str(child_cluster)] = \
anytree.Node(child_cluster, parent=parent_node)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Update total amount of time spent in the adaptive procedures
self.adaptive_time += time.time() - init_time
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Check adaptivity threshold conditions
self._check_adaptivity_lock()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Return
return adaptive_clustering_map
# -------------------------------------------------------------------------
[docs] def _check_adaptivity_lock(self):
"""Check ACRMP adaptivity locking conditions.
Check conditions that may deactivate the adaptive procedures in the
ACRMP. Once the ACRMP adaptivity is locked, it is treated as a SCRMP
for the remainder of the problem numerical solution.
"""
# Check if the number of clusters threshold as been surpassed
if self._n_clusters > self._threshold_n_clusters:
self.adaptivity_lock = True
# -------------------------------------------------------------------------
[docs] def get_n_clusters(self):
"""Get current number of clusters.
Returns
-------
n_clusters : int
Number of material phase clusters.
"""
return self._n_clusters
# -------------------------------------------------------------------------
[docs] def get_clustering_type(self):
"""Get cluster-reduced material phase adaptivity type.
Returns
-------
clustering_type : str
Type of cluster-reduced material phase.
"""
return self._clustering_type
# -------------------------------------------------------------------------
[docs] def get_clustering_tree_nodes(self):
"""Get clustering tree nodes.
Returns
-------
clustering_tree_nodes : dict
Clustering tree node (item, anytree.Node) associated with each
material cluster (key, str).
root_cluster_node : anytree.Node
Clustering tree root node.
"""
# Output clustering tree
# anytree.exporter.DotExporter(self._root_cluster_node).to_picture(
# 'clustering_tree_nodes_phase_' + self._mat_phase + '.png')
return self._clustering_tree_nodes, self._root_cluster_node
# -------------------------------------------------------------------------
[docs] @staticmethod
def get_adaptivity_type_parameters():
"""Get ACRMP mandatory and optional adaptivity type parameters.
Besides returning the ACRMP mandatory and optional adaptivity type
parameters, this method establishes the default values for the optional
parameters.
----
Returns
----------
mandatory_parameters : dict
Mandatory adaptivity type parameters (str) and associated type
(item, type).
optional_parameters : dict
Optional adaptivity type parameters (key, str) and associated
default value (item).
"""
# Set mandatory adaptivity type parameters
mandatory_parameters = {}
# Set optional adaptivity type parameters and associated default values
optional_parameters = {'adapt_split_factor': 0.01,
'child_cluster_vol_fraction': 0.5,
'dynamic_split_factor_amp': 0.0,
'threshold_n_clusters': 10**6}
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Return
return mandatory_parameters, optional_parameters
# -------------------------------------------------------------------------
[docs] def _set_adaptivity_type_parameters(self, adaptivity_type):
"""Set clustering adaptivity parameters.
Parameters
----------
adaptivity_type : dict
Clustering adaptivity parameters.
"""
# Set mandatory adaptivity type parameters
pass
# Set optional adaptivity type parameters
self._adapt_split_factor = adaptivity_type['adapt_split_factor']
self._child_cluster_vol_fraction = \
adaptivity_type['child_cluster_vol_fraction']
self._dynamic_split_factor_amp = \
adaptivity_type['dynamic_split_factor_amp']
self._threshold_n_clusters = adaptivity_type['threshold_n_clusters']
# Set dynamic adaptive split factor
if abs(self._dynamic_split_factor_amp) < 1e-10:
self._is_dynamic_split_factor = False
else:
self._is_dynamic_split_factor = True
# -------------------------------------------------------------------------
[docs] def get_adaptive_output(self):
"""Get adaptivity metrics for clustering adaptivity output.
Returns
-------
adaptivity_output : list
List containing the adaptivity metrics associated with the
clustering adaptivity output file.
"""
# Build adaptivity output
adaptivity_output = [self._n_clusters, self._adaptive_step,
self.adaptive_time]
# Return
return adaptivity_output
# -------------------------------------------------------------------------
[docs] def reset_adaptive_parameters(self):
"""Reset clustering adaptive progress parameters."""
# Reset counter of adaptive clustering steps
self._adaptive_step = 0
# Reset time spent in adaptive procedures
self.adaptive_time = 0
# -------------------------------------------------------------------------
[docs] def update_adaptivity_type(self, adaptivity_type):
"""Update clustering adaptivity parameters.
Parameters
----------
adaptivity_type : dict
Clustering adaptivity parameters.
"""
# Update clustering adaptivity parameters
self._adaptivity_type = adaptivity_type
# Set clustering adaptivity parameters
self._set_adaptivity_type_parameters(self._adaptivity_type)
# =============================================================================
[docs]class HAACRMP(ACRMP):
"""Hierarchical Agglomerative Adaptive Cluster-Reduced Material Phase.
Attributes
----------
_clustering_type : str
Type of cluster-reduced material phase.
_linkage_matrix : numpy.ndarray (2d)
Linkage matrix associated with the hierarchical agglomerative
clustering (numpy.ndarray of shape (n_phase_voxels - 1, 4)).
_cluster_node_map : dict
Tree node id (item, int) associated with each cluster label (key, str).
_adaptive_step : int
Counter of adaptive clustering steps, with 0 associated with the base
clustering.
_adapt_split_factor : float
Adaptive clustering split factor. The adaptive clustering split factor
must be contained between 0 and 1 (included). The lower bound (0)
enforces a single split, i.e., 2 new clusters, while the upper bound
(1) is associated with a maximum defined number of new voxels.
_threshold_n_clusters : int
Threshold number of adaptive material phase number of clusters. Once
this threshold is surpassed, the adaptive procedures of the adaptive
material phase are deactivated.
_is_dynamic_split_factor : bool
True if adaptive clustering split factor is to be computed dynamically.
Otherwise, the adaptive clustering split factor is always set equal to
`_adapt_split_factor`.
max_label : int
Clustering maximum label.
cluster_labels : numpy.ndarray (1d)
Material phase cluster labels (numpy.ndarray[int] of shape
(n_phase_voxels,)).
adaptive_time : float
Total amount of time (s) spent in the adaptive procedures.
adaptivity_lock : bool
True if the adaptive procedures are deactivated, False otherwise.
Methods
-------
perform_base_clustering(self, base_clustering_scheme, min_label=0)
Perform HAACRMP base clustering.
perform_adaptive_clustering(self, target_clusters, target_clusters_data, \
adaptive_clustering_scheme=None, \
min_label=0)
Perform HAACRMP adaptive clustering step.
add_to_tree_node_list(node_list, node)
Add node to tree node list and sort by descending linkage distance.
_check_adaptivity_lock(self)
Check ACRMP adaptivity locking conditions.
print_adaptive_clustering(self, adaptive_clustering_map, \
adaptive_tree_node_map)
get_valid_clust_algs()
Get valid clustering algorithms to compute the CRMP.
get_n_clusters(self)
Get current number of clusters.
get_clustering_type(self)
Get cluster-reduced material phase adaptivity type.
get_adaptivity_type_parameters()
Get ACRMP mandatory and optional adaptivity type parameters.
_set_adaptivity_type_parameters(self, adaptivity_type)
Set clustering adaptivity parameters.
get_adaptive_output(self)
Get adaptivity metrics for clustering adaptivity output.
"""
[docs] def __init__(self, mat_phase, cluster_data_matrix, n_clusters,
adaptivity_type):
"""Constructor.
Parameters
----------
mat_phase : str
Material phase label.
cluster_data_matrix: numpy.ndarray (2d)
Data matrix (numpy.ndarray of shape
(n_phase_voxels, n_features_dims)) containing the clustering
features data to perform the material phase cluster analyses.
n_clusters : int
Number of material phase clusters.
adaptivity_type : dict
Clustering adaptivity parameters.
"""
self._mat_phase = mat_phase
self._cluster_data_matrix = cluster_data_matrix
self._clustering_type = 'adaptive'
self._n_clusters = n_clusters
self._adaptivity_type = adaptivity_type
self._linkage_matrix = None
self._cluster_node_map = None
self._adaptive_step = 0
self.max_label = 0
self.cluster_labels = None
self.adaptive_time = 0
self.adaptivity_lock = False
# Set clustering adaptivity parameters
self._set_adaptivity_type_parameters(self._adaptivity_type)
# Set dynamic adaptive split factor
if abs(self._dynamic_split_factor_amp) < 1e-10:
self._is_dynamic_split_factor = False
else:
self._is_dynamic_split_factor = True
# -------------------------------------------------------------------------
[docs] def perform_base_clustering(self, base_clustering_scheme, min_label=0):
"""Perform HAACRMP base clustering.
Parameters
----------
base_clustering_scheme : dict
Prescribed base clustering scheme (item, numpy.ndarray of shape
(n_clusterings, 3)) for each material phase (key, str). Each row is
associated with a unique clustering characterized by a clustering
algorithm (col 1, int), a list of features (col 2, list[int]) and a
list of the features data matrix' indexes (col 3, list[int]).
min_label : int, default=0
Minimum cluster label.
"""
# Get number of prescribed clusterings
n_clusterings = base_clustering_scheme.shape[0]
if n_clusterings > 1:
raise RuntimeError('A HAACRMP only accepts a single '
'hierarchical agglomerative prescribed '
'clustering.')
# Get number of material phase voxels
n_phase_voxels = self._cluster_data_matrix.shape[0]
# Get clustering algorithm and check validity
clust_alg_id = str(base_clustering_scheme[0, 0])
if clust_alg_id not in self.get_valid_clust_algs():
raise RuntimeError('An invalid clustering algorithm has been '
'prescribed.')
# Get base clustering features' column indexes
indexes = base_clustering_scheme[0, 2]
# Get base clustering data matrix
data_matrix = mop.get_condensed_matrix(self._cluster_data_matrix,
list(range(n_phase_voxels)),
indexes)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Perform cluster analysis
cluster_analysis = clstalgs.ClusterAnalysis()
cluster_labels, clust_alg, is_n_clusters_satisfied = \
cluster_analysis.get_fitted_estimator(data_matrix, clust_alg_id,
self._n_clusters)
# Check if prescribed number of clusters is satisfied
if not is_n_clusters_satisfied:
raise RuntimeError('The number of clusters ('
+ str(len(set(cluster_labels)))
+ ') obtained is different from the '
'prescribed number of clusters ('
+ str(self._n_clusters) + ').')
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Set cluster labels
self.cluster_labels = cluster_labels
# Get hierarchical agglomerative base clustering linkage matrix
self._linkage_matrix = clust_alg.get_linkage_matrix()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Update cluster labels
self.cluster_labels, self.max_label = \
self._update_cluster_labels(self.cluster_labels, min_label)
# -------------------------------------------------------------------------
[docs] def perform_adaptive_clustering(self, target_clusters,
target_clusters_data,
adaptive_clustering_scheme=None,
min_label=0):
"""Perform HAACRMP adaptive clustering step.
Refine the provided target clusters by splitting them according to the
hierarchical agglomerative tree, prioritizing child nodes by descending
order of linkage distance.
----
Parameters
----------
target_clusters : list[int]
List with the labels (int) of clusters to be adapted.
target_clusters_data : dict
For each target cluster (key, str), store dictionary (item, dict)
containing cluster associated parameters relevant for the adaptive
procedures.
adaptive_clustering_scheme : dict
Prescribed adaptive clustering scheme (item, numpy.ndarray of shape
(n_clusterings, 3)) for each material phase (key, str). Each row is
associated with a unique clustering characterized by a clustering
algorithm (col 1, int), a list of features (col 2, list[int]) and a
list of the features data matrix' indexes (col 3, list[int]).
min_label : int, default=0
Minimum cluster label.
Returns
-------
adaptive_clustering_map : dict
List of new cluster labels (item, list[int]) resulting from the
adaptivity of each target cluster (key, str).
adaptive_tree_node_map : dict
List of new cluster tree node ids (item, list[int]) resulting from
the split of each target cluster tree node id (key, str).
Validation purposes only (not returned otherwise).
"""
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Check for duplicated target clusters
if len(target_clusters) != len(np.unique(target_clusters)):
raise RuntimeError('List of target clusters contains duplicated '
'labels.')
# Check for unexistent target clusters
for target_cluster in target_clusters:
if target_cluster not in self.cluster_labels:
raise RuntimeError('Target cluster ' + str(target_cluster)
+ ' does not exist in material phase '
+ str(self._mat_phase))
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
init_time = time.time()
# Increment adaptive clustering step counter
self._adaptive_step += 1
# Initialize adaptive clustering mapping dictionary
adaptive_clustering_map = {}
# Initialize adaptive tree node mapping dictionary (validation purposes
# only)
adaptive_tree_node_map = {}
# Get current hierarchical agglomerative clustering
cluster_labels = copy.deepcopy(self.cluster_labels)
# Initialize new cluster label
new_cluster_label = min_label
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Get cluster labels (conversion to int32 is required to avoid raising
# a TypeError in scipy hierarchy leaders function)
labels = cluster_labels.astype('int32')
# Convert hierarchical agglomerative base clustering linkage matrix
# into tree object
rootnode, nodelist = sciclst.to_tree(self._linkage_matrix, rd=True)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Build initial cluster-node mapping between cluster labels and tree
# nodes associated with the hierarchical agglomerative base clustering
if self._cluster_node_map is None:
# Get root nodes of hierarchical clustering corresponding to an
# horizontal cut defined by a flat clustering assignment vector.
# L contains the tree nodes ids while M contains the corresponding
# cluster labels
L, M = sciclst.leaders(self._linkage_matrix, labels)
# Build initial cluster-node mapping
self._cluster_node_map = dict(zip([str(x) for x in M], L))
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Loop over target clusters
for i in range(len(target_clusters)):
# Get target cluster label
target_cluster = target_clusters[i]
# Get target cluster tree node instance
target_node = nodelist[self._cluster_node_map[str(target_cluster)]]
# Get total number of leaf nodes associated with target node. If
# target node is a leaf itself (not splitable), skip to the next
# target cluster
if target_node.is_leaf():
continue
# else:
# n_leaves = target_node.get_count()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Get referece adaptive clustering split factor
ref_split_factor = self._adapt_split_factor
# Get target cluster dynamic split factor ability
is_cluster_dynamic_split_factor = target_clusters_data[
str(target_cluster)]['is_dynamic_split_factor']
# Set adaptive clustering split factor
if self._is_dynamic_split_factor \
and is_cluster_dynamic_split_factor:
# Get adaptive trigger ratio and magnitude
adapt_trigger_ratio = target_clusters_data[
str(target_cluster)]['adapt_trigger_ratio']
magnitude = target_clusters_data[
str(target_cluster)]['max_magnitude']
# Compute dynamic adaptive clustering split factor
adapt_split_factor = super()._dynamic_split_factor(
ref_split_factor, adapt_trigger_ratio, magnitude,
dynamic_amp=self._dynamic_split_factor_amp)
else:
adapt_split_factor = ref_split_factor
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Compute total number of tree node splits, enforcing at least one
# split
n_splits = max(
1, int(round(adapt_split_factor*int(int(
round(1.0/self._child_cluster_vol_fraction)) - 1))))
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Initialize child nodes list
child_nodes = []
# Initialize child target nodes list
child_target_nodes = []
# Split loop
for i_split in range(n_splits):
# Set node to be splitted
if i_split == 0:
# In the first split operation, the node to be splitted is
# the target cluster tree node
node_to_split = target_node
else:
# Get maximum linkage distance child target node and remove
# it from the child target nodes list
node_to_split = child_target_nodes[0]
child_target_nodes.pop(0)
# Loop over child target node's left and right child nodes
for node in [node_to_split.get_left(),
node_to_split.get_right()]:
if node.is_leaf():
# Append to child nodes list if leaf node
child_nodes.append(node)
else:
# Append to child target nodes list if non-leaf node
child_target_nodes = self.add_to_tree_node_list(
child_target_nodes, node)
# Add remaining child target nodes to child nodes list
child_nodes += child_target_nodes
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Initialize target cluster mapping
adaptive_clustering_map[str(target_cluster)] = []
# Remove target cluster from cluster node mapping
self._cluster_node_map.pop(str(target_cluster))
# Update target cluster mapping and flat clustering labels
for node in child_nodes:
# Add new cluster to target cluster mapping
adaptive_clustering_map[str(target_cluster)].append(
new_cluster_label)
# Update flat clustering labels
labels[node.pre_order()] = new_cluster_label
# Update cluster-node mapping
self._cluster_node_map[str(new_cluster_label)] = node.id
# Increment new cluster label
new_cluster_label += 1
# Update adaptive tree node mapping dictionary (validation purposes
# only)
adaptive_tree_node_map[str(target_node.id)] = \
[x.id for x in child_nodes]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Update RVE hierarchical agglomerative clustering
self.cluster_labels = labels
# Update number of material phase clusters
self._n_clusters = len(set(self.cluster_labels))
# Update clustering maximum label
self.max_label = max(self.cluster_labels)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Update total amount of time spent in the adaptive procedures
self.adaptive_time += time.time() - init_time
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Check adaptivity threshold conditions
self._check_adaptivity_lock()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Return
return adaptive_clustering_map
# -------------------------------------------------------------------------
[docs] @staticmethod
def add_to_tree_node_list(node_list, node):
"""Add node to tree node list and sort by descending linkage distance.
Parameters
----------
node_list : list[ClusterNode]
List of ClusterNode instances.
node : ClusterNode
ClusterNode to be added to list[ClusterNode].
"""
# Check parameters
if not isinstance(node, sciclst.ClusterNode):
raise TypeError('Node must be of type ClusterNode, not '
+ str(type(node)) + '.')
if any([not isinstance(node, sciclst.ClusterNode)
for node in node_list]):
raise TypeError('Node list can only contain elements of the type '
'ClusterNode.')
# Append tree node to node list
node_list.append(node)
# Sort tree node list by descending order of linkage distance
node_list = sorted(node_list, reverse=True, key=lambda x: x.dist)
# Return sorted tree node list
return node_list
# -------------------------------------------------------------------------
[docs] def _check_adaptivity_lock(self):
"""Check ACRMP adaptivity locking conditions.
Check conditions that may deactivate the adaptive procedures in the
ACRMP. Once the ACRMP adaptivity is locked, it is treated as a SCRMP
for the remainder of the problem numerical solution.
"""
# Check if the number of clusters threshold as been surpassed
if self._n_clusters > self._threshold_n_clusters:
self.adaptivity_lock = True
# -------------------------------------------------------------------------
[docs] def print_adaptive_clustering(self, adaptive_clustering_map,
adaptive_tree_node_map):
"""Print hierarchical adaptive clustering report (validation)."""
# Print report header
print(3*'\n' + 'Hierarchical adaptive clustering report\n' + 80*'-'
+ '\n\n' + 'Material phase: ' + str(self._mat_phase))
# Print adaptive clustering adaptive step
print('\nAdaptive refinement step: ', self._adaptive_step)
# Print hierarchical adaptive CRVE
print('\n\n' + 'Adaptive clustering: ' + '('
+ str(len(np.unique(self.cluster_labels))) + ' clusters)'
+ '\n\n', self.cluster_labels)
# Print adaptive clustering mapping
print('\n\n' + 'Adaptive cluster mapping: ')
for old_cluster in adaptive_clustering_map.keys():
print(' Old cluster: ' + '{:>4s}'.format(old_cluster)
+ ' -> '
+ 'New clusters: ',
adaptive_clustering_map[str(old_cluster)])
# Print adaptive tree node mapping
print('\n\n' + 'Adaptive tree node mapping (validation): ')
for old_node in adaptive_tree_node_map.keys():
print(' Old node: ' + '{:>4s}'.format(old_node)
+ ' -> '
+ 'New nodes: ', adaptive_tree_node_map[str(old_node)])
# Print cluster-node mapping
print('\n\n' + 'Cluster-Node mapping: ')
for new_cluster in self._cluster_node_map.keys():
print(' Cluster: ' + '{:>4s}'.format(new_cluster)
+ ' -> '
+ 'Tree node: ',
self._cluster_node_map[str(new_cluster)])
# -------------------------------------------------------------------------
[docs] @staticmethod
def get_valid_clust_algs():
"""Get valid clustering algorithms to compute the CRMP.
Returns
----------
clust_algs : list[str]
Clustering algorithms identifiers (str).
"""
return ['3', ]
# -------------------------------------------------------------------------
[docs] def get_n_clusters(self):
"""Get current number of clusters.
Returns
-------
n_clusters : int
Number of material phase clusters.
"""
return self._n_clusters
# -------------------------------------------------------------------------
[docs] def get_clustering_type(self):
"""Get cluster-reduced material phase adaptivity type.
Returns
-------
clustering_type : str
Type of cluster-reduced material phase.
"""
return self._clustering_type
# -------------------------------------------------------------------------
[docs] @staticmethod
def get_adaptivity_type_parameters():
"""Get ACRMP mandatory and optional adaptivity type parameters.
Besides returning the ACRMP mandatory and optional adaptivity type
parameters, this method establishes the default values for the optional
parameters.
----
Returns
----------
mandatory_parameters : dict
Mandatory adaptivity type parameters (str) and associated type
(item, type).
optional_parameters : dict
Optional adaptivity type parameters (key, str) and associated
default value (item).
"""
# Set mandatory adaptivity type parameters
mandatory_parameters = {}
# Set optional adaptivity type parameters and associated default values
optional_parameters = {'adapt_split_factor': 0.01,
'child_cluster_vol_fraction': 0.5,
'dynamic_split_factor_amp': 0.0,
'threshold_n_clusters': 10**6}
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Return
return mandatory_parameters, optional_parameters
# -------------------------------------------------------------------------
[docs] def _set_adaptivity_type_parameters(self, adaptivity_type):
"""Set clustering adaptivity parameters.
Parameters
----------
adaptivity_type : dict
Clustering adaptivity parameters.
"""
# Set mandatory adaptivity type parameters
pass
# Set optional adaptivity type parameters
self._adapt_split_factor = adaptivity_type['adapt_split_factor']
self._child_cluster_vol_fraction = \
adaptivity_type['child_cluster_vol_fraction']
self._dynamic_split_factor_amp = \
adaptivity_type['dynamic_split_factor_amp']
self._threshold_n_clusters = adaptivity_type['threshold_n_clusters']
# -------------------------------------------------------------------------
[docs] def get_adaptive_output(self):
"""Get adaptivity metrics for clustering adaptivity output.
Returns
-------
adaptivity_output : list
List containing the adaptivity metrics associated with the
clustering adaptivity output file.
"""
# Build adaptivity output
adaptivity_output = [self._n_clusters, self._adaptive_step,
self.adaptive_time]
# Return
return adaptivity_output