diff --git a/pipeline.py b/pipeline.py index c9d01d0..78559cb 100644 --- a/pipeline.py +++ b/pipeline.py @@ -268,7 +268,48 @@ class Components : else: writer.write(_df[columns],table=args['from']) - # @staticmethod + def finalize(self,args): + """ + This function performs post-processing opertions on a synthetic table i.e : + - remove duplicate keys + - remove orphaned keys i.e + """ + reader = factory.instance(**args['store']['source']) + logger = factory.instance(**args['store']['logs']) + target = args['store']['target']['args']['dataset'] + source = args['store']['source']['args']['dataset'] + table = args['from'] + schema = reader.meta(table=args['from']) + # + # keys : + unique_field = "_".join([args['from'],'id']) if 'unique_fields' not in args else args['unique_fields'] + fields = [ item.name if item.name != unique_field else "y."+item.name for item in schema] + SQL = [ + "SELECT :fields FROM ", + "(SELECT ROW_NUMBER() OVER() AS row_number,* FROM :target.:table) x","INNER JOIN", + "(SELECT ROW_NUMBER() OVER() AS row_number, :unique_field FROM :source.:table) y", + "ON y.row_number = x.row_number" + ] + SQL = " ".join(SQL).replace(":fields",",".join(fields)).replace(":table",table).replace(":source",source).replace(":target",target) + SQL = SQL.replace(":unique_field",unique_field) + # + # Use a native job to get this done ... + # + client = bq.Client.from_service_account_json(args['store']['source']['args']["private_key"]) + job = bq.QueryJobConfig() + job.destination = client.dataset(target).table(table) + job.use_query_cache = True + job.allow_large_results = True + # job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY) + job.write_disposition = "WRITE_TRUNCATE" + job.priority = 'BATCH' + r = client.query(SQL,location='US',job_config=job) + logger.write({"job":r.job_id,"action":"finalize", "args":{"sql":SQL,"source":"".join([source,table]),"destimation":".".join([target,table])}}) + # + # Keep a log of what just happened... + # + otable = ".".join([args['store']['source']['args']['dataset'],args['from']]) + dtable = ".".join([args['store']['target']['args']['dataset'],args['from']]) def generate(self,args): """ This function will generate data and store it to a given, @@ -527,18 +568,7 @@ if __name__ == '__main__' : # @TODO: # Log what was initiated so we have context of this processing ... # - # if 'listen' not in SYS_ARGS : - # if 'file' in args : - # DATA = pd.read_csv(args['file']) ; - # schema = [] - # else: - # DATA = Components().get(args) - # client = bq.Client.from_service_account_json(args["private_key"]) - # schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema - # COLUMNS = DATA.columns - # DATA = np.array_split(DATA,PART_SIZE) - # args['schema'] = schema GPU_CHIPS = args['gpu'] if 'gpu' in args else None if GPU_CHIPS and type(GPU_CHIPS) != list : GPU_CHIPS = [int(_id.strip()) for _id in GPU_CHIPS.split(',')] if type(GPU_CHIPS) == str else [GPU_CHIPS] @@ -550,50 +580,6 @@ if __name__ == '__main__' : # Let us see if we have partitions given the log folder content = os.listdir( os.sep.join([args['logs'],'train',args['context']])) - - - # if ''.join(content).isnumeric() : - # # - # # we have partitions we are working with - - # jobs = [] - - # # columns = DATA.columns.tolist() - - # # DATA = np.array_split(DATA,PART_SIZE) - - # for index in range(0,PART_SIZE) : - # if 'focus' in args and int(args['focus']) != index : - # # - # # This handles failures/recoveries for whatever reason - # # If we are only interested in generating data for a given partition - # continue - # # index = id.index(id) - - # args['partition'] = index - # args['data'] = DATA[index] - # if int(args['num_gpu']) > 1 : - # args['gpu'] = index - # else: - # args['gpu']=0 - - # make = lambda _args: (Components()).generate(_args) - # job = Process(target=make,args=(args,)) - # job.name = 'generator # '+str(index) - # job.start() - # jobs.append(job) - # # if len(jobs) == 1 : - # # job.join() - - # print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ]) - # while len(jobs)> 0 : - # jobs = [job for job in jobs if job.is_alive()] - # time.sleep(2) - - # # generator.generate(args) - # else: - # generator.generate(args) - # Components.generate(args) if 'all-chips' in SYS_ARGS and GPU_CHIPS: index = 0 jobs = [] @@ -625,7 +611,7 @@ if __name__ == '__main__' : shuffler = Components() shuffler.shuffle(args) pass - else: + elif 'train' in SYS_ARGS: # DATA = np.array_split(DATA,PART_SIZE) # @@ -657,10 +643,25 @@ if __name__ == '__main__' : # # If we have any obs we should wait till they finish # - while len(jobs)> 0 : - jobs = [job for job in jobs if job.is_alive()] - time.sleep(2) - + DIRTY = 0 + while len(jobs)> 0 : + DIRTY =1 + jobs = [job for job in jobs if job.is_alive()] + time.sleep(2) + if DIRTY: + print (["..:: jobs finished "]) + # + # We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations + # + print (['finalize' in SYS_ARGS, ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) ]) + if 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) : + # + # We should pull all the primary keys and regenerate them in order to insure some form of consistency + # + + (Components()).finalize(args) + # finalize(args) + pass # jobs = [] # for index in range(0,PART_SIZE) : # if 'focus' in args and int(args['focus']) != index : diff --git a/setup.py b/setup.py index 4eb869f..d75f1d3 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = {"name":"data-maker", - "version":"1.4.5", + "version":"1.4.6", "author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']