Module automl_alex.cross_validation
Cross-validation is a method for evaluating an analytical model and its behavior on independent data. When evaluating the model, the available data is split into k parts. Then the model is trained on k − 1 pieces of data, and the rest of the data is used for testing. The procedure is repeated k times; in the end, each of the k pieces of data is used for testing. The result is an assessment of the effectiveness of the selected model with the most even use of the available data.
Expand source code
'''
Cross-validation is a method for evaluating an analytical model and its behavior on independent data.
When evaluating the model, the available data is split into k parts.
Then the model is trained on k − 1 pieces of data, and the rest of the data is used for testing.
The procedure is repeated k times; in the end, each of the k pieces of data is used for testing.
The result is an assessment of the effectiveness of the selected model with the most even use of the available data.
'''
from typing import Callable
from typing import List
from typing import Optional
from typing import Union
import pandas as pd
import numpy as np
import copy
import os
import shutil
import optuna
from pathlib import Path
import joblib
import automl_alex
import sklearn
from sklearn.base import clone
from sklearn.model_selection import RepeatedKFold, RepeatedStratifiedKFold
from automl_alex._logger import *
from ._encoders import *
predict_proba_metrics = ['roc_auc_score', 'log_loss', 'brier_score_loss']
TMP_FOLDER = '.automl-alex_tmp/'
class CrossValidation(object):
'''Allows you to wrap any of your models in a Cross-validation.
Examples
--------
>>> from automl_alex import LightGBMClassifier, DataPrepare, CrossValidation
>>> import sklearn
>>> # Get Dataset
>>> dataset = sklearn.datasets.fetch_openml(name='adult', version=1, as_frame=True)
>>> dataset.target = dataset.target.astype('category').cat.codes
>>> X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
>>> dataset.data,
>>> dataset.target,
>>> test_size=0.2,)
>>> # clean up data before use ModelsReview
>>> de = DataPrepare()
>>> clean_X_train = de.fit_transform(X_train)
>>> clean_X_test = de.transform(X_test)
>>>
>>> # Model
>>> model = LightGBMClassifier()
>>>
>>> # CrossValidation
>>> cv = CrossValidation(estimator=model,)
>>> score, score_std = cv.fit_score(X_train, y_train, print_metric=True)
>>> cv.fit(X_train, y_train)
>>> predicts = cv.predict_test(X_test)
>>> print('Test AUC: ', round(sklearn.metrics.roc_auc_score(y_test, predicts),4))
'''
__name__ = 'CrossValidation'
_fit_models = False
fited_models = {}
'''dictionary with trained models'''
estimator = None
'''model'''
_fit_target_enc = {}
def __init__(
self,
estimator: Callable, # model
target_encoders_names: List[str] = [],
folds: int = 7,
score_folds: int = 5,
n_repeats: int = 1,
metric: Optional[Callable] = None,
print_metric: bool = False,
metric_round: int = 4,
random_state: int = 42
):
'''
Parameters
----------
estimator : Callable
model object from automl_alex.models
The object to use to fit model.
target_encoders_names : List[str]
name encoders (from automl_alex._encoders.target_encoders_names)
folds : int, optional
Number of folds., by default 7
score_folds : int, optional
Number of score folds, by default 5
n_repeats : int, optional
Number of times cross-validator needs to be repeated, by default 1
metric : Optional[Callable], optional
you can use standard metrics from sklearn.metrics or add custom metrics.
If None, the metric is selected from the type of estimator:
classifier: sklearn.metrics.roc_auc_score
regression: sklearn.metrics.mean_squared_error., by default None
print_metric : bool, optional
print metric, by default False
metric_round : int, optional
round metric score, by default 4
random_state : int, optional
Controls the generation of the random states for each repetition, by default 42
'''
self.estimator = estimator
self.folds = folds
self.score_folds = score_folds
self.n_repeats = n_repeats
self.print_metric = print_metric
self.metric_round = metric_round
self.target_encoders_names = target_encoders_names
if metric is None:
if estimator._type_of_estimator == 'classifier':
self.metric = sklearn.metrics.roc_auc_score
elif estimator._type_of_estimator == 'regression':
self.metric = sklearn.metrics.mean_squared_error
else:
self.metric = metric
if estimator._type_of_estimator == 'classifier':
self.skf = RepeatedStratifiedKFold(
n_splits=folds,
n_repeats=n_repeats,
random_state=random_state,
)
else:
self.skf = RepeatedKFold(
n_splits=folds,
n_repeats=n_repeats,
random_state=random_state,
)
def _clean_temp_folder(self):
Path(TMP_FOLDER).mkdir(parents=True, exist_ok=True)
if os.path.isdir(TMP_FOLDER+'cross-v_tmp'):
shutil.rmtree(TMP_FOLDER+'cross-v_tmp')
os.mkdir(TMP_FOLDER+'cross-v_tmp')
def fit(self,
X: pd.DataFrame,
y: Union[list, np.array, pd.DataFrame],
cat_features: Optional[List[str]] = None,
):
'''
Fit the model to the data.
Parameters
----------
X : pd.DataFrame
data (pd.DataFrame, shape (n_samples, n_features))
y : Union[list, np.array, pd.DataFrame]
target
cat_features : Optional[List[str]], optional
features name list, by default None
'''
self._clean_temp_folder()
self.cv_split_idx = [(train_idx, valid_idx) for (train_idx, valid_idx) in self.skf.split(X, y)]
for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx):
train_x, train_y = X.iloc[train_idx], y.iloc[train_idx]
# Target Encoder
if len(self.target_encoders_names) > 0:
train_x_copy = train_x.copy()
for target_enc_name in self.target_encoders_names:
self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = \
copy.deepcopy(target_encoders_names[target_enc_name](drop_invariant=True))
self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = \
self._fit_target_enc[f'{target_enc_name} _fold_{i}'].fit(train_x_copy, train_y)
data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(train_x_copy)
data_encodet = data_encodet.add_prefix(target_enc_name + '_')
train_x = train_x.join(data_encodet.reset_index(drop=True))
train_x_copy = None
train_x.fillna(0, inplace=True)
# Fit
self.estimator.fit(X_train=train_x, y_train=train_y, cat_features=cat_features)
self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'] = copy.deepcopy(self.estimator)
self._fit_models = True
return(self)
def predict_test(self, X_test):
if not self._fit_models:
raise Exception("No fit models")
stacking_y_pred_test = np.zeros(len(X_test))
for i in range(self.folds*self.n_repeats):
X_test_tmp = X_test.copy()
# Target Encoder
if len(self.target_encoders_names) > 0:
for target_enc_name in self.target_encoders_names:
data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(X_test)
data_encodet = data_encodet.add_prefix(target_enc_name + '_')
X_test_tmp = X_test_tmp.join(data_encodet.reset_index(drop=True))
X_test_tmp.fillna(0, inplace=True)
# Predict
y_pred_test = self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].predict_or_predict_proba(X_test_tmp)
stacking_y_pred_test += y_pred_test
predict = stacking_y_pred_test / (self.folds*self.n_repeats)
return(predict)
def predict_train(self, X):
if not self._fit_models:
raise Exception("No fit models")
stacking_y_pred_train = np.zeros(len(X))
for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx):
val_x = X.iloc[valid_idx]
# Target Encoder
if len(self.target_encoders_names) > 0:
val_x_copy = val_x.copy()
for target_enc_name in self.target_encoders_names:
data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(val_x_copy)
data_encodet = data_encodet.add_prefix(target_enc_name + '_')
val_x = val_x.join(data_encodet.reset_index(drop=True))
val_x_copy = None
val_x.fillna(0, inplace=True)
y_pred = self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].predict_or_predict_proba(val_x)
stacking_y_pred_train[valid_idx] += y_pred
predict = stacking_y_pred_train / self.n_repeats
return(predict)
def get_feature_importance(self, X):
if not self._fit_models:
raise Exception("No fit models")
if not self.estimator._is_possible_feature_importance():
raise Exception("Can't get the feature importance for this estimator")
feature_importance_df = pd.DataFrame(np.zeros(len(X.columns)), index=X.columns)
for i in range(self.folds*self.n_repeats):
X_tmp = X.copy()
# Target Encoder
if len(self.target_encoders_names) > 0:
for target_enc_name in self.target_encoders_names:
data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(X)
data_encodet = data_encodet.add_prefix(target_enc_name + '_')
X_tmp = X_tmp.join(data_encodet.reset_index(drop=True))
X_tmp.fillna(0, inplace=True)
# Get feature_importance
if i == 0:
feature_importance_df = \
self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].get_feature_importance(X_tmp)
feature_importance_df['value'] += \
self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].get_feature_importance(X_tmp)['value']
return(feature_importance_df)
def fit_score(self,
X: pd.DataFrame,
y: Union[list, np.array, pd.DataFrame],
print_metric=None,
trial=None,):
self._pruned_cv = False
if print_metric is None:
print_metric = self.print_metric
self.cv_split_idx = [(train_idx, valid_idx) for (train_idx, valid_idx) in self.skf.split(X, y)]
folds_scores = []
for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx):
train_x, train_y = X.iloc[train_idx], y.iloc[train_idx]
val_x, val_y = X.iloc[valid_idx], y.iloc[valid_idx]
# Target Encoder
if len(self.target_encoders_names) > 0:
val_x_copy = val_x.copy()
train_x_copy = train_x.copy()
for target_enc_name in self.target_encoders_names:
target_enc = target_encoders_names[target_enc_name](drop_invariant=True)
data_encodet = target_enc.fit_transform(train_x_copy, train_y)
data_encodet = data_encodet.add_prefix(target_enc_name + '_')
train_x = train_x.join(data_encodet.reset_index(drop=True))
data_encodet = None
val_x_data_encodet = target_enc.transform(val_x_copy)
val_x_data_encodet = val_x_data_encodet.add_prefix(target_enc_name + '_')
val_x = val_x.join(val_x_data_encodet.reset_index(drop=True))
val_x_data_encodet = None
val_x_copy = None
train_x_copy = None
train_x.fillna(0, inplace=True)
val_x.fillna(0, inplace=True)
# Fit
score_model = self.estimator.fit_score(
X_train=train_x,
y_train=train_y,
X_test=val_x,
y_test=val_y,
metric=self.metric,
print_metric=False,
metric_round=self.metric_round,
)
folds_scores.append(score_model)
if (trial is not None) and i < 1:
trial.report(score_model, i)
# Handle pruning based on the intermediate value.
if trial.should_prune():
self._pruned_cv=True
break
# score_folds
if i+1 >= self.score_folds:
break
if self._pruned_cv:
score = score_model
score_std = 0
else:
if self.score_folds > 1:
score = round(np.mean(folds_scores), self.metric_round)
score_std = round(np.std(folds_scores), self.metric_round+2)
else:
score = round(score_model, self.metric_round)
score_std = 0
if print_metric:
print(f'\n Mean Score {self.metric.__name__} on {self.score_folds} Folds: {score} std: {score_std}')
return(score, score_std)
def save(self, name='cv_dump', folder='./', verbose=1):
if not self._fit_models:
raise Exception("No fit models")
dir_tmp = TMP_FOLDER+'cross-v_tmp/'
self._clean_temp_folder()
for i in range(self.folds*self.n_repeats):
# Target Encoder
if len(self.target_encoders_names) > 0:
for target_enc_name in self.target_encoders_names:
joblib.dump(self._fit_target_enc[f'{target_enc_name} _fold_{i}'], \
f'{dir_tmp}{target_enc_name} _fold_{i}.pkl')
# Models
self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].save(f'{dir_tmp}model_{self.estimator.__name__}_fold_{i}', verbose=0)
joblib.dump(self, dir_tmp+'CV'+'.pkl')
shutil.make_archive(folder+name, 'zip', dir_tmp)
shutil.rmtree(dir_tmp)
if verbose>0:
print('Save CrossValidation')
def load(self, name='cv_dump', folder='./', verbose=1):
self._clean_temp_folder()
dir_tmp = TMP_FOLDER+'cross-v_tmp/'
shutil.unpack_archive(folder+name+'.zip', dir_tmp)
cv = joblib.load(dir_tmp+'CV'+'.pkl')
for i in range(cv.folds*cv.n_repeats):
# Target Encoder
if len(self.target_encoders_names) > 0:
for target_enc_name in self.target_encoders_names:
self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = joblib.load(f'{dir_tmp}{target_enc_name} _fold_{i}.pkl')
# Models
cv.fited_models[f'model_{self.estimator.__name__}_fold_{i}'] = \
copy.deepcopy(cv.estimator.load(f'{dir_tmp}model_{self.estimator.__name__}_fold_{i}', verbose=0))
shutil.rmtree(dir_tmp)
if verbose>0:
print('Load CrossValidation')
return(cv)
Classes
class CrossValidation (estimator: Callable, target_encoders_names: List[str] = [], folds: int = 7, score_folds: int = 5, n_repeats: int = 1, metric: Union[Callable, NoneType] = None, print_metric: bool = False, metric_round: int = 4, random_state: int = 42)
-
Allows you to wrap any of your models in a Cross-validation.
Examples
>>> from automl_alex import LightGBMClassifier, DataPrepare, CrossValidation >>> import sklearn >>> # Get Dataset >>> dataset = sklearn.datasets.fetch_openml(name='adult', version=1, as_frame=True) >>> dataset.target = dataset.target.astype('category').cat.codes >>> X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split( >>> dataset.data, >>> dataset.target, >>> test_size=0.2,) >>> # clean up data before use ModelsReview >>> de = DataPrepare() >>> clean_X_train = de.fit_transform(X_train) >>> clean_X_test = de.transform(X_test) >>> >>> # Model >>> model = LightGBMClassifier() >>> >>> # CrossValidation >>> cv = CrossValidation(estimator=model,) >>> score, score_std = cv.fit_score(X_train, y_train, print_metric=True) >>> cv.fit(X_train, y_train) >>> predicts = cv.predict_test(X_test) >>> print('Test AUC: ', round(sklearn.metrics.roc_auc_score(y_test, predicts),4))
Parameters
estimator
:Callable
- model object from automl_alex.models The object to use to fit model.
target_encoders_names
:List[str]
- name encoders (from automl_alex._encoders.target_encoders_names)
folds
:int
, optional- Number of folds., by default 7
score_folds
:int
, optional- Number of score folds, by default 5
n_repeats
:int
, optional- Number of times cross-validator needs to be repeated, by default 1
metric
:Optional[Callable]
, optional- you can use standard metrics from sklearn.metrics or add custom metrics. If None, the metric is selected from the type of estimator: classifier: sklearn.metrics.roc_auc_score regression: sklearn.metrics.mean_squared_error., by default None
print_metric
:bool
, optional- print metric, by default False
metric_round
:int
, optional- round metric score, by default 4
random_state
:int
, optional- Controls the generation of the random states for each repetition, by default 42
Expand source code
class CrossValidation(object): '''Allows you to wrap any of your models in a Cross-validation. Examples -------- >>> from automl_alex import LightGBMClassifier, DataPrepare, CrossValidation >>> import sklearn >>> # Get Dataset >>> dataset = sklearn.datasets.fetch_openml(name='adult', version=1, as_frame=True) >>> dataset.target = dataset.target.astype('category').cat.codes >>> X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split( >>> dataset.data, >>> dataset.target, >>> test_size=0.2,) >>> # clean up data before use ModelsReview >>> de = DataPrepare() >>> clean_X_train = de.fit_transform(X_train) >>> clean_X_test = de.transform(X_test) >>> >>> # Model >>> model = LightGBMClassifier() >>> >>> # CrossValidation >>> cv = CrossValidation(estimator=model,) >>> score, score_std = cv.fit_score(X_train, y_train, print_metric=True) >>> cv.fit(X_train, y_train) >>> predicts = cv.predict_test(X_test) >>> print('Test AUC: ', round(sklearn.metrics.roc_auc_score(y_test, predicts),4)) ''' __name__ = 'CrossValidation' _fit_models = False fited_models = {} '''dictionary with trained models''' estimator = None '''model''' _fit_target_enc = {} def __init__( self, estimator: Callable, # model target_encoders_names: List[str] = [], folds: int = 7, score_folds: int = 5, n_repeats: int = 1, metric: Optional[Callable] = None, print_metric: bool = False, metric_round: int = 4, random_state: int = 42 ): ''' Parameters ---------- estimator : Callable model object from automl_alex.models The object to use to fit model. target_encoders_names : List[str] name encoders (from automl_alex._encoders.target_encoders_names) folds : int, optional Number of folds., by default 7 score_folds : int, optional Number of score folds, by default 5 n_repeats : int, optional Number of times cross-validator needs to be repeated, by default 1 metric : Optional[Callable], optional you can use standard metrics from sklearn.metrics or add custom metrics. If None, the metric is selected from the type of estimator: classifier: sklearn.metrics.roc_auc_score regression: sklearn.metrics.mean_squared_error., by default None print_metric : bool, optional print metric, by default False metric_round : int, optional round metric score, by default 4 random_state : int, optional Controls the generation of the random states for each repetition, by default 42 ''' self.estimator = estimator self.folds = folds self.score_folds = score_folds self.n_repeats = n_repeats self.print_metric = print_metric self.metric_round = metric_round self.target_encoders_names = target_encoders_names if metric is None: if estimator._type_of_estimator == 'classifier': self.metric = sklearn.metrics.roc_auc_score elif estimator._type_of_estimator == 'regression': self.metric = sklearn.metrics.mean_squared_error else: self.metric = metric if estimator._type_of_estimator == 'classifier': self.skf = RepeatedStratifiedKFold( n_splits=folds, n_repeats=n_repeats, random_state=random_state, ) else: self.skf = RepeatedKFold( n_splits=folds, n_repeats=n_repeats, random_state=random_state, ) def _clean_temp_folder(self): Path(TMP_FOLDER).mkdir(parents=True, exist_ok=True) if os.path.isdir(TMP_FOLDER+'cross-v_tmp'): shutil.rmtree(TMP_FOLDER+'cross-v_tmp') os.mkdir(TMP_FOLDER+'cross-v_tmp') def fit(self, X: pd.DataFrame, y: Union[list, np.array, pd.DataFrame], cat_features: Optional[List[str]] = None, ): ''' Fit the model to the data. Parameters ---------- X : pd.DataFrame data (pd.DataFrame, shape (n_samples, n_features)) y : Union[list, np.array, pd.DataFrame] target cat_features : Optional[List[str]], optional features name list, by default None ''' self._clean_temp_folder() self.cv_split_idx = [(train_idx, valid_idx) for (train_idx, valid_idx) in self.skf.split(X, y)] for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx): train_x, train_y = X.iloc[train_idx], y.iloc[train_idx] # Target Encoder if len(self.target_encoders_names) > 0: train_x_copy = train_x.copy() for target_enc_name in self.target_encoders_names: self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = \ copy.deepcopy(target_encoders_names[target_enc_name](drop_invariant=True)) self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = \ self._fit_target_enc[f'{target_enc_name} _fold_{i}'].fit(train_x_copy, train_y) data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(train_x_copy) data_encodet = data_encodet.add_prefix(target_enc_name + '_') train_x = train_x.join(data_encodet.reset_index(drop=True)) train_x_copy = None train_x.fillna(0, inplace=True) # Fit self.estimator.fit(X_train=train_x, y_train=train_y, cat_features=cat_features) self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'] = copy.deepcopy(self.estimator) self._fit_models = True return(self) def predict_test(self, X_test): if not self._fit_models: raise Exception("No fit models") stacking_y_pred_test = np.zeros(len(X_test)) for i in range(self.folds*self.n_repeats): X_test_tmp = X_test.copy() # Target Encoder if len(self.target_encoders_names) > 0: for target_enc_name in self.target_encoders_names: data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(X_test) data_encodet = data_encodet.add_prefix(target_enc_name + '_') X_test_tmp = X_test_tmp.join(data_encodet.reset_index(drop=True)) X_test_tmp.fillna(0, inplace=True) # Predict y_pred_test = self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].predict_or_predict_proba(X_test_tmp) stacking_y_pred_test += y_pred_test predict = stacking_y_pred_test / (self.folds*self.n_repeats) return(predict) def predict_train(self, X): if not self._fit_models: raise Exception("No fit models") stacking_y_pred_train = np.zeros(len(X)) for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx): val_x = X.iloc[valid_idx] # Target Encoder if len(self.target_encoders_names) > 0: val_x_copy = val_x.copy() for target_enc_name in self.target_encoders_names: data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(val_x_copy) data_encodet = data_encodet.add_prefix(target_enc_name + '_') val_x = val_x.join(data_encodet.reset_index(drop=True)) val_x_copy = None val_x.fillna(0, inplace=True) y_pred = self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].predict_or_predict_proba(val_x) stacking_y_pred_train[valid_idx] += y_pred predict = stacking_y_pred_train / self.n_repeats return(predict) def get_feature_importance(self, X): if not self._fit_models: raise Exception("No fit models") if not self.estimator._is_possible_feature_importance(): raise Exception("Can't get the feature importance for this estimator") feature_importance_df = pd.DataFrame(np.zeros(len(X.columns)), index=X.columns) for i in range(self.folds*self.n_repeats): X_tmp = X.copy() # Target Encoder if len(self.target_encoders_names) > 0: for target_enc_name in self.target_encoders_names: data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(X) data_encodet = data_encodet.add_prefix(target_enc_name + '_') X_tmp = X_tmp.join(data_encodet.reset_index(drop=True)) X_tmp.fillna(0, inplace=True) # Get feature_importance if i == 0: feature_importance_df = \ self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].get_feature_importance(X_tmp) feature_importance_df['value'] += \ self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].get_feature_importance(X_tmp)['value'] return(feature_importance_df) def fit_score(self, X: pd.DataFrame, y: Union[list, np.array, pd.DataFrame], print_metric=None, trial=None,): self._pruned_cv = False if print_metric is None: print_metric = self.print_metric self.cv_split_idx = [(train_idx, valid_idx) for (train_idx, valid_idx) in self.skf.split(X, y)] folds_scores = [] for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx): train_x, train_y = X.iloc[train_idx], y.iloc[train_idx] val_x, val_y = X.iloc[valid_idx], y.iloc[valid_idx] # Target Encoder if len(self.target_encoders_names) > 0: val_x_copy = val_x.copy() train_x_copy = train_x.copy() for target_enc_name in self.target_encoders_names: target_enc = target_encoders_names[target_enc_name](drop_invariant=True) data_encodet = target_enc.fit_transform(train_x_copy, train_y) data_encodet = data_encodet.add_prefix(target_enc_name + '_') train_x = train_x.join(data_encodet.reset_index(drop=True)) data_encodet = None val_x_data_encodet = target_enc.transform(val_x_copy) val_x_data_encodet = val_x_data_encodet.add_prefix(target_enc_name + '_') val_x = val_x.join(val_x_data_encodet.reset_index(drop=True)) val_x_data_encodet = None val_x_copy = None train_x_copy = None train_x.fillna(0, inplace=True) val_x.fillna(0, inplace=True) # Fit score_model = self.estimator.fit_score( X_train=train_x, y_train=train_y, X_test=val_x, y_test=val_y, metric=self.metric, print_metric=False, metric_round=self.metric_round, ) folds_scores.append(score_model) if (trial is not None) and i < 1: trial.report(score_model, i) # Handle pruning based on the intermediate value. if trial.should_prune(): self._pruned_cv=True break # score_folds if i+1 >= self.score_folds: break if self._pruned_cv: score = score_model score_std = 0 else: if self.score_folds > 1: score = round(np.mean(folds_scores), self.metric_round) score_std = round(np.std(folds_scores), self.metric_round+2) else: score = round(score_model, self.metric_round) score_std = 0 if print_metric: print(f'\n Mean Score {self.metric.__name__} on {self.score_folds} Folds: {score} std: {score_std}') return(score, score_std) def save(self, name='cv_dump', folder='./', verbose=1): if not self._fit_models: raise Exception("No fit models") dir_tmp = TMP_FOLDER+'cross-v_tmp/' self._clean_temp_folder() for i in range(self.folds*self.n_repeats): # Target Encoder if len(self.target_encoders_names) > 0: for target_enc_name in self.target_encoders_names: joblib.dump(self._fit_target_enc[f'{target_enc_name} _fold_{i}'], \ f'{dir_tmp}{target_enc_name} _fold_{i}.pkl') # Models self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].save(f'{dir_tmp}model_{self.estimator.__name__}_fold_{i}', verbose=0) joblib.dump(self, dir_tmp+'CV'+'.pkl') shutil.make_archive(folder+name, 'zip', dir_tmp) shutil.rmtree(dir_tmp) if verbose>0: print('Save CrossValidation') def load(self, name='cv_dump', folder='./', verbose=1): self._clean_temp_folder() dir_tmp = TMP_FOLDER+'cross-v_tmp/' shutil.unpack_archive(folder+name+'.zip', dir_tmp) cv = joblib.load(dir_tmp+'CV'+'.pkl') for i in range(cv.folds*cv.n_repeats): # Target Encoder if len(self.target_encoders_names) > 0: for target_enc_name in self.target_encoders_names: self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = joblib.load(f'{dir_tmp}{target_enc_name} _fold_{i}.pkl') # Models cv.fited_models[f'model_{self.estimator.__name__}_fold_{i}'] = \ copy.deepcopy(cv.estimator.load(f'{dir_tmp}model_{self.estimator.__name__}_fold_{i}', verbose=0)) shutil.rmtree(dir_tmp) if verbose>0: print('Load CrossValidation') return(cv)
Class variables
var estimator
-
model
var fited_models
-
dictionary with trained models
Methods
def fit(self, X: pandas.core.frame.DataFrame, y: Union[list,
, pandas.core.frame.DataFrame], cat_features: Union[List[str], NoneType] = None) -
Fit the model to the data.
Parameters
X
:pd.DataFrame
- data (pd.DataFrame, shape (n_samples, n_features))
y
:Union[list, np.array, pd.DataFrame]
- target
cat_features
:Optional[List[str]]
, optional- features name list, by default None
Expand source code
def fit(self, X: pd.DataFrame, y: Union[list, np.array, pd.DataFrame], cat_features: Optional[List[str]] = None, ): ''' Fit the model to the data. Parameters ---------- X : pd.DataFrame data (pd.DataFrame, shape (n_samples, n_features)) y : Union[list, np.array, pd.DataFrame] target cat_features : Optional[List[str]], optional features name list, by default None ''' self._clean_temp_folder() self.cv_split_idx = [(train_idx, valid_idx) for (train_idx, valid_idx) in self.skf.split(X, y)] for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx): train_x, train_y = X.iloc[train_idx], y.iloc[train_idx] # Target Encoder if len(self.target_encoders_names) > 0: train_x_copy = train_x.copy() for target_enc_name in self.target_encoders_names: self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = \ copy.deepcopy(target_encoders_names[target_enc_name](drop_invariant=True)) self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = \ self._fit_target_enc[f'{target_enc_name} _fold_{i}'].fit(train_x_copy, train_y) data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(train_x_copy) data_encodet = data_encodet.add_prefix(target_enc_name + '_') train_x = train_x.join(data_encodet.reset_index(drop=True)) train_x_copy = None train_x.fillna(0, inplace=True) # Fit self.estimator.fit(X_train=train_x, y_train=train_y, cat_features=cat_features) self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'] = copy.deepcopy(self.estimator) self._fit_models = True return(self)
def fit_score(self, X: pandas.core.frame.DataFrame, y: Union[list,
, pandas.core.frame.DataFrame], print_metric=None, trial=None) -
Expand source code
def fit_score(self, X: pd.DataFrame, y: Union[list, np.array, pd.DataFrame], print_metric=None, trial=None,): self._pruned_cv = False if print_metric is None: print_metric = self.print_metric self.cv_split_idx = [(train_idx, valid_idx) for (train_idx, valid_idx) in self.skf.split(X, y)] folds_scores = [] for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx): train_x, train_y = X.iloc[train_idx], y.iloc[train_idx] val_x, val_y = X.iloc[valid_idx], y.iloc[valid_idx] # Target Encoder if len(self.target_encoders_names) > 0: val_x_copy = val_x.copy() train_x_copy = train_x.copy() for target_enc_name in self.target_encoders_names: target_enc = target_encoders_names[target_enc_name](drop_invariant=True) data_encodet = target_enc.fit_transform(train_x_copy, train_y) data_encodet = data_encodet.add_prefix(target_enc_name + '_') train_x = train_x.join(data_encodet.reset_index(drop=True)) data_encodet = None val_x_data_encodet = target_enc.transform(val_x_copy) val_x_data_encodet = val_x_data_encodet.add_prefix(target_enc_name + '_') val_x = val_x.join(val_x_data_encodet.reset_index(drop=True)) val_x_data_encodet = None val_x_copy = None train_x_copy = None train_x.fillna(0, inplace=True) val_x.fillna(0, inplace=True) # Fit score_model = self.estimator.fit_score( X_train=train_x, y_train=train_y, X_test=val_x, y_test=val_y, metric=self.metric, print_metric=False, metric_round=self.metric_round, ) folds_scores.append(score_model) if (trial is not None) and i < 1: trial.report(score_model, i) # Handle pruning based on the intermediate value. if trial.should_prune(): self._pruned_cv=True break # score_folds if i+1 >= self.score_folds: break if self._pruned_cv: score = score_model score_std = 0 else: if self.score_folds > 1: score = round(np.mean(folds_scores), self.metric_round) score_std = round(np.std(folds_scores), self.metric_round+2) else: score = round(score_model, self.metric_round) score_std = 0 if print_metric: print(f'\n Mean Score {self.metric.__name__} on {self.score_folds} Folds: {score} std: {score_std}') return(score, score_std)
def get_feature_importance(self, X)
-
Expand source code
def get_feature_importance(self, X): if not self._fit_models: raise Exception("No fit models") if not self.estimator._is_possible_feature_importance(): raise Exception("Can't get the feature importance for this estimator") feature_importance_df = pd.DataFrame(np.zeros(len(X.columns)), index=X.columns) for i in range(self.folds*self.n_repeats): X_tmp = X.copy() # Target Encoder if len(self.target_encoders_names) > 0: for target_enc_name in self.target_encoders_names: data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(X) data_encodet = data_encodet.add_prefix(target_enc_name + '_') X_tmp = X_tmp.join(data_encodet.reset_index(drop=True)) X_tmp.fillna(0, inplace=True) # Get feature_importance if i == 0: feature_importance_df = \ self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].get_feature_importance(X_tmp) feature_importance_df['value'] += \ self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].get_feature_importance(X_tmp)['value'] return(feature_importance_df)
def load(self, name='cv_dump', folder='./', verbose=1)
-
Expand source code
def load(self, name='cv_dump', folder='./', verbose=1): self._clean_temp_folder() dir_tmp = TMP_FOLDER+'cross-v_tmp/' shutil.unpack_archive(folder+name+'.zip', dir_tmp) cv = joblib.load(dir_tmp+'CV'+'.pkl') for i in range(cv.folds*cv.n_repeats): # Target Encoder if len(self.target_encoders_names) > 0: for target_enc_name in self.target_encoders_names: self._fit_target_enc[f'{target_enc_name} _fold_{i}'] = joblib.load(f'{dir_tmp}{target_enc_name} _fold_{i}.pkl') # Models cv.fited_models[f'model_{self.estimator.__name__}_fold_{i}'] = \ copy.deepcopy(cv.estimator.load(f'{dir_tmp}model_{self.estimator.__name__}_fold_{i}', verbose=0)) shutil.rmtree(dir_tmp) if verbose>0: print('Load CrossValidation') return(cv)
def predict_test(self, X_test)
-
Expand source code
def predict_test(self, X_test): if not self._fit_models: raise Exception("No fit models") stacking_y_pred_test = np.zeros(len(X_test)) for i in range(self.folds*self.n_repeats): X_test_tmp = X_test.copy() # Target Encoder if len(self.target_encoders_names) > 0: for target_enc_name in self.target_encoders_names: data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(X_test) data_encodet = data_encodet.add_prefix(target_enc_name + '_') X_test_tmp = X_test_tmp.join(data_encodet.reset_index(drop=True)) X_test_tmp.fillna(0, inplace=True) # Predict y_pred_test = self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].predict_or_predict_proba(X_test_tmp) stacking_y_pred_test += y_pred_test predict = stacking_y_pred_test / (self.folds*self.n_repeats) return(predict)
def predict_train(self, X)
-
Expand source code
def predict_train(self, X): if not self._fit_models: raise Exception("No fit models") stacking_y_pred_train = np.zeros(len(X)) for i, (train_idx, valid_idx) in enumerate(self.cv_split_idx): val_x = X.iloc[valid_idx] # Target Encoder if len(self.target_encoders_names) > 0: val_x_copy = val_x.copy() for target_enc_name in self.target_encoders_names: data_encodet = self._fit_target_enc[f'{target_enc_name} _fold_{i}'].transform(val_x_copy) data_encodet = data_encodet.add_prefix(target_enc_name + '_') val_x = val_x.join(data_encodet.reset_index(drop=True)) val_x_copy = None val_x.fillna(0, inplace=True) y_pred = self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].predict_or_predict_proba(val_x) stacking_y_pred_train[valid_idx] += y_pred predict = stacking_y_pred_train / self.n_repeats return(predict)
def save(self, name='cv_dump', folder='./', verbose=1)
-
Expand source code
def save(self, name='cv_dump', folder='./', verbose=1): if not self._fit_models: raise Exception("No fit models") dir_tmp = TMP_FOLDER+'cross-v_tmp/' self._clean_temp_folder() for i in range(self.folds*self.n_repeats): # Target Encoder if len(self.target_encoders_names) > 0: for target_enc_name in self.target_encoders_names: joblib.dump(self._fit_target_enc[f'{target_enc_name} _fold_{i}'], \ f'{dir_tmp}{target_enc_name} _fold_{i}.pkl') # Models self.fited_models[f'model_{self.estimator.__name__}_fold_{i}'].save(f'{dir_tmp}model_{self.estimator.__name__}_fold_{i}', verbose=0) joblib.dump(self, dir_tmp+'CV'+'.pkl') shutil.make_archive(folder+name, 'zip', dir_tmp) shutil.rmtree(dir_tmp) if verbose>0: print('Save CrossValidation')