Source code for dtale.duplicate_checks

import pandas as pd

import dtale.global_state as global_state
from dtale.utils import (
    dict_merge,
    grid_columns,
    grid_formatter,
    run_query,
    triple_quote,
)
from dtale.charts.utils import build_group_inputs_filter


[docs]class NoDuplicatesException(ValueError): """Container class for any instance where a user tries to remove duplicates when they don't exist."""
[docs]class NoDuplicatesToShowException(ValueError): """Container class for any instance where a user tries to show duplicates that don't exist."""
[docs]class RemoveAllDataException(ValueError): """Container class for any instance where a user tries remove duplicates and it returns an empty dataframe."""
[docs]class DuplicateCheck(object): def __init__(self, data_id, check_type, cfg): self.data_id = data_id if check_type == "columns": self.checker = DuplicateColumns(data_id, cfg) elif check_type == "column_names": self.checker = DuplicateColumnNames(data_id, cfg) elif check_type == "rows": self.checker = RemoveDuplicateRows(data_id, cfg) elif check_type == "show": self.checker = ShowDuplicates(cfg) else: raise NotImplementedError( "'{}' duplicate check not implemented yet!".format(check_type) )
[docs] def test(self): data = global_state.get_data(self.data_id) return self.checker.check(data)
[docs] def execute(self): from dtale.views import startup data = global_state.get_data(self.data_id) try: df, code = self.checker.remove(data) instance = startup(data=df, **self.checker.startup_kwargs) curr_settings = global_state.get_settings(instance._data_id) global_state.set_settings( instance._data_id, dict_merge(curr_settings, dict(startup_code=code)), ) return instance._data_id except NoDuplicatesException: return self.data_id
[docs]def process_keep(input_list, keep): if keep == "first": return input_list[1:] elif keep == "last": return input_list[:-1] return input_list # none
[docs]class DuplicateColumns(object): def __init__(self, data_id, cfg=None): self.cfg = cfg self.startup_kwargs = dict(ignore_duplicate=True, data_id=data_id)
[docs] def check(self, df): duplicate_columns = {} keep = self.cfg.get("keep") or "none" col_indexes = list(range(df.shape[1])) if keep == "last": col_indexes = col_indexes[::-1] for x_idx, x in enumerate(col_indexes): col = df.iloc[:, x] col_duplicates = duplicate_columns.get(df.columns.values[x], []) for y in col_indexes[x_idx + 1 :]: other_col = df.iloc[:, y] if col.equals(other_col): col_duplicates.append(df.columns.values[y]) duplicate_columns[df.columns.values[x]] = col_duplicates return {k: v for k, v in duplicate_columns.items() if len(v) > 0}
[docs] def remove(self, df): duplicate_cols = self.check(df) keep = self.cfg.get("keep") or "none" cols_to_remove = [] for col, dupes in duplicate_cols.items(): if keep == "none": cols_to_remove += [col] + dupes else: cols_to_remove += dupes if not cols_to_remove: raise NoDuplicatesException() if len(cols_to_remove) == len(df.columns): raise RemoveAllDataException("This will remove all data!") df = df[[c for c in df.columns if c not in cols_to_remove]] code = ( "duplicate_cols_to_remove = [\n" "\t'{cols}'\n" "]\n" "df = df[[c for c in df.columns if c not in duplicates_cols_to_remove]]" ).format(cols="','".join(cols_to_remove)) return df, code
[docs]class DuplicateColumnNames(object): def __init__(self, data_id, cfg=None): self.cfg = cfg self.startup_kwargs = dict(ignore_duplicate=True, data_id=data_id)
[docs] def check(self, df): distinct_names = {} for col in df.columns: general_name = col.strip().lower() names = distinct_names.get(general_name, []) names.append(col) distinct_names[general_name] = names return {k: v for k, v in distinct_names.items() if len(v) > 1}
[docs] def remove(self, df): duplicate_names = self.check(df) keep = self.cfg.get("keep") or "none" names_to_remove = [] for _, v in duplicate_names.items(): names_to_remove += process_keep(v, keep) if not names_to_remove: raise NoDuplicatesException() if len(names_to_remove) == len(df.columns): raise RemoveAllDataException("This will remove all data!") df = df[[c for c in df.columns if c not in names_to_remove]] code = ( "duplicate_cols_to_remove = [\n" "\t'{cols}'\n" "]\n" "df = df[[c for c in df.columns if c not in duplicates_cols_to_remove]]" ).format(cols="','".join(names_to_remove)) return df, code
[docs]class RemoveDuplicateRows(object): def __init__(self, data_id, cfg): self.cfg = cfg self.startup_kwargs = dict(ignore_duplicate=True, data_id=data_id)
[docs] def check(self, df): subset, keep = (self.cfg.get(p) for p in ["subset", "keep"]) dupe_args = {"keep": False if keep == "none" else keep} duplicates = df.duplicated(subset, **dupe_args) removed = int(duplicates.sum()) total = len(df) return dict(removed=removed, total=total, remaining=total - removed)
[docs] def remove(self, df): subset, keep = (self.cfg.get(p) for p in ["subset", "keep"]) dupe_args = {"keep": False if keep == "none" else keep} duplicates = df.duplicated(subset, **dupe_args) dupe_ct = int(duplicates.sum()) if not dupe_ct: raise NoDuplicatesException() if dupe_ct == len(df): raise RemoveAllDataException("This will remove all data!") df = df[~duplicates].reset_index(drop=True) code = self._build_code() return df, code
def _build_code(self): subset, keep = (self.cfg.get(p) for p in ["subset", "keep"]) keep = "False" if keep == "none" else "'{}'".format(keep) return ( "duplicates = df.duplicated(['{subset}'], keep={keep})\n" "df = df[~duplicates]" ).format(subset="','".join(subset), keep=keep)
[docs]class ShowDuplicates(object): def __init__(self, cfg): self.cfg = cfg self.startup_kwargs = dict(ignore_duplicate=True, data_id=None)
[docs] def check(self, df): group = self.cfg.get("group") duplicates = df[group].reset_index().groupby(group).count() duplicates = duplicates.iloc[:, 0] duplicates = duplicates[duplicates > 1] duplicate_counts = duplicates.values duplicates = duplicates.reset_index()[group] duplicates = grid_formatter( grid_columns(duplicates), as_string=True ).format_lists(duplicates) check_data = { ", ".join([duplicates[col][i] for col in group]): dict( count=int(ct), filter=[duplicates[col][i] for col in group] ) for i, ct in enumerate(duplicate_counts) } return check_data
[docs] def remove(self, df): group = self.cfg.get("group") duplicates = [g for _, g in df.groupby(group) if len(g) > 1] if not duplicates: raise NoDuplicatesToShowException("No duplicates to show!") duplicates = pd.concat(duplicates) group_filter = None if self.cfg.get("filter"): group_filter = build_group_inputs_filter( df, [{col: val for col, val in zip(group, self.cfg["filter"])}] ) duplicates = run_query(duplicates, group_filter) code = self._build_code(group_filter) self.startup_kwargs["name"] = "{group}_duplicates".format(group="_".join(group)) return duplicates, code
def _build_code(self, group_filter=None): group = self.cfg.get("group") group_filter_str = "" if group_filter: group_filter_str = "\ndf = df.query({filter})".format( filter=triple_quote(group_filter_str) ) return ( "df = pd.concat(g for _, g in df.groupby(['{group}']) if len(g) > 1)" "{filter}" ).format(group="','".join(group), filter=group_filter_str)