From ea30cd1c0009eddf5dd4a9082de0eb8213af5fd9 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 9 Aug 2023 15:23:33 -0500 Subject: [PATCH] bug fix: attempt --- data/maker/__init__.py | 79 ++++++++++++++++++++++++++++++++---------- 1 file changed, 60 insertions(+), 19 deletions(-) diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 9d1008d..70c1807 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -432,7 +432,51 @@ class Generator (Learner): return _date.strftime(FORMAT) pass - + def _format(self,_df,_schema): + """ + :_df data-frame being processed + :_schema table schema with types + """ + + + _columns = [_item['name'] for _item in _schema] + _map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))} + + # pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None) + + for _item in _schema : + _name = _item['name'] + if _item['type'] not in _map : + continue + _pointer = _map[_item['type']] + try: + if type(_pointer).__name__ == 'type': + if _item['type'] in ['INT64','FLOAT64'] : + + novalue = np.int64(0) if _item['type'] == 'INT64' else np.float64(0) + elif _item['type'] == 'STRING' : + novalue = '' + + if _item['type'] in ['INT64','FLOAT64','STRING'] : + + _df[_name] = _df[_name].fillna(novalue) + + # + # This is not guaranteed to work but may help ... + _df[_name] = _df[_name].values.astype(_pointer) + + + + else: + _df[_name] = _pointer(_df[_name]) + pass + except Exception as e: + + pass + # bqw = transport.factory.instance(**_store['target']) + # bqw.write(_df,schema=_schema) + + return _df[_columns] def post(self,_candidates): if 'target' in self.store : @@ -455,27 +499,23 @@ class Generator (Learner): if self.columns and _haslist is False: _df[self.columns] = _iodf[self.columns] else: + # + # In here we have the case of all attributes have undergone random permutations + # _df = _iodf N += _df.shape[0] if self._states and 'post' in self._states: _df = State.apply(_df,self._states['post']) - # # - # #@TODO: - # # Improve formatting with better post-processing pipeline - # if 'approximate' in self.info : - # _df = self.approximate(_df) - # if 'make_date' in self.info : - # for name in self.info['make_date'] : - # # iname = self.info['make_date']['init_field'] - # iname = self.info['make_date'][name] - - # years = _df[iname] - # _dates = [self.make_date(year=_year,field=name) for _year in years] - # if _dates : - # _df[name] = _dates - + + # + # Let us format the data frame so as to be able to minimize write errors + # + if _schema : + _df = self._format(_df,_schema) + + @@ -488,13 +528,14 @@ class Generator (Learner): if _store : _log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}} - writer = transport.factory.instance(**_store) - if _store['provider'] == 'bigquery': + writer = transport.factory.instance(**_store) + + if _store['provider'] == 'bigquery' and _schema: try: _log['schema'] = _schema writer.write(_df,schema=_schema) except Exception as e: - _log['schema'] = [] + print (e) writer.write(_df) else: writer.write(_df)