import torch
from scipy.optimize import curve_fit
from functools import partial
from . import PCA
from ....DeepLearning.Optimisers import SGD
from ...SupervisedLearning.Kernels import RBF
[docs]
class UMAP:
"""
Uniform Manifold Approximation and Projection (UMAP) class for dimensionality reduction. This implementation is based on `this paper <https://arxiv.org/pdf/1802.03426>`_ and `this article <https://github.com/NikolayOskolkov/HowUMAPWorks/blob/master/HowUMAPWorks.ipynb>`_.
Args:
n_components (int): Number of principal components to keep. The number must be a positive integer.
init (str, optional): The method for initializing the embedding. Must be in ``["spectral", "pca", "random"]``. Defaults to ``"spectral"``.
p (int, optional): The order of the chosen metric. Must be a positive integer. Defaults to 2, which corresponds to the Euclidian metric.
n_neighbor (int, optional): Controls how UMAP balances local and global structure in data. The larger this parameter is the better the global structure is conserved. A small value conserves fine details well, but may lose global structure. Must be a positive integer. Defaults to 15.
min_dist (float | int, optional): Controls the minimum distance between samples in the low dimensional space. Must be a non-negative real number. Defaults to 0.25.
learning_rate (float | int, optional): Determines how long steps do we take towards the gradient. Must be a positive real number. Defaults to 1.
Attributes:
history (list[float]): The history of the cross entropy loss function each epoch. Available after fitting the model.
"""
def __init__(self, n_components=2, init="spectral", p=2, n_neighbor=15, min_dist=0.25, learning_rate=1):
if not isinstance(n_components, int) or n_components < 1:
raise ValueError("n_components must be a positive integer.")
if not init in ["spectral", "pca", "random"]:
raise ValueError('init must be in ["spectral", "pca", "random"].')
if not isinstance(p, int) or p <= 0:
raise ValueError("p must be a positive integer.")
if not isinstance(n_neighbor, int) or n_neighbor <= 0:
raise ValueError("n_neighbor must be a positive integer.")
if not isinstance(min_dist, int | float) or min_dist < 0:
raise ValueError("min_dist must be a non-negative real number.")
if (not isinstance(learning_rate, float | int) or learning_rate <= 0):
raise ValueError('learning_rate must be a positive real number.')
self.n_components = n_components
self.init = init
self.p = p
self.learning_rate = learning_rate
self.n_neighbor = n_neighbor
self.min_dist = min_dist
self.a, self.b = self._solve_params()
def _solve_params(self):
def func(d, a, b):
return 1 / (1 + a * d ** (2 * b))
x = torch.linspace(0, 3, 300)
y = torch.full_like(x, 1)
mask = x > self.min_dist
y[mask] = torch.exp(-x[mask] + self.min_dist)
p, _ = curve_fit(func, x.numpy(), y.numpy())
return torch.tensor(p[0], dtype=torch.float32), torch.tensor(p[1], dtype=torch.float32) # a, b
def _initialize(self, X):
if self.init == "pca":
return PCA(n_components=self.n_components).fit_transform(X)
elif self.init == "spectral":
W = RBF()(X, X)
D = torch.diag(W.sum(dim=1))
L = D - W
_, eigenvectors = torch.linalg.eigh(L)
embedding = eigenvectors[:, 1:self.n_components+1]
embedding /= torch.linalg.norm(embedding, dim=1, keepdim=True) + 1e-10
return embedding
else:
return torch.normal(0, 1e-4, size=(len(X), self.n_components))
def _pairwise_affinities(self, norm, rho, sigma, row):
d = norm[row] - rho[row]
d[d < 0] = 0
return torch.exp(- d / sigma)
def _k(self, prob):
return 2 ** torch.sum(prob)
def _sigma(self, k_of_sigma, fixed_k):
sigma_lower_limit = 0
sigma_upper_limit = 1000
for _ in range(20):
approx_sigma = (sigma_lower_limit + sigma_upper_limit) / 2
if k_of_sigma(approx_sigma) < fixed_k:
sigma_lower_limit = approx_sigma
else:
sigma_upper_limit = approx_sigma
if torch.abs(fixed_k - k_of_sigma(approx_sigma)) <= 1e-5:
break
return approx_sigma
def _local_fuzzy_simplicial_set(self, norm, rho):
n = len(norm)
prob = torch.zeros((n, n))
for dist_row in range(n):
_func = partial(self._k_of_sigma, norm, rho, dist_row)
prob[dist_row] = self._pairwise_affinities(norm, rho, self._sigma(_func, self.n_neighbor), dist_row)
return prob
def _k_of_sigma(self, norm, rho, dist_row, sigma):
return self._k(self._pairwise_affinities(norm, rho, sigma, dist_row))
def _symmetric_affinities(self, X):
diff = X.unsqueeze(1) - X.unsqueeze(0)
norm = torch.norm(diff, dim=2, p=self.p) ** 2
rho = [sorted(norm[i])[1] for i in range(len(norm))] # distance to nearest neighbor
affinities = self._local_fuzzy_simplicial_set(norm, rho)
return (affinities + affinities.T) / 2 # suggested by article
# return affinities + affinities.T - affinities * affinities.T # suggested by paper
def _low_dimensional_affinities(self, low_dimensional_representation):
diff = low_dimensional_representation.unsqueeze(1) - low_dimensional_representation.unsqueeze(0)
norm = torch.norm(diff, dim=2, p=self.p)
inv_distances = 1 / (1 + self.a * norm ** (2 * self.b))
return inv_distances
def _CE(self, symmetric_affinities, low_dimensional_representation):
low_dimensional_affinities = self._low_dimensional_affinities(low_dimensional_representation)
return torch.mean(-symmetric_affinities * torch.log(low_dimensional_affinities + 1e-5) - (1 - symmetric_affinities) * torch.log(1 - low_dimensional_affinities + 1e-5))
def _CE_gradient(self, symmetric_affinities, low_dimensional_representation):
diff = low_dimensional_representation.unsqueeze(1) - low_dimensional_representation.unsqueeze(0)
norm = torch.norm(diff, dim=2, p=self.p) ** 2
inv_dist = 1 / (1 + self.a * norm ** self.b)
Q = (1 - symmetric_affinities) @ (1 / (norm + 1e-5))
Q.fill_diagonal_(0)
Q /= torch.sum(Q, dim=1, keepdims=True)
fact = (self.a * symmetric_affinities * (1e-8 + norm) ** (self.b - 1) - Q).unsqueeze(2)
return 2 * self.b * torch.sum(fact * diff * inv_dist.unsqueeze(2), dim=1)
[docs]
def fit(self, X, epochs=100, verbose=False):
"""
Wrapper for the TSNE.fit_transform(X) method.
Args:
X (torch.Tensor of shape (n_samples, n_features)): The input data, where each row is a sample and each column is a feature.
epochs (int, optional): The number of training epochs after early exaggeration. Must be a positive integer. Defaults to 100.
verbose (bool, optional): Determines if the loss is printed each epoch. Must be a boolean. Defaults to ``False``.
"""
return self.fit_transform(X, epochs, verbose)