From 670559aef7c5a5986a5526e8d6559aacc272036f Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 26 Sep 2020 15:32:06 -0500 Subject: [PATCH] bug fix with parser --- healthcareio/__init__.py | 48 +--------- healthcareio/analytics.py | 37 ++++++-- healthcareio/healthcare-io.py | 165 +++++++++++++++++++++++++--------- healthcareio/params.py | 4 +- healthcareio/parser.py | 28 +++++- 5 files changed, 185 insertions(+), 97 deletions(-) diff --git a/healthcareio/__init__.py b/healthcareio/__init__.py index 97f80a5..64f7099 100644 --- a/healthcareio/__init__.py +++ b/healthcareio/__init__.py @@ -14,50 +14,6 @@ Usage : Embedded : """ -import healthcareio -import os -import requests -import platform -import sqlite3 as lite -from transport import factory -import json -#import healthcareio.params as params -PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) -OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) - -def register (**args) : - """ - This function will reset/register a user i.e they will download the configuration - :email user's email address - :url url of the provider to register - """ - URL = "https://healthcareio.the-phi.com" if 'url' not in args else args['url'] - - args['out_folder'] = os.sep.join([args['path'],args['out_folder']]) - email = args['email'] - url = args['url'] if 'url' in args else URL - folders = [PATH,OUTPUT_FOLDER] - for path in folders : - if not os.path.exists(path) : - os.mkdir(path) - - # - # - headers = {"email":email,"client":platform.node()} - http = requests.session() - r = http.post(url,headers=headers) - - # - # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}} - # if 'store' in args : - # store = args['store'] - filename = (os.sep.join([PATH,'config.json'])) - info = r.json() #{"parser":r.json(),"store":store} - info = dict({"owner":email},**info) - info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql - info['out-folder'] = OUTPUT_FOLDER - - file = open( filename,'w') - file.write( json.dumps(info)) - file.close() +from healthcareio import analytics +# from healthcareio import server diff --git a/healthcareio/analytics.py b/healthcareio/analytics.py index 7ae8b6d..d9f393a 100644 --- a/healthcareio/analytics.py +++ b/healthcareio/analytics.py @@ -175,7 +175,8 @@ class Apex : @staticmethod def scalar(item): _df = item['data'] - name = str(_df.columns[0]) + print (_df) + name = _df.columns.tolist()[0] value = _df[name].values.round(2)[0] html = '
:value
:label
' if value > 999 and value < 1000000 : @@ -240,9 +241,15 @@ class Apex : if type(y) == list : y = y[0] axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x'] + if not set(axis['x']) & set(df.columns.tolist()) : + print (set(axis['x']) & set(df.columns.tolist())) + print (axis['x']) + print (df.columns) + # df.columns = axis['x'] series = [] _min=_max = 0 for x in axis['x'] : + series += [{"data": df[x].values.tolist()[:N],"name":x.upper().replace('_',' ')}] _min = df[x].min() if df[x].min() < _min else _min _max = df[x].max() if df[x].max() > _max else _max @@ -317,6 +324,12 @@ class engine : _config = json.loads(f.read()) self.store_config = _config['store'] self.info = _config['analytics'] + _args = self.store_config + if self.store_config['type'] == 'mongo.MongoWriter' : + _args['type'] = 'mongo.MongoReader' + else: + _args['type'] = 'disk.SQLiteReader' + self.reader = transport.factory.instance(**_args) def apply (self,**args) : """ @@ -332,19 +345,26 @@ class engine : analytics = [analytics[index]] _info = list(analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] - conn = lite.connect(self.store_config['args']['path'],isolation_level=None) - conn.create_aggregate("stdev",1,stdev) - + # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) + # conn.create_aggregate("stdev",1,stdev) + DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' r = [] for row in _info : for item in row['pipeline'] : - item['data'] = pd.read_sql(item['sql'],conn) + # item['data'] = pd.read_sql(item['sql'],conn) + query = {DB_TYPE:item[DB_TYPE]} + item['data'] = self.reader.read(**item) if 'serialize' in args : - item['data'] = json.dumps(item['data'].to_dict(orient='record')) + + item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data'] + else: + item['data'] = (pd.DataFrame(item['data'])) + + # if 'info' in item: # item['info'] = item['info'].replace(":rows",str(item["data"].shape[0])) - conn.close() + # conn.close() return _info @@ -540,4 +560,5 @@ css = """ # print (p[2]['pipeline'][0]['data']) # e.export (p[0]) # features = ['diagnosis.code'] -# split(folder = folder, features=features) \ No newline at end of file +# split(folder = folder, features=features) + diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py index 9c52009..53f14a1 100644 --- a/healthcareio/healthcare-io.py +++ b/healthcareio/healthcare-io.py @@ -32,10 +32,18 @@ Usage : from healthcareio.params import SYS_ARGS from transport import factory import requests + +from healthcareio import analytics +from healthcareio import server from healthcareio.parser import get_content import os import json import sys +import numpy as np +from multiprocessing import Process +import time + + PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) INFO = None @@ -60,7 +68,8 @@ def register (**args) : # # - headers = {"email":email,"client":platform.node()} + store = args['store'] if 'store' in args else 'sqlite' + headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']} http = requests.session() r = http.post(url,headers=headers) @@ -82,22 +91,6 @@ def register (**args) : # Create the sqlite3 database to -def analytics(**args): - """ - This fucntion will only compute basic distributions of a given feature for a given claim - @args - @param x: vector of features to process - @param apply: operation to be applied {dist} - """ - if args['apply'] in ['dist','distribution'] : - """ - This section of the code will return the distribution of a given space. - It is intended to be applied on several claims/remits - """ - x = pd.DataFrame(args['x'],columns=['x']) - return x.groupby(['x']).size().to_frame().T.to_dict(orient='record') - - def log(**args): """ This function will perform a log of anything provided to it @@ -152,7 +145,39 @@ def parse(**args): return get_content(args['filename'],CONFIG,SECTION) +def apply(files,store_info,logger_info=None): + """ + :files list of files to be processed in this given thread/process + :store_info information about data-store, for now disk isn't thread safe + :logger_info information about where to store the logs + """ + if not logger_info : + logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) + else: + logger = factory.instance(**logger_info) + + writer = factory.instance(**store_info) + for filename in files : + + if filename.strip() == '': + continue + # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) + # + try: + content,logs = parse(filename = filename,type=SYS_ARGS['parse']) + if content : + writer.write(content) + if logs : + [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] + else: + logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) + except Exception as e: + logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) + # print ([filename,len(content)]) + # + # @TODO: forward this data to the writer and log engine + # def upgrade(**args): """ :email provide us with who you are @@ -175,8 +200,9 @@ if __name__ == '__main__' : email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init'] url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com' - - register(email=email,url=url) + store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite' + db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db'] + register(email=email,url=url,store=store,db=db) # else: # m = """ # usage: @@ -218,46 +244,95 @@ if __name__ == '__main__' : # CONFIG = CONFIG[ int(SYS_ARGS['version'])] # else: # CONFIG = CONFIG[-1] + logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) if info['store']['type'] == 'disk.DiskWriter' : info['store']['args']['path'] += (os.sep + 'healthcare-io.json') elif info['store']['type'] == 'disk.SQLiteWriter' : # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3') pass + + if info['store']['type'] == 'disk.SQLiteWriter' : info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower() else: + # + # if we are working with no-sql we will put the logs in it (performance )? info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower() + _info = json.loads(json.dumps(info['store'])) + _info['args']['doc'] = 'logs' + logger = factory.instance(**_info) + writer = factory.instance(**info['store']) - logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) + + # + # we need to have batches ready for this in order to run some of these queries in parallel + # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet) + # - Make sure we can leverage this on n-cores later on, for now the assumption is a single core + # + BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch']) + #logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'}) # schema = info['schema'] # for key in schema : # sql = schema[key]['create'] # writer.write(sql) - for filename in files : + files = np.array_split(files,BATCH_COUNT) + procs = [] + index = 0 + for row in files : + + row = row.tolist() + logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) + proc = Process(target=apply,args=(row,info['store'],_info,)) + proc.start() + procs.append(proc) + index = index + 1 + while len(procs) > 0 : + procs = [proc for proc in procs if proc.is_alive()] + time.sleep(2) + # for filename in files : + + # if filename.strip() == '': + # continue + # # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) + # # + # try: + # content,logs = parse(filename = filename,type=SYS_ARGS['parse']) + # if content : + # writer.write(content) + # if logs : + # [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] + # else: + # logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) + # except Exception as e: + # logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) + # # print ([filename,len(content)]) + # # + # # @TODO: forward this data to the writer and log engine + # # + - if filename.strip() == '': - continue - # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) - # - try: - content,logs = parse(filename = filename,type=SYS_ARGS['parse']) - if content : - writer.write(content) - if logs : - [logger.write(_row) for _row in logs] - else: - logger.write({"name":filename,"completed":True,"rows":len(content)}) - except Exception as e: - logger.write({"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) - # print ([filename,len(content)]) - # - # @TODO: forward this data to the writer and log engine - # pass + elif 'analytics' in SYS_ARGS : + PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500 + DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0 + SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else '' + # + # + + # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) + + e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible + e.apply(type='claims',serialize=True) + SYS_ARGS['engine'] = e + + pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False) + pthread = Process(target=pointer,args=()) + pthread.start() + elif 'export' in SYS_ARGS: # # this function is designed to export the data to csv @@ -267,7 +342,17 @@ if __name__ == '__main__' : if set([format]) not in ['xls','csv'] : format = 'csv' - + else: + msg = """ + CLI Usage + healthcare-io.py --register --store + healthcare-io.py --parse claims --folder [--batch ] + healthcare-io.py --parse remits --folder [--batch ] + parameters : + --<[signup|init]> signup or get a configuration file from a parsing server + --store data store mongo or sqlite + """ + print(msg) pass # """ # The program was called from the command line thus we are expecting diff --git a/healthcareio/params.py b/healthcareio/params.py index 517d356..a9d2bc9 100644 --- a/healthcareio/params.py +++ b/healthcareio/params.py @@ -8,10 +8,12 @@ if len(sys.argv) > 1: value = None if sys.argv[i].startswith('--'): key = sys.argv[i][2:] #.replace('-','') + SYS_ARGS[key] = 1 + if i + 1 < N: value = sys.argv[i + 1] = sys.argv[i+1].strip() - if key and value: + if key and value and not value.startswith('--'): SYS_ARGS[key] = value diff --git a/healthcareio/parser.py b/healthcareio/parser.py index b499f76..f15ae14 100644 --- a/healthcareio/parser.py +++ b/healthcareio/parser.py @@ -91,6 +91,27 @@ def format_date(value) : return "-".join([year,month,day]) def format_time(value): return ":".join([value[:2],value[2:] ])[:5] +def sv2_parse(value): + # + # @TODO: Sometimes there's a suffix (need to inventory all the variations) + # + if '>' in value or ':' in value: + xchar = '>' if '>' in value else ':' + _values = value.split(xchar) + modifier = {} + + if len(_values) > 2 : + + modifier= {"code":_values[2]} + if len(_values) > 3 : + modifier['type'] = _values[3] + _value = {"code":_values[1],"type":_values[0]} + if modifier : + _value['modifier'] = modifier + + return _value + else: + return value def format_proc(value): for xchar in [':','<'] : if xchar in value and len(value.split(xchar)) > 1 : @@ -110,11 +131,11 @@ def format_pos(value): x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None} return x -def get_map(row,config,version): +def get_map(row,config,version=None): label = config['label'] if 'label' in config else None - omap = config['map'] if version not in config else config[version] + omap = config['map'] if not version or version not in config else config[version] anchors = config['anchors'] if 'anchors' in config else [] if type(row[0]) == str: object_value = {} @@ -136,6 +157,9 @@ def get_map(row,config,version): if type(value) == dict : for objkey in value : + + if type(value[objkey]) == dict : + continue if 'syn' in config and value[objkey] in config['syn'] : value[objkey] = config['syn'][ value[objkey]] value = {key:value} if key not in value else value