diff --git a/README.md b/README.md index 37fd19d..9fff556 100644 --- a/README.md +++ b/README.md @@ -22,16 +22,26 @@ We wrote this frame to be used in both command line or as a library within in yo pip install --upgrade git+https://hiplab.mc.vanderbilt.edu/git/lab/parse-edi.git ## Usage - + **cli :** 1. signup to get parsing configuration + The parser is driven by a configuration file that specifies fields to parse and how to parse them. You need by signing up, to get a copy of the configuration file. + healthcare-io.py --signup [--store ] - -2. parsing claims in a folder - healthcare-io.py --parse --folder [--batch ] [--resume] +2. check version + + Occasionally the attributes in the configuration file may change, This function will determine if there is a new version available. + + healthcare-io.py --check-update + +3. parsing data in a folder + + The parser will recursively traverse a directory with claims and or remittances + + healthcare-io.py --parse --folder [--batch ] [--resume] with : --parse tells the engine what to parse claims or remits @@ -39,12 +49,42 @@ We wrote this frame to be used in both command line or as a library within in yo --batch number of processes to spawn to parse the files --resume tells the parser to resume parsing if all files weren't processed or new files were added into the folder -3. dashboard - - There is a built-in dashboard that has displays descriptive analytics in a web browser - - healthcare-io.py --server [--context ] + +4. export data to a relational data-store + + The parser will export data into other data-stores as a relational tables allowing users to construct views to support a variety of studies. + + healthcare-io.py --export <835|837> --config + + with: + --config configuration to support data-store + + The configuration file needed to implement export is modelled after the following template: + + + { + "provider":"", + "db":"mydatabase", + [ + "host":"server-name","port":5432, + "user":"me","password":"!@#z4qm", + "schema":"target-schema" + ] + } + + **parameters:** + + provider postgresql,redshift,mysql or mariadb (supported providers) + db name of the database + + **optional:** + schema name of the target schema. If not provided we will assume the default + host host of the database. If not provided assuming localhost + port port value of the database if not provided the default will be used + user database user name. If not provided we assume security settings to trust + password password of database user. If not set we assume security settings to trust + **Embedded in Code :** The Healthcare/IO **parser** can be used within your code base as a library and handle storing data in a data store of choice diff --git a/healthcareio/Dockerfile b/healthcareio/Dockerfile index 5cf8fd8..23464ef 100644 --- a/healthcareio/Dockerfile +++ b/healthcareio/Dockerfile @@ -1,4 +1,8 @@ -FROM ubuntu:bionic-20200403 +# +# Let us create an image for healthcareio +# The image will contain the {X12} Parser and the +# FROM ubuntu:bionic-20200403 +FROM ubuntu:focal RUN ["apt","update","--fix-missing"] RUN ["apt-get","upgrade","-y"] @@ -6,9 +10,22 @@ RUN ["apt-get","-y","install","apt-utils"] RUN ["apt","update","--fix-missing"] RUN ["apt-get","upgrade","-y"] -RUN ["apt-get","install","-y","sqlite3","sqlite3-pcre","libsqlite3-dev","python3-dev","python3","python3-pip","git","python3-virtualenv"] +RUN ["apt-get","install","-y","mongo","sqlite3","sqlite3-pcre","libsqlite3-dev","python3-dev","python3","python3-pip","git","python3-virtualenv","wget"] # # +RUN ["pip3","install","--upgrade","pip"] +# RUN ["pip3","install","git+https://healthcare.the-phi.com/git/code/parser.git","botocore"] USER health-user +# +# This volume is where the data will be loaded from (otherwise it is assumed the user will have it in the container somehow) +# +VOLUME ["/data"] +# +# This is the port from which some degree of monitoring can/will happen +EXPOSE 80 +# wget https://healthcareio.the-phi.com/git/code/parser.git/bootup.sh +COPY bootup.sh bootup.sh +ENTRYPOINT ["bash","-C"] +CMD ["bootup.sh"] # VOLUME ["/home/health-user/healthcare-io/","/home-healthuser/.healthcareio"] -# RUN ["pip3","install","git+https://healthcareio.the-phi.com/git"] \ No newline at end of file +# RUN ["pip3","install","git+https://healthcareio.the-phi.com/git"] diff --git a/healthcareio/__init__.py b/healthcareio/__init__.py index d63c74f..30e8159 100644 --- a/healthcareio/__init__.py +++ b/healthcareio/__init__.py @@ -16,5 +16,9 @@ Usage : """ from healthcareio import analytics +from healthcareio import server +from healthcareio import export import healthcareio.x12 as x12 +import healthcareio.params as params + # from healthcareio import server diff --git a/healthcareio/analytics.py b/healthcareio/analytics.py index 875a8cd..442ad60 100644 --- a/healthcareio/analytics.py +++ b/healthcareio/analytics.py @@ -11,7 +11,7 @@ import transport import matplotlib.pyplot as plt import re, base64 # from weasyprint import HTML, CSS -COLORS = ["#f79256","#7dcfb6","#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"] +COLORS = ["#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"] class stdev : def __init__(self) : self.values = [] @@ -149,11 +149,16 @@ class Apex : This class will format a data-frame to work with Apex charting engine """ @staticmethod - def apply(item): + def apply(item,theme={'mode':'light','palette':'palette6'}): pointer = item['chart']['type'] if hasattr(Apex,pointer) : pointer = getattr(Apex,pointer) + options = pointer(item) + if 'apex' in options and 'colors' in options['apex'] : + del options['apex']['colors'] + if 'apex' in options : + options['apex']['theme'] = theme options['responsive']= [ { 'breakpoint': 1, @@ -168,6 +173,18 @@ class Apex : print ("Oops") pass @staticmethod + def radial(item): + df = item['data'] + x = item['chart']['axis']['x'] + y = item['chart']['axis']['y'] + + labels = df[y].tolist() + values = [float(np.round(value,2)) for value in df[x].tolist()] + chart = {"type":"radialBar","height":200} + option = {"chart":chart,"series":values,"labels":labels,"plotOptions":{"radialBar":{"hollow":{"size":"70%"}}}} + return {'apex':option} + + @staticmethod def scatter(item): options = Apex.spline(item) options['apex']['chart']['type'] = 'scatter' @@ -175,7 +192,7 @@ class Apex : @staticmethod def scalar(item): _df = item['data'] - print (_df) + name = _df.columns.tolist()[0] value = _df[name].values.round(2)[0] html = '
:value
:label
' @@ -235,16 +252,17 @@ class Apex : @TODO: alias this with bar (!= column) """ df = item['data'] + N = df.shape[0] if df.shape[0] < 10 else 10 axis = item['chart']['axis'] y = axis['y'] if type(y) == list : y = y[0] axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x'] - if not set(axis['x']) & set(df.columns.tolist()) : - print (set(axis['x']) & set(df.columns.tolist())) - print (axis['x']) - print (df.columns) + # if not set(axis['x']) & set(df.columns.tolist()) : + # print (set(axis['x']) & set(df.columns.tolist())) + # print (axis['x']) + # print (df.columns) # df.columns = axis['x'] series = [] _min=_max = 0 @@ -294,7 +312,6 @@ class Apex : values are x-axis """ df = item['data'] - if df.shape [0]> 1 : y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x'] labels = df[y_cols].values.tolist() @@ -302,10 +319,11 @@ class Apex : values = df[x_cols].values.round(2).tolist() else: labels = [name.upper().replace('_',' ') for name in df.columns.tolist()] + df = df.astype(float) values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist() colors = COLORS[:len(values)] - options = {"series":values,"colors":colors,"labels":labels,"chart":{"type":"donut"},"plotOptions":{"pie":{"customScale":.8}},"legend":{"position":"right"}} + options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}} return {"apex":options} pass @@ -329,43 +347,117 @@ class engine : _args['type'] = 'mongo.MongoReader' else: _args['type'] = 'disk.SQLiteReader' - self.reader = transport.factory.instance(**_args) + self.store_config = _args ; + + def filter (self,**args): + """ + type: claims or remits + filter optional identifier claims, procedures, taxonomy, ... + """ + + + _m = {'claim':'837','claims':'837','remits':'835','remit':'835'} + table = _m[ args['type']] + _analytics = self.info[table] + if 'index' in args : + index = int(args['index']) + _analytics = [_analytics[index]] + + _info = list(_analytics) #if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] + # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) + # conn.create_aggregate("stdev",1,stdev) + DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' + if DB_TYPE == 'mongo' : + self.store_config['args']['doc'] = args['type'] + + self.reader = transport.factory.instance(**self.store_config) + r = [] + for row in _info : + pipeline = row['pipeline'] + + index = 0 + for item in pipeline: + if not item[DB_TYPE] : + continue + query = {DB_TYPE:item[DB_TYPE]} + + df = pd.DataFrame(self.reader.read(**query)) #item) + df = df.fillna('N/A') + # item['data'] = df + chart = item['chart'] + pipe = {"data":df,"chart":chart} + for key in list(item.keys()) : + if key not in ["chart","data","mongo","sql","couch"] : + pipe[key] = item[key] + + + + r.append(pipe) + self.reader.close() + return {"id":_info[0]['id'],'pipeline':r} + def apply (self,**args) : """ type: claims or remits filter optional identifier claims, procedures, taxonomy, ... """ + + _m = {'claim':'837','claims':'837','remits':'835','remit':'835'} # key = '837' if args['type'] == 'claims' else '835' table = _m[ args['type']] - analytics = self.info[table] + + _analytics = self.info[table] if 'index' in args : index = int(args['index']) - analytics = [analytics[index]] + _analytics = [_analytics[index]] - _info = list(analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] + _info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) # conn.create_aggregate("stdev",1,stdev) - DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' + # + # @TODO: Find a better way to handle database variance + # + # DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' + + if 'mongo' in self.store_config['type'] : + DB_TYPE='mongo' + else: + DB_TYPE='sql' + self.store_config['args']['table'] = args['type'] + + self.reader = transport.factory.instance(**self.store_config) r = [] for row in _info : - - for item in row['pipeline'] : + pipeline = row['pipeline'] + index = 0 + for item in pipeline: # item['data'] = pd.read_sql(item['sql'],conn) - query = {DB_TYPE:item[DB_TYPE]} - item['data'] = self.reader.read(**item) + # query = {DB_TYPE:item[DB_TYPE]} + query = item[DB_TYPE] + if not query : + continue + if DB_TYPE == 'sql' : + query = {"sql":query} + + item['data'] = self.reader.read(**query) #item) if 'serialize' in args : - item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data'] + # item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data'] + item['data'] = json.dumps(item['data'].to_dict('record')) if type(item['data']) == pd.DataFrame else item['data'] else: item['data'] = (pd.DataFrame(item['data'])) - + pipeline[index] = item + index += 1 + # + # + row['pipeline']= pipeline # if 'info' in item: # item['info'] = item['info'].replace(":rows",str(item["data"].shape[0])) # conn.close() - + self.reader.close() return _info def _html(self,item) : diff --git a/healthcareio/docker/Dockerfile b/healthcareio/docker/Dockerfile new file mode 100644 index 0000000..4ff6d14 --- /dev/null +++ b/healthcareio/docker/Dockerfile @@ -0,0 +1,37 @@ +# +# Let us create an image for healthcareio +# The image will contain the {X12} Parser and the +# FROM ubuntu:bionic-20200403 +FROM ubuntu:focal +RUN ["apt-get","update","--fix-missing"] +RUN ["apt-get","upgrade","-y"] + +RUN ["apt-get","-y","install","apt-utils"] + +RUN ["apt-get","update","--fix-missing"] +RUN ["apt-get","upgrade","-y"] +RUN ["apt-get","install","-y","mongodb","sqlite3","sqlite3-pcre","libsqlite3-dev","python3-dev","python3","python3-pip","git","python3-virtualenv","wget"] +# +# +RUN ["pip3","install","--upgrade","pip"] +RUN ["pip3","install","numpy","pandas","git+https://dev.the-phi.com/git/steve/data-transport","botocore","matplotlib"] +# RUN ["pip3","install","git+https://healthcare.the-phi.com/git/code/parser.git","botocore"] +# RUN ["useradd", "-ms", "/bin/bash", "health-user"] +# USER health-user +# +# This volume is where the data will be loaded from (otherwise it is assumed the user will have it in the container somehow) +# +VOLUME ["/data","/app/healthcareio"] +WORKDIR /app +ENV PYTHONPATH="/app" + +# +# This is the port from which some degree of monitoring can/will happen +EXPOSE 80 +EXPOSE 27017 +# wget https://healthcareio.the-phi.com/git/code/parser.git/bootup.sh +COPY bootup.sh bootup.sh +ENTRYPOINT ["bash","-C"] +CMD ["bootup.sh"] +# VOLUME ["/home/health-user/healthcare-io/","/home-healthuser/.healthcareio"] +# RUN ["pip3","install","git+https://healthcareio.the-phi.com/git"] diff --git a/healthcareio/docker/bootup.sh b/healthcareio/docker/bootup.sh new file mode 100644 index 0000000..1ee561f --- /dev/null +++ b/healthcareio/docker/bootup.sh @@ -0,0 +1,10 @@ +set -e +/etc/init.d/mongodb start +cd /app +export +export PYTHONPATH=$PWD +ls +# python3 healthcareio/healthcare-io.py --signup $EMAIL --store mongo +# python3 healthcareio/healthcare-io.py --analytics --port 80 --debug + +bash diff --git a/healthcareio/docs/835-relational-structure.jpeg b/healthcareio/docs/835-relational-structure.jpeg new file mode 100644 index 0000000..465ba2f Binary files /dev/null and b/healthcareio/docs/835-relational-structure.jpeg differ diff --git a/healthcareio/docs/837-relational-structure.jpeg b/healthcareio/docs/837-relational-structure.jpeg new file mode 100644 index 0000000..a9e3171 Binary files /dev/null and b/healthcareio/docs/837-relational-structure.jpeg differ diff --git a/healthcareio/docs/claims-837-relational-structure.xmi b/healthcareio/docs/claims-837-relational-structure.xmi new file mode 100644 index 0000000..2a317e7 --- /dev/null +++ b/healthcareio/docs/claims-837-relational-structure.xmi @@ -0,0 +1,314 @@ + + + + + umbrello uml modeller http://umbrello.kde.org + 1.6.17 + UnicodeUTF8 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/healthcareio/docs/remits-835-relational-structure.xmi b/healthcareio/docs/remits-835-relational-structure.xmi new file mode 100644 index 0000000..5ec1a7c --- /dev/null +++ b/healthcareio/docs/remits-835-relational-structure.xmi @@ -0,0 +1,331 @@ + + + + + umbrello uml modeller http://umbrello.kde.org + 1.6.17 + UnicodeUTF8 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py index 037586c..61d4d8d 100644 --- a/healthcareio/healthcare-io.py +++ b/healthcareio/healthcare-io.py @@ -32,9 +32,10 @@ Usage : from healthcareio.params import SYS_ARGS from transport import factory import requests - from healthcareio import analytics from healthcareio import server + + from healthcareio.parser import get_content import os import json @@ -43,6 +44,10 @@ import numpy as np from multiprocessing import Process import time from healthcareio import x12 +from healthcareio.export import export +import smart +from healthcareio.server import proxy +import pandas as pd PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) @@ -53,10 +58,28 @@ if not os.path.exists(PATH) : import platform import sqlite3 as lite # PATH = os.sep.join([os.environ['HOME'],'.edi-parser']) -def register (**args) : +HELP_MESSAGE = """ + cli: + + healthcare-io.py --<[signup|init]> --store [--batch ] + healthcare-io.py --parse --folder [--batch ] [--resume] + healthcare-io.py --check-update + healthcare-io.py --export <835|837> --config + action : + --signup|init signup user and get configuration file + --parse starts parsing + --check checks for updates + --export export data of a 835 or 837 into another database + parameters : + --<[signup|init]> signup or get a configuration file from a parsing server + --folder location of the files (the program will recursively traverse it) + --store data store mongo or sqlite or mongodb + --resume will attempt to resume if there was an interruption + """ +def signup (**args) : """ :email user's email address - :url url of the provider to register + :url url of the provider to signup """ email = args['email'] @@ -121,77 +144,77 @@ def init(): # # Global variables that load the configuration files -def parse(**args): - """ - This function will parse the content of a claim or remittance (x12 format) give the following parameters - :filename absolute path of the file to be parsed - :type claims|remits in x12 format - """ - global INFO - if not INFO : - INFO = init() - if args['type'] == 'claims' : - CONFIG = INFO['parser']['837'] - elif args['type'] == 'remits' : - CONFIG = INFO['parser']['835'] - else: - CONFIG = None - if CONFIG : - # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0] - CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1] - SECTION = CONFIG['SECTION'] - os.environ['HEALTHCAREIO_SALT'] = INFO['owner'] +# def parse(**args): +# """ +# This function will parse the content of a claim or remittance (x12 format) give the following parameters +# :filename absolute path of the file to be parsed +# :type claims|remits in x12 format +# """ +# global INFO +# if not INFO : +# INFO = init() +# if args['type'] == 'claims' : +# CONFIG = INFO['parser']['837'] +# elif args['type'] == 'remits' : +# CONFIG = INFO['parser']['835'] +# else: +# CONFIG = None +# if CONFIG : +# # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0] +# CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1] +# SECTION = CONFIG['SECTION'] +# os.environ['HEALTHCAREIO_SALT'] = INFO['owner'] - return get_content(args['filename'],CONFIG,SECTION) -def resume (files,id,config): - _args = config['store'].copy() - if 'mongo' in config['store']['type'] : - _args['type'] = 'mongo.MongoReader' - reader = factory.instance(**_args) - _files = [] - if 'resume' in config['analytics'] : - _args = config['analytics']['resume'][id] - _files = reader.read(**_args) - _files = [item['name'] for item in _files if item['name'] != None] - return list(set(files) - set(_files)) +# return get_content(args['filename'],CONFIG,SECTION) +# def resume (files,id,config): +# _args = config['store'].copy() +# if 'mongo' in config['store']['type'] : +# _args['type'] = 'mongo.MongoReader' +# reader = factory.instance(**_args) +# _files = [] +# if 'resume' in config['analytics'] : +# _args = config['analytics']['resume'][id] +# _files = reader.read(**_args) +# _files = [item['name'] for item in _files if item['name'] != None] +# return list(set(files) - set(_files)) - return files - pass -def apply(files,store_info,logger_info=None): - """ - :files list of files to be processed in this given thread/process - :store_info information about data-store, for now disk isn't thread safe - :logger_info information about where to store the logs - """ + # return files + # pass +# def apply(files,store_info,logger_info=None): +# """ +# :files list of files to be processed in this given thread/process +# :store_info information about data-store, for now disk isn't thread safe +# :logger_info information about where to store the logs +# """ - if not logger_info : - logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) - else: - logger = factory.instance(**logger_info) +# if not logger_info : +# logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) +# else: +# logger = factory.instance(**logger_info) - writer = factory.instance(**store_info) - for filename in files : +# writer = factory.instance(**store_info) +# for filename in files : - if filename.strip() == '': - continue - # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) - # - try: - content,logs = parse(filename = filename,type=SYS_ARGS['parse']) +# if filename.strip() == '': +# continue +# # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) +# # +# try: +# content,logs = parse(filename = filename,type=SYS_ARGS['parse']) - if content : - writer.write(content) - if logs : - [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] - else: - logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) - except Exception as e: +# if content : +# writer.write(content) +# if logs : +# [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] +# else: +# logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) +# except Exception as e: - logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) - # print ([filename,len(content)]) - # - # @TODO: forward this data to the writer and log engine +# logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) +# # print ([filename,len(content)]) +# # +# # @TODO: forward this data to the writer and log engine # def upgrade(**args): """ @@ -200,13 +223,28 @@ def upgrade(**args): """ url = args['url'] if 'url' in args else URL+"/upgrade" headers = {"key":args['key'],"email":args["email"],"url":url} +def check(**_args): + """ + This function will check if there is an update available (versions are in the configuration file) + :param url + """ + url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url'] + url = url + "/version" + if 'version' not in _args : + version = {"_id":"version","current":0.0} + else: + version = _args['version'] + http = requests.session() + r = http.get(url) + return r.json() if __name__ == '__main__' : info = init() if 'out-folder' in SYS_ARGS : OUTPUT_FOLDER = SYS_ARGS['out-folder'] - + SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL + if set(list(SYS_ARGS.keys())) & set(['signup','init']): # # This command will essentially get a new copy of the configurations @@ -214,10 +252,10 @@ if __name__ == '__main__' : # email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init'] - url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com' + url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite' db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db'] - register(email=email,url=url,store=store,db=db) + signup(email=email,url=url,store=store,db=db) # else: # m = """ # usage: @@ -241,51 +279,31 @@ if __name__ == '__main__' : if 'file' in SYS_ARGS : files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else [] if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']): - names = os.listdir(SYS_ARGS['folder']) - files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))] + for root,_dir,f in os.walk(SYS_ARGS['folder']) : + + if f : + files += [os.sep.join([root,name]) for name in f] + + # names = os.listdir(SYS_ARGS['folder']) + # files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))] else: # - # raise an erro + # raise an error + pass # # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't # if 'resume' in SYS_ARGS : - files = resume(files,SYS_ARGS['parse'],info) - print (["Found ",len(files)," files unprocessed"]) + store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() ) + files = proxy.get.resume(files,store_config ) + # print (["Found ",len(files)," files unprocessed"]) # # @TODO: Log this here so we know what is being processed or not SCOPE = None if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']): - # logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) - # if info['store']['type'] == 'disk.DiskWriter' : - # info['store']['args']['path'] += (os.sep + 'healthcare-io.json') - # elif info['store']['type'] == 'disk.SQLiteWriter' : - # # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3') - # pass - - - # if info['store']['type'] == 'disk.SQLiteWriter' : - # info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower() - # _info = json.loads(json.dumps(info['store'])) - # _info['args']['table']='logs' - # else: - # # - # # if we are working with no-sql we will put the logs in it (performance )? - - # info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower() - # _info = json.loads(json.dumps(info['store'])) - # _info['args']['doc'] = 'logs' - # logger = factory.instance(**_info) - # writer = factory.instance(**info['store']) - - # - # we need to have batches ready for this in order to run some of these queries in parallel - # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet) - # - Make sure we can leverage this on n-cores later on, for now the assumption is a single core - # BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch']) files = np.array_split(files,BATCH_COUNT) @@ -304,27 +322,7 @@ if __name__ == '__main__' : while len(procs) > 0 : procs = [proc for proc in procs if proc.is_alive()] time.sleep(2) - # for filename in files : - - # if filename.strip() == '': - # continue - # # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) - # # - # try: - # content,logs = parse(filename = filename,type=SYS_ARGS['parse']) - # if content : - # writer.write(content) - # if logs : - # [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] - # else: - # logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) - # except Exception as e: - # logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) - # # print ([filename,len(content)]) - # # - # # @TODO: forward this data to the writer and log engine - # # - + pass @@ -337,103 +335,71 @@ if __name__ == '__main__' : # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) - e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible - e.apply(type='claims',serialize=True) - SYS_ARGS['engine'] = e + if os.path.exists(os.sep.join([PATH,'config.json'])) : + e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible + e.apply(type='claims',serialize=True) + SYS_ARGS['engine'] = e + SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read()) + else: + SYS_ARGS['config'] = {"owner":None,"store":None} + + if 'args' not in SYS_ARGS['config'] : + SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"} + me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist() + SYS_ARGS['me'] = me[0] #-- This key will identify the current process + pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False) pthread = Process(target=pointer,args=()) pthread.start() + elif 'check-update' in SYS_ARGS : + _args = {"url":SYS_ARGS['url']} + try: + if os.path.exists(os.sep.join([PATH,'config.json'])) : + SYS_ARGS['config'] = json.loads((open(os.sep.join([PATH,'config.json']))).read()) + else: + SYS_ARGS['config'] = {} + if 'version' in SYS_ARGS['config'] : + _args['version'] = SYS_ARGS['config']['version'] + version = check(**_args) + _version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version'] + if _version['current'] != version['current'] : + print () + print ("You need to upgrade your system to version to ",version['current']) + print ("\t- signup (for new configuration)") + print ("\t- use pip to upgrade the codebase") + else: + print () + print ("You are running the current configuraiton version ",_version['current']) + except Exception as e: + print (e) + pass elif 'export' in SYS_ARGS: # # this function is designed to export the data to csv # - format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv' - format = format.lower() - if set([format]) not in ['xls','csv'] : - format = 'csv' - - else: - msg = """ - cli: - - healthcare-io.py --<[signup|init]> --store [--batch ] - healthcare-io.py --parse claims --folder [--batch ] - healthcare-io.py --parse remits --folder [--batch ] [--resume] - - parameters : - --<[signup|init]> signup or get a configuration file from a parsing server - --store data store mongo or sqlite or mongodb - --resume will attempt to resume if there was an interruption - """ - print(msg) - pass - # """ - # The program was called from the command line thus we are expecting - # parse in [claims,remits] - # config os.sep.path.exists(path) - # folder os.sep.path.exists(path) - # store store () - # """ - # p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS) - # TYPE = { - # 'mongo':'mongo.MongoWriter', - # 'couch':'couch.CouchWriter', - # 'disk':'disk.DiskWriter' - # } - # INFO = { - # '837':{'scope':'claims','section':'HL'}, - # '835':{'scope':'remits','section':'CLP'} - # } - # if p : - # args = {} - # scope = SYS_ARGS['config'][:-5].split(os.sep)[-1] - # CONTEXT = INFO[scope]['scope'] - # # - # # @NOTE: - # # improve how database and data stores are handled. - # if SYS_ARGS['store'] == 'couch' : - # args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'} - # args['dbname'] = SYS_ARGS['db'] + path = SYS_ARGS['config'] + TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835' + if not os.path.exists(path) or TYPE not in ['835','837']: + print (HELP_MESSAGE) + else: + # + # Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ... + # + _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())} - # elif SYS_ARGS ['store'] == 'mongo': - # args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'} - # if SYS_ARGS['store'] in ['mongo','couch']: - # args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes' - # args['doc'] = CONTEXT + pipes = export.Factory.instance(type=TYPE,write_store=_store) #"inspect":0,"cast":0}}) + # pipes[0].run() + for thread in pipes: + thread.start() + time.sleep(1) + while pipes : + pipes = [thread for thread in pipes if thread.is_alive()] + time.sleep(1) + - # TYPE = TYPE[SYS_ARGS['store']] - # writer = factory.instance(type=TYPE,args=args) - # if SYS_ARGS['store'] == 'disk': - # writer.init(path = 'output-claims.json') - # logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"})) - # files = os.listdir(SYS_ARGS['folder']) - # CONFIG = json.loads(open(SYS_ARGS['config']).read()) - # SECTION = INFO[scope]['section'] - # for file in files : - # if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) : - # break - # else: - # filename = os.sep.join([SYS_ARGS['folder'],file]) - - # try: - # content,logs = get_content(filename,CONFIG,SECTION) - # except Exception as e: - # if sys.version_info[0] > 2 : - # logs = [{"filename":filename,"msg":e.args[0]}] - # else: - # logs = [{"filename":filename,"msg":e.message}] - # content = None - # if content : - - # writer.write(content) - # if logs: - - # logger.write(logs) - - - # pass - # else: - # print (__doc__) + else: + + print(HELP_MESSAGE) diff --git a/healthcareio/server/__init__.py b/healthcareio/server/__init__.py index 3910930..165e93e 100644 --- a/healthcareio/server/__init__.py +++ b/healthcareio/server/__init__.py @@ -3,7 +3,78 @@ from healthcareio.params import SYS_ARGS import healthcareio.analytics import os import json +import time +import smart +import transport +import pandas as pd +import numpy as np +from healthcareio import x12 +from healthcareio.export import export +from multiprocessing import Process +# from flask_socketio import SocketIO, emit, disconnect,send +from healthcareio.server import proxy +PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) app = Flask(__name__) +# socket_ = SocketIO(app) + +def resume (files): + _args = SYS_ARGS['config']['store'].copy() + if 'mongo' in SYS_ARGS['config']['store']['type'] : + _args['type'] = 'mongo.MongoReader' + reader = transport.factory.instance(**_args) + _files = [] + try: + pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}] + _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} + _files = reader.read(mongo = _args) + _files = [item['name'] for item in _files] + except Exception as e : + pass + + return list(set(files) - set(_files)) + + + +def run (): + # + # let's get the files in the folder (perhaps recursively traverse them) + # + FILES = [] + BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable) + + for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) : + if f : + FILES += [os.sep.join([root,name]) for name in f] + FILES = resume(FILES) + FILES = np.array_split(FILES,BATCH) + procs = [] + for FILE_GROUP in FILES : + + FILE_GROUP = FILE_GROUP.tolist() + # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) + # proc = Process(target=apply,args=(row,info['store'],_info,)) + parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json'])) + parser.set.files(FILE_GROUP) + parser.start() + procs.append(parser) + SYS_ARGS['procs'] = procs +# @socket_.on('data',namespace='/stream') +def push() : + _args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"}) + reader = transport.factory.instance(**_args) + pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}] + _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} + r = pd.DataFrame(reader.read(mongo=_args)) + r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r}) + # emit("update",r,json=True) + return r +# @socket_.on('connect') +# def client_connect(**r): +# print ('Connection received') +# print (r) +# push() +# pass + @app.route("/favicon.ico") def _icon(): return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']), @@ -12,7 +83,7 @@ def _icon(): def init(): e = SYS_ARGS['engine'] sections = {"remits":e.info['835'],"claims":e.info['837']} - _args = {"sections":sections} + _args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]} return render_template("index.html",**_args) @app.route("/format//",methods=['POST']) def _format(id,index): @@ -21,43 +92,117 @@ def _format(id,index): key = '837' if id == 'claims' else '835' index = int(index) # p = e.info[key][index] - p = e.apply(type=id,index=index) - - # + p = e.filter(type=id,index=index) + r = [] - for item in p[0]['pipeline'] : - _item= dict(item) - del _item['sql'] - del _item ['data'] - + for item in p['pipeline'] : + _item= dict(item) _item = dict(_item,**healthcareio.analytics.Apex.apply(item)) + del _item['data'] if 'apex' in _item or 'html' in _item: r.append(_item) - r = {"id":p[0]['id'],"pipeline":r} + + r = {"id":p['id'],"pipeline":r} return json.dumps(r),200 + @app.route("/get//",methods=['GET']) def get(id,index): e = SYS_ARGS['engine'] key = '837' if id == 'claims' else '835' index = int(index) # p = e.info[key][index] - p = e.apply(type=id,index=index) + p = e.filter(type=id,index=index) r = {} for item in p[0]['pipeline'] : _item= [dict(item)] - r[item['label']] = item['data'].to_dict(orient='record') - # del _item['sql'] - # del _item ['data'] - # print (item['label']) - # _item['apex'] = healthcareio.analytics.Apex.apply(item) - # if _item['apex']: - # r.append(_item) - - # r = {"id":p[0]['id'],"pipeline":r} + # r[item['label']] = item['data'].to_dict(orient='record') + r[item['label']] = item['data'].to_dict('record') return json.dumps(r),200 + +@app.route("/reset",methods=["POST"]) +def reset(): + return "1",200 +@app.route("/data",methods=['GET']) +def get_data (): + """ + This function will return statistical data about the services i.e general statistics about what has/been processed + """ + HEADER = {"Content-type":"application/json"} + _args = SYS_ARGS['config'] + options = dict(proxy.get.files(_args),**proxy.get.processes(_args)) + return json.dumps(options),HEADER +@app.route("/log/",methods=["POST","PUT","GET"]) +def log(id) : + HEADER = {"Content-Type":"application/json; charset=utf8"} + if id == 'params' and request.method in ['PUT', 'POST']: + info = request.json + _args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True} + # + # We should update the configuration + SYS_ARGS['config']['args'] = _args + PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) + write = lambda content: (open(PATH,'w')).write(json.dumps(content)) + proc = Process(target=write,args=(SYS_ARGS['config'],)) + proc.start() + return "1",HEADER + pass +@app.route("/io/",methods=['POST']) +def io_data(id): + if id == 'params' : + _args = request.json + # + # Expecting batch,folder as parameters + _args = request.json + _args['resume'] = True + print (_args) + # + # We should update the configuration + SYS_ARGS['config']['args'] = _args + # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) + try: + write = lambda content: (open(PATH,'w')).write(json.dumps(content)) + proc = Process(target=write,args=(SYS_ARGS['config'],)) + proc.start() + # proc.join() + return "1",200 + except Exception as e : + return "0",403 + pass + elif id == 'stop' : + stop() + pass + elif id == 'run' : + # run() + _args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]} + proxy.run(_args) + return "1",200 + pass + +@app.route("/export") +def export_form(): + _args = {"context":SYS_ARGS['context']} + return render_template("store.html",**_args) +@app.route("/export",methods=['POST','PUT']) +def apply_etl(): + _info = request.json + m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'} + if _info : + dest_args = {'type':m[_info['type']],"args": _info['content'] } + src_args = SYS_ARGS['config']['store'] + # print (_args) + # writer = transport.factory.instance(**_args) + proxy.publish(src_args,dest_args) + return "1",405 + + else: + return "0",404 +@app.route("/update") +def update(): + pass + return "0",405 @app.route("/reload",methods=['POST']) def reload(): # e = SYS_ARGS['engine'] @@ -74,11 +219,20 @@ if __name__ == '__main__' : PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500 DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0 SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else '' + # # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) + # + # Adjusting configuration with parameters (batch,folder,resume) + if 'args' not in SYS_ARGS['config'] : + SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"} + + SYS_ARGS['procs'] = [] + + # SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) e = healthcareio.analytics.engine(PATH) - # e.apply(type='claims',serialize=True) + e.apply(type='claims',serialize=False) SYS_ARGS['engine'] = e app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True) \ No newline at end of file diff --git a/healthcareio/server/index.py b/healthcareio/server/index.py index c61f130..99f3709 100644 --- a/healthcareio/server/index.py +++ b/healthcareio/server/index.py @@ -12,8 +12,9 @@ def _icon(): def init(): e = SYS_ARGS['engine'] sections = {"remits":e.info['835'],"claims":e.info['837']} - _args = {"sections":sections} - return render_template("index.html",**_args) + _args = {"sections":sections,"store":SYS_ARGS['config']['store']} + print (SYS_ARGS['config']['store']) + return render_template("setup.html",**_args) @app.route("/format//",methods=['POST']) def _format(id,index): diff --git a/healthcareio/server/proxy.py b/healthcareio/server/proxy.py new file mode 100644 index 0000000..2ad95c6 --- /dev/null +++ b/healthcareio/server/proxy.py @@ -0,0 +1,170 @@ +""" + This file serves as proxy to healthcare-io, it will be embedded into the API +""" +import os +import transport +import numpy as np +from healthcareio import x12 +import pandas as pd +import smart +from healthcareio.analytics import Apex +import time +class get : + PROCS = [] + PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) + @staticmethod + def resume (files,args): + """ + This function will determine the appropriate files to be processed by performing a simple complementary set operation against the logs + @TODO: Support data-stores other than mongodb + :param files list of files within a folder + :param _args configuration + """ + _args = args['store'].copy() + if 'mongo' in _args['type'] : + _args['type'] = 'mongo.MongoReader' + reader = transport.factory.instance(**_args) + _files = [] + try: + pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}] + _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} + _files = reader.read(mongo = _args) + _files = [item['name'] for item in _files] + except Exception as e : + pass + print ( [len(list(set(files) - set(_files))),' files to be processed']) + return list(set(files) - set(_files)) + + @staticmethod + def processes(_args): + _info = pd.DataFrame(smart.top.read(name='healthcare-io.py'))[['name','cpu','mem']] + + if _info.shape[0] == 0 : + _info = pd.DataFrame({"name":["healthcare-io.py"],"cpu":[0],"mem":[0]}) + # _info = pd.DataFrame(_info.groupby(['name']).sum()) + # _info['name'] = ['healthcare-io.py'] + m = {'cpu':'CPU','mem':'RAM','name':'name'} + _info.columns = [m[name] for name in _info.columns.tolist()] + _info.index = np.arange(_info.shape[0]) + + charts = [] + for label in ['CPU','RAM'] : + value = _info[label].sum() + df = pd.DataFrame({"name":[label],label:[value]}) + charts.append ( + Apex.apply( + {"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}} + )['apex'] + ) + # + # This will update the counts for the processes, upon subsequent requests so as to show the change + # + N = 0 + lprocs = [] + for proc in get.PROCS : + if proc.is_alive() : + lprocs.append(proc) + N = len(lprocs) + get.PROCS = lprocs + return {"process":{"chart":charts,"counts":N}} + @staticmethod + def files (_args): + _info = smart.folder.read(path='/data') + N = _info.files.tolist()[0] + if 'mongo' in _args['store']['type'] : + store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"}) + # reader = transport.factory.instance(**_args) + + pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}] + query = {"mongo":{"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}} + # _info = pd.DataFrame(reader.read(mongo={"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline})) + pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}] + _query = {"mongo":{"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}} #-- distribution claims/remits + + + else: + store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"}) + store_args['args']['table'] = 'logs' + query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"} + _query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits + reader = transport.factory.instance(**store_args) + _info = pd.DataFrame(reader.read(**query)) + if not _info.shape[0] : + _info = pd.DataFrame({"status":["completed"],"count":[0]}) + _info['count'] = np.round( (_info['count'] * 100 )/N,2) + + charts = [Apex.apply({"data":_info,"chart":{"type":"radial","axis":{"y":"status","x":"count"}}})['apex']] + # + # Let us classify the files now i.e claims / remits + # + + + # pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}] + # _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} + # r = pd.DataFrame(reader.read(mongo=_args)) + r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits + r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex'] + r['chart']['height'] = '100%' + r['legend']['position'] = 'bottom' + + charts += [r] + + + return {"files":{"counts":N,"chart":charts}} + + pass +# +# Process handling .... + + +def run (_args) : + """ + This function will run the jobs and insure as processes (as daemons). + :param _args system configuration + """ + FILES = [] + BATCH = int(_args['args']['batch']) #-- number of processes (poorly named variable) + + for root,_dir,f in os.walk(_args['args']['folder']) : + if f : + FILES += [os.sep.join([root,name]) for name in f] + FILES = get.resume(FILES,_args) + FILES = np.array_split(FILES,BATCH) + + for FILE_GROUP in FILES : + + FILE_GROUP = FILE_GROUP.tolist() + # logger.write({"process":index,"parse":_args['parse'],"file_count":len(row)}) + # proc = Process(target=apply,args=(row,info['store'],_info,)) + parser = x12.Parser(get.PATH) #os.sep.join([PATH,'config.json'])) + parser.set.files(FILE_GROUP) + parser.daemon = True + parser.start() + get.PROCS.append(parser) + time.sleep(3) + # + # @TODO:consider submitting an update to clients via publish/subscribe framework + # + return get.PROCS +def stop(_args): + for job in get.PROCS : + if job.is_alive() : + job.terminate() + get.PROCS = [] + # + # @TODO: consider submitting an update to clients via publish/subscribe framework + pass +def write(src_args,dest_args,files) : + # + # @TODO: Support for SQLite + pass +def publish (src_args,dest_args,folder="/data"): + FILES = [] + for root,_dir,f in os.walk(folder) : + if f : + FILES += [os.sep.join([root,name]) for name in f] + # + # @TODO: Add support for SQLite .... + + FILES = np.array_split(FILES,4) + diff --git a/healthcareio/server/static/css/default.css b/healthcareio/server/static/css/default.css index 0bded6c..5ea27b2 100644 --- a/healthcareio/server/static/css/default.css +++ b/healthcareio/server/static/css/default.css @@ -1,3 +1,4 @@ + .active { padding:4px; cursor:pointer; @@ -6,3 +7,51 @@ .active:hover{ border-bottom:2px solid #ff6500; } +input[type=text]{ + border:1px solid transparent; + background-color:#f3f3f3; + outline: 0px; + padding:8px; + font-weight:normal; + font-family:sans-serif; + color:black; +} +.active-button { + display:grid; + grid-template-columns: 32px auto; + gap:2px; + align-items:center; + border:2px solid #CAD5E0; + cursor:pointer; + +} +.active-button i {padding:4px;;} +.active-button:hover { border-color:#ff6500} + +.system {display:grid; grid-template-columns: 45% 1px auto; gap:20px; margin-left:5%; width:90%;} +.system .status .item {display:grid; grid-template-columns: 75px 8px auto; gap:2px;} +.input-form {display:grid; gap:2px;} +.input-form .item {display:grid; grid-template-columns: 125px auto; gap:2px; align-items:center;} +.input-form .item .label { font-weight:bold; padding-left:10px} +.fa-cog {color:#4682B4} +.fa-check {color:#00c6b3} +.fa-times {color:maroon} + +.code { + margin:4px; + background:#000000 ; + padding:8px; + font-family: 'Courier New', Courier, monospace; + color:#d3d3d3; + font-size:12px; + line-height: 2; +} + +.tabs {display:grid; grid-template-columns: repeat(3,1fr) auto; gap:0px; align-items:center; text-align: center;} +.tab {border:1px solid transparent; border-bottom-color:#D3D3D3; font-weight:bold; padding:4px} + +.tabs .selected {border-color:#CAD5E0; border-bottom-color:transparent; } +.system iframe {width:100%; height:100%; border:1px solid transparent;} +.data-info {height:90%; padding:8px;} +.fa-cloud {color:#4682B4} +.fa-database{color:#cc8c91} diff --git a/healthcareio/server/static/dialog.html b/healthcareio/server/static/dialog.html new file mode 100644 index 0000000..93a7600 --- /dev/null +++ b/healthcareio/server/static/dialog.html @@ -0,0 +1,21 @@ +
+
+
+
+
+
+
+ +
+
+
+ +
+
+
+ +
+
Ok
+
+
+
\ No newline at end of file diff --git a/healthcareio/server/static/img/smart-top.png b/healthcareio/server/static/img/smart-top.png new file mode 100644 index 0000000..11ce029 Binary files /dev/null and b/healthcareio/server/static/img/smart-top.png differ diff --git a/healthcareio/server/static/js/io/dialog.js b/healthcareio/server/static/js/io/dialog.js new file mode 100644 index 0000000..af49692 --- /dev/null +++ b/healthcareio/server/static/js/io/dialog.js @@ -0,0 +1,75 @@ +/*** + * This file will handle the dialog boxes as they and their associated configurations and function binding + */ +if (!dialog){ + var dialog = {} +} + +dialog.open = function(title,msg,pointer){ + if (sessionStorage.dialog == null){ + + + var http = HttpClient.instance() + http.get(sessionStorage.io_context+'/static/dialog.html',function(x){ + var html = x.responseText + jx.modal.show({html:html,id:'dialog'}) + $('.dialog .title').text(title) + $('.dialog .message .text').text(msg) + dialog.status.ask() + $('.dialog .action .active-button').on('click',pointer) + $('.dialog .title-bar .close').on('click',function(){dialog.close(0)}) + + }) + }else{ + var html = sessionStorage.dialog + jx.modal.show({html:html,id:'dialog'}) + dialog.status.ask() + $('.dialog .action .active-button').on('click',pointer) + $('.dialog .title-bar .close').on('click',function(){dialog.close(0)}) + + } +} +dialog.bind = function(pointer){ + if (pointer == null){ + pointer = dialog.close + } + $('.dialog .action .active-button').off() + $('.dialog .action .active-button').on('click',pointer) +} +dialog.close = function(delay){ + delay = (delay == null)?1750:delay + setTimeout(function(){ + if ( $('.dialog').length > 0){ + jx.modal.close() + } + },delay) +} +dialog.status = {} +dialog.status.wait = function(){ + $('.dialog .action .active-button').hide() +} +dialog.status.confirm = function(){ + $('.dialog .action .active-button').show() +} +dialog.status.busy = function(){ + $('.dialog .message #msg-icon').removeClass() + $('.dialog .message #msg-icon').addClass('fas fa-cog fa-4x fa-spin') + +} +dialog.status.fail = function(){ + $('.dialog .message #msg-icon').removeClass() + $('.dialog .message #msg-icon').addClass('fas fa-times fa-4x') +} +dialog.status.ask = function(){ + $('.dialog .message #msg-icon').removeClass() + $('.dialog .message #msg-icon').addClass('far fa-question-circle fa-4x') +} +dialog.status.warn = function(){ + $('.dialog .message #msg-icon').removeClass() + $('.dialog .message #msg-icon').addClass('fas fa-exclamation-triangle fa-4x') +} +dialog.status.success = function(){ + $('.dialog .message #msg-icon').removeClass() + $('.dialog .message #msg-icon').addClass('fas fa-check fa-4x') +} + diff --git a/healthcareio/server/static/js/io/healthcare.js b/healthcareio/server/static/js/io/healthcare.js new file mode 100644 index 0000000..8aa159a --- /dev/null +++ b/healthcareio/server/static/js/io/healthcare.js @@ -0,0 +1,15 @@ +if (!healthcare) { + var healthcare = {io:{}} +} +healthcare.io = {'dialog':dialog,'confirmed':confirmed,'reset':reset,'update':update,'run':run,'publish':publish} +healthcare.io.apply = function(){ + var value = $('.input-form .item .procs').val() + var folder= $('.input-form .item .folder').val() + $('.code .batch').html(value) + var http = HttpClient.instance() + http.setData({"batch":value,"resume":true,"folder":folder},"application/json") + http.post(sessionStorage.io_context+'/io/params',function(x){}) +} + + + diff --git a/healthcareio/server/static/js/io/io.js b/healthcareio/server/static/js/io/io.js new file mode 100644 index 0000000..a2d085f --- /dev/null +++ b/healthcareio/server/static/js/io/io.js @@ -0,0 +1,182 @@ +/** + * This file will depend on dialog.js (soft dependency). Some functions here will make calls to resources in dialog.js + */ + +var reset = function(){ + dialog.open('Healthcare/IO::Parser', 'Are you sure you would like to delete all data parsed? Click Ok to confirm',confirmed.reset) + +} +var update= function(){ + dialog.open('Healthcare/IO::Parser','Update will change parsing configuration. Would you like to continue ?',confirmed.update) +} +var run = function(){ + dialog.open('Healthcare/IO::Parser','Preparing parser, confirm to continue',confirmed.run) +} +var _queue = {socket:null} +var confirmed = {} +confirmed.run = function(){ + dialog.status.busy() + dialog.status.wait() + $('.dialog .message .text').html('Initiating Parsing ...') + setTimeout(function(){ + var http = HttpClient.instance() + http.post(sessionStorage.io_context+'/io/run',function(x){ + // dialog.handler = setInterval(function(){monitor.data()},750) + monitor.data() + //dialog.close() + + }) + + },1000) +} +confirmed.reset = function(){ + var uri = sessionStorage.io_context+'/reset' + var http= HttpClient.instance() + dialog.status.busy() + dialog.status.wait() + http.post(uri,function(x){ + setTimeout(function(){ + if (x.status == 200 && x.responseText == "1"){ + dialog.status.success() + $('.dialog .message .text').html('Reset Healthcare/IO::Parser was successful!
Dialog will be closing
') + dialog.close() + }else{ + dialog.status.fail() + + + } + + },2000) + }) +} + +confirmed.update = function(){ + var uri = sessionStorage.io_context+'/update' + var email = $('#email').val() + // + //-- validate the email + if (email.match(/^([^\s]+)@([^\s@]+)\.(org|com|edu|io)$/i)){ + dialog.status.wait() + dialog.status.busy() + var http = HttpClient.instance() + http.setData({"email":email},"application/son") + setTimeout(function(){ + http.post(uri,function(x){ + if(x.status == 200 && x.responseText == "1"){ + dialog.status.success() + }else{ + + dialog.status.fail() + $('.dialog .message .text').html('Error code '+x.status) + dialog.bind() + dialog.status.confirm() + $('.dialog .title-bar .title').html("Error found") + + } + }) + + },1000) + }else{ + dialog.status.fail() + dialog.bind() + $('.dialog .title-bar .title').text("Error found") + $('.dialog .message .text').html('Invvalid Email entered') + dialog.status.confirm() + } + +} + +/** + * This namespace is designed to export data to either the cloud or to a database + */ +var publish={set:{}} +publish.post = function(){ + + if($('.jxmodal').length > 0){ + jx.modal.close() + } + dialog.open('Export/ETL','Please wait') + dialog.status.busy() + + var http = HttpClient.instance() + http.setData(JSON.parse(sessionStorage.export),"application/json") + http.post(sessionStorage.io_context+'/export',function(x){ + if (x.status != 200){ + setTimeout(function(){ + $('.dialog .message .text').html('An error occurred with code '+x.status) + dialog.status.fail() + dialog.status.wait() + + },1500) + + } + // + // @TODO: Have progress be monitored for this bad boy i.e open the connection to socket and read in ... + // + }) + +} +publish.set.file = function(){ + var file = $('#file')[0].files[0] + $('.file .name').html(file.name) + var button = $('.cloud input').prop('disabled',true) + var div = $('.cloud .file .fa-file-upload')[0] + $(div).empty() + $(div).addClass('fas fa-cog fa-spin') + var reader = new FileReader() + reader.readAsText(file) + + + reader.onload = function(){ + _args = {"type":$('.cloud .id').html().trim(),"content":reader.result} + // _args = JSON.stringify(_args) + if (_args.content.match(/^\{.+/i) == null){ + content = _args.content.split('\n')[1].split(',') + _args.content = {'bucket':'healthcareio','access_key':content[0].trim(),'secret_key':content[1].trim()} + } + sessionStorage.export = JSON.stringify(_args) + } + + reader.onloadend = function(){ + setTimeout(function(){ + var div = $('.cloud .file .fa-cog')[0] + $(div).empty() + $(div).addClass('fas fa-check') + $(div).removeClass('fa-spin') + // jx.modal.close() + + //setTimeout(jx.modal.close,1500) + publish.post() + },2000) + } +} +publish.database = {} + +publish.database.init = function(id){ + // + // we are expecting id in {mongo,couch,postgresql,mysql,sqlite} + // @TODO: Account for cloud service brokers like dropbox, box, one-drive and google-drive + sessionStorage.export = "{}" + p = {'id':id} + if (id.match(/(mongodb|postgresql|mysql|sqlite|couchdb)/i)){ + var hide_id = '.store .cloud' + var show_id = '.store .database' + }else{ + // + // @TODO: generate an error message + var show_id = '.store .cloud' + var hide_id = '.store .database' + + } + var http = HttpClient.instance() + http.get(sessionStorage.io_context+'/export',function(x){ + var html = x.responseText + jx.modal.show({'html':html,'id':'dialog'}) + $(hide_id).hide(function(){ + $(show_id).show() + }) + + $('.store .id').text(id) + }) + +} \ No newline at end of file diff --git a/healthcareio/server/static/js/jx/rpc.js b/healthcareio/server/static/js/jx/rpc.js index 8dd08fe..22033f9 100755 --- a/healthcareio/server/static/js/jx/rpc.js +++ b/healthcareio/server/static/js/jx/rpc.js @@ -12,119 +12,128 @@ * Improve on how returned data is handled (if necessary). */ if(!jx){ - var jx = {} -} -/** - * These are a few parsers that can come in handy: - * urlparser: This parser is intended to break down a url parameter string in key,value pairs - */ - -function urlparser(url){ - if(url.toString().match(/\x3F/) != null){ - url = url.split('\x3F')[1] - - } - var p = url.split('&') ; - var r = {} ; - r.meta = [] ; - r.data = {} ; - var entry; - for(var i=0; i < p.length; i++){ - entry = p[i] ; - key = (entry.match('(.*)=') !=null)? entry.match('(.*)=')[1]:null ; - value = (entry.match('=(.*)$') != null)? entry.match('=(.*)$')[1]:null - if(key != null){ - key = key.replace('\x3F','') - r.meta.push(key) ; - r.data[key] = value ; - } - } - - return r.data; -} - -/** -* The following are corrections related to consistency in style & cohesion -*/ -jx.ajax = {} -jx.ajax.get = {} ; -jx.ajax.debug = null; -jx.ajax.get.instance = function(){ - var factory = function(){ - this.obj = {} - this.obj.headers = {} - this.obj.async = true; - this.setHeader = function(key,value){ - if(key.constructor != String && value == null){ - this.obj.headers = key ; - }else{ - this.obj.headers[key] = value; - } - } - this.setData = function(data){ - this.obj.data = data; - } - this.setAsync = function(flag){ - this.obj.async = (flag == true) ; - } - this.send = function(url,callback,method){ - - if(method == null){ - method = 'GET' - } - - p = jx.ajax.debug != null; - q = false; - if(p){ - q = jx.ajax.debug[url] != null; - } - - is_debuggable = p && q - - if(is_debuggable){ - x = {} ; - x.responseText = jx.ajax.debug [url] ; - callback(x) - }else{ - var http = new XMLHttpRequest() ; - http.onreadystatechange = function(){ - if(http.readyState == 4){ - - callback(http) - } - } - // - // In order to insure backward compatibility - // Previous versions allowed the user to set the variable on the wrapper (poor design) - if(this.async != null){ - this.setAsync(this.async) ; - } - http.open(method,url,this.obj.async) ; - for(key in this.obj.headers){ - value = this.obj.headers[key] ; - - http.setRequestHeader(key,value) - } - - http.send(this.obj.data) - } - - - } - this.put = function(url,callback){ - this.send(url,callback,'PUT') ; - } - this.get = function(url,callback){ - this.send(url,callback,'GET') ; - } - this.post = function(url,callback){ - this.send(url,callback,'POST') ; - } - }//-- end of the factory method - return new factory() ; -} - -// -// backward compatibility -jx.ajax.getInstance = jx.ajax.get.instance ; -var HttpClient = jx.ajax.get ; + var jx = {} + } + /** + * These are a few parsers that can come in handy: + * urlparser: This parser is intended to break down a url parameter string in key,value pairs + */ + + function urlparser(url){ + if(url.toString().match(/\x3F/) != null){ + url = url.split('\x3F')[1] + + } + var p = url.split('&') ; + var r = {} ; + r.meta = [] ; + r.data = {} ; + var entry; + for(var i=0; i < p.length; i++){ + entry = p[i] ; + key = (entry.match('(.*)=') !=null)? entry.match('(.*)=')[1]:null ; + value = (entry.match('=(.*)$') != null)? entry.match('=(.*)$')[1]:null + if(key != null){ + key = key.replace('\x3F','') + r.meta.push(key) ; + r.data[key] = value ; + } + } + + return r.data; + } + + /** + * The following are corrections related to consistency in style & cohesion + */ + jx.ajax = {} + jx.ajax.get = {} ; + jx.ajax.debug = null; + jx.ajax.get.instance = function(){ + var factory = function(){ + this.obj = {} + this.obj.headers = {} + this.obj.async = true; + this.setHeader = function(key,value){ + if(key.constructor != String && value == null){ + this.obj.headers = key ; + }else{ + this.obj.headers[key] = value; + } + } + this.setData = function(data,mimetype){ + if(mimetype == null) + this.obj.data = data; + else { + this.obj.headers['Content-Type'] = mimetype + if(mimetype.match(/application\/json/i)){ + this.obj.data = JSON.stringify(data) + } + } + + } + this.setAsync = function(flag){ + this.obj.async = (flag == true) ; + } + this.send = function(url,callback,method){ + + if(method == null){ + method = 'GET' + } + + p = jx.ajax.debug != null; + q = false; + if(p){ + q = jx.ajax.debug[url] != null; + } + + is_debuggable = p && q + + if(is_debuggable){ + x = {} ; + x.responseText = jx.ajax.debug [url] ; + callback(x) + }else{ + var http = new XMLHttpRequest() ; + http.onreadystatechange = function(){ + if(http.readyState == 4){ + + callback(http) + } + } + // + // In order to insure backward compatibility + // Previous versions allowed the user to set the variable on the wrapper (poor design) + if(this.async != null){ + this.setAsync(this.async) ; + } + http.open(method,url,this.obj.async) ; + for(key in this.obj.headers){ + value = this.obj.headers[key] ; + + http.setRequestHeader(key,value) + } + + http.send(this.obj.data) + } + + + } + this.put = function(url,callback){ + this.send(url,callback,'PUT') ; + } + this.get = function(url,callback){ + this.send(url,callback,'GET') ; + } + this.post = function(url,callback){ + this.send(url,callback,'POST') ; + } + }//-- end of the factory method + return new factory() ; + } + + // + // backward compatibility + jx.ajax.getInstance = jx.ajax.get.instance ; + var HttpClient = jx.ajax.get ; + \ No newline at end of file diff --git a/healthcareio/server/templates/header.html b/healthcareio/server/templates/header.html new file mode 100644 index 0000000..7dd35cd --- /dev/null +++ b/healthcareio/server/templates/header.html @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/healthcareio/server/templates/index.html b/healthcareio/server/templates/index.html index 915cddc..768618e 100644 --- a/healthcareio/server/templates/index.html +++ b/healthcareio/server/templates/index.html @@ -8,6 +8,7 @@ + @@ -15,6 +16,9 @@ + + + Healthcare/IO Analytics -
+
-
Healthcare/IO
-
Analytics Dashboard
+
Healthcare/IO :: Parser
+
Dashboard
-