""" This file contains default functions applied to a data-frame/dataset as pre/post processing jobs. The functions are organized in a pipeline i.e the data will be applied to each function Custom functions : functions must tak 2 arguments (_data,_args) : where _data is a data frame and _arg is a object describing the input parameters """ import pandas as pd import numpy as np from datetime import datetime, timedelta def limit(_data,size): """ ...,{limit:size} """ # size = int(_args['limit']) return _data.iloc[:size] def format(_data,_schema): """ This function enforces a schema against a data-frame, this may or may not work depending on the persistence storage :_data data-frame containing all data :_args schema to enforce the data, we are expecting the format as a list of {name,type,description} """ return _data def approximate(_data,_args): """ :_args Object of {field:type} This function will approximate n-fields in the data given it's distribution """ _m = {'int':int,'float':float,'integer':int,'double':float} columns = list(_args.keys()) for _name in columns : if _name not in _data : continue otype = _args[_name] otype = str if otype not in _m else _m[otype] _data.loc[:,_name] = np.random.uniform(_data[_name].values).astype(otype) return _data def split_date(_data,_args): """ This function takes a field and applies the format from other fields :_data data-frame :_config configuration entry {column:{format,column:format,type}} """ _columns = list(_args.keys()) _m = {'int':int,'float':float,'integer':int,'double':float} for _name in _columns : _iname = _args[_name]['column'] _iformat = _args[_name]['format']['in'] _oformat = _args[_name]['format']['out'] _otype = str if 'type' not in _args[_name] else _args[_name]['type'] _data.loc[:,_name] = _data[_iname].apply(lambda _date: datetime.strftime(datetime.strptime(str(_date),_iformat),_oformat)).astype(_otype) return _data def newdate(_data,_args): """ This function creates a new data on a given column from another :_data data frame :_args configuration column:{format,column} """ _columns = list(_args.keys()) for _name in _columns : format = _args[_name]['format'] ROW_COUNT = _data[_name].size if 'column' in _args[_name] : srcName = _args[_name]['column'] years = _data[srcName].values else: years = np.random.choice(np.arange(datetime.now().year- 90,datetime.now().year),ROW_COUNT) _data.loc[:,_name] = [ _makedate(year = years[_index],format = format) for _index in np.arange(ROW_COUNT)] return _data def _makedate(**_args): """ This function creates a new date and applies it to a column :_data data-frame with columns :_args arguments for col1:format """ _columns = list(_args.keys()) # if _args['year'] in ['',None,np.nan] : # year = np.random.choice(np.arange(1920,222),1) # else: # year = int(_args['year']) year = int(_args['year']) offset = _args['offset'] if 'offset' in _args else 0 month = np.random.randint(1,13) if month == 2: _end = 28 if year % 4 != 0 else 29 else: _end = 31 if month in [1,3,5,7,8,10,12] else 30 day = np.random.randint(1,_end) #-- synthetic date _date = datetime(year=year,month=month,day=day,minute=0,hour=0,second=0) FORMAT = '%Y-%m-%d' if 'format' in _args: FORMAT = _args['format'] # print ([_name,FORMAT, _date.strftime(FORMAT)]) r = [] if offset : r = [_date.strftime(FORMAT)] for _delta in offset : _date = _date + timedelta(_delta) r.append(_date.strptime(FORMAT)) return r else: return _date.strftime(FORMAT)