Source code for dtale.column_analysis

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objs as go
import pprint
import scipy.stats as sts
import time

import dtale.global_state as global_state

from dtale.code_export import build_code_export, build_final_chart_code
from dtale.column_builders import clean, clean_code
from dtale.describe import load_describe
from dtale.query import build_query, load_filterable_data
from dtale.utils import (
    apply,
    classify_type,
    find_dtype,
    find_dtype_formatter,
    find_selected_column,
    get_bool_arg,
    get_int_arg,
    get_str_arg,
    grid_columns,
    grid_formatter,
    json_float,
    json_timestamp,
)


LINE_CFG = "line={'shape': 'spline', 'smoothing': 0.3}, mode='lines'"


[docs]def handle_top(df, top): if top is not None: top = int(top) if top > 0: return df[:top], top, ["chart = chart[:{}]".format(top)] else: return df[top:], top, ["chart = chart[{}:]".format(top)] elif len(df) > 100: top = 100 return df[:top], top, ["chart = chart[:100]"] return df, len(df), []
[docs]def pctsum_updates(df, group, ret_col): grp_sums = df.groupby(group)[[ret_col]].sum() code = ( "ordinal_data = df.groupby('{col}')[['{ret_col}']].sum()\n" "ordinal_data = ordinal_data / ordinal_data.sum()" ).format(col=group, ret_col=ret_col) return grp_sums / grp_sums.sum(), [code]
[docs]def handle_cleaners(s, cleaners): cleaner_code = [] if cleaners: for cleaner in cleaners.split(","): s = clean(s, cleaner, {}) cleaner_code += clean_code(cleaner, {}) return s, cleaner_code
[docs]def build_kde(s, hist_labels, selected_col): try: kde = sts.gaussian_kde(s) kde_data = kde.pdf(hist_labels) kde_data = [json_float(k, precision=12) for k in kde_data] code = [ "import scipy.stats as sts", "kde = sts.gaussian_kde(s['{}'])".format(selected_col), "kde_data = kde.pdf(np.linspace(labels.min(), labels.max()))", ] return kde_data, code except np.linalg.LinAlgError: return None, []
[docs]class ColumnAnalysis(object): def __init__(self, data_id, req): self.data_id = data_id self.analysis_type = get_str_arg(req, "type") curr_settings = global_state.get_settings(data_id) or {} self.query = build_query(data_id, curr_settings.get("query")) data = load_filterable_data(data_id, req, query=self.query) self.selected_col = find_selected_column( data, get_str_arg(req, "col", "values") ) self.data = data[~pd.isnull(data[self.selected_col])] self.dtype = find_dtype(self.data[self.selected_col]) self.classifier = classify_type(self.dtype) self.code = build_code_export( data_id, imports="{}\n".format( "\n".join( [ "import numpy as np", "import pandas as pd", "import plotly.graph_objs as go", ] ) ), ) if self.analysis_type is None: self.analysis_type = ( "histogram" if self.classifier in ["F", "I", "D"] else "value_counts" ) if self.analysis_type == "geolocation": self.analysis = GeolocationAnalysis(req) elif self.analysis_type == "histogram": self.analysis = HistogramAnalysis(req) elif self.analysis_type == "categories": self.analysis = CategoryAnalysis(req) elif self.analysis_type == "value_counts": self.analysis = ValueCountAnalysis(req) elif self.analysis_type == "word_value_counts": self.analysis = WordValueCountAnalysis(req) elif self.analysis_type == "qq": self.analysis = QQAnalysis()
[docs] def build(self): base_code = build_code_export( self.data_id, imports="{}\n\n".format( "\n".join( [ "import numpy as np", "import pandas as pd", "import plotly.graph_objs as go", ] ) ), ) return_data, code = self.analysis.build(self) return dict( code=build_final_chart_code(base_code + code), query=self.query, cols=global_state.get_dtypes(self.data_id), dtype=self.dtype, chart_type=self.analysis_type, timestamp=round(time.time() * 1000), **return_data )
[docs]class HistogramAnalysis(object): def __init__(self, req): self.bins = get_int_arg(req, "bins", 20) self.target = get_str_arg(req, "target") self.density = get_bool_arg(req, "density")
[docs] def build_histogram_data(self, series): hist_kwargs = {"density": True} if self.density else {"bins": self.bins} hist_data, hist_labels = np.histogram(series, **hist_kwargs) hist_data = [json_float(h) for h in hist_data] decimals = 1 # drop the first bin because of just a minimum labels = [ "".join(["{0:.", "{}".format(decimals), "f}"]).format(lbl) for lbl in hist_labels[1:] ] while len(set(labels)) < len(labels) or decimals > 10: decimals += 1 labels = [ "".join(["{0:.", "{}".format(decimals), "f}"]).format(lbl) for lbl in hist_labels[1:] ] return dict(labels=labels, data=hist_data), hist_labels
[docs] def build(self, parent): if parent.classifier == "D": parent.data.loc[:, parent.selected_col] = apply( parent.data[parent.selected_col], json_timestamp ) kde_code = [] if self.target is None: return_data, hist_labels = self.build_histogram_data( parent.data[parent.selected_col] ) kde, kde_code = build_kde( parent.data[parent.selected_col], hist_labels, parent.selected_col ) if kde is not None: return_data["kde"] = kde else: bin_vals = pd.cut(parent.data[parent.selected_col], bins=self.bins) labels = ["{}".format(c) for c in bin_vals.dtype.categories] parent.data.loc[:, "bin"] = bin_vals.astype("str") return_data = {"targets": [], "labels": labels} target_dtype = find_dtype(parent.data[self.target]) target_formatter = find_dtype_formatter(target_dtype) for target, target_data in parent.data[[self.target, "bin"]].groupby( self.target ): target_counts = target_data["bin"].value_counts() target_counts = [ int(tc) for tc in target_counts.reindex(labels, fill_value=0).values ] return_data["targets"].append( dict( target=target_formatter(target, as_string=True), data=target_counts, ) ) desc, desc_code = load_describe(parent.data[parent.selected_col]) dtype_info = global_state.get_dtype_info(parent.data_id, parent.selected_col) for p in ["skew", "kurt"]: if p in dtype_info: desc[p] = dtype_info[p] return_data["desc"] = desc return return_data, self._build_code(parent, kde_code, desc_code)
def _build_code(self, parent, kde_code, desc_code): pp = pprint.PrettyPrinter(indent=4) code = [ "s = df[~pd.isnull(df['{col}'])][['{col}'{target}]]".format( col=parent.selected_col, target=",'{}'".format(self.target) if self.target else "", ) ] if parent.classifier == "D": code.append( ( "\nimport time\n\n" "s.loc[:, '{col}'] = s['{col}'].apply(\n" "\tlambda x: int((time.mktime(x.timetuple()) + (old_div(x.microsecond, 1000000.0))) * 1000\n" ")" ).format(col=parent.selected_col) ) if self.target is None: hist_kwargs = ( "density=True" if self.density else "bins={}".format(self.bins) ) code.append( "chart, labels = np.histogram(s['{col}'], {hist_kwargs})".format( col=parent.selected_col, hist_kwargs=hist_kwargs, ) ) code += kde_code + desc_code layout = pp.pformat( go.Layout( **{ "barmode": "group", "legend": {"orientation": "h"}, "title": { "text": "{} Histogram (bins: {}) w/ KDE".format( parent.selected_col, self.bins ) }, "xaxis2": {"anchor": "y", "overlaying": "x", "side": "top"}, "yaxis": {"title": {"text": "Frequency"}, "side": "left"}, "yaxis2": { "title": {"text": "KDE"}, "side": "right", "overlaying": "y", }, } ) ) code += [ "charts = [", "\tgo.Bar(x=labels[1:], y=chart, name='Histogram'),", "\tgo.Scatter(", "\t\tx=list(range(len(kde_data))), y=kde_data, name='KDE'," "\t\tyaxis='y2', xaxis='x2'," "\t\t{}".format(LINE_CFG), "\t)", "]", ] else: layout = pp.pformat( go.Layout( **{ "barmode": "stack", "legend": {"orientation": "h"}, "title": { "text": "{} Histogram (bins: {}) w/ target ({})".format( parent.selected_col, self.bins, self.target ) }, "yaxis": {"title": {"text": "Frequency"}, "side": "left"}, } ) ) code.append( ( "bin_vals = pd.cut(s['{col}'], bins={bins})\n" "labels = [str(c) for c in bin_vals.dtype.categories]\n" "s.loc[:, 'bin'] = bin_vals.astype('str')\n" "for target, target_data in s[['{target}', 'bin']].groupby('{target}'):\n" "\ttarget_counts = target_data['bin'].value_counts()\n" "\ttarget_counts = [int(tc) for tc in target_counts.reindex(labels, fill_value=0).values]\n" "\tcharts.append(go.Bar(x=labels, y=target_counts, name=target))" ).format(col=parent.selected_col, bins=self.bins, target=self.target) ) code.append( "figure = go.Figure(data=charts, layout=go.{layout})".format(layout=layout) ) return code
[docs]class CategoryAnalysis(object): def __init__(self, req): self.category_col = get_str_arg(req, "categoryCol") self.category_agg = get_str_arg(req, "categoryAgg", "mean") self.aggs = [ "count", "sum" if self.category_agg == "pctsum" else self.category_agg, ] self.top = get_int_arg(req, "top")
[docs] def build(self, parent): hist = parent.data.groupby(self.category_col)[[parent.selected_col]].agg( self.aggs ) hist.columns = hist.columns.droplevel(0) hist.columns = ["count", "data"] if self.category_agg == "pctsum": hist["data"] = hist["data"] / hist["data"].sum() hist.index.name = "labels" hist = hist.reset_index() hist, top, top_code = handle_top(hist, self.top) f = grid_formatter(grid_columns(hist), nan_display=None) return_data = f.format_lists(hist) return_data["top"] = top return return_data, self._build_code(parent, top_code)
def _build_code(self, parent, top_code): pp = pprint.PrettyPrinter(indent=4) layout = pp.pformat( go.Layout( **{ "barmode": "group", "legend": {"orientation": "h"}, "title": { "text": "{}({}) Categorized by {}".format( parent.selected_col, self.category_agg, self.category_col ) }, "xaxis": {"title": {"text": self.category_col}}, "yaxis": { "title": { "text": "{} ({})".format( parent.selected_col, self.category_agg ) }, "side": "left", }, "yaxis2": { "title": {"text": "Frequency"}, "side": "right", "overlaying": "y", }, } ) ) code = [ "chart = df.groupby('{cat}')[['{col}']].agg(['{aggs}'])".format( cat=self.category_col, col=parent.selected_col, aggs="', '".join(self.aggs), ), "chart.columns = chart.columns.droplevel(0)", 'chart.columns = ["count", "data"]', ] if self.category_agg == "pctsum": code.append("chart['data'] = chart['data'] / chart['data'].sum()") code += [ "chart.index.name = 'labels'", "chart = chart.reset_index()", ] code += top_code code += [ "charts = [", "\tgo.Bar(x=chart['labels'].values, y=chart['data'].values),", "\tgo.Scatter(", "\t\tx=chart['labels'].values, y=chart['count'].values, yaxis='y2',", "\t\tname='Frequency', {}".format(LINE_CFG), "\t)", "]", "figure = go.Figure(data=charts, layout=go.{layout})".format(layout=layout), ] return code
[docs]class ValueCountAnalysis(object): def __init__(self, req): self.top = get_int_arg(req, "top") self.ordinal_col = get_str_arg(req, "ordinalCol") self.ordinal_agg = get_str_arg(req, "ordinalAgg", "sum") self.cleaners = get_str_arg(req, "cleaner")
[docs] def build_hist(self, s, code): code.append("chart = pd.value_counts(s).to_frame(name='data')") return pd.value_counts(s).to_frame(name="data")
[docs] def setup_ordinal_data(self, parent): if self.ordinal_agg == "pctsum": return pctsum_updates(parent.data, parent.selected_col, self.ordinal_col) ordinal_data = getattr( parent.data.groupby(parent.selected_col)[[self.ordinal_col]], self.ordinal_agg, )() ordinal_code = [ "ordinal_data = df.groupby('{col}')[['{ordinal}']].{agg}()".format( col=parent.selected_col, ordinal=self.ordinal_col, agg=self.ordinal_agg ) ] return ordinal_data, ordinal_code
[docs] def setup_chart_layout(self, parent): pp = pprint.PrettyPrinter(indent=4) layout_cfg = { "barmode": "group", "legend": {"orientation": "h"}, "title": {"text": "{} Value Counts".format(parent.selected_col)}, "xaxis": {"title": {"text": parent.selected_col}}, "yaxis": {"title": {"text": "Frequency"}}, } if self.ordinal_col: layout_cfg["yaxis2"] = { "title": {"text": "{} ({})".format(self.ordinal_col, self.ordinal_agg)}, "side": "right", "overlaying": "y", } return pp.pformat(go.Layout(**layout_cfg))
[docs] def build(self, parent): code = [ "s = df[~pd.isnull(df['{col}'])]['{col}']".format(col=parent.selected_col) ] s, cleaner_code = handle_cleaners( parent.data[parent.selected_col], self.cleaners ) code += cleaner_code hist = self.build_hist(s, code) if self.ordinal_col is not None: ordinal_data, ordinal_code = self.setup_ordinal_data(parent) code += ordinal_code hist["ordinal"] = ordinal_data hist.index.name = "labels" hist = hist.reset_index().sort_values("ordinal") code += [ "chart['ordinal'] = ordinal_data", "chart.index.name = 'labels'", "chart = chart.reset_index().sort_values('ordinal')", ] else: hist.index.name = "labels" hist = hist.reset_index().sort_values( ["data", "labels"], ascending=[False, True] ) code += [ "chart.index.name = 'labels'", "chart = chart.reset_index().sort_values(['data', 'labels'], ascending=[False, True])", ] hist, top, top_code = handle_top(hist, self.top) code += top_code col_types = grid_columns(hist) f = grid_formatter(col_types, nan_display=None) return_data = f.format_lists(hist) return_data["top"] = top layout = self.setup_chart_layout(parent) code.append( "charts = [go.Bar(x=chart['labels'].values, y=chart['data'].values, name='Frequency')]" ) if self.ordinal_col: code.append( ( "charts.append(go.Scatter(\n" "\tx=chart['labels'].values, y=chart['ordinal'].values, yaxis='y2',\n" "\tname='{} ({})', {}\n" "))" ).format(self.ordinal_col, self.ordinal_agg, LINE_CFG) ) code.append( "figure = go.Figure(data=charts, layout=go.{layout})".format(layout=layout) ) return return_data, code
[docs]class WordValueCountAnalysis(ValueCountAnalysis):
[docs] def build_hist(self, s, code): code.append("chart = pd.value_counts(s.str.split(expand=True).stack())") code.append("chart = chart.to_frame(name='data').sort_index()") return ( pd.value_counts(s.str.split(expand=True).stack()) .to_frame(name="data") .sort_index() )
[docs] def setup_ordinal_data(self, parent): expanded_words = parent.data[parent.selected_col].str.split(expand=True).stack() expanded_words.name = "label" expanded_words = expanded_words.reset_index()[["level_0", "label"]] expanded_words.columns = ["index", "label"] expanded_words = pd.merge( parent.data[[self.ordinal_col]], expanded_words.set_index("index"), how="inner", left_index=True, right_index=True, ) expanded_word_code = [ ( "ordinal_data = df['{col}'].str.split(expand=True).stack()\n" "ordinal_data.name = 'label'\n" "ordinal_data = ordinal_data.reset_index()[['level_0', 'label']]\n" "ordinal_data.columns = ['index', 'label']\n" "ordinal_data = pd.merge(\n" "\tdf[['{ordinal}']],\n" "\tordinal_data.set_index('index'),\n" "\thow='inner',\n" "\tleft_index=True,\n" "\tright_index=True,\n" ")" ).format(col=parent.selected_col, ordinal=self.ordinal_col) ] if self.ordinal_agg == "pctsum": ordinal_data, ordinal_code = pctsum_updates( expanded_words, "label", self.ordinal_col ) return ordinal_data, expanded_word_code + ordinal_code ordinal_code = expanded_word_code + [ "ordinal_data = ordinal_data.groupby('label')[['{ordinal}']].{agg}()".format( ordinal=self.ordinal_col, agg=self.ordinal_agg ) ] ordinal_data = getattr( expanded_words.groupby("label")[[self.ordinal_col]], self.ordinal_agg )() return ordinal_data, ordinal_code
[docs] def setup_chart_layout(self, parent): pp = pprint.PrettyPrinter(indent=4) layout_cfg = { "barmode": "group", "legend": {"orientation": "h"}, "title": {"text": "{} Word Value Counts".format(parent.selected_col)}, "xaxis": {"title": {"text": parent.selected_col}}, "yaxis": {"title": {"text": "Frequency"}}, } if self.ordinal_col: layout_cfg["yaxis2"] = { "title": {"text": "{} ({})".format(self.ordinal_col, self.ordinal_agg)}, "side": "right", "overlaying": "y", } return pp.pformat(go.Layout(**layout_cfg))
[docs]class GeolocationAnalysis(object): def __init__(self, req): self.lat_col = get_str_arg(req, "latCol") self.lon_col = get_str_arg(req, "lonCol")
[docs] def build(self, parent): geo = parent.data[[self.lat_col, self.lon_col]].dropna() geo.columns = ["lat", "lon"] col_types = grid_columns(geo) f = grid_formatter(col_types, nan_display=None) return_data = f.format_lists(geo) return return_data, self._build_code()
def _build_code(self): pp = pprint.PrettyPrinter(indent=4) layout = pp.pformat( go.Layout( **{ "autosize": True, "geo": {"fitbounds": "locations", "scope": "world"}, "legend": {"orientation": "h"}, "margin": {"b": 0, "l": 0, "r": 0}, "title": { "text": "Map of Latitude({})/ Longitude({})".format( self.lat_col, self.lon_col ) }, } ) ) return [ "chart = df[['{}', '{}']].dropna()".format(self.lat_col, self.lon_col), "chart.columns = ['lat', 'lon']", ( "chart = go.Scattergeo(\n" "\tlon=chart['lon'].values,\n" "\tlat=chart['lat'].values,\n" "\tmode='markers',\n" "\tmarker={'color': 'darkblue'}\n" ")" ), "figure = go.Figure(data=chart, layout=go.{layout})".format(layout=layout), ]
[docs]class QQAnalysis(object):
[docs] def build(self, parent): s = parent.data[parent.selected_col] if parent.classifier == "D": s = apply(s, json_timestamp) qq_x, qq_y = sts.probplot(s, dist="norm", fit=False) qq = pd.DataFrame(dict(x=qq_x, y=qq_y)) f = grid_formatter(grid_columns(qq), nan_display=None) return_data = f.format_lists(qq) trend_line = px.scatter(x=qq_x, y=qq_y, trendline="ols").data[1] trend_line = pd.DataFrame(dict(x=trend_line["x"], y=trend_line["y"])) f = grid_formatter(grid_columns(trend_line), nan_display=None) trend_line = f.format_lists(trend_line) return_data["x2"] = trend_line["x"] return_data["y2"] = trend_line["y"] return return_data, self._build_code(parent)
def _build_code(self, parent): pp = pprint.PrettyPrinter(indent=4) layout = pp.pformat( go.Layout( **{ "legend": {"orientation": "h"}, "title": {"text": "{} QQ Plot".format(parent.selected_col)}, } ) ) code = [ "s = df[~pd.isnull(df['{col}'])]['{col}']".format(col=parent.selected_col) ] if parent.classifier == "D": code.append( ( "\nimport time\n\n" "s = s['{col}'].apply(\n" "\tlambda x: int((time.mktime(x.timetuple()) + (old_div(x.microsecond, 1000000.0))) * 1000\n" ")" ).format(col=parent.selected_col) ) code += [ "\nimport scipy.stats as sts\nimport plotly.express as px\n", 'qq_x, qq_y = sts.probplot(s, dist="norm", fit=False)', "chart = px.scatter(x=qq_x, y=qq_y, trendline='ols', trendline_color_override='red')", "figure = go.Figure(data=chart, layout=go.{layout})".format(layout=layout), ] return code