From 278d639fbfddedfe502d782fea4e684330b3bb44 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 16 Feb 2024 11:54:05 -0600 Subject: [PATCH] bug fix ... --- data/maker/__init__.py | 17 +++++++++++++---- data/maker/state/__init__.py | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 70c1807..894ed65 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -320,7 +320,7 @@ class Generator (Learner): self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1 # filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json']) _suffix = self.network_args['context'] - filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'meta-',_suffix,'.json']) + filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'meta-',_suffix+'.json']) self.log(**{'action':'init-map','input':{'filename':filename,'exists':os.path.exists(filename)}}) if os.path.exists(filename): @@ -439,7 +439,7 @@ class Generator (Learner): """ - _columns = [_item['name'] for _item in _schema] + _columns = [_item['name'] for _item in _schema if _item['name'] in _df.columns] _map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))} # pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None) @@ -476,7 +476,7 @@ class Generator (Learner): # bqw = transport.factory.instance(**_store['target']) # bqw.write(_df,schema=_schema) - return _df[_columns] + return _df #[_columns] def post(self,_candidates): if 'target' in self.store : @@ -492,7 +492,16 @@ class Generator (Learner): _haslist = np.sum([type(_item)==list for _item in self.columns]) > 0 _schema = self.get_schema() - + # + # If the schema doesn't match the data we need to skip it + # This happens when the data comes from a query, the post processing needs to handle this + # + + # _names = [_field['name'] for _field in _schema] + # _columns = _candidates[0].columns.tolist() + # _common = list( set(_columns) & set(_names) ) + # if not (len(_common) == len(_columns) and len(_names) == len(_common)) : + # _schema = None for _iodf in _candidates : _df = self._df.copy() diff --git a/data/maker/state/__init__.py b/data/maker/state/__init__.py index f1b8da0..265a57a 100644 --- a/data/maker/state/__init__.py +++ b/data/maker/state/__init__.py @@ -20,7 +20,7 @@ import os class State : @staticmethod - def apply(_data,lpointers): + def apply(_data,lpointers,_config={}): """ This function applies a pipeline against a given data-frame, the calling code must decide whether it is a pre/post :_data data-frame