Source code for dtale.column_replacements

import re

import numpy as np
import pandas as pd
from six import string_types

import dtale.global_state as global_state
from dtale.utils import classify_type, find_dtype


[docs]class ColumnReplacement(object): def __init__(self, data_id, col, replacement_type, cfg, name=None): self.data_id = data_id if replacement_type == "spaces": self.builder = SpaceReplacement(col, cfg, name) elif replacement_type == "strings": self.builder = StringReplacement(col, cfg, name) elif replacement_type == "value": self.builder = ValueReplacement(col, cfg, name) elif replacement_type == "imputer": # iterative, knn, simple self.builder = ImputerReplacement(col, cfg, name) else: raise NotImplementedError( "'{}' replacement not implemented yet!".format(replacement_type) )
[docs] def build_replacements(self): return self.builder.build_column(global_state.get_data(self.data_id))
[docs] def build_code(self): return self.builder.build_code(global_state.get_data(self.data_id))
[docs]def get_inner_replacement_value(val): return np.nan if isinstance(val, string_types) and val.lower() == "nan" else val
[docs]def get_replacement_value(cfg, prop): value = (cfg or {}).get(prop) or "nan" return get_inner_replacement_value(value)
[docs]def get_inner_replacement_value_as_str(val, series): if isinstance(val, string_types) and val.lower() == "nan": return "np.nan" if classify_type(find_dtype(series)) == "S": return "'{value}'".format(value=val) return val
[docs]def get_replacement_value_as_str(cfg, prop, series): value = (cfg or {}).get(prop) or "nan" return get_inner_replacement_value_as_str(value, series)
[docs]class SpaceReplacement(object): def __init__(self, col, cfg, name): self.col = col self.cfg = cfg self.name = name
[docs] def build_column(self, data): value = get_replacement_value(self.cfg, "value") return data[self.col].replace(r"^\s+$", value, regex=True)
[docs] def build_code(self, data): value = get_replacement_value_as_str(self.cfg, "value", data[self.col]) return "df.loc[:, '{name}'] = df['{col}'].replace(r'^\\s+$', {value}, regex=True)".format( name=self.name or self.col, col=self.col, value=value )
[docs]class StringReplacement(object): def __init__(self, col, cfg, name): self.col = col self.cfg = cfg self.name = name
[docs] def parse_cfg(self): return (self.cfg[p] for p in ["value", "ignoreCase", "isChar"])
[docs] def build_column(self, data): value, ignore_case, is_char = self.parse_cfg() flags = re.UNICODE if ignore_case: flags |= re.IGNORECASE value = re.escape(value) if is_char: value = "[{value}]+".format(value=value) regex_pat = re.compile(r"^ *{value} *$".format(value=value), flags=flags) replace_with = get_replacement_value(self.cfg, "replace") return data[self.col].replace(regex_pat, replace_with, regex=True)
[docs] def build_code(self, data): value, ignore_case, is_char = self.parse_cfg() flags = re.UNICODE if ignore_case: flags |= re.IGNORECASE regex_exp = "r'^ *{value} *$'.format(value=re.escape({value}))" if is_char: regex_exp = "r'^ *[{value}]+ *$'.format(value=re.escape({value}))" regex_exp = regex_exp.format(value=value) replace_with = get_replacement_value_as_str(self.cfg, "replace", data[self.col]) return ( "import re\n\n" "regex_pat = re.compile({regex_exp}, flags={flags})\n" "df.loc[:, '{name}'] = df['{col}'].replace(regex_pat, {replace}, regex=True)" ).format( name=self.name or self.col, col=self.col, regex_exp=regex_exp, flags=flags, replace=replace_with, )
[docs]class ValueReplacement(object): def __init__(self, col, cfg, name): self.col = col self.cfg = cfg self.name = name
[docs] def build_column(self, data): s = data[self.col] replacements = {} col_replacements = [] for replacement in self.cfg.get("value", []): value = get_replacement_value(replacement, "value") replacement_type = replacement.get("type") if replacement_type == "agg": replace = getattr(s, replacement["replace"])() # min, max, mean, median if pd.isnull(replace): raise Exception( "Running the aggregation, {agg}, on {col} resulted in nan, this would result in a no-op." ) elif replacement_type == "col": col_replacements.append( lambda s2: np.where(s2 == value, data[replacement["replace"]], s2) ) else: replace = get_replacement_value(replacement, "replace") replacements[value] = replace final_s = s if len(replacements): final_s = final_s.replace(replacements) for col_r in col_replacements: final_s = col_r(final_s) return final_s
[docs] def build_code(self, data): replacements = [] series = data[self.col] col_replacements = [] for replacement in self.cfg.get("value", []): value = get_replacement_value_as_str(replacement, "value", series) replacement_type = self.cfg.get("type") if replacement_type == "agg": replace = "getattr(df['{col}'], '{agg}')()".format( agg=replacement["value"], col=self.col ) elif replacement_type == "col": col_replacements.append( "s = np.where(s == {value}, data['{col2}'], s)".format( col2=replacement["replace"], value=value ) ) else: replace = get_replacement_value_as_str(replacement, "replace", series) replacements.append( "\t{value}: {replace}".format(value=value, replace=replace) ) code = ["s = df['{col}']".format(col=self.col)] if len(replacements): replacements = ",\n".join(replacements) replacements = "{\n" + replacements + "}" code.append( "s = s.replace({replacements})".format(replacements=replacements) ) code += col_replacements code.append("df.loc[:, '{name}'] = s".format(name=self.name or self.col)) return "\n".join(code)
[docs]class ImputerReplacement(object): def __init__(self, col, cfg, name): self.col = col self.cfg = cfg self.name = name
[docs] def build_column(self, data): imputer_type = self.cfg["type"] if imputer_type == "iterative": try: from sklearn.experimental import enable_iterative_imputer # noqa from sklearn.impute import IterativeImputer except ImportError: raise Exception( "You must have at least scikit-learn 0.21.0 installed in order to use the Iterative Imputer!" ) imputer = IterativeImputer() elif imputer_type == "knn": try: from sklearn.impute import KNNImputer except ImportError: raise Exception( "You must have at least scikit-learn 0.22.0 installed in order to use the Iterative Imputer!" ) n_neighbors = self.cfg.get("n_neighbors") or 2 imputer = KNNImputer(n_neighbors=n_neighbors) elif imputer_type == "simple": try: from sklearn.impute import SimpleImputer except ImportError: raise Exception( "You must have at least scikit-learn 0.20.0 installed in order to use the Iterative Imputer!" ) imputer = SimpleImputer() else: raise NotImplementedError( "'{}' sklearn imputer not implemented yet!".format(imputer_type) ) output = imputer.fit_transform(data[[self.col]]) return pd.DataFrame(output, columns=[self.col], index=data.index)[self.col]
[docs] def build_code(self, _data): imputer_type = self.cfg["type"] code = [] if imputer_type == "iterative": code.append( ( "from sklearn.experimental import enable_iterative_imputer\n" "from sklearn.impute import IterativeImputer\n\n" "output = IterativeImputer().fit_transform(df[['{col}']])" ).format(col=self.col) ) elif imputer_type == "knn": n_neighbors = self.cfg.get("n_neighbors") or 2 code.append( ( "from sklearn.impute import KNNImputer\n\n" "output = KNNImputer(n_neighbors={n_neighbors}).fit_transform(df[['{col}']])" ).format(col=self.col, n_neighbors=n_neighbors) ) elif imputer_type == "simple": code.append( ( "from sklearn.impute import SimpleImputer\n\n" "output = SimpleImputer().fit_transform(df[['{col}']])" ).format(col=self.col) ) code.append( "df.loc[:, '{name}'] = pd.DataFrame(output, columns=['{col}'], index=df.index)['{col}']".format( name=self.name or self.col, col=self.col ) ) return "\n".join(code)