Source code for clustering.clusteringalgs

"""Clustering analysis and algorithms.

This module includes the interface to perform a cluster analysis as well as
the interface to implement any clustering algorithm. It also includes several
wrappers over clustering algorithms available on open-source libraries (e.g.,
`SciPy <https://docs.scipy.org/doc/scipy/reference/cluster.html#>`_,
`Scikit-Learn <https://scikit-learn.org/stable/modules/clustering.html>`_).

Classes
-------
ClusterAnalysis
    Interface to perform a cluster analysis.
ClusteringAlgorithm
    Clustering algorithm interface.
AgglomerativeAlgorithm
    Hierarchical agglomerative interface.
KMeansSK
    K-Means clustering algorithm (wrapper).
MiniBatchKMeansSK
    Mini-Batch K-Means clustering algorithm (wrapper).
AgglomerativeSP
    Agglomerative clustering algorithm (wrapper).
"""
#
#                                                                       Modules
# =============================================================================
# Standard
from abc import ABC, abstractmethod
# Third-party
import numpy as np
import sklearn.cluster as skclst
import scipy.cluster.hierarchy as sciclst
#
#                                                          Authorship & Credits
# =============================================================================
__author__ = 'Bernardo Ferreira (bernardo_ferreira@brown.edu)'
__credits__ = ['Bernardo Ferreira', ]
__status__ = 'Stable'
# =============================================================================
#
# =============================================================================
#
#                                                              Cluster Analysis
# =============================================================================
[docs]class ClusterAnalysis: """Interface to perform a cluster analysis. Attributes ---------- available_clust_alg : dict Available clustering algorithms (item, str) and associated identifiers (key, str). Methods ------- get_fitted_estimator(self, data_matrix, clust_alg_id, n_clusters): Get cluster labels and clustering fitted estimator. """ available_clustering_alg = {'1': 'K-Means (scikit-learn)', '2': 'Mini-Batch K-Means (scikit-learn)', '3': 'Agglomerative (scipy)', } # -------------------------------------------------------------------------
[docs] def __init__(self): """Constructor.""" pass
# -------------------------------------------------------------------------
[docs] def get_fitted_estimator(self, data_matrix, clust_alg_id, n_clusters): """Get cluster labels and clustering fitted estimator. Parameters ---------- data_matrix : numpy.ndarray (2d) Data matrix containing the required data to perform the cluster analysis (numpy.ndarray of shape (n_items, n_features)). clust_alg_id : str Clustering algorithm identifier. n_clusters : int The number of clusters to find. Returns ------- cluster_labels : numpy.ndarray (1d) Cluster label (int) assigned to each dataset item. clust_alg : function Clustering fitted estimator. is_n_clusters_satisfied : bool True if the number of clusters obtained from the cluster analysis matches the prescribed number of clusters, False otherwise. """ # Get number of dataset items n_items = data_matrix.shape[0] # Initialize clustering cluster_labels = np.full(n_items, -1, dtype=int) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Instantiate clustering algorithm if clust_alg_id == '1': # Set number of full batch K-Means clusterings (with different # initializations) n_init = 10 # Instantiate K-Means clust_alg = KMeansSK(init='k-means++', n_init=n_init, max_iter=300, tol=1e-4, random_state=None, algorithm='lloyd', n_clusters=n_clusters) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ elif clust_alg_id == '2': # Set size of the mini-batches batch_size = 100 # Set number of random initializations n_init = 3 # Intantiate Mini-Batch K-Means clust_alg = MiniBatchKMeansSK(init='k-means++', max_iter=100, tol=0.0, random_state=None, batch_size=batch_size, max_no_improvement=10, init_size=None, n_init=n_init, reassignment_ratio=0.01, n_clusters=n_clusters) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ elif clust_alg_id == '3': # Instatiate Agglomerative clustering clust_alg = AgglomerativeSP(0, method='ward', metric='euclidean', criterion='maxclust', n_clusters=n_clusters) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ else: raise RuntimeError('Unknown clustering algorithm.') # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Perform cluster analysis cluster_labels = clust_alg.perform_clustering(data_matrix) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check if all the dataset items have been labeled if np.any(cluster_labels == -1): raise RuntimeError('At least one dataset item has not been ' 'labeled during the cluster analysis.') # Check number of clusters formed if len(set(cluster_labels)) != n_clusters: is_n_clusters_satisfied = False else: is_n_clusters_satisfied = True # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return cluster_labels, clust_alg, is_n_clusters_satisfied
# # Interface: Clustering algorithm # ============================================================================= class ClusteringAlgorithm(ABC): """Clustering algorithm interface. Methods ------- perform_clustering(self, data_matrix): *abstract*: Perform cluster analysis and get cluster label of each dataset item. """ @abstractmethod def __init__(self): """Constructor.""" pass # ------------------------------------------------------------------------- @abstractmethod def perform_clustering(self, data_matrix): """Perform cluster analysis and get cluster label of each dataset item. Parameters ---------- data_matrix : numpy.ndarray (2d) Data matrix containing the required data to perform the cluster analysis (numpy.ndarray of shape (n_items, n_features)). Returns ------- cluster_labels : numpy.ndarray (1d) Cluster label (int) assigned to each dataset item. """ pass # ----------------------------------------------------------------------------- class AgglomerativeAlgorithm(ClusteringAlgorithm): """Hierarchical agglomerative interface. Methods ------- get_linkage_matrix(self): *abstract*: Get hierarchical agglomerative clustering linkage matrix. """ @abstractmethod def get_linkage_matrix(self): """Get hierarchical agglomerative clustering linkage matrix. Returns ------- linkage_matrix : numpy.ndarray (2d) Linkage matrix associated with the hierarchical agglomerative clustering (numpy.ndarray of shape (n-1, 4)). At the i-th iteration the clusterings with indices Z[i, 0] and Z[i, 1], with distance Z[i, 2], are merged, forming a new cluster that contains Z[i, 3] original dataset items. All cluster indices j >= n refer to the cluster formed in Z[j-n, :]. Notes ----- The hierarchical agglomerative clustering linkage matrix follows the definition of `SciPy <https://docs.scipy.org/>`_ agglomerative clustering algorithm (see `here <https://docs.scipy.org/doc/scipy/ reference/generated/scipy.cluster.hierarchy.linkage.html#scipy. cluster.hierarchy.linkage>`_). """ pass # # Clustering algorithms # ============================================================================= class KMeansSK(ClusteringAlgorithm): """K-Means clustering algorithm (wrapper). Documentation: see `here <https://scikit-learn.org/stable/modules/ generated/sklearn.cluster.KMeans.html#sklearn.cluster.KMeans>`_. Methods ------- perform_clustering(self, data_matrix): Perform cluster analysis and get cluster label of each dataset item. """ def __init__(self, init='k-means++', n_init=10, max_iter=300, tol=1e-4, random_state=None, algorithm='lloyd', n_clusters=None): """Constructor. Parameters ---------- n_clusters : int, default=None Number of clusters to find. init : {‘k-means++’, ‘random’, ndarray, callable}, default=’k-means++’ Method for centroid initialization. n_init : int, default=10 Number of times K-Means is run with different centroid seeds. max_iter : int, default=300 Maximum number of iterations. tol : float, default=1e-4 Convergence tolerance (based on Frobenius norm of the different in the cluster centers of two consecutive iterations). random_state : {int, RandomState}, default=None Determines random number generation for centroid initialization. Use an int to make the randomness deterministic. algorithm : {'lloyd', 'elkan'}, default='lloyd' K-Means algorithm to use. 'lloyd' is the classical EM-style algorithm, 'elkan' uses the triangle inequality to speed up convergence. """ self.n_clusters = n_clusters self._init = init self._n_init = n_init self._max_iter = max_iter self._tol = tol self._random_state = random_state self._algorithm = algorithm # ------------------------------------------------------------------------- def perform_clustering(self, data_matrix): """Perform cluster analysis and get cluster label of each dataset item. Parameters ---------- data_matrix : numpy.ndarray (2d) Data matrix containing the required data to perform the cluster analysis (numpy.ndarray of shape (n_items, n_features)). Returns ------- cluster_labels : numpy.ndarray (1d) Cluster label (int) assigned to each dataset item. """ # Instantiate scikit-learn K-Means clustering algorithm self._clst_alg = skclst.KMeans(n_clusters=self.n_clusters, init=self._init, n_init=self._n_init, max_iter=self._max_iter, tol=self._tol, random_state=self._random_state, algorithm=self._algorithm) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Compute cluster centers (fitted estimator) and predict cluster label # (prediction) for each dataset item cluster_labels = self._clst_alg.fit_predict(data_matrix, sample_weight=None) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return cluster_labels # ============================================================================= class MiniBatchKMeansSK(ClusteringAlgorithm): """Mini-Batch K-Means clustering algorithm (wrapper). Documentation: see `here <https://scikit-learn.org/stable/modules/ generated/sklearn.cluster.MiniBatchKMeans.html#sklearn.cluster. MiniBatchKMeans>`_. Methods ------- perform_clustering(self, data_matrix): Perform cluster analysis and get cluster label of each dataset item. Notes ----- The Mini-Batch K-Means clustering algorithm is taken from scikit-learn (https://scikit-learn.org). Further information can be found in there. """ def __init__(self, init='k-means++', max_iter=100, tol=0.0, random_state=None, batch_size=100, max_no_improvement=10, init_size=None, n_init=3, reassignment_ratio=0.01, n_clusters=None): """Constructor. Parameters ---------- n_clusters : int, default=None Number of clusters to find. init: {‘k-means++’, ‘random’, numpy.ndarray, callable}, \ default=’k-means++’ Method for centroid initialization. n_init : int, default=10 Number of times K-Means is run with different centroid seeds. max_iter : int, default=300 Maximum number of iterations. tol : float, default=1e-4 Convergence tolerance (based on Frobenius norm of the different in the cluster centers of two consecutive iterations). random_state : int, RandomState instance, default=None Determines random number generation for centroid initialization. Use an int to make the randomness deterministic. init_size : int, default=None Number of samples to randomly sample for speeding up the initialization (sometimes at the expense of accuracy): the only algorithm is initialized by running a batch KMeans on a random subset of the data. n_init : int, default=3 Number of random initializations that are tried (best of initializations is used to run the algorithm). reassignment_ratio : float, default=0.01 Control the fraction of the maximum number of counts for a center to be reassigned. """ self.n_clusters = n_clusters self._init = init self._n_init = n_init self._max_iter = max_iter self._tol = tol self._random_state = random_state self._init_size = init_size self._reassignment_ratio = reassignment_ratio # ------------------------------------------------------------------------- def perform_clustering(self, data_matrix): """Perform cluster analysis and get cluster label of each dataset item. Parameters ---------- data_matrix : numpy.ndarray (2d) Data matrix containing the required data to perform the cluster analysis (numpy.ndarray of shape (n_items, n_features)). Returns ------- cluster_labels : numpy.ndarray (1d) Cluster label (int) assigned to each dataset item. """ # Instantiate scikit-learn Mini-Batch K-Means clustering algorithm self._clst_alg = skclst.MiniBatchKMeans( n_clusters=self.n_clusters, init=self._init, n_init=self._n_init, max_iter=self._max_iter, tol=self._tol, random_state=self._random_state, init_size=self._init_size, reassignment_ratio=self._reassignment_ratio) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Compute cluster centers (fitted estimator) and predict cluster label # (prediction) for each dataset item cluster_labels = self._clst_alg.fit_predict(data_matrix, sample_weight=None) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return cluster_labels # ============================================================================= class AgglomerativeSP(AgglomerativeAlgorithm): """Agglomerative clustering algorithm (wrapper). Documentation: see `here <https://docs.scipy.org/doc/scipy/reference/ cluster.hierarchy.html>`_. Methods ------- perform_clustering(self, data_matrix): Perform cluster analysis and get cluster label of each dataset item. Attributes ---------- _linkage_matrix : numpy.ndarray (2d) Linkage matrix associated with the hierarchical agglomerative clustering (numpy.ndarray of shape (n-1, 4)). At the i-th iteration the clusterings with indices Z[i, 0] and Z[i, 1], with distance Z[i, 2], are merged, forming a new cluster that contains Z[i, 3] original dataset items. All cluster indices j >= n refer to the cluster formed in Z[j-n, :]. """ def __init__(self, t, method='ward', metric='euclidean', criterion='maxclust', n_clusters=None): """Constructor. Parameters ---------- n_clusters : int, default=None The number of clusters to find. t : {int, float} Scalar parameter associated to the criterion used to form a flat clustering. Threshold (float) with criterion in {'inconsistent', 'distance', 'monocrit'} or maximum number of clusters with criterion in {'maxclust', 'maxclust_monocrit'}. method : {'single', 'complete', 'average', 'weighted', 'centroid', \ 'median', 'ward'}, default='ward' Linkage criterion. metric : {str, function}, default='euclidean' Distance metric to use when the input data matrix is a numpy.ndarray of observation vectors, otherwise ignored. Options: {'cityblock', 'euclidean', 'cosine', ...}. criterion : str, {'inconsistent', 'distance', 'maxclust', 'monocrit', \ 'maxclust_monocrit'}, default='maxclust' Criterion used to form a flat clustering (i.e., perform a horizontal cut in the hierarchical tree). """ self._t = t self.n_clusters = n_clusters self._method = method self._metric = metric self._criterion = criterion self._linkage_matrix = None # ------------------------------------------------------------------------- def perform_clustering(self, data_matrix): """Perform cluster analysis and get cluster label of each dataset item. Parameters ---------- data_matrix : numpy.ndarray (2d) Data matrix containing the required data to perform the cluster analysis (numpy.ndarray of shape (n_items, n_features)). Returns ------- cluster_labels : numpy.ndarray (1d) Cluster label (int) assigned to each dataset item. """ # Perform hierarchical clustering and encode it in a linkage matrix self._linkage_matrix = sciclst.linkage(data_matrix, method=self._method, metric=self._metric) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Perform horizontal cut in hierarchical tree and return cluster labels # (form a flat clustering) cluster_labels = sciclst.fcluster(self._linkage_matrix, self.n_clusters, criterion=self._criterion) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ return cluster_labels # ------------------------------------------------------------------------- def get_linkage_matrix(self): """Get hierarchical agglomerative clustering linkage matrix. Returns ------- linkage_matrix : numpy.ndarray (2d) Linkage matrix associated with the hierarchical agglomerative clustering (numpy.ndarray of shape (n-1, 4)). At the i-th iteration the clusterings with indices Z[i, 0] and Z[i, 1], with distance Z[i, 2], are merged, forming a new cluster that contains Z[i, 3] original dataset items. All cluster indices j >= n refer to the cluster formed in Z[j-n, :]. Notes ----- The hierarchical agglomerative clustering linkage matrix follows the definition of `SciPy <https://docs.scipy.org/>`_ agglomerative clustering algorithm (see `here <https://docs.scipy.org/doc/scipy/ reference/generated/scipy.cluster.hierarchy.linkage.html#scipy. cluster.hierarchy.linkage>`_). """ if self._linkage_matrix is None: raise ValueError('Hierarchical agglomerative clustering linkage ' 'matrix has not been computed yet.') return self._linkage_matrix