import re
import numpy as np
import pandas as pd
from six import string_types
import dtale.global_state as global_state
from dtale.utils import classify_type, find_dtype
[docs]class ColumnReplacement(object):
def __init__(self, data_id, col, replacement_type, cfg, name=None):
self.data_id = data_id
if replacement_type == "spaces":
self.builder = SpaceReplacement(col, cfg, name)
elif replacement_type == "strings":
self.builder = StringReplacement(col, cfg, name)
elif replacement_type == "value":
self.builder = ValueReplacement(col, cfg, name)
elif replacement_type == "imputer": # iterative, knn, simple
self.builder = ImputerReplacement(col, cfg, name)
else:
raise NotImplementedError(
"'{}' replacement not implemented yet!".format(replacement_type)
)
[docs] def build_replacements(self):
return self.builder.build_column(global_state.get_data(self.data_id))
[docs] def build_code(self):
return self.builder.build_code(global_state.get_data(self.data_id))
[docs]def get_inner_replacement_value(val):
return np.nan if isinstance(val, string_types) and val.lower() == "nan" else val
[docs]def get_replacement_value(cfg, prop):
value = (cfg or {}).get(prop) or "nan"
return get_inner_replacement_value(value)
[docs]def get_inner_replacement_value_as_str(val, series):
if isinstance(val, string_types) and val.lower() == "nan":
return "np.nan"
if classify_type(find_dtype(series)) == "S":
return "'{value}'".format(value=val)
return val
[docs]def get_replacement_value_as_str(cfg, prop, series):
value = (cfg or {}).get(prop) or "nan"
return get_inner_replacement_value_as_str(value, series)
[docs]class SpaceReplacement(object):
def __init__(self, col, cfg, name):
self.col = col
self.cfg = cfg
self.name = name
[docs] def build_column(self, data):
value = get_replacement_value(self.cfg, "value")
return data[self.col].replace(r"^\s+$", value, regex=True)
[docs] def build_code(self, data):
value = get_replacement_value_as_str(self.cfg, "value", data[self.col])
return "df.loc[:, '{name}'] = df['{col}'].replace(r'^\\s+$', {value}, regex=True)".format(
name=self.name or self.col, col=self.col, value=value
)
[docs]class StringReplacement(object):
def __init__(self, col, cfg, name):
self.col = col
self.cfg = cfg
self.name = name
[docs] def parse_cfg(self):
return (self.cfg[p] for p in ["value", "ignoreCase", "isChar"])
[docs] def build_column(self, data):
value, ignore_case, is_char = self.parse_cfg()
flags = re.UNICODE
if ignore_case:
flags |= re.IGNORECASE
value = re.escape(value)
if is_char:
value = "[{value}]+".format(value=value)
regex_pat = re.compile(r"^ *{value} *$".format(value=value), flags=flags)
replace_with = get_replacement_value(self.cfg, "replace")
return data[self.col].replace(regex_pat, replace_with, regex=True)
[docs] def build_code(self, data):
value, ignore_case, is_char = self.parse_cfg()
flags = re.UNICODE
if ignore_case:
flags |= re.IGNORECASE
regex_exp = "r'^ *{value} *$'.format(value=re.escape({value}))"
if is_char:
regex_exp = "r'^ *[{value}]+ *$'.format(value=re.escape({value}))"
regex_exp = regex_exp.format(value=value)
replace_with = get_replacement_value_as_str(self.cfg, "replace", data[self.col])
return (
"import re\n\n"
"regex_pat = re.compile({regex_exp}, flags={flags})\n"
"df.loc[:, '{name}'] = df['{col}'].replace(regex_pat, {replace}, regex=True)"
).format(
name=self.name or self.col,
col=self.col,
regex_exp=regex_exp,
flags=flags,
replace=replace_with,
)
[docs]class ValueReplacement(object):
def __init__(self, col, cfg, name):
self.col = col
self.cfg = cfg
self.name = name
[docs] def build_column(self, data):
s = data[self.col]
replacements = {}
col_replacements = []
for replacement in self.cfg.get("value", []):
value = get_replacement_value(replacement, "value")
replacement_type = replacement.get("type")
if replacement_type == "agg":
replace = getattr(s, replacement["replace"])() # min, max, mean, median
if pd.isnull(replace):
raise Exception(
"Running the aggregation, {agg}, on {col} resulted in nan, this would result in a no-op."
)
elif replacement_type == "col":
col_replacements.append(
lambda s2: np.where(s2 == value, data[replacement["replace"]], s2)
)
else:
replace = get_replacement_value(replacement, "replace")
replacements[value] = replace
final_s = s
if len(replacements):
final_s = final_s.replace(replacements)
for col_r in col_replacements:
final_s = col_r(final_s)
return final_s
[docs] def build_code(self, data):
replacements = []
series = data[self.col]
col_replacements = []
for replacement in self.cfg.get("value", []):
value = get_replacement_value_as_str(replacement, "value", series)
replacement_type = self.cfg.get("type")
if replacement_type == "agg":
replace = "getattr(df['{col}'], '{agg}')()".format(
agg=replacement["value"], col=self.col
)
elif replacement_type == "col":
col_replacements.append(
"s = np.where(s == {value}, data['{col2}'], s)".format(
col2=replacement["replace"], value=value
)
)
else:
replace = get_replacement_value_as_str(replacement, "replace", series)
replacements.append(
"\t{value}: {replace}".format(value=value, replace=replace)
)
code = ["s = df['{col}']".format(col=self.col)]
if len(replacements):
replacements = ",\n".join(replacements)
replacements = "{\n" + replacements + "}"
code.append(
"s = s.replace({replacements})".format(replacements=replacements)
)
code += col_replacements
code.append("df.loc[:, '{name}'] = s".format(name=self.name or self.col))
return "\n".join(code)
[docs]class ImputerReplacement(object):
def __init__(self, col, cfg, name):
self.col = col
self.cfg = cfg
self.name = name
[docs] def build_column(self, data):
imputer_type = self.cfg["type"]
if imputer_type == "iterative":
try:
from sklearn.experimental import enable_iterative_imputer # noqa
from sklearn.impute import IterativeImputer
except ImportError:
raise Exception(
"You must have at least scikit-learn 0.21.0 installed in order to use the Iterative Imputer!"
)
imputer = IterativeImputer()
elif imputer_type == "knn":
try:
from sklearn.impute import KNNImputer
except ImportError:
raise Exception(
"You must have at least scikit-learn 0.22.0 installed in order to use the Iterative Imputer!"
)
n_neighbors = self.cfg.get("n_neighbors") or 2
imputer = KNNImputer(n_neighbors=n_neighbors)
elif imputer_type == "simple":
try:
from sklearn.impute import SimpleImputer
except ImportError:
raise Exception(
"You must have at least scikit-learn 0.20.0 installed in order to use the Iterative Imputer!"
)
imputer = SimpleImputer()
else:
raise NotImplementedError(
"'{}' sklearn imputer not implemented yet!".format(imputer_type)
)
output = imputer.fit_transform(data[[self.col]])
return pd.DataFrame(output, columns=[self.col], index=data.index)[self.col]
[docs] def build_code(self, _data):
imputer_type = self.cfg["type"]
code = []
if imputer_type == "iterative":
code.append(
(
"from sklearn.experimental import enable_iterative_imputer\n"
"from sklearn.impute import IterativeImputer\n\n"
"output = IterativeImputer().fit_transform(df[['{col}']])"
).format(col=self.col)
)
elif imputer_type == "knn":
n_neighbors = self.cfg.get("n_neighbors") or 2
code.append(
(
"from sklearn.impute import KNNImputer\n\n"
"output = KNNImputer(n_neighbors={n_neighbors}).fit_transform(df[['{col}']])"
).format(col=self.col, n_neighbors=n_neighbors)
)
elif imputer_type == "simple":
code.append(
(
"from sklearn.impute import SimpleImputer\n\n"
"output = SimpleImputer().fit_transform(df[['{col}']])"
).format(col=self.col)
)
code.append(
"df.loc[:, '{name}'] = pd.DataFrame(output, columns=['{col}'], index=df.index)['{col}']".format(
name=self.name or self.col, col=self.col
)
)
return "\n".join(code)