I have defined a custom tansformer that takes a pandas dataframe, apply a function on only one column and leaves all the remaining columns untouched. The transformer is working fine during testing, but not when I include it as part of a Pipeline.
Here's the transformer:
import re from sklearn.base import BaseEstimator, TransformerMixin class SynopsisCleaner(BaseEstimator, TransformerMixin): def __init__(self): return None def fit(self, X, y=None, **fit_params): # nothing to learn from data. return self def clean_text(self, text): text = text.lower() text = re.sub(r'@[a-zA-Z0-9_]+', '', text) text = re.sub(r'https?://[A-Za-z0-9./]+', '', text) text = re.sub(r'www.[^ ]+', '', text) text = re.sub(r'[a-zA-Z0-9]*www[a-zA-Z0-9]*com[a-zA-Z0-9]*', '', text) text = re.sub(r'[^a-zA-Z]', ' ', text) text = [token for token in text.split() if len(token) > 2] text = ' '.join(text) return text def transform(self, X, y=None, **fit_params): for i in range(X.shape[0]): X[i] = self.clean_text(X[i]) return X When I test it manually like this, it is working just as expected.
train_synopsis = SynopsisCleaner().transform(train_data['Synopsis']) But, when I include it as a part of sklearn pipeline:
from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline # part 1: defining a column transformer that learns on only one column and transforms it synopsis_clean_col_tran = ColumnTransformer(transformers=[('synopsis_clean_col_tran', SynopsisCleaner(), ['Synopsis'])], # set remainder to passthrough to pass along all the un-specified columns untouched to the next steps remainder='passthrough') # make a pipeline now with all the steps pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)]) pipe_1.fit(train_data) I get KeyError, like shown below:
--------------------------------------------------------------------------- KeyError Traceback (most recent call last) /usr/local/lib/python3.6/dist-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance) 2890 try: -> 2891 return self._engine.get_loc(casted_key) 2892 except KeyError as err: pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc() pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc() pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item() pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item() KeyError: 0 The above exception was the direct cause of the following exception: KeyError Traceback (most recent call last) 16 frames <ipython-input-10-3396fa5d6092> in <module>() 6 # make a pipeline now with all the steps 7 pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)]) ----> 8 pipe_1.fit(train_data) /usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in fit(self, X, y, **fit_params) 352 self._log_message(len(self.steps) - 1)): 353 if self._final_estimator != 'passthrough': --> 354 self._final_estimator.fit(Xt, y, **fit_params) 355 return self 356 /usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit(self, X, y) 482 # we use fit_transform to make sure to set sparse_output_ (for which we 483 # need the transformed data) to have consistent output type in predict --> 484 self.fit_transform(X, y=y) 485 return self 486 /usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit_transform(self, X, y) 516 self._validate_remainder(X) 517 --> 518 result = self._fit_transform(X, y, _fit_transform_one) 519 520 if not result: /usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in _fit_transform(self, X, y, func, fitted) 455 message=self._log_message(name, idx, len(transformers))) 456 for idx, (name, trans, column, weight) in enumerate( --> 457 self._iter(fitted=fitted, replace_strings=True), 1)) 458 except ValueError as e: 459 if "Expected 2D array, got 1D array instead" in str(e): /usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self, iterable) 1027 # remaining jobs. 1028 self._iterating = False -> 1029 if self.dispatch_one_batch(iterator): 1030 self._iterating = self._original_iterator is not None 1031 /usr/local/lib/python3.6/dist-packages/joblib/parallel.py in dispatch_one_batch(self, iterator) 845 return False 846 else: --> 847 self._dispatch(tasks) 848 return True 849 /usr/local/lib/python3.6/dist-packages/joblib/parallel.py in _dispatch(self, batch) 763 with self._lock: 764 job_idx = len(self._jobs) --> 765 job = self._backend.apply_async(batch, callback=cb) 766 # A job can complete so quickly than its callback is 767 # called before we get here, causing self._jobs to /usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in apply_async(self, func, callback) 206 def apply_async(self, func, callback=None): 207 """Schedule a func to be run""" --> 208 result = ImmediateResult(func) 209 if callback: 210 callback(result) /usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in __init__(self, batch) 570 # Don't delay the application, to avoid keeping the input 571 # arguments in memory --> 572 self.results = batch() 573 574 def get(self): /usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self) 251 with parallel_backend(self._backend, n_jobs=self._n_jobs): 252 return [func(*args, **kwargs) --> 253 for func, args, kwargs in self.items] 254 255 def __reduce__(self): /usr/local/lib/python3.6/dist-packages/joblib/parallel.py in <listcomp>(.0) 251 with parallel_backend(self._backend, n_jobs=self._n_jobs): 252 return [func(*args, **kwargs) --> 253 for func, args, kwargs in self.items] 254 255 def __reduce__(self): /usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in _fit_transform_one(transformer, X, y, weight, message_clsname, message, **fit_params) 726 with _print_elapsed_time(message_clsname, message): 727 if hasattr(transformer, 'fit_transform'): --> 728 res = transformer.fit_transform(X, y, **fit_params) 729 else: 730 res = transformer.fit(X, y, **fit_params).transform(X) /usr/local/lib/python3.6/dist-packages/sklearn/base.py in fit_transform(self, X, y, **fit_params) 569 if y is None: 570 # fit method of arity 1 (unsupervised transformation) --> 571 return self.fit(X, **fit_params).transform(X) 572 else: 573 # fit method of arity 2 (supervised transformation) <ipython-input-6-004ee595d544> in transform(self, X, y, **fit_params) 20 def transform(self, X, y=None, **fit_params): 21 for i in range(X.shape[0]): ---> 22 X[i] = self.clean_text(X[i]) 23 return X /usr/local/lib/python3.6/dist-packages/pandas/core/frame.py in __getitem__(self, key) 2900 if self.columns.nlevels > 1: 2901 return self._getitem_multilevel(key) -> 2902 indexer = self.columns.get_loc(key) 2903 if is_integer(indexer): 2904 indexer = [indexer] /usr/local/lib/python3.6/dist-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance) 2891 return self._engine.get_loc(casted_key) 2892 except KeyError as err: -> 2893 raise KeyError(key) from err 2894 2895 if tolerance is not None: KeyError: 0 What am I doing wrong here?
EDIT 1: without brackets and the column name specified as string, this is the error I see:
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-11-bdd42b09e2af> in <module>() 6 # make a pipeline now with all the steps 7 pipe_1 = Pipeline(steps=[('synopsis_cleaning', synopsis_clean_col_tran)]) ----> 8 pipe_1.fit(train_data) 3 frames /usr/local/lib/python3.6/dist-packages/sklearn/pipeline.py in fit(self, X, y, **fit_params) 352 self._log_message(len(self.steps) - 1)): 353 if self._final_estimator != 'passthrough': --> 354 self._final_estimator.fit(Xt, y, **fit_params) 355 return self 356 /usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit(self, X, y) 482 # we use fit_transform to make sure to set sparse_output_ (for which we 483 # need the transformed data) to have consistent output type in predict --> 484 self.fit_transform(X, y=y) 485 return self 486 /usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in fit_transform(self, X, y) 536 537 self._update_fitted_transformers(transformers) --> 538 self._validate_output(Xs) 539 540 return self._hstack(list(Xs)) /usr/local/lib/python3.6/dist-packages/sklearn/compose/_column_transformer.py in _validate_output(self, result) 400 raise ValueError( 401 "The output of the '{0}' transformer should be 2D (scipy " --> 402 "matrix, array, or pandas DataFrame).".format(name)) 403 404 def _validate_features(self, n_features, feature_names): ValueError: The output of the 'synopsis_clean_col_tran' transformer should be 2D (scipy matrix, array, or pandas DataFrame).