From 523f7364f163a83c84a6ae94efb6a619d7401403 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 8 Aug 2023 09:48:22 -0500 Subject: [PATCH 1/2] bug fix with logs --- data/maker/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 3d3788a..67ab13c 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -192,9 +192,10 @@ class Learner(Process): finally: pass - _log[name] = self._df[name].dtypes.name - _log = {'action':'structure','input':_log} - self.log(**_log) + # _log[name] = self._df[name].dtypes.name + # _log[name] = reader.meta() + # _log = {'action':'structure','input':_log} + # self.log(**_log) # # convert the data to binary here ... _schema = self.get_schema() From 2ad56dcff667242848a6464f8e4673c5978a1f88 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 8 Aug 2023 11:08:18 -0500 Subject: [PATCH 2/2] bug fixes --- data/maker/__init__.py | 46 +++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 67ab13c..e3af9de 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -452,10 +452,10 @@ class Generator (Learner): FORMAT = '%Y-%m-%-d %H:%M:%S' SIZE = 19 - if SIZE > 0 : + # if SIZE > 0 : - values = pd.to_datetime(_df[name], format=FORMAT).astype(np.datetime64) - # _df[name] = [_date[:SIZE].strip() for _date in values] + # values = pd.to_datetime(_df[name], format=FORMAT).astype(np.datetime64) + # # _df[name] = [_date[:SIZE].strip() for _date in values] # _df[name] = _df[name].astype(str) @@ -465,6 +465,7 @@ class Generator (Learner): pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]') except Exception as e: + print (e) pass finally: pass @@ -503,12 +504,20 @@ class Generator (Learner): else: _store = None N = 0 + + + _haslist = np.sum([type(_item)==list for _item in self.columns]) > 0 + _schema = self.get_schema() + for _iodf in _candidates : _df = self._df.copy() - if self.columns : + + if self.columns and _haslist is False: _df[self.columns] = _iodf[self.columns] + else: + _df = _iodf - + N += _df.shape[0] if self._states and 'post' in self._states: _df = State.apply(_df,self._states['post']) @@ -529,19 +538,27 @@ class Generator (Learner): - _schema = self.get_schema() + _df = self.format(_df,_schema) - _log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ] - self.log(**{"action":"consolidate","input":_log}) + # _log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ] + self.log(**{"action":"consolidate","input":{"rows":N,"candidate":_candidates.index(_iodf)}}) - + if _store : - writer = transport.factory.instance(**_store) + _log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}} + + writer = transport.factory.instance(**_store) if _store['provider'] == 'bigquery': - writer.write(_df,schema=[],table=self.info['from']) + try: + _log['schema'] = _schema + writer.write(_df,schema=_schema,table=self.info['from']) + except Exception as e: + _log['schema'] = [] + writer.write(_df,table=self.info['from']) else: writer.write(_df,table=self.info['from']) + self.log(**_log) else: self.cache.append(_df) @@ -571,17 +588,21 @@ class Shuffle(Generator): _invColumns = [] _colNames = [] _ucolNames= [] + _rmColumns = [] for _item in self.info['columns'] : if type(_item) == list : _invColumns.append(_item) + _rmColumns += _item + elif _item in self._df.columns.tolist(): _colNames.append(_item) # # At this point we build the matrix of elements we are interested in considering the any unspecified column # + if _colNames : _invColumns.append(_colNames) - _ucolNames = list(set(self._df.columns) - set(_colNames)) + _ucolNames = list(set(self._df.columns) - set(_colNames) - set(_rmColumns)) if _ucolNames : _invColumns += [ [_name] for _name in _ucolNames] @@ -608,6 +629,7 @@ class Shuffle(Generator): _log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}} self.log(**_log) try: + self.post([self._df]) self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}}) except Exception as e :