Source code for DLL.MachineLearning.SupervisedLearning.LinearModels._RANSAC

import torch
from math import ceil
from copy import deepcopy

from . import LinearRegression
from ....Exceptions import NotFittedError


[docs] class RANSACRegression: """ Implements the random sample consensus (RANSAC) regression model. Args: estimator (A regression model with fit and predict methods): A base model which is fit to random samples of the data. Defaults to LinearRegression. Attributes: best_estimator (estimator): The best model. Available after fitting. """ def __init__(self, estimator=LinearRegression()): if not hasattr(estimator, "fit") or not hasattr(estimator, "predict"): raise TypeError("estimator must have fit and predict functions.") self.estimator = estimator
[docs] def fit(self, X, y, min_samples=None, residual_threshold=None, max_trials=100, stop_inliers_prob=1, **kwargs): """ Samples random subsamples of the datapoints and fits base estimators to the subsamples. Args: X (torch.Tensor of shape (n_samples, n_features)): The input data, where each row is a sample and each column is a feature. y (torch.Tensor of shape (n_samples,)): The target values corresponding to each sample. min_samples (int | float | None, optional): The number of samples used to fit the base estimators. If float, ceil(n_samples * min_samples) is used and if None, n_features + 1 is used. Defaults to None residual_threshold (int | float | None, optional): The threshold for which larger absolute errors are considered outliers. If None, the median absolute deviation of y is used. Defaults to None. max_trials (int, optional): The number of tries to sample the data. Must be a positive integer. Defaults to 100. stop_inliers_prob (int | float, optional): If the proportion of inliers on an iteration exceeds this value, the random sampling is stopped early. Defaults to 1, i.e. the process is never stopped early as the max(n_inliers / number_of_samples_in_subsample) == 1. kwargs: Other parameters are passed to estimator.fit() """ if not isinstance(X, torch.Tensor) or not isinstance(y, torch.Tensor): raise TypeError("The input matrix and the target matrix must be a PyTorch tensor.") if X.ndim != 2: raise ValueError("The input matrix must be a 2 dimensional tensor.") if y.ndim != 1 or y.shape[0] != X.shape[0]: raise ValueError("The targets must be 1 dimensional with the same number of samples as the input data") if min_samples is not None and not isinstance(min_samples, float) and not isinstance(min_samples, int): raise TypeError("min_samples must be one of None, float or int.") if isinstance(min_samples, float) and (min_samples < 0 or min_samples > 1): raise ValueError("If min_samples is a float, it must be in range [0, 1].") if residual_threshold is not None and not isinstance(residual_threshold, float) and not isinstance(residual_threshold, int): raise TypeError("residual_threshold must be one of None, float or int.") if not isinstance(max_trials, int) or max_trials < 1: raise ValueError("max_trials must be a positive integer.") if not isinstance(stop_inliers_prob, float | int) or (stop_inliers_prob < 0 or stop_inliers_prob > 1): raise ValueError("stop_inliers_prob must be a float in range [0, 1].") n_samples, self.n_features = X.shape if isinstance(min_samples, float): min_samples = ceil(n_samples * min_samples) min_samples = X.shape[1] + 1 if min_samples is None else min_samples residual_threshold = torch.median(torch.abs(y - torch.median(y))) if residual_threshold is None else residual_threshold max_inliers = -1 for _ in range(max_trials): indicies = torch.randperm(n_samples)[:min_samples] X_ = X[indicies] y_ = y[indicies] self.estimator.fit(X_, y_, **kwargs) preds = self.estimator.predict(X_) n_inliers = torch.sum(torch.abs(y_ - preds) < residual_threshold) if n_inliers > max_inliers: max_inliers = n_inliers best_model = deepcopy(self.estimator) if n_inliers / min_samples > stop_inliers_prob: break self.best_estimator = best_model
[docs] def predict(self, X, **kwargs): """ Predicts the values of the samples using the best estimator determined in the fit method. Args: X (torch.Tensor of shape (n_samples, n_features)): The input data, where each row is a sample and each column is a feature. kwargs: Other parameters are passed to estimator.predict() """ if not hasattr(self, "best_estimator"): raise NotFittedError("RANSACRegression.predict() must be called before predicting.") if not isinstance(X, torch.Tensor): raise TypeError("The input matrix must be a PyTorch tensor.") if X.ndim != 2 or X.shape[1] != self.n_features: raise ValueError("The input matrix must be a 2 dimensional tensor with the same number of features as the fitted tensor.") return self.best_estimator.predict(X, **kwargs)