Source code for DLL.MachineLearning.SupervisedLearning.LinearModels.TimeSeries

import torch

from . import LinearRegression


[docs] class SARIMA: """ The Seasonal auto regressive moving average model for time series analysis. Args: series (torch.Tensor of shape (n_samples,)): The time series for fitting. Must be one dimensional. order (tuple of ints): The orders of the non-seasonal parts. Follows the format (p, d, q). seasonal_order (tuple of ints): The orders of the seasonal parts. Follows the format (P, D, Q, S). If a seasonal component is not needed, the seasonal order should be put as (0, 0, 0, 1). """ def __init__(self, series, order, seasonal_order): if not isinstance(series, torch.Tensor) or series.ndim != 1: raise TypeError("series must be a one-dimensional torch tensor.") if not isinstance(order, tuple | list) or len(order) != 3: raise TypeError("order must be a tuple of length 3.") if any([not isinstance(val, int) or val < 0 for val in order]): raise ValueError("order must only contain non-negative integers.") if not isinstance(seasonal_order, tuple | list) or len(seasonal_order) != 4: raise TypeError("seasonal_order must be a tuple of length 4.") if any([not isinstance(val, int) or val < 0 for val in seasonal_order]): raise ValueError("seasonal_order must only contain non-negative integers.") self.p, self.d, self.q = order self.P, self.D, self.Q, self.S = seasonal_order self.series = series self._discarded = {} if self.d > 0: series = self._differentiate(series, order=self.d) if self.D > 0: series = self._differentiate(series, lag=self.S, order=self.D) self.diff_series = series min_length = max(self.p, self.q, self.P * self.S, self.Q * self.S) if len(self.diff_series) <= min_length: raise ValueError(f"Differentiated series' length {len(self.diff_series)} is less than or equal minimum required length {min_length} for the given orders.")
[docs] def fit(self): """ Fits the ARMA model to the given time series. Currently, the function fits two linear regression models separately for the AR and MA components. Note: This approach is suboptimal for the MA component, as it should be fitted using Kalman filters for correctness. """ residuals = self._fit_ar() self._fit_ma(residuals)
def _fit_ar(self): X, y = self._train_data(part="ar") self.ar_model = LinearRegression() self.ar_model.fit(X, y) return y - self.ar_model.predict(X) def _fit_ma(self, residuals): X, _ = self._train_data(part="ma") length = min(len(X), len(residuals)) X, residuals = X[-length:], residuals[-length:] self.ma_model = LinearRegression() self.ma_model.fit(X, residuals) def _train_data(self, part): if part == "ar": X_series, X_targets = self._lagged_terms(order=self.p) seasonal_X_series, seasonal_X_targets = self._lagged_terms(order=self.P, lag=self.S) elif part == "ma": X_series, X_targets = self._lagged_terms(order=self.q) seasonal_X_series, seasonal_X_targets = self._lagged_terms(order=self.Q, lag=self.S) length = min(len(X_series), len(seasonal_X_series)) X = torch.cat((X_series[-length:], seasonal_X_series[-length:]), dim=1) y = X_targets if len(X_targets) < len(seasonal_X_targets) else seasonal_X_targets return X, y def _lagged_terms(self, order, lag=1): points = [] for i in range(lag * order, len(self.diff_series)): indicies = i - lag * torch.arange(1, order + 1) features = self.diff_series[indicies] points.append(features) targets = self.diff_series[lag * order:] return torch.stack(points, dim=0), targets def _differentiate(self, series, lag=1, order=1): discarded = [] for _ in range(order): discarded.append(series[:lag]) series = series[lag:] - series[:-lag] self._discarded[lag] = torch.stack(discarded, dim=0) return series def _integrate(self, differenced, lag=1, order=1): for j in range(order): restored = torch.zeros(len(differenced) + lag) restored[:lag] = self._discarded[lag][-j - 1] for i in range(lag, len(restored)): restored[i] = differenced[i - lag] + restored[i - lag] differenced = restored return differenced
[docs] def predict(self, steps=1, fit_between_steps=False): """ Predicts the next values of the given time series. Args: steps (int, optional): The number of next values to predict. Must be a positive integer. Defaults to 1. fit_between_steps (bool, optional): Determines if the model should be refitted between each prediction. Defaults to False. Returns: torch.Tensor: The predicted values as a one-dimensional torch Tensor. """ if not isinstance(steps, int) or steps <= 0: raise ValueError("steps must be a positive integer.") if not isinstance(fit_between_steps, bool): raise TypeError("fit_between_steps must be a boolean.") diff_series = self.diff_series.clone() original_diff_series = self.diff_series.clone() for _ in range(steps): pred = self._predict_next(diff_series) diff_series = torch.cat((diff_series, pred), dim=0) if fit_between_steps: self.diff_series = diff_series residuals = self._fit_ar() self._fit_ma(residuals) if self.D > 0: diff_series = self._integrate(diff_series, lag=self.S, order=self.D) if self.d > 0: diff_series = self._integrate(diff_series, order=self.d) self.diff_series = original_diff_series result = diff_series return result[-steps:]
def _predict_next(self, diff_series): indicies = -torch.arange(1, self.p + 1) indicies = torch.cat((-self.S * torch.arange(1, self.P + 1), indicies), dim=0) X_ar = diff_series[indicies].unsqueeze(0) indicies = -torch.arange(1, self.q + 1) indicies = torch.cat((-self.S * torch.arange(1, self.Q + 1), indicies), dim=0) X_ma = diff_series[indicies].unsqueeze(0) ar_pred = self.ar_model.predict(X_ar) ma_correction = self.ma_model.predict(X_ma) return ar_pred + ma_correction