diff --git a/healthcareio/analytics.py b/healthcareio/analytics.py index 442ad60..1171998 100644 --- a/healthcareio/analytics.py +++ b/healthcareio/analytics.py @@ -192,18 +192,23 @@ class Apex : @staticmethod def scalar(item): _df = item['data'] - - name = _df.columns.tolist()[0] - value = _df[name].values.round(2)[0] + value = '0' + unit = '' html = '
:value
:label
' - if value > 999 and value < 1000000 : - value = " ".join([str(np.divide(value,1000).round(2)),"K"]) - elif value > 999999 : - #@ Think of considering the case of a billion ... - value = " ".join([str(np.divide(value,1000000).round(2)),"M"]) - else: - value = str(value) - unit = name.replace('_',' ') if 'unit' not in item else item['unit'] + if _df.shape[0] > 0 : + print (_df) + print ('_____________________________________') + name = _df.columns.tolist()[0] + value = _df[name].values[0] + + if value > 999 and value < 1000000 : + value = " ".join([str(np.divide(value,1000).round(2)),"K"]) + elif value > 999999 : + #@ Think of considering the case of a billion ... + value = " ".join([str(np.divide(value,1000000).round(2)),"M"]) + else: + value = str(value) + unit = name.replace('_',' ') if 'unit' not in item else item['unit'] return {'html':html.replace(':value',value).replace(":label",unit)} @staticmethod def column(item): @@ -319,8 +324,9 @@ class Apex : values = df[x_cols].values.round(2).tolist() else: labels = [name.upper().replace('_',' ') for name in df.columns.tolist()] - df = df.astype(float) - values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist() + # df = df.astype(float) + # values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist() + values = df[[name for name in df.columns if df[name].dtype in [float,int]] ].values.round(2).tolist() colors = COLORS[:len(values)] options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}} @@ -343,10 +349,10 @@ class engine : self.store_config = _config['store'] self.info = _config['analytics'] _args = self.store_config - if self.store_config['type'] == 'mongo.MongoWriter' : - _args['type'] = 'mongo.MongoReader' - else: - _args['type'] = 'disk.SQLiteReader' + if 'type' not in self.store_config : + # + # This is the newer version of data-transport + self.store_config['context'] = 'read' self.store_config = _args ; def filter (self,**args): @@ -367,8 +373,8 @@ class engine : # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) # conn.create_aggregate("stdev",1,stdev) DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' - if DB_TYPE == 'mongo' : - self.store_config['args']['doc'] = args['type'] + # if DB_TYPE == 'mongo' : + # self.store_config['args']['doc'] = args['type'] self.reader = transport.factory.instance(**self.store_config) r = [] @@ -414,20 +420,8 @@ class engine : _analytics = [_analytics[index]] _info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] - # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) - # conn.create_aggregate("stdev",1,stdev) - # - # @TODO: Find a better way to handle database variance - # - # DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' - - if 'mongo' in self.store_config['type'] : - DB_TYPE='mongo' - else: - DB_TYPE='sql' - self.store_config['args']['table'] = args['type'] - self.reader = transport.factory.instance(**self.store_config) + DB_TYPE = 'mongo' if self.store_config ['provider'] in ['mongodb','mongo'] else 'sql' r = [] for row in _info : pipeline = row['pipeline'] @@ -440,14 +434,22 @@ class engine : continue if DB_TYPE == 'sql' : query = {"sql":query} + else: + query = {DB_TYPE:query} - item['data'] = self.reader.read(**query) #item) + _df = self.reader.read(**query) #item) + print (query) + print (self.reader) if 'serialize' in args : # item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data'] - item['data'] = json.dumps(item['data'].to_dict('record')) if type(item['data']) == pd.DataFrame else item['data'] + item['data'] = json.dumps(_df.to_dict(orient='record')) else: - item['data'] = (pd.DataFrame(item['data'])) + # item['data'] = (pd.DataFrame(item['data'])) + item['data'] = _df + pass + print (_df.head()) + break pipeline[index] = item index += 1 # diff --git a/healthcareio/server/__init__.py b/healthcareio/server/__init__.py index 861b438..3bf9cb7 100644 --- a/healthcareio/server/__init__.py +++ b/healthcareio/server/__init__.py @@ -225,6 +225,7 @@ if __name__ == '__main__' : PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) # # Adjusting configuration with parameters (batch,folder,resume) + SYS_ARGS['config'] = json.loads(open(PATH).read()) if 'args' not in SYS_ARGS['config'] : SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"} diff --git a/healthcareio/server/index.py b/healthcareio/server/index.py index 99f3709..533bb67 100644 --- a/healthcareio/server/index.py +++ b/healthcareio/server/index.py @@ -6,15 +6,16 @@ import json app = Flask(__name__) @app.route("/favicon.ico") def _icon(): - return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']), + return send_from_directory(os.path.join([app.root_path, 'static/img/logo.svg']), 'favicon.ico', mimetype='image/vnd.microsoft.icon') @app.route("/") def init(): e = SYS_ARGS['engine'] sections = {"remits":e.info['835'],"claims":e.info['837']} - _args = {"sections":sections,"store":SYS_ARGS['config']['store']} - print (SYS_ARGS['config']['store']) - return render_template("setup.html",**_args) + + _args = {"sections":sections,"store":SYS_ARGS['config']['store'],'args':{'batch':5}} + + return render_template("index.html",**_args) @app.route("/format//",methods=['POST']) def _format(id,index): @@ -73,13 +74,16 @@ def reload(): if __name__ == '__main__' : PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500 - DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0 + DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 1 SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else '' # # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) - + # + if os.path.exists(PATH) : + SYS_ARGS['config'] = json.loads(open(PATH).read()) e = healthcareio.analytics.engine(PATH) # e.apply(type='claims',serialize=True) SYS_ARGS['engine'] = e + app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True) \ No newline at end of file diff --git a/healthcareio/server/proxy.py b/healthcareio/server/proxy.py index 2ad95c6..f50fd34 100644 --- a/healthcareio/server/proxy.py +++ b/healthcareio/server/proxy.py @@ -37,14 +37,17 @@ class get : @staticmethod def processes(_args): - _info = pd.DataFrame(smart.top.read(name='healthcare-io.py'))[['name','cpu','mem']] + APP_NAME ='healthcare-io' + _info = smart.top.read(name=APP_NAME) #pd.DataFrame(smart.top.read(name='healthcare-io'))[['name','cpu','mem']] + if _info.shape[0] == 0 : - _info = pd.DataFrame({"name":["healthcare-io.py"],"cpu":[0],"mem":[0]}) + _info = pd.DataFrame({"name":[APP_NAME],"cpu":[0],"mem":[0]}) # _info = pd.DataFrame(_info.groupby(['name']).sum()) # _info['name'] = ['healthcare-io.py'] m = {'cpu':'CPU','mem':'RAM','name':'name'} - _info.columns = [m[name] for name in _info.columns.tolist()] + _info = _info.rename(columns=m) + # _info.columns = [m[name] for name in _info.columns.tolist() if name in m] _info.index = np.arange(_info.shape[0]) charts = [] @@ -56,23 +59,20 @@ class get : {"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}} )['apex'] ) - # - # This will update the counts for the processes, upon subsequent requests so as to show the change - # - N = 0 - lprocs = [] - for proc in get.PROCS : - if proc.is_alive() : - lprocs.append(proc) - N = len(lprocs) - get.PROCS = lprocs - return {"process":{"chart":charts,"counts":N}} + + return {"process":{"chart":charts,"counts":_info.shape[0]}} @staticmethod def files (_args): - _info = smart.folder.read(path='/data') + folder = _args['args']['folder'] + _info = smart.folder.read(path=folder) + N = _info.files.tolist()[0] - if 'mongo' in _args['store']['type'] : - store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"}) + store_args = _args['store'].copy() + store_args['context'] = 'read' + + # if 'mongo' in _args['store']['type'] : + if _args['store']['provider'] in ['mongo', 'mongodb']: + # store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"}) # reader = transport.factory.instance(**_args) pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}] @@ -83,12 +83,15 @@ class get : else: - store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"}) - store_args['args']['table'] = 'logs' + # store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"}) + + # store_args['args']['table'] = 'logs' + store_args['table'] = 'logs' query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"} _query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits reader = transport.factory.instance(**store_args) _info = pd.DataFrame(reader.read(**query)) + if not _info.shape[0] : _info = pd.DataFrame({"status":["completed"],"count":[0]}) _info['count'] = np.round( (_info['count'] * 100 )/N,2) @@ -97,11 +100,6 @@ class get : # # Let us classify the files now i.e claims / remits # - - - # pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}] - # _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} - # r = pd.DataFrame(reader.read(mongo=_args)) r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex'] r['chart']['height'] = '100%' diff --git a/healthcareio/server/templates/setup.html b/healthcareio/server/templates/setup.html index df2fdb8..1cc70cf 100644 --- a/healthcareio/server/templates/setup.html +++ b/healthcareio/server/templates/setup.html @@ -63,10 +63,10 @@ // // We should insure the listeners are enabled if(monitor.listen.handler == null){ - monitor.listen.handler = setInterval( + /*monitor.listen.handler = setInterval( function(){ console.log('running ...') - monitor.data()},5000) + monitor.data()},5000)*/ } }else{ diff --git a/healthcareio/x12/__init__.py b/healthcareio/x12/__init__.py index 3760e69..c46a7e3 100644 --- a/healthcareio/x12/__init__.py +++ b/healthcareio/x12/__init__.py @@ -24,6 +24,7 @@ import sys from itertools import islice from multiprocessing import Process import transport +from transport import providers import jsonmerge import copy @@ -236,7 +237,54 @@ class Parser (Process): _config[_id] = [_config[_id]] config['parser'] = _config return config - + @staticmethod + def init(**_args): + """ + This function allows to initialize the database that will store the claims if need be + :path configuration file + """ + PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) + filename = os.sep.join([PATH,'config.json']) + + filename = _args['path'] if 'path' in _args else filename + info = None + if os.path.exists(filename): + # + # Loading the configuration file (JSON format) + file = open(filename) + info = json.loads(file.read()) + + + OUTPUT_FOLDER = info['out-folder'] + if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) : + os.mkdir(OUTPUT_FOLDER) + elif 'output-folder' in info and not os.path.exists(info['out-folder']) : + os.mkdir(info['out-folder']) + # if 'type' in info['store'] : + lwriter = None + IS_SQL = False + if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' : + lwriter = transport.factory.instance(**info['store']) + IS_SQL = True + elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' : + lwriter = transport.instance(**info['store']) ; + IS_SQL = [providers.SQLITE,providers.POSTGRESQL,providers.NETEZZA,providers.MYSQL,providers.MARIADB] + + if lwriter and IS_SQL: + for key in info['schema'] : + if key != 'logs' : + _id = 'claims' if key == '837' else 'remits' + else: + _id = key + + if not lwriter.has(table=_id) : + lwriter.apply(info['schema'][key]['create']) + + # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)] + lwriter.close() + + return info + def __init__(self,path): """ :path path of the configuration file (it can be absolute)