From 904a7d12dba021a6cbb84be1475250d7f6ac6803 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sun, 7 Feb 2021 16:02:35 -0600 Subject: [PATCH] bug fix: with references --- README.md | 4 ++ healthcareio/export/export.py | 14 ++++-- healthcareio/export/workers.py | 86 ++++++++++++++++++++++++---------- healthcareio/healthcare-io.py | 12 +++-- setup.py | 2 +- 5 files changed, 86 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 9fff556..8337542 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,10 @@ We wrote this frame to be used in both command line or as a library within in yo with: --config configuration to support data-store + + **NOTE** + + The output generates a set of tables that are the result of transforming unstructured data to relational structure. The tables can be bound with the attribute **_id** The configuration file needed to implement export is modelled after the following template: diff --git a/healthcareio/export/export.py b/healthcareio/export/export.py index 5d29888..cc3a9e7 100644 --- a/healthcareio/export/export.py +++ b/healthcareio/export/export.py @@ -83,27 +83,35 @@ def meta(config) : if type(config[prefix]) != dict : continue - if '@ref' in config[prefix] and set(['label','field','map']) & set(config[prefix]['@ref'].keys()): + + if '@ref' in config[prefix] : #and set(['label','field','map']) & set(config[prefix]['@ref'].keys()): for subprefix in config[prefix]['@ref'] : _entry = config[prefix]['@ref'][subprefix] - _info += get_field(_entry) + if 'map' in _entry : + _info += get_field(_entry) + else: + _info += list(_entry.keys()) elif set(['label','field','map']) & set(config[prefix].keys()): _entry = config[prefix] if 'map' in _entry : _info += get_field(_entry) + # # We need to organize the fields appropriately here # + fields = {"main":[],"rel":{}} for row in _info : if type(row) == str : fields['main'] += [row] + fields['main'] = list(set(fields['main'])) + fields['main'].sort() else : fields['rel'] = jsonmerge.merge(fields['rel'],row) - + return fields def create (**_args) : skip = [] if 'skip' not in _args else _args['skip'] diff --git a/healthcareio/export/workers.py b/healthcareio/export/workers.py index a9aff6e..177fa31 100644 --- a/healthcareio/export/workers.py +++ b/healthcareio/export/workers.py @@ -8,27 +8,58 @@ """ import transport import os -from multiprocessing import Process +from multiprocessing import Process, Lock import numpy as np import json +import pandas as pd + class Subject (Process): + cache = pd.DataFrame() + lock = Lock() + @staticmethod + def log(_args): + Subject.lock.acquire() + try: + Subject.cache = Subject.cache.append(pd.DataFrame([_args])) + except Exception as e : + print (e) + finally: + Subject.lock.release() def __init__(self,**_args): super().__init__() self.observers = _args['observers'] self.index = 0 self.name = _args['name'] self.table = self.observers[1].table + self.m = {} + + pass def run(self): self.notify() def notify(self): if self.index < len(self.observers) : + observer = self.observers[self.index] _observer = None if self.index == 0 else self.observers[self.index -1] _invalues = None if not _observer else _observer.get() + if _observer is None : + self.m['table'] = self.name + + + + observer.init(caller=self,invalues = _invalues) self.index += 1 observer.execute() + print ({"table":self.table,"module":observer.name(),"status":observer.status}) + # self.m[observer.name()] = observer.status + + else: + pass + + + class Worker : def __init__(self,**_args): @@ -38,6 +69,7 @@ class Worker : self.logs = [] self.schema = _args['schema'] self.prefix = _args['prefix'] + self.status = 0 def name(self): return self.__class__.__name__ @@ -45,7 +77,7 @@ class Worker : """ This function is designed to log to either the console or a data-store """ - print (_args) + # print (_args) pass def init(self,**_args): """ @@ -60,9 +92,10 @@ class Worker : try: self._apply() except Exception as error: - print () - print (error) - print () + pass + # print () + # print (error) + # print () finally: self.caller.notify() @@ -101,10 +134,12 @@ class CreateSQL(Worker) : writer.apply(self._sql.replace(":table",sqltable)) writer.close() log['status'] = 1 + self.status = 1 except Exception as e: log['status'] = 0 log['info'] = {"error":e.args[0]} - print (e) + + # print (e) finally: self.log(**log) @@ -141,25 +176,28 @@ class Reader(Worker): self.rows = [] def _apply(self): - self.reader = transport.factory.instance(**self._info) ; - print() - print (self.table) - print (json.dumps(self.pipeline)) - print () - self.rows = self.reader.read(mongo=self.pipeline) + try: + self.reader = transport.factory.instance(**self._info) ; + self.rows = self.reader.read(mongo=self.pipeline) - N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1 - N = int(N) - # self.rows = rows - _log = {"context":self.name(),"args":self._info['args']['db'], "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}} - self.rows = np.array_split(self.rows,N) + N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1 + N = int(N) + # self.rows = rows + _log = {"context":self.name(),"args":self._info['args']['db'], "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}} + self.rows = np.array_split(self.rows,N) + + + # self.get = lambda : rows #np.array_split(rows,N) + self.reader.close() + self.status = 1 + # + except Exception as e : + log['status'] = 0 + log['info'] = {"error":e.args[0]} + - - # self.get = lambda : rows #np.array_split(rows,N) - self.reader.close() - - # self.log(**_log) + # @TODO: Call the caller and notify it that this here is done def get(self): return self.rows @@ -201,8 +239,8 @@ class Writer(Worker): # for _e in rows : # writer.write(_e) - - + + self.status = 1 else: print ("No data was passed") diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py index 88907ff..4d13204 100644 --- a/healthcareio/healthcare-io.py +++ b/healthcareio/healthcare-io.py @@ -391,14 +391,18 @@ if __name__ == '__main__' : pipes = export.Factory.instance(type=TYPE,write_store=_store) #"inspect":0,"cast":0}}) # pipes[0].run() + for thread in pipes: if 'table' in SYS_ARGS and SYS_ARGS['table'] != thread.table : continue thread.start() - time.sleep(1) - while pipes : - pipes = [thread for thread in pipes if thread.is_alive()] - time.sleep(1) + time.sleep(1) + thread.join() + + # print (Subject.cache) + # while pipes : + # pipes = [thread for thread in pipes if thread.is_alive()] + # time.sleep(1) diff --git a/setup.py b/setup.py index ff85749..545b231 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { - "name":"healthcareio","version":"1.5.9.1", + "name":"healthcareio","version":"1.5.6", "author":"Vanderbilt University Medical Center", "author_email":"steve.l.nyemba@vumc.org", "include_package_data":True,