from __future__ import division
import decimal
import json
import os
import socket
import sys
import time
from builtins import map, object
from logging import getLogger
from flask import jsonify as _jsonify
import numpy as np
import pandas as pd
from past.utils import old_div
from six import PY3
import dtale.global_state as global_state
logger = getLogger(__name__)
[docs]def running_with_pytest():
"""
Checks to see if D-Tale has been initiated from test
:return: `True` if executed from test, `False` otherwise
:rtype: bool
"""
return hasattr(sys, '_called_from_test')
[docs]def running_with_flask_debug():
"""
Checks to see if D-Tale has been initiated from Flask
:return: `True` if executed from test, `False` otherwise
:rtype: bool
"""
return os.environ.get('WERKZEUG_RUN_MAIN') == 'true'
[docs]def get_host(host=None):
"""
Returns host input if it exists otherwise the output of :func:`python:socket.gethostname`
:param host: hostname, can start with 'http://', 'https://' or just the hostname itself
:type host: str, optional
:return: str
"""
def is_valid_host(host):
try:
socket.gethostbyname(host.split('://')[-1])
return True
except BaseException:
return False
if host is None:
socket_host = socket.gethostname()
if is_valid_host(socket_host):
return socket_host
return 'localhost'
if is_valid_host(host):
return host
raise Exception('Hostname ({}) is not recognized'.format(host))
[docs]def build_url(port, host):
"""
Returns full url combining host(if not specified will use the output of :func:`python:socket.gethostname`) & port
:param port: integer string for the port to be used by the :class:`flask:flask.Flask` process
:type port: str
:param host: hostname, can start with 'http://', 'https://' or just the hostname itself
:type host: str, optional
:return: str
"""
final_port = ':{}'.format(port) if port is not None else ''
if (host or '').startswith('http'):
return '{}{}'.format(host, final_port)
return 'http://{}{}'.format(host, final_port)
[docs]def build_shutdown_url(base):
"""
Builds the shutdown endpoint for the specified port
:param port: integer string for a D-Tale process's port
:type port: str
:return: URL string of the shutdown endpoint for the current server and port passed
"""
return '{}/shutdown'.format(base)
[docs]def get_str_arg(r, name, default=None):
"""
Retrieve argument from :attr:`flask:flask.request` and convert to string
:param r: :attr:`flask:flask.request`
:param name: argument name
:type: str
:param default: default value if parameter is non-existent, defaults to None
:return: string argument value
"""
val = r.args.get(name)
if val is None or val == '':
return default
else:
try:
return str(val)
except BaseException:
return default
[docs]def get_json_arg(r, name, default=None):
"""
Retrieve argument from :attr:`flask:flask.request` and parse JSON to python data structure
:param r: :attr:`flask:flask.request`
:param name: argument name
:type: str
:param default: default value if parameter is non-existent, defaults to None
:return: parsed JSON
"""
val = r.args.get(name)
if val is None or val == '':
return default
else:
return json.loads(val)
[docs]def get_int_arg(r, name, default=None):
"""
Retrieve argument from :attr:`flask:flask.request` and convert to integer
:param r: :attr:`flask:flask.request`
:param name: argument name
:type: str
:param default: default value if parameter is non-existent, defaults to None
:return: integer argument value
"""
val = r.args.get(name)
if val is None or val == '':
return default
else:
try:
return int(val)
except BaseException:
return default
[docs]def get_float_arg(r, name, default=None):
"""
Retrieve argument from :attr:`flask:flask.request` and convert to float
:param r: :attr:`flask:flask.request`
:param name: argument name
:type: str
:param default: default value if parameter is non-existent, defaults to None
:return: float argument value
"""
val = r.args.get(name)
if val is None or val == '':
return default
else:
try:
return float(val)
except BaseException:
return default
[docs]def get_bool_arg(r, name):
"""
Retrieve argument from :attr:`flask:flask.request` and convert to boolean
:param r: :attr:`flask:flask.request`
:param name: argument name
:type: str
:return: `True` if lowercase value equals 'true', `False` otherwise
"""
return r.args.get(name, 'false').lower() == 'true'
[docs]def json_string(x, nan_display='', **kwargs):
"""
convert value to string to be used within JSON output
If a :class:`python.UnicodeEncodeError` occurs then :func:`python:str.encode` will be called on input
:param x: value to be converted to string
:param nan_display: if `x` is :attr:`numpy:numpy.nan` then return this value
:return: string value
:rtype: str
"""
if x:
try:
return str(x)
except UnicodeEncodeError:
return x.encode('utf-8')
except BaseException as ex:
logger.exception(ex)
return nan_display
[docs]def json_int(x, nan_display='', as_string=False, fmt='{:,d}'):
"""
Convert value to integer to be used within JSON output
:param x: value to be converted to integer
:param nan_display: if `x` is :attr:`numpy:numpy.nan` then return this value
:param as_string: return integer as a formatted string (EX: 1,000,000)
:return: integer value
:rtype: int
"""
try:
if not np.isnan(x) and not np.isinf(x):
return fmt.format(int(x)) if as_string else int(x)
return nan_display
except BaseException:
return nan_display
# hack to solve issues with formatting floats with a precision more than 4 decimal points
# https://stackoverflow.com/questions/38847690/convert-float-to-string-without-scientific-notation-and-false-precision
DECIMAL_CTX = decimal.Context()
DECIMAL_CTX.prec = 20
[docs]def json_float(x, precision=2, nan_display='nan', inf_display='inf', as_string=False):
"""
Convert value to float to be used within JSON output
:param x: value to be converted to integer
:param precision: precision of float to be returned
:param nan_display: if `x` is :attr:`numpy:numpy.nan` then return this value
:param inf_display: if `x` is :attr:`numpy:numpy.inf` then return this value
:param as_string: return float as a formatted string (EX: 1,234.5643)
:return: float value
:rtype: float
"""
try:
if np.isinf(x):
return inf_display
if not np.isnan(x):
output = float(round(x, precision))
if as_string:
str_output = format(DECIMAL_CTX.create_decimal(repr(x)), ',.{}f'.format(str(precision)))
# drop trailing zeroes off & trailing decimal points if necessary
return str_output.rstrip('0').rstrip('.')
return output
return nan_display
except BaseException:
return nan_display
[docs]def json_date(x, fmt='%Y-%m-%d %H:%M:%S', nan_display='', **kwargs):
"""
Convert value to date string to be used within JSON output
:param x: value to be converted to date string
:param fmt: the data string formatting to be applied
:param nan_display: if `x` is :attr:`numpy:numpy.nan` then return this value
:return: date string value
:rtype: str (YYYY-MM-DD)
"""
try:
# calling unique on a pandas datetime column returns numpy datetime64
output = (pd.Timestamp(x) if isinstance(x, np.datetime64) else x).strftime(fmt)
empty_time = ' 00:00:00'
if output.endswith(empty_time):
return output[:-1 * len(empty_time)]
return output
except BaseException:
return nan_display
[docs]def json_timestamp(x, nan_display='', **kwargs):
"""
Convert value to timestamp (milliseconds) to be used within JSON output
:param x: value to be converted to milliseconds
:param nan_display: if `x` is :attr:`numpy:numpy.nan` then return this value
:return: millisecond value
:rtype: bigint
"""
try:
output = (pd.Timestamp(x) if isinstance(x, np.datetime64) else x)
output = int((time.mktime(output.timetuple()) + (old_div(output.microsecond, 1000000.0))) * 1000)
return output
except BaseException:
return nan_display
[docs]def classify_type(type_name):
"""
:param type_name: string label for value from :meth:`pandas:pandas.DataFrame.dtypes`
:return: shortened string label for dtype
S = str
B = bool
F = float
I = int
D = timestamp or datetime
TD = timedelta
:rtype: str
"""
lower_type_name = (type_name or '').lower()
if lower_type_name.startswith('str'):
return 'S'
if lower_type_name.startswith('bool'):
return 'B'
if lower_type_name.startswith('float'):
return 'F'
if lower_type_name.startswith('int'):
return 'I'
if any([t for t in ['timestamp', 'datetime'] if lower_type_name.startswith(t)]):
return 'D'
if lower_type_name.startswith('timedelta'):
return 'TD'
return 'S'
[docs]def retrieve_grid_params(req, props=None):
"""
Pull out grid parameters from :attr:`flask:flask.request` arguments and return as a `dict`
:param req: :attr:`flask:flask.request`
:param props: argument names
:type props: list
:return: dictionary of argument/value pairs
:rtype: dict
"""
params = dict()
filters = get_str_arg(req, 'filters')
if filters:
params['filters'] = json.loads(filters)
params['query'] = get_str_arg(req, 'query')
params['page'] = get_int_arg(req, 'page', 1)
params['page_size'] = get_int_arg(req, 'page_size')
params['sort_column'] = get_str_arg(req, 'sortColumn')
params['sort_direction'] = get_str_arg(req, 'sortDirection')
sort = get_str_arg(req, 'sort')
if sort:
params['sort'] = json.loads(sort)
if props:
return filter_params(params, props)
return params
[docs]def filter_params(params, props):
"""
Return list of values from dictionary for list of keys
"""
return list(map(params.get, props))
[docs]def sort_df_for_grid(df, params):
"""
Sort dataframe based on 'sort' property in parameter dictionary. Sort
configuration is of the following shape:
{
sort: [
[col1, ASC],
[col2, DESC],
...
]
}
:param df: dataframe
:type df: :class:`pandas:pandas.DataFrame`
:param params: arguments from :attr:`flask:flask.request`
:type params: dict
:return: sorted dataframe
:rtype: :class:`pandas:pandas.DataFrame`
"""
if 'sort' in params:
cols, dirs = [], []
for col, dir in params['sort']:
cols.append(col)
dirs.append(dir == 'ASC')
return df.sort_values(cols, ascending=dirs)
return df.sort_index()
[docs]def filter_df_for_grid(df, params, context_vars):
"""
Filter dataframe based on 'filters' property in parameter dictionary. Filter
configuration is of the following shape:
{
filters: {
col1: {
value: {
type: 1, # Equals
value: 1.0
}
type: 'NumericFilter'
},
col2: {
value: {
type: 2, # Range
begin: 1.0,
end: 2.0
}
type: 'NumericFilter'
},
col3: {
value: {
type: 3, # GreaterThan
value: 1.0
}
type: 'NumericFilter'
},
col4: {
value: {
type: 4, # LessThan
value: 1.0
}
type: 'NumericFilter'
},
col5: {
value: 'aaaa' # contains 'aaaa'
type: 'StringFilter'
},
col6: {
value: '2000-01-01' # contains '2000-01-01'
type: 'StringFilter'
},
col7: {
value: '=aaaa' # equals 'aaaa'
type: 'StringFilter'
}
},
query: 'col1 === 1'
}
:param df: dataframe
:type df: :class:`pandas:pandas.DataFrame`
:param params: arguments from :attr:`flask:flask.request`
:type params: dict
:param context_vars: a dictionary of the variables that will be available for use in user-defined expressions
:type context_vars: dict
:return: filtering dataframe
:rtype: :class:`pandas:pandas.DataFrame`
"""
data_type_info = get_dtypes(df)
for col, filter_cfg in params.get('filters', {}).items():
filter_val = filter_cfg['value']
if filter_cfg.get('type') == 'NumericFilter':
for numeric_operation in filter_val:
operation_type = numeric_operation['type']
df_filter = None
if operation_type == 2: # Range
begin = numeric_operation.get('begin')
end = numeric_operation.get('end')
if begin is not None and end is not None:
df_filter = ((df[col] >= begin) & (df[col] <= end))
elif numeric_operation.get('value') is not None:
operation_val = numeric_operation['value']
if operation_type == 1: # Number
df_filter = df[col] == operation_val
elif operation_type == 3: # GreaterThan
df_filter = df[col] > operation_val
elif operation_type == 4: # LessThan
df_filter = df[col] < operation_val
if df_filter is not None:
df = df[df_filter]
else: # this catches StringFilter values
stringified_col = df[col]
if classify_type(data_type_info[col]) == 'D':
stringified_col = df[col].apply(lambda d: d.strftime('%Y-%m-%d'))
if filter_val.startswith('='):
df = df[stringified_col.astype(str) == filter_val[1:]]
else:
df = df[stringified_col.astype(str).str.lower().str.contains(filter_val.lower(), na=False)]
df = run_query(df, params.get('query'), context_vars)
return df
[docs]def find_dtype(s):
"""
Helper function to determine the dtype of a :class:`pandas:pandas.Series`
"""
if s.dtype.name == 'object':
return pd.api.types.infer_dtype(s, skipna=True)
else:
return s.dtype.name
[docs]def get_dtypes(df):
"""
Build dictionary of column/dtype name pairs from :class:`pandas:pandas.DataFrame`
"""
def _load():
for col in df.columns:
yield col, find_dtype(df[col])
return dict(list(_load()))
[docs]def grid_columns(df):
"""
Build list of {name, dtype} dictionaries for columns in :class:`pandas:pandas.DataFrame`
"""
data_type_info = get_dtypes(df)
return [dict(name=c, dtype=data_type_info[c]) for c in df.columns]
DF_MAPPINGS = {
'I': lambda f, i, c: f.add_int(i, c),
'D': lambda f, i, c: f.add_date(i, c),
'F': lambda f, i, c: f.add_float(i, c),
'S': lambda f, i, c: f.add_string(i, c)
}
[docs]def handle_error(error_info):
"""
Boilerplate exception messaging
"""
logger.exception("Exception occurred while processing request: {}".format(error_info.get('error')))
[docs]def jsonify(return_data={}, **kwargs):
"""
Overriding Flask's jsonify method to account for extra error handling
:param return_data: dictionary of data to be passed to :meth:`flask:flask.jsonify`
:param kwargs: Optional keyword arguments merged into return_data
:return: output of :meth:`flask:flask.jsonify`
"""
if isinstance(return_data, dict) and return_data.get('error'):
handle_error(return_data)
return _jsonify(dict_merge(dict(success=False), dict_merge(kwargs, return_data)))
if len(kwargs):
return _jsonify(dict_merge(kwargs, return_data))
return _jsonify(return_data)
[docs]def find_selected_column(data, col):
"""
In case we come across a series which after reset_index()
it's columns are [date, security_id, values]
in which case we want the last column
:param data: dataframe
:type data: :class:`pandas:pandas.DataFrame`
:param col: column name
:type col: str
:return: column name if it exists within the dataframe's columns, the last column within the dataframe otherwise
:rtype: str
"""
return col if col in data.columns else data.columns[-1]
[docs]def make_list(vals):
"""
Convert a value that is optionally list or scalar
into a list
"""
if vals is None:
return []
elif isinstance(vals, (list, tuple)):
return vals
else:
return [vals]
[docs]def dict_merge(d1, d2, *args):
"""
Merges two dictionaries. Items of the second dictionary will
replace items of the first dictionary if there are any overlaps.
Either dictionary can be None. An empty dictionary {} will be
returned if both dictionaries are None.
:param d1: First dictionary can be None
:type d1: dict
:param d2: Second dictionary can be None
:type d1: dict
:return: new dictionary with the contents of d2 overlaying the contents of d1
:rtype: dict
"""
def _dict_merge(d11, d12):
if not d11:
return d12 or {}
elif not d12:
return d11 or {}
return dict(list(d11.items()) + list(d12.items()))
ret = _dict_merge(d1, d2)
for d in args:
ret = _dict_merge(ret, d)
return ret
[docs]def flatten_lists(lists):
"""
Take an iterable containing iterables and flatten them into one list.
- [[1], [2], [3, 4]] => [1, 2, 3, 4]
"""
return [item for sublist in lists for item in sublist]
[docs]def divide_chunks(l, n):
"""
Break list input 'l' up into smaller lists of size 'n'
"""
# looping till length l
for i in range(0, len(l), n):
yield l[i:i + n]
[docs]def run_query(df, query, context_vars=None):
"""
Utility function for running :func:`pandas:pandas.DataFrame.query` . This function contains extra logic to
handle when column names contain special characters. Looks like pandas will be handling this in a future
version: https://github.com/pandas-dev/pandas/issues/27017
The logic to handle these special characters in the meantime is only available in Python 3+
:param df: input dataframe
:type df: :class:`pandas:pandas.DataFrame`
:param query: query string
:type query: str
:param context_vars: dictionary of user-defined variables which can be referenced by name in query strings
:type context_vars: dict, optional
:return: filtered dataframe
"""
if (query or '') == '':
return df
# https://stackoverflow.com/a/40083013/12616360 only supporting filter cleanup for python 3+
if PY3:
invalid_column_names = [x for x in df.columns.values if not x.isidentifier()]
# Make replacements in the query and keep track
# NOTE: This method fails if the frame has columns called REPL_0 etc.
replacements = dict()
final_query = str(query)
for cn in invalid_column_names:
r = 'REPL_{}'.format(str(invalid_column_names.index(cn)))
final_query = final_query.replace(cn, r)
replacements[cn] = r
inv_replacements = {replacements[k]: k for k in replacements.keys()}
df = df.rename(columns=replacements)
df = df.query(final_query, local_dict=context_vars or {})
df = df.rename(columns=inv_replacements)
else:
df = df.query(query, local_dict=context_vars or {})
if not len(df):
raise Exception('query "{}" found no data, please alter'.format(query))
return df
[docs]class DuplicateDataError(Exception):
"""
Exception for signalling that similar data is trying to be loaded to D-Tale again. Is this correct?
"""
def __init__(self, data_id):
super(DuplicateDataError, self).__init__("Duplicate Data")
self.data_id = data_id
[docs]def build_code_export(data_id, imports='import pandas as pd\n\n', query=None):
"""
Helper function for building a string representing the code that was run to get the data you are viewing to that
point.
:param data_id: integer string identifier for a D-Tale process's data
:type data_id: str
:param imports: string representing the imports at the top of the code string
:type imports: string, optional
:param query: pandas dataframe query string
:type query: str, optional
:return: python code string
"""
history = global_state.get_history(data_id) or []
settings = global_state.get_settings(data_id) or {}
ctxt_vars = global_state.get_context_variables(data_id)
startup_code = settings.get('startup_code')
startup_code = '# Data Re-shaping\n{}\n\n'.format(startup_code) if startup_code else ''
startup_str = (
"# DISCLAIMER: 'df' refers to the data you passed in when calling 'dtale.show'\n\n"
'{imports}'
'{startup}'
'if isinstance(df, (pd.DatetimeIndex, pd.MultiIndex)):\n'
'\tdf = df.to_frame(index=False)\n\n'
'# remove any pre-existing indices for ease of use in the D-Tale code, but this is not required\n'
"df = df.reset_index().drop('index', axis=1, errors='ignore')\n"
'df.columns = [str(c) for c in df.columns] # update columns to strings in case they are numbers\n'
).format(imports=imports, startup=startup_code)
final_history = [startup_str] + history
final_query = query
if final_query is None:
final_query = settings.get('query')
if final_query is not None:
if len(ctxt_vars or {}):
final_history.append((
"\n# this is injecting any context variables you may have passed into 'dtale.show'\n"
"import dtale.global_state as dtale_global_state\n"
"\n# DISCLAIMER: running this line in a different process than the one it originated will produce\n"
"# differing results\n"
"ctxt_vars = dtale_global_state.get_context_variables('{data_id}')\n\n"
"df = df.query('{query}', local_dict=ctxt_vars)\n"
).format(query=final_query, data_id=data_id))
else:
final_history.append("df = df.query('{}')\n".format(final_query))
elif 'query' in settings:
final_history.append("df = df.query('{}')\n".format(settings['query']))
if 'sort' in settings:
cols, dirs = [], []
for col, dir in settings['sort']:
cols.append(col)
dirs.append('True' if dir == 'ASC' else 'False')
final_history.append("df = df.sort_values(['{cols}'], ascending=[{dirs}])\n".format(
cols=', '.join(cols), dirs="', '".join(dirs)
))
return final_history