Source code for DLL.MachineLearning.SupervisedLearning.LinearModels._Elasticnet

import torch
from math import floor

from ....Data.Metrics import calculate_metrics, _round_dictionary
from ....Exceptions import NotFittedError


[docs] class ElasticNetRegression: """ Implements a linear regression model with L1 and L2 regularization. Args: alpha (int | float, optional): The regularization parameter. Larger alpha will force the l1 and l2 norms of the weights to be lower. Must be a non-negative real number. Defaults to 1. l1_ratio (int | float, optional): The proportion of l1 regularisation compared to l2 regularisation. Must be in the range [0, 1]. Defaults to 0.5. loss (:ref:`losses_section_label`, optional): A loss function used for training the model. Defaults to the mean squared error. Attributes: n_features (int): The number of features. Available after fitting. weights (torch.Tensor of shape (n_features + 1,)): The weights of the linear regression model. The first element is the bias of the model. Available after fitting. residuals (torch.Tensor of shape (n_samples,)): The residuals of the fitted model. For a good fit, the residuals should be normally distributed with zero mean and constant variance. Available after fitting. """ def __init__(self, alpha=1.0, l1_ratio=0.5): if not isinstance(alpha, int | float) or alpha < 0: raise ValueError("alpha must be a non-negative real number.") if not isinstance(l1_ratio, int | float) or not ((0 <= l1_ratio) and (l1_ratio <= 1)): raise ValueError("l1_ratio must be in range [0, 1].") self.alpha = alpha self.l1_ratio = l1_ratio
[docs] def fit(self, X, y, sample_weight=None, val_data=None, epochs=100, callback_frequency=1, metrics=["loss"], verbose=False): """ Fits the ElasticNetRegression model to the input data by minimizing the mean squared error loss function using cyclic coordinate-wise descent from `this paper <https://aaltodoc.aalto.fi/server/api/core/bitstreams/091c41eb-7348-48d3-b25d-c8de25fbab03/content>`_. Args: X (torch.Tensor of shape (n_samples, n_features)): The input data, where each row is a sample and each column is a feature. y (torch.Tensor of shape (n_samples,)): The target values corresponding to each sample. val_data (tuple[X_val, y_val] | None, optional): Optional validation samples. If None, no validation data is used. Defaults to None. epochs (int, optional): The number of training iterations. Must be a positive integer. Defaults to 100. callback_frequency (int, optional): The number of iterations between printing info from training. Must be a positive integer. Defaults to 1, which means that every iteration, info is printed assuming verbose=True. metrics (list[str], optional): The metrics that will be tracked during training. Defaults to ["loss"]. verbose (bool, optional): If True, prints info of the chosen metrics during training. Defaults to False. Returns: history (dict[str, torch.Tensor], each tensor is floor(epochs / callback_frequency) long.): A dictionary tracking the evolution of selected metrics at intervals defined by callback_frequency. Raises: TypeError: If the input matrix or the target matrix is not a PyTorch tensor or if other parameters are of wrong type. ValueError: If the input matrix or the target matrix is not the correct shape or if other parameters have incorrect values. """ if not isinstance(X, torch.Tensor) or not isinstance(y, torch.Tensor): raise TypeError("The input matrix and the target matrix must be a PyTorch tensor.") if X.ndim != 2: raise ValueError("The input matrix must be a 2 dimensional tensor.") if y.ndim != 1 or y.shape[0] != X.shape[0]: raise ValueError("The targets must be 1 dimensional with the same number of samples as the input data") if not isinstance(val_data, tuple) and val_data is not None: raise TypeError("val_data must either be a tuple containing validation samples or None.") if isinstance(val_data, tuple) and len(val_data) != 2: raise ValueError("val_data must contain both X_val and y_val.") if isinstance(val_data, tuple) and len(val_data) == 2 and (val_data[0].ndim != 2 or val_data[1].ndim != 1 or val_data[0].shape[1] != X.shape[1] or len(val_data[0]) != len(val_data[1])): raise ValueError("X_val and y_val must be of correct shape.") if not isinstance(epochs, int) or epochs <= 0: raise ValueError("epochs must be a positive integer.") if not isinstance(callback_frequency, int) or callback_frequency <= 0: raise ValueError("callback_frequency must be a positive integer.") if not isinstance(metrics, list | tuple): raise TypeError("metrics must be a list or a tuple containing the strings of wanted metrics.") if not isinstance(verbose, bool): raise TypeError("verbose must be a boolean.") if not isinstance(sample_weight, torch.Tensor) and sample_weight is not None: raise TypeError("sample_weight must be torch.Tensor or None.") if isinstance(sample_weight, torch.Tensor) and (sample_weight.ndim != 1 or len(X) != len(sample_weight)): raise ValueError("sample_weight must be of shape (n_samples,)") # self.n_features = X.shape[1] # history = {metric: torch.zeros(floor(epochs / callback_frequency)) for metric in metrics} # batch_size = len(X) if batch_size is None else batch_size # data_reader = DataReader(X, y, batch_size=batch_size, shuffle=shuffle_data, shuffle_every_epoch=shuffle_every_epoch) # self.weights = torch.randn((self.n_features,)) # self.bias = torch.zeros((1,)) # optimiser = ADAM() if optimiser is None else optimiser # optimiser.initialise_parameters([self.weights, self.bias]) # for epoch in range(epochs): # for x_batch, y_batch in data_reader.get_data(): # predictions = self.predict(x_batch) # dCdy = self.loss.gradient(predictions, y_batch) # dCdweights = (x_batch.T @ dCdy) + self.alpha * self.l1_ratio * torch.sign(self.weights) + self.alpha * (1 - self.l1_ratio) * self.weights # dCdbias = dCdy.mean(dim=0, keepdim=True) # self.weights.grad = dCdweights # self.bias.grad = dCdbias # optimiser.update_parameters() # if epoch % callback_frequency == 0: # values = calculate_metrics(data=(self.predict(X), y), metrics=metrics, loss=self.loss.loss) # if val_data is not None: # val_values = calculate_metrics(data=(self.predict(val_data[0]), val_data[1]), metrics=metrics, loss=self.loss.loss, validation=True) # values |= val_values # for metric, value in values.items(): # history[metric][int(epoch / callback_frequency)] = value # if verbose: print(f"Epoch: {epoch + 1} - Metrics: {_round_dictionary(values)}") # self.residuals = y - self.predict(X) # return history n_samples, self.n_features = X.shape X = torch.cat((torch.ones(size=(X.shape[0], 1)), X), dim=1) history = {metric: torch.zeros(floor(epochs / callback_frequency)) for metric in metrics} self.weights = torch.randn((self.n_features + 1)) if sample_weight is None: sample_weight = torch.ones(n_samples) normalisation_const = torch.sum(sample_weight[:, None] * X**2, axis=0) for epoch in range(epochs): for j in range(self.n_features + 1): residual = y - X @ self.weights + self.weights[j] * X[:, j] weighted_prod = torch.dot(X[:, j], sample_weight * residual) if normalisation_const[j] == 0: continue soft_thresholding = torch.sign(weighted_prod) * max(abs(weighted_prod) - self.alpha * self.l1_ratio, 0) / (normalisation_const[j] + self.alpha * (1 - self.l1_ratio)) self.weights[j] = soft_thresholding if epoch % callback_frequency == 0: values = calculate_metrics(data=(self.predict(X), y), metrics=metrics, loss=self._loss) if val_data is not None: val_values = calculate_metrics(data=(self.predict(val_data[0]), val_data[1]), metrics=metrics, loss=self._loss, validation=True) values |= val_values for metric, value in values.items(): history[metric][int(epoch / callback_frequency)] = value if verbose: print(f"Epoch: {epoch + 1} - Metrics: {_round_dictionary(values)}") # self.weights *= 1 + self.alpha * (1 - self.l1_ratio) # author of paper suggests this for a correction to the double shrinkage effect, however, the results are not that good. self.residuals = y - self.predict(X) return history
def _loss(self, prediction, true_outputs): return torch.sum((prediction - true_outputs) ** 2) + self.alpha * (2 * self.l1_ratio * torch.linalg.norm(self.weights, ord=1) + (1 - self.l1_ratio) * torch.linalg.norm(self.weights, ord=2) ** 2)
[docs] def predict(self, X): """ Applies the fitted ElasticNetRegression model to the input data, predicting the correct values. Args: X (torch.Tensor of shape (n_samples, n_features)): The input data to be regressed. Returns: target values (torch.Tensor of shape (n_samples,)): The predicted values corresponding to each sample. Raises: NotFittedError: If the ElasticNetRegression model has not been fitted before predicting. TypeError: If the input matrix is not a PyTorch tensor. ValueError: If the input matrix is not the correct shape. """ if not hasattr(self, "weights"): raise NotFittedError("ElasticNetRegression.fit() must be called before predicting.") if not isinstance(X, torch.Tensor): raise TypeError("The input matrix must be a PyTorch tensor.") if X.ndim != 2 or (X.shape[1] != self.n_features and (X.shape[1] != self.n_features + 1 or torch.any(X[:, 0] != torch.ones(len(X))))): raise ValueError("The input matrix must be a 2 dimensional tensor with the same number of features as the fitted tensor.") if X.shape[1] != self.n_features + 1: X = torch.cat((torch.ones(size=(X.shape[0], 1)), X), dim=1) return X @ self.weights