Merge branch 'dev'

master
Steve Nyemba 1 year ago
commit ca09ea0202

@ -192,9 +192,10 @@ class Learner(Process):
finally: finally:
pass pass
_log[name] = self._df[name].dtypes.name # _log[name] = self._df[name].dtypes.name
_log = {'action':'structure','input':_log} # _log[name] = reader.meta()
self.log(**_log) # _log = {'action':'structure','input':_log}
# self.log(**_log)
# #
# convert the data to binary here ... # convert the data to binary here ...
_schema = self.get_schema() _schema = self.get_schema()
@ -451,10 +452,10 @@ class Generator (Learner):
FORMAT = '%Y-%m-%-d %H:%M:%S' FORMAT = '%Y-%m-%-d %H:%M:%S'
SIZE = 19 SIZE = 19
if SIZE > 0 : # if SIZE > 0 :
values = pd.to_datetime(_df[name], format=FORMAT).astype(np.datetime64) # values = pd.to_datetime(_df[name], format=FORMAT).astype(np.datetime64)
# _df[name] = [_date[:SIZE].strip() for _date in values] # # _df[name] = [_date[:SIZE].strip() for _date in values]
# _df[name] = _df[name].astype(str) # _df[name] = _df[name].astype(str)
@ -464,6 +465,7 @@ class Generator (Learner):
pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]') pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]')
except Exception as e: except Exception as e:
print (e)
pass pass
finally: finally:
pass pass
@ -502,12 +504,20 @@ class Generator (Learner):
else: else:
_store = None _store = None
N = 0 N = 0
_haslist = np.sum([type(_item)==list for _item in self.columns]) > 0
_schema = self.get_schema()
for _iodf in _candidates : for _iodf in _candidates :
_df = self._df.copy() _df = self._df.copy()
if self.columns :
if self.columns and _haslist is False:
_df[self.columns] = _iodf[self.columns] _df[self.columns] = _iodf[self.columns]
else:
_df = _iodf
N += _df.shape[0] N += _df.shape[0]
if self._states and 'post' in self._states: if self._states and 'post' in self._states:
_df = State.apply(_df,self._states['post']) _df = State.apply(_df,self._states['post'])
@ -528,19 +538,27 @@ class Generator (Learner):
_schema = self.get_schema()
_df = self.format(_df,_schema) _df = self.format(_df,_schema)
_log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ] # _log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ]
self.log(**{"action":"consolidate","input":_log}) self.log(**{"action":"consolidate","input":{"rows":N,"candidate":_candidates.index(_iodf)}})
if _store : if _store :
writer = transport.factory.instance(**_store) _log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery': if _store['provider'] == 'bigquery':
writer.write(_df,schema=[],table=self.info['from']) try:
_log['schema'] = _schema
writer.write(_df,schema=_schema,table=self.info['from'])
except Exception as e:
_log['schema'] = []
writer.write(_df,table=self.info['from'])
else: else:
writer.write(_df,table=self.info['from']) writer.write(_df,table=self.info['from'])
self.log(**_log)
else: else:
self.cache.append(_df) self.cache.append(_df)
@ -570,17 +588,21 @@ class Shuffle(Generator):
_invColumns = [] _invColumns = []
_colNames = [] _colNames = []
_ucolNames= [] _ucolNames= []
_rmColumns = []
for _item in self.info['columns'] : for _item in self.info['columns'] :
if type(_item) == list : if type(_item) == list :
_invColumns.append(_item) _invColumns.append(_item)
_rmColumns += _item
elif _item in self._df.columns.tolist(): elif _item in self._df.columns.tolist():
_colNames.append(_item) _colNames.append(_item)
# #
# At this point we build the matrix of elements we are interested in considering the any unspecified column # At this point we build the matrix of elements we are interested in considering the any unspecified column
# #
if _colNames : if _colNames :
_invColumns.append(_colNames) _invColumns.append(_colNames)
_ucolNames = list(set(self._df.columns) - set(_colNames)) _ucolNames = list(set(self._df.columns) - set(_colNames) - set(_rmColumns))
if _ucolNames : if _ucolNames :
_invColumns += [ [_name] for _name in _ucolNames] _invColumns += [ [_name] for _name in _ucolNames]
@ -607,6 +629,7 @@ class Shuffle(Generator):
_log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}} _log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}}
self.log(**_log) self.log(**_log)
try: try:
self.post([self._df]) self.post([self._df])
self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}}) self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}})
except Exception as e : except Exception as e :

Loading…
Cancel
Save