Skip to content

Add support for imblearn's Pipeline and Samplers. #638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from

Conversation

xKHUNx
Copy link

@xKHUNx xKHUNx commented Apr 10, 2020

I'm using dask-ml to parallelized my sklearn pipeline, which consist of imblearn pipeline. I hack the code to make it work with it.

This would be extremely useful for people that are using both libraries.

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there are some test failures. I haven't looked closely at them yet. Can you reproduce them locally?

cc @glemaitre, if you have any thoughts on how this should be supported.

@@ -326,7 +330,7 @@ def do_fit_and_score(
scorer,
return_train_score,
):
if not isinstance(est, Pipeline):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does imblearn.pipeline.Pipeline not subclass sklearn.pipeline.Pipeline?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do.

@glemaitre
Copy link
Contributor

cc @glemaitre, if you have any thoughts on how this should be supported.

Basically, we just inherit from Pipeline and allow to call fit_resample during fit of the Pipeline.
Actually, it would be interesting to have some supported resamplers in dask: random under- and over-sampling. The rest of the sampler would not make so much sense with large dataset and not sure that you can generate sample without a full view on the dataset. However, random sampler could be handy to resample on each node maybe.

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xKHUNx can you summarize the high-level changes? I'm a little concerned that this is affecting so many small places? For example, why does the return type of fit_transform need to be changed? Can that be limited to a new fit_reshape method that returns the transformed yt as well?

Comment on lines +34 to +37
try:
from imblearn.pipeline import Pipeline
except:
from sklearn.pipeline import Pipeline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Can we always use sklearn.pipeline.Pipeline and check for hasattr(x, "fit_resample")?

Copy link
Author

@xKHUNx xKHUNx Apr 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xKHUNx can you summarize the high-level changes? I'm a little concerned that this is affecting so many small places? For example, why does the return type of fit_transform need to be changed? Can that be limited to a new fit_reshape method that returns the transformed yt as well?

I assume you meant fit_resample, correct me if I am wrong. My high level idea is that yt needs to be propagated throughout the pipeline as component that implements fit_resample might change the y values. The code works for my personal use case, where I perform RandomizedCVSearch with imblearn component in my pipeline. I can't post my code here as it is related to my work.

Why is this needed? Can we always use sklearn.pipeline.Pipeline and check for hasattr(x, "fit_resample")?

This is needed when the refit option is selected. If I understand correctly, when the refit option is selected, dask-ml will fit the pipeline normally as a whole, without going through each compoennt in the pipeline. In this scenario, if a normal sklearn pipeline is used, it won't handle pipeline component that implements fit_resample instead of fit_transform. On the other hand, if imblearn's pipeline is used, it would behave as it should.

@stsievert
Copy link
Member

Would this PR close #317? It seems certainly is related.

@TomAugspurger
Copy link
Member

TomAugspurger commented May 1, 2020

@xKHUNx can you include a test with for changes (will need to include imblearn in the environment file)? It'd be helpful to have an example to play with.

@sephib
Copy link

sephib commented May 10, 2020

Would this PR close #317? It seems certainly is related.

Yes

@TomAugspurger
Copy link
Member

@xKHUNx are you able to provide a simple test / example using this. That'd help me with reviewing.

@TomAugspurger
Copy link
Member

@xKHUNx are you able to add tests here?

@xKHUNx
Copy link
Author

xKHUNx commented Jul 28, 2020

I have not test it myself, but this should do:

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from imblearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from imblearn.over_sampling import SMOTE
from scipy.stats import loguniform
from dask_ml.model_selection import RandomizedSearchCV
from dask.distributed import Client

# Initiate Dask client
client = Client('127.0.0.1:8786')

# Fix random seed
np.random.seed(0)

# Read data from Titanic dataset.
titanic_url = ('https://github.com/raw/amueller/'
               'scipy-2017-sklearn/091d371/notebooks/datasets/titanic3.csv')
data = pd.read_csv(titanic_url)

# We create the preprocessing pipelines for both numeric and categorical data.
numeric_features = ['age', 'fare']
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())])

categorical_features = ['embarked', 'sex', 'pclass']
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)])

# Append classifier to preprocessing pipeline.
# Now we have a full prediction pipeline.
clf = Pipeline(steps=[('preprocessor', preprocessor),
                      ('smote', SMOTE(sampling_strategy='all')),
                      ('classifier', LogisticRegression(solver='lbfgs'))])

X = data.drop('survived', axis=1)
y = data['survived']

params = {
    'classifier__C': loguniform(0.001, 1)
}

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

final_clf = RandomizedSearchCV(clf, n_iter=10, param_distributions=params, cv=3, n_jobs=-1, refit=True)
final_clf.fit(X_train, y_train)
print("model score: %.3f" % final_clf.score(X_test, y_test))

I'm not familiar with writing test scripts, let alone Dask's one. So, it would be great if someone can turn this into a test script.

@TomAugspurger
Copy link
Member

I won't have time, unfortunately. Are you interested in working on this PR still?

@xKHUNx
Copy link
Author

xKHUNx commented Jul 29, 2020

Are you interested in working on this PR still?

This PR is good enough for my personal use case, I don't see the need to improve it for the near future.

@TomAugspurger
Copy link
Member

OK. We would need to add a tests, clean up a few things, and clarify a couple changes before this will be merged. Let me know if you want to pick it up again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants