From dd7fd5696bf682cd9b70786e2ccaa454aa62d841 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 5 Mar 2020 11:49:14 -0600 Subject: [PATCH] bug fix with partitions (generation may require it regardless) --- data/gan.py | 4 ++-- pipeline.py | 30 ++++++++++++++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/data/gan.py b/data/gan.py index b3b9cf8..a591f34 100644 --- a/data/gan.py +++ b/data/gan.py @@ -356,7 +356,7 @@ class Train (GNet): self.meta = self.log_meta() if(self.logger): - self.logger.write( self.meta ) + self.logger.write({"module":"gan-train","action":"start","input":self.meta} ) # self.log (real_shape=list(self._REAL.shape),label_shape = self._LABEL.shape,meta_data=self.meta) def load_meta(self, column): @@ -514,7 +514,7 @@ class Train (GNet): # # if self.logger : - row = {"logs":logs} #,"model":pickle.dump(sess)} + row = {"module":"gan-train","action":"logs","input":logs} #,"model":pickle.dump(sess)} self.logger.write(row) # # @TODO: diff --git a/pipeline.py b/pipeline.py index cd527a5..58b5380 100644 --- a/pipeline.py +++ b/pipeline.py @@ -131,6 +131,7 @@ class Components : log_folder = args['logs'] if 'logs' in args else 'logs' partition = args['partition'] if 'partition' in args else '' log_folder = os.sep.join([log_folder,args['context'],partition]) + _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 @@ -143,12 +144,31 @@ class Components : # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() reader = args['reader'] df = reader() - if 'partition' in args : + bounds = Components.split(df,MAX_ROWS,PART_SIZE) + if partition != '' and os.path.exists(log_folder): bounds = Components.split(df,MAX_ROWS,PART_SIZE) # bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories) lbound = int(bounds[int(partition)].left) ubound = int(bounds[int(partition)].right) df = df.iloc[lbound:ubound] + else: + # + # We have an implicit partition here + # bounds = Components.split(df,MAX_ROWS,PART_SIZE) + logger.write({"module":"generate","action":"virtual-parititions","input":{"rows":df.shape[0],"max_rows":MAX_ROWS,"part_size":PART_SIZE}}) + for item in bounds : + + lbound = int(item.left) + ubound = int(item.right) + args['reader'] = lambda: df[lbound:ubound] + args['partition'] = bounds.index(item) + + self.generate(args) + return ; + if not os.path.exists(log_folder) : + log_folder = log_folder.replace(partition,'') + _args['logs'] = log_folder + _args['data'] = df # _args['data'] = reader() #_args['data'] = _args['data'].astype(object) @@ -193,7 +213,7 @@ class Components : _fname = table.replace('_io','_full_io') data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000) data_comp.to_csv(_pname,index=False) - INSERT_FLAG = 'replace' if 'partition' not in args else 'append' + INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' _args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000) info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} } @@ -235,8 +255,9 @@ if __name__ == '__main__' : args = (PIPELINE[index]) args = dict(args,**SYS_ARGS) - args['max_rows'] = int(args['max_rows']) if 'max_rows' in args else 3 - args['part_size']= int(args['part_size']) if 'part_size' in args else 3 + args['max_rows'] = int(args['max_rows']) if 'max_rows' in args else 3 + args['part_size'] = int(args['part_size']) if 'part_size' in args else 4 + args['logs'] = args['logs'] if 'logs' in args else 'logs' if 'dataset' not in args : args['dataset'] = 'combined20191004v2_deid' @@ -257,6 +278,7 @@ if __name__ == '__main__' : content = os.listdir( os.sep.join([args['logs'],args['context']])) generator = Components() + if ''.join(content).isnumeric() : # # we have partitions we are working with