import json
import dtale.global_state as global_state
from dtale.utils import classify_type, find_dtype, make_list
[docs]class ColumnFilter(object):
def __init__(self, data_id, column, cfg):
self.data_id = data_id
self.column = column
s = global_state.get_data(data_id)[column]
dtype = find_dtype(s)
self.classification = classify_type(dtype)
self.cfg = cfg
if self.cfg is not None:
self.cfg = json.loads(self.cfg)
if self.cfg['type'] == 'string':
self.builder = StringFilter(column, self.classification, self.cfg)
if self.cfg['type'] in ['int', 'float']:
self.builder = NumericFilter(column, self.classification, self.cfg)
if self.cfg['type'] == 'date':
self.builder = DateFilter(column, self.classification, self.cfg)
if self.cfg['type'] == 'outliers':
self.builder = OutlierFilter(column, self.classification, self.cfg)
[docs] def save_filter(self):
curr_settings = global_state.get_settings(self.data_id)
filters_key = '{}Filters'.format('outlier' if self.cfg['type'] == 'outliers' else 'column')
curr_filters = curr_settings.get(filters_key) or {}
fltr = self.builder.build_filter()
if fltr is None:
curr_filters.pop(self.column, None)
else:
curr_filters[self.column] = fltr
curr_settings[filters_key] = curr_filters
global_state.set_settings(self.data_id, curr_settings)
return curr_filters
[docs]class OutlierFilter(object):
def __init__(self, column, classification, cfg):
self.column = column
self.classification = classification
self.cfg = cfg
[docs] def build_filter(self):
if self.cfg.get("query") is None:
return None
return self.cfg
[docs]class MissingFilter(object):
def __init__(self, column, classification, cfg):
self.column = column
self.classification = classification
self.cfg = cfg
[docs] def handle_missing(self, fltr):
if self.cfg is None or not self.cfg.get('missing', False):
return fltr
return {'missing': True, 'query': '{col} != {col}'.format(col=self.column)}
[docs]class StringFilter(MissingFilter):
def __init__(self, column, classification, cfg):
super(StringFilter, self).__init__(column, classification, cfg)
[docs] def build_filter(self):
if self.cfg is None or not len(self.cfg.get('value', [])):
return super(StringFilter, self).handle_missing(None)
state = self.cfg.get('value', [])
operand = self.cfg.get('operand', '=')
fltr = dict(value=state)
if len(state) == 1:
val_str = ("'{}'" if self.classification == 'S' else '{}').format(state[0])
fltr['query'] = "{} {} {}".format(self.column, '==' if operand == '=' else '!=', val_str)
else:
val_str = ("'{}'".format("', '".join(state)) if self.classification == 'S' else ','.join(state))
fltr['query'] = "{} {} ({})".format(self.column, 'in' if operand == '=' else 'not in', val_str)
return super(StringFilter, self).handle_missing(fltr)
[docs]class NumericFilter(MissingFilter):
def __init__(self, column, classification, cfg):
super(NumericFilter, self).__init__(column, classification, cfg)
[docs] def build_filter(self):
if self.cfg is None:
return super(NumericFilter, self).handle_missing(None)
cfg_val, cfg_operand, cfg_min, cfg_max = (self.cfg.get(p) for p in ['value', 'operand', 'min', 'max'])
if cfg_operand in ['=', 'ne']:
state = make_list(cfg_val or [])
if not len(state):
return super(NumericFilter, self).handle_missing(None)
fltr = dict(value=cfg_val, operand=cfg_operand)
if len(state) == 1:
fltr['query'] = "{} {} {}".format(self.column, '==' if cfg_operand == '=' else '!=', state[0])
else:
fltr['query'] = "{} {} ({})".format(
self.column, 'in' if cfg_operand == '=' else 'not in', ", ".join(state)
)
return super(NumericFilter, self).handle_missing(fltr)
if cfg_operand in ['<', '>', '<=', '>=']:
if cfg_val is None:
return super(NumericFilter, self).handle_missing(None)
fltr = dict(value=cfg_val, operand=cfg_operand, query='{} {} {}'.format(self.column, cfg_operand, cfg_val))
return super(NumericFilter, self).handle_missing(fltr)
if cfg_operand in ['[]', '()']:
fltr = dict(operand=cfg_operand)
queries = []
if cfg_min is not None:
fltr['min'] = cfg_min
queries.append('{} >{} {}'.format(self.column, '=' if cfg_operand == '[]' else '', cfg_min))
if cfg_max is not None:
fltr['max'] = cfg_max
queries.append('{} <{} {}'.format(self.column, '=' if cfg_operand == '[]' else '', cfg_max))
if len(queries) == 2 and cfg_max == cfg_min:
queries = ["{} == {}".format(self.column, cfg_max)]
if not len(queries):
return super(NumericFilter, self).handle_missing(None)
fltr['query'] = ' and '.join(queries)
return super(NumericFilter, self).handle_missing(fltr)
return super(NumericFilter, self).handle_missing(None)
[docs]class DateFilter(MissingFilter):
def __init__(self, column, classification, cfg):
super(DateFilter, self).__init__(column, classification, cfg)
[docs] def build_filter(self):
if self.cfg is None:
return super(DateFilter, self).handle_missing(None)
start, end = (self.cfg.get(p) for p in ['start', 'end'])
fltr = dict(start=start, end=end)
queries = []
if start:
queries.append("{} >= '{}'".format(self.column, start))
if end:
queries.append("{} <= '{}'".format(self.column, end))
if len(queries) == 2 and start == end:
queries = ["{} == '{}'".format(self.column, start)]
if not len(queries):
return super(DateFilter, self).handle_missing(None)
fltr['query'] = ' and '.join(queries)
return super(DateFilter, self).handle_missing(fltr)