4
votes

I'm looking to perform walk forward validation on my time-series data. Extensive document exists on how to perform rolling window:

enter image description here

or expanding window

enter image description here

But this validation does not correspond to what will be in my production system: I want to daily retrain a model that will make prediction 14 days in the future. So I would only add one day of data to my previous training period (where the other methods add on the following training folds an entire set of data of length test_size; 14 days in my case). Therefore, I would like to validate my model with a sliding window:

enter image description here

My question is that I can't come across a Python library that would do the work. TimeSeriesSplit from sklearn has no option of that kind. Basically I want to provide :
test_size, n_fold, min_train_size and

if n_fold > (n_samples - min_train_size) % test_size then next training_set draw data from the previous fold test_set

2
Does my answer solves your problem?Venkatachalam

2 Answers

4
votes

Looks like your requirement is make the test size as more than 1 fold. To make that change you need to tweak these lines.

I have made those changes and added a new param called n_test_folds, so that it can be customizable.

from sklearn.model_selection._split import TimeSeriesSplit
from sklearn.utils.validation import _deprecate_positional_args

from sklearn.utils import indexable
from sklearn.utils.validation import _num_samples

class WindowedTestTimeSeriesSplit(TimeSeriesSplit):
    """
    parameters
    ----------
    n_test_folds: int
        number of folds to be used as testing at each iteration.
        by default, 1.
    """
    @_deprecate_positional_args
    def __init__(self, n_splits=5, *, max_train_size=None, n_test_folds=1):
        super().__init__(n_splits, 
                         max_train_size=max_train_size)
        self.n_test_folds=n_test_folds

    def split(self, X, y=None, groups=None):
        """Generate indices to split data into training and test set.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data, where n_samples is the number of samples
            and n_features is the number of features.
        y : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.
        groups : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.
        Yields
        ------
        train : ndarray
            The training set indices for that split.
        test : ndarray
            The testing set indices for that split.
        """
        X, y, groups = indexable(X, y, groups)
        n_samples = _num_samples(X)
        n_splits = self.n_splits
        n_folds = n_splits + self.n_test_folds
        if n_folds > n_samples:
            raise ValueError(
                ("Cannot have number of folds ={0} greater"
                 " than the number of samples: {1}.").format(n_folds,
                                                             n_samples))
        indices = np.arange(n_samples)
        fold_size = (n_samples // n_folds)
        test_size = fold_size * self.n_test_folds # test window
        test_starts = range(fold_size + n_samples % n_folds,
                            n_samples-test_size+1, fold_size) # splits based on fold_size instead of test_size
        for test_start in test_starts:
            if self.max_train_size and self.max_train_size < test_start:
                yield (indices[test_start - self.max_train_size:test_start],
                       indices[test_start:test_start + test_size])
            else:
                yield (indices[:test_start],
                       indices[test_start:test_start + test_size])

Example:

import numpy as np
X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]])
y = np.array([1, 2, 3, 4, 5, 6])
tscv = WindowedTestTimeSeriesSplit(n_splits=4, n_test_folds=2)
print(tscv)

for train_index, test_index in tscv.split(X):
    print("TRAIN:", train_index, "TEST:", test_index)
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]

# WindowedTestTimeSeriesSplit(max_train_size=None, n_splits=4, n_test_folds=2)
# TRAIN: [0] TEST: [1 2]
# TRAIN: [0 1] TEST: [2 3]
# TRAIN: [0 1 2] TEST: [3 4]
# TRAIN: [0 1 2 3] TEST: [4 5]

Note: TRAIN: [0 1 2 3 4] TEST: [5] was not generated because it doesn't satisfy the requirement of number of test folds.

Using this function, we can visualize the different splits of our CV.

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
np.random.seed(1338)
cmap_data = plt.cm.Paired
cmap_cv = plt.cm.coolwarm
n_splits = 4


# Generate the class/group data
n_points = 100
X = np.random.randn(100, 10)

percentiles_classes = [.1, .3, .6]
y = np.hstack([[ii] * int(100 * perc)
               for ii, perc in enumerate(percentiles_classes)])

# Evenly spaced groups repeated once
groups = np.hstack([[ii] * 10 for ii in range(10)])

fig, ax = plt.subplots()
cv = WindowedTestTimeSeriesSplit(n_splits=n_splits, n_test_folds=2)
plot_cv_indices(cv, X, y, groups, ax, n_splits)
plt.show()

1
votes

Here is my solution that allows the user to specify the testing horizon and the minimum sample of data for training:

from sklearn.model_selection import TimeSeriesSplit
from sklearn.utils import indexable
from sklearn.utils.validation import _num_samples

class TimeSeriesSplitCustom(TimeSeriesSplit):
    def __init__(self, n_splits=5, max_train_size=None,
                 test_size=1,
                 min_train_size=1):
        super().__init__(n_splits=n_splits, max_train_size=max_train_size)
        self.test_size = test_size
        self.min_train_size = min_train_size

    def overlapping_split(self, X, y=None, groups=None):
        min_train_size = self.min_train_size
        test_size = self.test_size

        n_splits = self.n_splits
        n_samples = _num_samples(X)

        if (n_samples - min_train_size) / test_size >= n_splits:
            print('(n_samples -  min_train_size) / test_size >= n_splits')
            print('default TimeSeriesSplit.split() used')
            yield from super().split(X)

        else:
            shift = int(np.floor(
                (n_samples - test_size - min_train_size) / (n_splits - 1)))

            start_test = n_samples - (n_splits * shift + test_size - shift)

            test_starts = range(start_test, n_samples - test_size + 1, shift)

            if start_test < min_train_size:
                raise ValueError(
                    ("The start of the testing : {0} is smaller"
                     " than the minimum training samples: {1}.").format(start_test,
                                                                        min_train_size))

            indices = np.arange(n_samples)

            for test_start in test_starts:
                if self.max_train_size and self.max_train_size < test_start:
                    yield (indices[test_start - self.max_train_size:test_start],
                           indices[test_start:test_start + test_size])
                else:
                    yield (indices[:test_start],
                           indices[test_start:test_start + test_size])

And with the visualisation:

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from ModelEvaluation import TimeSeriesSplitCustom
np.random.seed(1338)
cmap_data = plt.cm.Paired
cmap_cv = plt.cm.coolwarm
n_splits = 13

# Generate the class/group data
n_points = 100
X = np.random.randn(100, 10)

percentiles_classes = [.1, .3, .6]
y = np.hstack([[ii] * int(100 * perc)
               for ii, perc in enumerate(percentiles_classes)])

# Evenly spaced groups repeated once
groups = np.hstack([[ii] * 10 for ii in range(10)])

fig, ax = plt.subplots()

cv = TimeSeriesSplitCustom(n_splits=n_splits, test_size=20, min_train_size=12)
plot_cv_indices(cv, X, y, groups, ax, n_splits)
plt.show()

enter image description here

(To have the same result, make sure to change the
for ii, (tr, tt) in enumerate(**cv.overlapping_split**(X=X, y=y, groups=group)):
in the plot_cv_indices function.

Cheers!