Source code for DLL.MachineLearning.UnsupervisedLearning.Clustering._SpectralClustering

import torch

from ...SupervisedLearning.Kernels import RBF, _Base
from . import KMeansClustering
from ....Exceptions import NotFittedError


[docs] class SpectralClustering: """ SpectralClustering implements the spectral clustering algorithm, which partitions data points into `k` clusters. Args: kernel (:ref:`kernel_section_label`, optional): The similarity function for fitting the model. Defaults to RBF(correlation_length=0.1). k (int, optional): The number of clusters. Defaults to 3. Must be a positive integer. max_iters (int, optional): The maximum number of iterations for training the model. Defaults to 100. Must be a positive integer. normalise (bool, optional): Determines if the laplacian matrix is calculated using L = I - sqrt(inv(D)) A sqrt(inv(D)) or just L = D - A. Defaults to True. use_kmeans (bool, optional): Determines if the clustring in embedded space is done using kmeans or discretisation. Defaults to True. **kwargs: Other arguments are passed into the `KMeansClustering` algorithm. Note: The result depends heavily on the chosen kernel function. Especially the correlation_length parameter should be fine-tuned for optimal performance. """ def __init__(self, kernel=RBF(correlation_length=0.1), k=3, max_iters=100, normalise=True, use_kmeans=True, **kwargs): if not isinstance(kernel, _Base): raise ValueError("kernel must be from DLL.MachineLearning.SupervisedLearning.Kernels") if not isinstance(k, int) or k < 1: raise ValueError("k must be a positive integer.") if not isinstance(max_iters, int) or max_iters < 1: raise ValueError("max_iters must be a positive integer.") if not isinstance(normalise, bool): raise TypeError("normalise must be a boolean.") if not isinstance(use_kmeans, bool): raise TypeError("use_kmeans must be a boolean.") self.kernel = kernel self.k = k self.max_iters = max_iters self.normalise = normalise self.use_kmeans = use_kmeans if self.use_kmeans: self.kmeans = KMeansClustering(k=self.k, max_iters=self.max_iters, **kwargs) def _transform_data(self, X): similarity_matrix = self.kernel(X, X) if self.normalise: D_sqrt_inv = torch.diag(1.0 / torch.sqrt(similarity_matrix.sum(axis=1))) laplacian_matrix = torch.eye(X.shape[0]) - D_sqrt_inv @ similarity_matrix @ D_sqrt_inv else: degree_matrix = torch.diag(similarity_matrix.sum(axis=1)) laplacian_matrix = degree_matrix - similarity_matrix _, eigenvectors = torch.linalg.eigh(laplacian_matrix) selected_eigenvectors = eigenvectors[:, :self.k] return selected_eigenvectors def _discretize_spectral_embedding(self, selected_eigenvectors): selected_eigenvectors = selected_eigenvectors / torch.linalg.norm(selected_eigenvectors, axis=1, keepdims=True) n_samples = len(selected_eigenvectors) H = torch.zeros((n_samples, self.k), dtype=selected_eigenvectors.dtype) indices = torch.argmax(selected_eigenvectors, axis=1) H[torch.arange(n_samples), indices] = 1 for _ in range(self.max_iters): U, _, Vt = torch.linalg.svd(selected_eigenvectors.T @ H) Q = U @ Vt H_new = torch.argmax(selected_eigenvectors @ Q, axis=1) H = torch.zeros((n_samples, self.k), dtype=selected_eigenvectors.dtype) H[torch.arange(n_samples), H_new] = 1 labels = torch.argmax(H, axis=1) return labels
[docs] def fit(self, X): """ Fits the algorithm to the given data. Transforms the data into the embedding space using the kernel function and clusters the data in the space. Args: X (torch.Tensor of shape (n_samples, n_features)): The input data, where each row is a sample and each column is a feature. The number of samples must be atleast `k`. """ if not isinstance(X, torch.Tensor): raise TypeError("The input matrix must be a PyTorch tensor.") if X.ndim != 2 or X.shape[0] < self.k: raise ValueError("The input matrix must be a 2 dimensional tensor with atleast k samples.") self.selected_eigenvectors = self._transform_data(X) if self.use_kmeans: self.kmeans.fit(self.selected_eigenvectors) else: self._labels = self._discretize_spectral_embedding(self.selected_eigenvectors)
[docs] def predict(self): """ Applies the fitted SpectralClustering model to the input data, partitioning it to `k` clusters. Returns: labels (torch.Tensor of shape (n_samples,)): The cluster corresponding to each sample. Raises: NotFittedError: If the SpectralClustering model has not been fitted before predicting. """ if not hasattr(self, "selected_eigenvectors"): raise NotFittedError("SpectralClustering.fit() must be called before predicting.") if self.use_kmeans: return self.kmeans.predict(self.selected_eigenvectors) else: return self._labels