bug fix: finalize to remove duplicate keys

dev
Steve L. Nyemba 4 years ago
parent 3eb28dd798
commit 94798fd9a2

@ -268,7 +268,48 @@ class Components :
else:
writer.write(_df[columns],table=args['from'])
# @staticmethod
def finalize(self,args):
"""
This function performs post-processing opertions on a synthetic table i.e :
- remove duplicate keys
- remove orphaned keys i.e
"""
reader = factory.instance(**args['store']['source'])
logger = factory.instance(**args['store']['logs'])
target = args['store']['target']['args']['dataset']
source = args['store']['source']['args']['dataset']
table = args['from']
schema = reader.meta(table=args['from'])
#
# keys :
unique_field = "_".join([args['from'],'id']) if 'unique_fields' not in args else args['unique_fields']
fields = [ item.name if item.name != unique_field else "y."+item.name for item in schema]
SQL = [
"SELECT :fields FROM ",
"(SELECT ROW_NUMBER() OVER() AS row_number,* FROM :target.:table) x","INNER JOIN",
"(SELECT ROW_NUMBER() OVER() AS row_number, :unique_field FROM :source.:table) y",
"ON y.row_number = x.row_number"
]
SQL = " ".join(SQL).replace(":fields",",".join(fields)).replace(":table",table).replace(":source",source).replace(":target",target)
SQL = SQL.replace(":unique_field",unique_field)
#
# Use a native job to get this done ...
#
client = bq.Client.from_service_account_json(args['store']['source']['args']["private_key"])
job = bq.QueryJobConfig()
job.destination = client.dataset(target).table(table)
job.use_query_cache = True
job.allow_large_results = True
# job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY)
job.write_disposition = "WRITE_TRUNCATE"
job.priority = 'BATCH'
r = client.query(SQL,location='US',job_config=job)
logger.write({"job":r.job_id,"action":"finalize", "args":{"sql":SQL,"source":"".join([source,table]),"destimation":".".join([target,table])}})
#
# Keep a log of what just happened...
#
otable = ".".join([args['store']['source']['args']['dataset'],args['from']])
dtable = ".".join([args['store']['target']['args']['dataset'],args['from']])
def generate(self,args):
"""
This function will generate data and store it to a given,
@ -527,18 +568,7 @@ if __name__ == '__main__' :
# @TODO:
# Log what was initiated so we have context of this processing ...
#
# if 'listen' not in SYS_ARGS :
# if 'file' in args :
# DATA = pd.read_csv(args['file']) ;
# schema = []
# else:
# DATA = Components().get(args)
# client = bq.Client.from_service_account_json(args["private_key"])
# schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
# COLUMNS = DATA.columns
# DATA = np.array_split(DATA,PART_SIZE)
# args['schema'] = schema
GPU_CHIPS = args['gpu'] if 'gpu' in args else None
if GPU_CHIPS and type(GPU_CHIPS) != list :
GPU_CHIPS = [int(_id.strip()) for _id in GPU_CHIPS.split(',')] if type(GPU_CHIPS) == str else [GPU_CHIPS]
@ -550,50 +580,6 @@ if __name__ == '__main__' :
# Let us see if we have partitions given the log folder
content = os.listdir( os.sep.join([args['logs'],'train',args['context']]))
# if ''.join(content).isnumeric() :
# #
# # we have partitions we are working with
# jobs = []
# # columns = DATA.columns.tolist()
# # DATA = np.array_split(DATA,PART_SIZE)
# for index in range(0,PART_SIZE) :
# if 'focus' in args and int(args['focus']) != index :
# #
# # This handles failures/recoveries for whatever reason
# # If we are only interested in generating data for a given partition
# continue
# # index = id.index(id)
# args['partition'] = index
# args['data'] = DATA[index]
# if int(args['num_gpu']) > 1 :
# args['gpu'] = index
# else:
# args['gpu']=0
# make = lambda _args: (Components()).generate(_args)
# job = Process(target=make,args=(args,))
# job.name = 'generator # '+str(index)
# job.start()
# jobs.append(job)
# # if len(jobs) == 1 :
# # job.join()
# print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ])
# while len(jobs)> 0 :
# jobs = [job for job in jobs if job.is_alive()]
# time.sleep(2)
# # generator.generate(args)
# else:
# generator.generate(args)
# Components.generate(args)
if 'all-chips' in SYS_ARGS and GPU_CHIPS:
index = 0
jobs = []
@ -625,7 +611,7 @@ if __name__ == '__main__' :
shuffler = Components()
shuffler.shuffle(args)
pass
else:
elif 'train' in SYS_ARGS:
# DATA = np.array_split(DATA,PART_SIZE)
#
@ -657,10 +643,25 @@ if __name__ == '__main__' :
#
# If we have any obs we should wait till they finish
#
DIRTY = 0
while len(jobs)> 0 :
DIRTY =1
jobs = [job for job in jobs if job.is_alive()]
time.sleep(2)
if DIRTY:
print (["..:: jobs finished "])
#
# We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations
#
print (['finalize' in SYS_ARGS, ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) ])
if 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) :
#
# We should pull all the primary keys and regenerate them in order to insure some form of consistency
#
(Components()).finalize(args)
# finalize(args)
pass
# jobs = []
# for index in range(0,PART_SIZE) :
# if 'focus' in args and int(args['focus']) != index :

@ -5,7 +5,7 @@ import sys
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {"name":"data-maker",
"version":"1.4.5",
"version":"1.4.6",
"author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']

Loading…
Cancel
Save