import json
import pandas as pd
import dtale.global_state as global_state
[docs]class CombineData(object):
def __init__(self, cfg):
action, config, datasets = (
cfg.get(prop) for prop in ["action", "config", "datasets"]
)
config = json.loads(config)
datasets = json.loads(datasets)
self.builder = (
MergeBuilder(config, datasets)
if action == "merge"
else StackBuilder(config, datasets)
)
[docs] def build_data(self):
return self.builder.build_data()
[docs] def build_code(self):
return "\n".join(self.builder.build_code())
[docs]def build_df(dataset, is_merge=False):
data = global_state.get_data(dataset["dataId"])
cols = dataset.get("columns")
cols = list(set(cols + (dataset["index"] if is_merge else []))) if cols else None
if cols:
data = data[cols]
if is_merge and dataset["index"]:
data = data.set_index(dataset["index"])
return data
[docs]def build_code_data(datasets, is_merge=False):
code = ["import dtale\n"]
def build_idx(index):
return ".setIndex(['{}'])".format("','".join(index))
def build_cols(columns):
return "[['{}']]".format("','".join(columns)) if columns else ""
for idx, dataset in enumerate(datasets, 1):
code.append(
"df{idx} = dtale.get_instance('{id}').data{index}{cols}".format(
idx=idx,
id=dataset["dataId"],
index="" if is_merge else build_idx(dataset["index"]),
cols=build_cols(dataset.get("columns")),
)
)
return code
[docs]class MergeBuilder(object):
def __init__(self, config, datasets):
self.config = config
self.datasets = datasets
[docs] def build_data(self):
dfs = []
for dataset in self.datasets:
dfs.append((build_df(dataset, is_merge=True), dataset["suffix"] or None))
how, indicator, sort = (
self.config.get(prop) for prop in ["how", "indicator", "sort"]
)
left_df, left_suffix = dfs.pop(0)
right_df, right_suffix = dfs.pop(0)
kwargs = dict(
how=how,
left_index=True,
right_index=True,
suffixes=[left_suffix, right_suffix],
)
if indicator:
kwargs["indicator"] = "merge_1"
if sort:
kwargs["sort"] = sort
final_df = left_df.merge(right_df, **kwargs)
if len(dfs):
for idx, (df, suffix) in enumerate(dfs, 2):
if suffix:
kwargs["suffixes"] = [None, suffix]
else:
kwargs.pop("suffixes", None)
if indicator:
kwargs["indicator"] = "merge_{}".format(idx)
final_df = final_df.merge(df, **kwargs)
return final_df
[docs] def build_code(self):
code = build_code_data(self.datasets, is_merge=True)
how, indicator, sort = (
self.config.get(prop) for prop in ["how", "indicator", "sort"]
)
def build_merge(df1, df2, left, right, ind_id=1):
suffixes = ""
if left.get("suffix") or right.get("suffix"):
def suffix_str(suffix):
return "'{}'".format(suffix) if suffix else "None"
suffixes = ", suffixes=[{}, {}]".format(
suffix_str(left.get("suffix")), suffix_str(right.get("suffix"))
)
sort_param = ", sort=True" if sort else ""
indicator_param = (
", indicator='merge_{}".format(ind_id) if indicator else ""
)
return (
"df = {df1}.merge({df2}, how='{how}', left_index=True, right_index=True"
"{sort}{indicator}{suffixes})"
).format(
df1=df1,
df2=df2,
how=how,
sort=sort_param,
indicator=indicator_param,
suffixes=suffixes,
)
code.append(build_merge("df1", "df2", self.datasets[0], self.datasets[1]))
if len(self.datasets) > 2:
for idx, dataset in enumerate(self.datasets[2:], 3):
code.append(build_merge("df", "df{}".format(idx), {}, dataset, idx - 1))
return code
[docs]class StackBuilder(object):
def __init__(self, config, datasets):
self.config = config
self.datasets = datasets
[docs] def build_data(self):
ignore_index = self.config.get("ignoreIndex")
kwargs = {}
if ignore_index:
kwargs["ignore_index"] = ignore_index
return pd.concat([build_df(dataset) for dataset in self.datasets], **kwargs)
[docs] def build_code(self):
code = build_code_data(self.datasets, is_merge=True)
ignore_index = self.config.get("ignoreIndex")
ignore_index_param = ", ignore_index=True" if ignore_index else ""
code.append(
"df = pd.concat([{dfs}]{ignore_index})".format(
dfs=",".join(
["df{}".format(idx) for idx, _ in enumerate(self.datasets, 1)]
),
ignore_index=ignore_index_param,
)
)
return code