from concurrent.futures import ProcessPoolExecutor
from typing import Optional
from typing import TypeVar
import numpy as np
from sklearn.base import BaseEstimator # type: ignore
from sklearn.base import TransformerMixin # type: ignore
from sklearn.utils.validation import check_array # type: ignore
from sklearn.utils.validation import check_is_fitted # type: ignore
from fracdiff.fdiff import fdiff
from .fracdiff import Fracdiff
from .stat import StatTester
T = TypeVar("T", bound="FracdiffStat")
[docs]class FracdiffStat(TransformerMixin, BaseEstimator):
"""A scikit-learn transformer to compute fractional differentiation,
where the order is chosen as the minumum order that makes fracdiff stationary.
Parameters
----------
window : int > 0 or None, default 10
Number of observations to compute each element in the output.
mode : {"same", "valid"}, default "same"
See :func:`fracdiff.fdiff` for details.
window_policy : {"fixed"}, default "fixed"
If "fixed" :
Fixed window method.
Every term in the output is evaluated using `window` observations.
In other words, a fracdiff operator, which is a polynominal of a backshift
operator, is truncated up to the `window`-th term.
The beginning `window - 1` elements in output are filled with
``numpy.nan``.
If "expanding" (not available) :
Expanding window method.
Every term in fracdiff time-series is evaluated using at least `window`
observations.
The beginning `window - 1` elements in output are filled with
``numpy.nan``.
stattest : {"ADF"}, default "ADF"
Method of stationarity test.
pvalue : float, default 0.05
Threshold of p-value to judge stationarity.
precision : float, default .01
Precision for the order of differentiation.
upper : float, default 1.0
Upper limit of the range to search the order.
lower : float, default 0.0
Lower limit of the range to search the order.
n_jobs : int, default None
The number of jobs to run `fit` in parallel. -1 means using all processors
Attributes
----------
d_ : numpy.array, shape (n_features,)
Minimum order of fractional differentiation
that makes time-series stationary.
Note
----
If ``upper`` th differentiation of series is still non-stationary,
``order_`` is set to ``numpy.nan``.
If ``lower`` th differentiation of series is already stationary,
``order_`` is set to ``lower``, but the true value may be smaller.
Examples
--------
>>> from fracdiff.sklearn import FracdiffStat
>>> np.random.seed(42)
>>> X = np.random.randn(100, 4).cumsum(0)
>>> f = FracdiffStat().fit(X)
>>> f.d_
array([0.140625 , 0.5078125, 0.3984375, 0.140625 ])
>>> X = f.transform(X)
"""
def __init__(
self,
window: int = 10,
mode: str = "same",
window_policy: str = "fixed",
stattest: str = "ADF",
pvalue: float = 0.05,
precision: float = 0.01,
upper: float = 1.0,
lower: float = 0.0,
n_jobs: Optional[int] = None,
) -> None:
self.window = window
self.mode = mode
self.window_policy = window_policy
self.stattest = stattest
self.pvalue = pvalue
self.precision = precision
self.upper = upper
self.lower = lower
self.n_jobs = n_jobs
[docs] def fit(self: T, X: np.ndarray, y: None = None) -> T:
"""
Fit the model with `X`.
Parameters
----------
X : array_like, shape (n_samples, n_features)
Time-series to perform fractional differentiation.
Here `n_samples` is the number of samples and `n_features` is the number of
features.
y : array_like, optional
Ignored.
Returns
-------
self : object
Returns the instance itself.
"""
check_array(X)
self.d_ = self._find_features_d(np.asarray(X))
if np.isnan(self.d_).any():
raise RuntimeWarning("d_ has nan. You may want to increase `upper`.")
return self
def _is_stat(self, x: np.ndarray) -> bool:
return StatTester(method=self.stattest).is_stat(x, pvalue=self.pvalue)
def _find_features_d(self, X: np.ndarray) -> np.ndarray:
features = (X[:, i] for i in range(X.shape[1]))
if self.n_jobs is not None and self.n_jobs != 1:
# If n_jobs == -1, use all CPUs
max_workers = self.n_jobs if self.n_jobs != -1 else None
with ProcessPoolExecutor(max_workers=max_workers) as exec:
d_ = exec.map(self._find_d, features)
else:
d_ = map(self._find_d, features)
return np.array(list(d_))
def _find_d(self, x: np.ndarray) -> float:
"""
Carry out binary search of minimum order of fractional
differentiation to make the time-series stationary.
Parameters
----------
x : array, shape (n,)
Returns
-------
d : float
"""
def diff(d: float) -> np.ndarray:
return fdiff(x, d, window=self.window, mode=self.mode)
if not self._is_stat(diff(self.upper)):
return np.nan
if self._is_stat(diff(self.lower)):
return self.lower
upper, lower = self.upper, self.lower
while upper - lower > self.precision:
m = (upper + lower) / 2
if self._is_stat(diff(m)):
upper = m
else:
lower = m
return upper