diff --git a/healthcareio/export/export.py b/healthcareio/export/export.py index f520d6f..ed7baaf 100644 --- a/healthcareio/export/export.py +++ b/healthcareio/export/export.py @@ -34,7 +34,7 @@ from datetime import datetime import copy import requests import time - +from healthcareio.x12 import Parser PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json']) STORE_URI = 'http://healthcareio.the-phi.com/store/healthcareio' @@ -82,7 +82,7 @@ def meta(config) : key = _cache['key'] if 'map' in config[key]: config[key]['map'][field] = -100 - + add_index = {} #-- tells if we should add _index attribute or not for prefix in config : # if 'map' in config[prefix] : # label = list(set(['label','field']) & set(config[prefix].keys())) @@ -95,22 +95,35 @@ def meta(config) : if '@ref' in config[prefix] : #and set(['label','field','map']) & set(config[prefix]['@ref'].keys()): for subprefix in config[prefix]['@ref'] : + _entry = config[prefix]['@ref'][subprefix] + _id = list(set(['label','field']) & set(config[prefix]['@ref'][subprefix].keys())) + _id = _id[0] + table = config[prefix]['@ref'][subprefix][_id] + add_index[table] = 1 if _id == 'label' else 0 if 'map' in _entry : + _info += get_field(_entry) else: _info += list(_entry.keys()) if set(['label','field','map']) & set(config[prefix].keys()): _entry = config[prefix] + _id = list(set(['label','field']) & set(config[prefix].keys())) + if _id : + _id = _id[0] + table = config[prefix][_id] + add_index[table] = 1 if _id == 'label' else 0 + + if 'map' in _entry : _info += get_field(_entry) - - + + # # We need to organize the fields appropriately here # - + # print (_info) fields = {"main":[],"rel":{}} for row in _info : if type(row) == str : @@ -118,16 +131,36 @@ def meta(config) : fields['main'] = list(set(fields['main'])) fields['main'].sort() else : + _id = list(set(add_index.keys()) & set(row.keys())) + + if _id : + + _id = _id[0] + + if add_index[_id] == 1 : + row[_id]+= ['_index'] + + if _id not in fields['rel']: + + fields['rel'][_id] = row[_id] + else: + fields['rel'][_id] += row[_id] + else: + print ( _entry) + + _id = list(row.keys())[0] + fields['rel'][_id] = row[_id] if _id not in fields['rel'] else fields['rel'][_id] + row[_id] + - fields['rel'] = jsonmerge.merge(fields['rel'],row) - + return fields def create (**_args) : skip = [] if 'skip' not in _args else _args['skip'] fields = ([_args['key']] if 'key' in _args else []) + _args['fields'] fields = ['_id'] + list(set(fields)) + fields.sort() table = _args['table'] - sql = ['CREATE TABLE :table ',"(",",\n".join(["\t".join(["\t",name,"VARCHAR(125)"]) for name in fields]),")"] + sql = ['CREATE TABLE :table ',"(",",\n".join(["\t".join(["\t",name,"VARCHAR(125)"]) for name in fields]),")" ] return " ".join(sql) def read (**_args) : """ @@ -141,32 +174,80 @@ def read (**_args) : # @TODO: Find a way to write the data into a data-store # - use dbi interface with pandas or stream it in # -def init (**_args) : +def init_sql(**_args): + """ + This function expresses how we can generically read data stored in JSON format from a relational table + :param type 835,837 + :param skip list of fields to be skipped + """ + # + # we should acknowledge global variables CONFIG,CUSTOM_CONFIG + TYPE = _args['type'] + _config = CONFIG['parser'][TYPE][0] + TABLE_NAME = 'claims' if TYPE== '837' else 'remits' + if TYPE in CUSTOM_CONFIG : + _config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE]) + # + _info = meta(_config) + _projectSQLite = [] #-- sqlite projection + for field_name in _info['main'] : + _projectSQLite += ["json_extract(data,'$."+field_name+"') "+field_name] + _projectSQLite = ",".join(_projectSQLite) #-- Wrapping up SQLITE projection on main table + SQL = "SELECT DISTINCT claims.id _id,:fields FROM :table, json_each(data)".replace(":fields",_projectSQLite).replace(":table",TABLE_NAME) + r = [{"table":TABLE_NAME,"read":{"sql":SQL},"sql":create(table=TABLE_NAME,fields=_info['main'])}] + for table in _info['rel'] : + # + # NOTE: Adding _index to the fields + fields = _info['rel'][table] #+["_index"] + + + project = [TABLE_NAME+".id _id","json_extract(data,'$.claim_id') as claim_id"] + fn_prefix = "json_extract(x.value,'$." if '_index' not in _info['rel'][table] else "json_extract(i.value,'$." + for field_name in fields : + # project += ["json_extract(x.value,'$."+field_name+"') "+field_name] + project += [fn_prefix+field_name+"') "+field_name] + SQL = "SELECT DISTINCT :fields FROM "+TABLE_NAME+", json_each(data) x, json_each(x.value) i where x.key = ':table'" + SQL = SQL.replace(":table",table).replace(":fields",",".join(project)) + r += [{"table":table,"read":{"sql":SQL},"sql":create(table=table,key='claim_id',fields=fields)}] + return r +def init(**_args): + if 'provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'sqlite' : + return init_sql(**_args) + else: + return init_mongo(**_args) +def init_mongo (**_args) : """ This function is intended to determine the number of tables to be created, as well as their type. :param type {835,837} :param skip list of fields to be skipped """ TYPE = _args['type'] - SKIP = _args['skip'] if 'skip' in _args else [] + # SKIP = _args['skip'] if 'skip' in _args else [] _config = CONFIG['parser'][TYPE][0] if TYPE in CUSTOM_CONFIG : _config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE]) + _info = meta(_config) # # @TODO: implement fields to be skipped ... # TABLE_NAME = 'claims' if TYPE== '837' else 'remits' - _info = meta(_config) + # project = dict.fromkeys(["_id","claim_id"]+_info['main'],1) project = {} + for field_name in _info['main'] : _name = "".join(["$",field_name]) project[field_name] = {"$ifNull":[_name,""]} + project["_id"] = 1 project = {"$project":project} - r = [{"table":TABLE_NAME,"mongo":{"aggregate":TABLE_NAME,"pipeline":[project],"cursor":{},"allowDiskUse":True},"sql":create(table=TABLE_NAME,fields=_info['main'])}] + # _projectSQLite = ",".join(_projectSQLite) #-- Wrapping up SQLITE projection on main table + + r = [{"table":TABLE_NAME,"read":{"mongo":{"aggregate":TABLE_NAME,"pipeline":[project],"cursor":{},"allowDiskUse":True}},"sql":create(table=TABLE_NAME,fields=_info['main'])}] + + for table in _info['rel'] : # # NOTE: Adding _index to the fields @@ -180,7 +261,8 @@ def init (**_args) : project["_id"] = 1 # pipeline = [{"$match":{"procedures":{"$nin":[None,'']}}},{"$unwind":"$"+table},{"$project":project}] pipeline = [{"$match": {table: {"$nin": [None, ""]}}},{"$unwind":"$"+table},{"$project":project}] - r += [{"table":table,"mongo":{"aggregate":TABLE_NAME,"cursor":{},"pipeline":pipeline,"allowDiskUse":True},"sql":create(table=table,key='claim_id',fields=fields)}] + cmd = {"mongo":{"aggregate":TABLE_NAME,"cursor":{},"pipeline":pipeline,"allowDiskUse":True}} + r += [{"table":table,"read":cmd,"sql":create(table=table,key='claim_id',fields=fields)}] return r @@ -207,9 +289,14 @@ class Factory: global PATH global CONFIG global CUSTOM_CONFIG - PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json']) - if os.path.exists(PATH): - CONFIG = json.loads((open(PATH)).read()) + + PATH = _args['config'] + + # if 'config' in _args : + # PATH = _args['config'] + # else: + # PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json']) + CONFIG = Parser.setup(PATH) CUSTOM_PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','custom']) if os.path.exists(CUSTOM_PATH) and os.listdir(CUSTOM_PATH) : @@ -217,31 +304,49 @@ class Factory: CUSTOM_CONFIG = json.loads((open(CUSTOM_PATH)).read()) _features = Factory.license(email=CONFIG['owner']) - store = copy.deepcopy(CONFIG['store']) - store['type']='mongo.MongoReader' - + X12_TYPE = _args['type'] + store = copy.deepcopy(CONFIG['store']) #-- reading the original data + # + # Formatting accordingly just in case + if 'provider' in store : + if 'table' in store: + store['table'] = 'claims' if X12_TYPE == '837' else 'remits' + store['context'] ='read' + else: + pass + # store['type']='mongo.MongoReader' + wstore = _args['write_store'] #-- output data store - TYPE = _args['type'] - PREFIX = 'clm_' if TYPE == '837' else 'era_' - SCHEMA = '' if 'schema' not in wstore['args'] else wstore['args']['schema'] - _config = CONFIG['parser'][TYPE][0] - if TYPE in CUSTOM_CONFIG : - _config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE]) + + PREFIX = 'clm_' if X12_TYPE == '837' else 'era_' + # SCHEMA = '' if 'schema' not in wstore['args'] else wstore['args']['schema'] + SCHEMA = '' if 'schema' not in wstore else wstore['schema'] + _config = CONFIG['parser'][X12_TYPE][0] + if X12_TYPE in CUSTOM_CONFIG : + _config = jsonmerge.merge(_config,CUSTOM_CONFIG[X12_TYPE]) # _info = meta(_config) - job_args = init(type=TYPE) + job_args = init(type=X12_TYPE) #-- getting the queries that will generate the objects we are interested in # print (json.dumps(job_args)) _jobs = [] for row in job_args: # _store = json.loads(json.dumps(wstore)) _store = copy.deepcopy(wstore) - _store['args']['table'] = row['table'] + # _store['args']['table'] = row['table'] + if 'type' in _store : + _store['args']['table'] = row['table'] + else: + + _store['table'] = row['table'] _pipe = [ workers.CreateSQL(prefix=PREFIX,schema=SCHEMA,store=_store,sql=row['sql']), - workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,mongo=row['mongo'],max_rows=250000,features=_features,table=row['table']), + # workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,mongo=row['mongo'],max_rows=250000,features=_features,table=row['table']), + workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,read=row['read'],max_rows=250000,features=_features,table=row['table']), + workers.Writer(prefix=PREFIX,schema=SCHEMA,store=_store) ] _jobs += [workers.Subject(observers=_pipe,name=row['table'])] + return _jobs # if __name__ == '__main__' : diff --git a/healthcareio/export/workers.py b/healthcareio/export/workers.py index 7b65b09..234cb3c 100644 --- a/healthcareio/export/workers.py +++ b/healthcareio/export/workers.py @@ -93,9 +93,6 @@ class Worker : self._apply() except Exception as error: pass - # print () - # print (error) - # print () finally: self.caller.notify() @@ -119,17 +116,18 @@ class CreateSQL(Worker) : def __init__(self,**_args): super().__init__(**_args) self._sql = _args['sql'] + def init(self,**_args): super().init(**_args) def _apply(self) : - - sqltable = self.tablename(self._info['args']['table']) + sqltable = self._info['table'] if 'provider' in self._info else self._info['args']['table'] + sqltable = self.tablename(sqltable) # log = {"context":self.name(),"args":{"table":self._info['args']['table'],"sql":self._sql}} log = {"context":self.name(),"args":{"table":sqltable,"sql":self._sql.replace(":table",sqltable)}} try: - + writer = transport.factory.instance(**self._info) writer.apply(self._sql.replace(":table",sqltable)) writer.close() @@ -153,9 +151,13 @@ class Reader(Worker): super().__init__(**_args) - self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL + # self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL + + # self.pipeline = _args['mongo'] if 'mongo' in _args else _args['sql'] + self.pipeline = _args['read'] ; self.MAX_ROWS = _args['max_rows'] - self.table = _args['table'] + self.table = _args['table'] #-- target table + # is_demo = 'features' not in _args or ('features' in _args and ('export_etl' not in _args['features'] or _args['features']['export_etl'] == 0)) # @@ -174,26 +176,36 @@ class Reader(Worker): def init(self,**_args): super().init(**_args) self.rows = [] + def _apply(self): try: + self.reader = transport.factory.instance(**self._info) ; - self.rows = self.reader.read(mongo=self.pipeline) + # print (self.pipeline) + # self.rows = self.reader.read(mongo=self.pipeline) + self.rows = self.reader.read(**self.pipeline) + + if type(self.rows) == pd.DataFrame : + self.rows = self.rows.to_dict(orient='records') + # if 'provider' in self._info and self._info['provider'] == 'sqlite' : + # self.rows = self.rows.apply(lambda row: json.loads(row.data),axis=1).tolist() N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1 N = int(N) # self.rows = rows - _log = {"context":self.name(),"args":self._info['args']['db'], "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}} + _log = {"context":self.name(), "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}} self.rows = np.array_split(self.rows,N) - + # self.get = lambda : rows #np.array_split(rows,N) self.reader.close() self.status = 1 # except Exception as e : - log['status'] = 0 - log['info'] = {"error":e.args[0]} + _log['status'] = 0 + _log['info'] = {"error":e.args[0]} + print (e) self.log(**_log) @@ -206,6 +218,9 @@ class Reader(Worker): class Writer(Worker): def __init__(self,**_args): super().__init__(**_args) + if 'provider' in self._info : + self._info['context'] = 'write' + def init(self,**_args): """ :param store output data-store needed for writing @@ -215,25 +230,35 @@ class Writer(Worker): self._invalues = _args['invalues'] + def _apply(self): # table = self._info['args']['table'] if 'table' in self._info['args'] else 'N/A' - table = self.tablename(self._info['args']['table']) + # table = self.tablename(self._info['args']['table']) + if 'provider' in self._info : + table = self.tablename(self._info['table']) + self._info['table'] = table + else: + table = self.tablename(self._info['args']['table']) + self._info['args']['table'] = table - self._info['args']['table'] = table; writer = transport.factory.instance(**self._info) + index = 0 if self._invalues : for rows in self._invalues : # print (['segment # ',index,len(rows)]) - self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}}) - if len(rows) : + + # self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}}) + + if len(rows) > 0: # # @TODO: Upgrade to mongodb 4.0+ and remove the line below # Upon upgrade use the operator "$toString" in export.init function # rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows] + writer.write(rows) index += 1 # for _e in rows : diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py index 07279da..0145dda 100644 --- a/healthcareio/healthcare-io.py +++ b/healthcareio/healthcare-io.py @@ -46,6 +46,7 @@ import time from healthcareio import x12 from healthcareio.export import export import smart +import transport from healthcareio.server import proxy import pandas as pd @@ -58,8 +59,19 @@ if not os.path.exists(PATH) : import platform import sqlite3 as lite # PATH = os.sep.join([os.environ['HOME'],'.edi-parser']) +CONFIG_FILE = os.sep.join([PATH,'config.json']) if 'config' not in SYS_ARGS else SYS_ARGS['config'] HELP_MESSAGE = """ cli: + # + # Signup, allows parsing configuration to be downloaded + # + + # Support for SQLite3 + healthcare-io.py --signup steve@the-phi.com --store sqlite + + #or support for mongodb + healthcare-io.py --signup steve@the-phi.com --store mongo + healthcare-io.py --<[signup|init]> --store [--batch ] healthcare-io.py --parse --folder [--batch ] [--resume] @@ -100,7 +112,8 @@ def signup (**args) : # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}} # if 'store' in args : # store = args['store'] - filename = (os.sep.join([PATH,'config.json'])) + # filename = (os.sep.join([PATH,'config.json'])) + filename = CONFIG_FILE info = r.json() #{"parser":r.json(),"store":store} info = dict({"owner":email},**info) info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql @@ -121,101 +134,48 @@ def log(**args): pass def init(): """ - read all the configuration from the + read all the configuration from disk. + Requirements for configuration file : + {out-folder,store,837,835 } """ - filename = os.sep.join([PATH,'config.json']) + # filename = os.sep.join([PATH,'config.json']) + filename = CONFIG_FILE info = None if os.path.exists(filename): + # + # Loading the configuration file (JSON format) file = open(filename) info = json.loads(file.read()) - if not os.path.exists(info['out-folder']) : - os.mkdir(info['out-folder']) - if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) : - conn = lite.connect(info['store']['args']['path'],isolation_level=None) + + if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) : + os.mkdir(OUTPUT_FOLDER) + elif 'output-folder' in info and not os.path.exists(info['out-folder']) : + os.mkdir(info['out-folder']) + # if 'type' in info['store'] : + lwriter = None + is_sqlite = False + if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) : + lwriter = transport.factory.instance(**info['store']) + is_sqlite = True + elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' : + lwriter = transport.instance(**info['store']) ; + is_sqlite = True + if lwriter and is_sqlite: for key in info['schema'] : - _sql = info['schema'][key]['create'] - # r = conn.execute("select * from sqlite_master where name in ('claims','remits')") - conn.execute(_sql) - conn.commit() - conn.close() - - return info -# -# Global variables that load the configuration files - -# def parse(**args): -# """ -# This function will parse the content of a claim or remittance (x12 format) give the following parameters -# :filename absolute path of the file to be parsed -# :type claims|remits in x12 format -# """ -# global INFO -# if not INFO : -# INFO = init() -# if args['type'] == 'claims' : -# CONFIG = INFO['parser']['837'] -# elif args['type'] == 'remits' : -# CONFIG = INFO['parser']['835'] -# else: -# CONFIG = None -# if CONFIG : -# # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0] -# CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1] -# SECTION = CONFIG['SECTION'] -# os.environ['HEALTHCAREIO_SALT'] = INFO['owner'] + if key != 'logs' : + _id = 'claims' if key == '837' else 'remits' + else: + _id = key + + if not lwriter.has(table=_id) : + lwriter.apply(info['schema'][key]['create']) - -# return get_content(args['filename'],CONFIG,SECTION) -# def resume (files,id,config): -# _args = config['store'].copy() -# if 'mongo' in config['store']['type'] : -# _args['type'] = 'mongo.MongoReader' -# reader = factory.instance(**_args) -# _files = [] -# if 'resume' in config['analytics'] : -# _args = config['analytics']['resume'][id] -# _files = reader.read(**_args) -# _files = [item['name'] for item in _files if item['name'] != None] -# return list(set(files) - set(_files)) - - # return files - # pass -# def apply(files,store_info,logger_info=None): -# """ -# :files list of files to be processed in this given thread/process -# :store_info information about data-store, for now disk isn't thread safe -# :logger_info information about where to store the logs -# """ + # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)] + lwriter.close() -# if not logger_info : -# logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) -# else: -# logger = factory.instance(**logger_info) + return info -# writer = factory.instance(**store_info) -# for filename in files : - -# if filename.strip() == '': -# continue -# # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) -# # -# try: -# content,logs = parse(filename = filename,type=SYS_ARGS['parse']) - -# if content : -# writer.write(content) -# if logs : -# [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] -# else: -# logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) -# except Exception as e: - -# logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) -# # print ([filename,len(content)]) -# # -# # @TODO: forward this data to the writer and log engine - # def upgrade(**args): """ :email provide us with who you are @@ -295,7 +255,7 @@ if __name__ == '__main__' : # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't # if 'resume' in SYS_ARGS : - store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() ) + store_config = json.loads( (open(CONFIG_FILE)).read() ) files = proxy.get.resume(files,store_config ) # print (["Found ",len(files)," files unprocessed"]) # @@ -314,7 +274,9 @@ if __name__ == '__main__' : row = row.tolist() # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) # proc = Process(target=apply,args=(row,info['store'],_info,)) - parser = x12.Parser(os.sep.join([PATH,'config.json'])) + # parser = x12.Parser(os.sep.join([PATH,'config.json'])) + + parser = x12.Parser(CONFIG_FILE) parser.set.files(row) parser.start() procs.append(parser) @@ -335,11 +297,11 @@ if __name__ == '__main__' : # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) - if os.path.exists(os.sep.join([PATH,'config.json'])) : - e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible + if os.path.exists(CONFIG_FILE) : + e = analytics.engine(CONFIG_FILE) #--@TODO: make the configuration file globally accessible e.apply(type='claims',serialize=True) SYS_ARGS['engine'] = e - SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read()) + SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read()) else: SYS_ARGS['config'] = {"owner":None,"store":None} @@ -355,8 +317,8 @@ if __name__ == '__main__' : elif 'check-update' in SYS_ARGS : _args = {"url":SYS_ARGS['url']} try: - if os.path.exists(os.sep.join([PATH,'config.json'])) : - SYS_ARGS['config'] = json.loads((open(os.sep.join([PATH,'config.json']))).read()) + if os.path.exists(CONFIG_FILE) : + SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read()) else: SYS_ARGS['config'] = {} if 'version' in SYS_ARGS['config'] : @@ -379,18 +341,23 @@ if __name__ == '__main__' : # # this function is designed to export the data to csv # - path = SYS_ARGS['config'] - TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835' - if not os.path.exists(path) or TYPE not in ['835','837']: + path = SYS_ARGS['export-config'] + + X12_TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835' + if not os.path.exists(path) or X12_TYPE not in ['835','837']: print (HELP_MESSAGE) else: # # Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ... # - _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())} - pipes = export.Factory.instance(type=TYPE,write_store=_store) #"inspect":0,"cast":0}}) + # _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())} + _store = json.loads( (open(path) ).read()) + + pipes = export.Factory.instance(type=X12_TYPE,write_store=_store,config = CONFIG_FILE) #"inspect":0,"cast":0}}) # pipes[0].run() + # print (pipes) + for thread in pipes: @@ -399,12 +366,7 @@ if __name__ == '__main__' : thread.start() time.sleep(1) thread.join() - - # print (Subject.cache) - # while pipes : - # pipes = [thread for thread in pipes if thread.is_alive()] - # time.sleep(1) - + else: diff --git a/healthcareio/server/__init__.py b/healthcareio/server/__init__.py index 165e93e..861b438 100644 --- a/healthcareio/server/__init__.py +++ b/healthcareio/server/__init__.py @@ -4,7 +4,7 @@ import healthcareio.analytics import os import json import time -import smart +# import smart import transport import pandas as pd import numpy as np diff --git a/healthcareio/x12/__init__.py b/healthcareio/x12/__init__.py index 5f49cca..3760e69 100644 --- a/healthcareio/x12/__init__.py +++ b/healthcareio/x12/__init__.py @@ -54,10 +54,10 @@ class Formatters : """ This function is designed to split an x12 row and """ + value = [] if row.startswith(prefix) is False: - - + for row_value in row.replace('~','').split(sep) : if '>' in row_value and not row_value.startswith('HC'): @@ -184,7 +184,7 @@ class Formatters : else: return value - + def procedure(self,value): for xchar in [':','<','|','>'] : @@ -204,8 +204,10 @@ class Formatters : _value = str(value) return _value def diagnosis(self,value): - return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1] + def parse_loc(self,value): + if ':' in value : + return dict(zip(['place_of_service','claim_indicator','claim_frequency'],value.split(':'))) def pos(self,value): """ formatting place of service information within a segment (REF) @@ -218,6 +220,23 @@ class Formatters : x = {"place_of_service":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"place_of_service":x[0],"indicator":None,"frequency":None} return x class Parser (Process): + @staticmethod + def setup (path): + # self.config = _config['parser'] + config = json.loads(open(path).read()) + _config = config['parser'] + # + # The parser may need some editing provided, this allows ease of developement and using alternate configurations + # + if type(_config['837']) == str or type(_config['835']) == str : + for _id in ['837','835'] : + if type(_config[_id]) == str and os.path.exists(_config[_id]): + _config[_id] = json.loads(open(_config[_id]).read()) + if type(_config[_id]) == dict : + _config[_id] = [_config[_id]] + config['parser'] = _config + return config + def __init__(self,path): """ :path path of the configuration file (it can be absolute) @@ -227,9 +246,21 @@ class Parser (Process): self.get = void() self.get.value = self.get_map self.get.default_value = self.get_default_value - _config = json.loads(open(path).read()) + # _config = json.loads(open(path).read()) self._custom_config = self.get_custom(path) + # self.config = _config['parser'] + # # + # # The parser may need some editing provided, this allows ease of developement and using alternate configurations + # # + # if type(self.config['837']) == str or type(self.config['835']) == str : + # for _id in ['837','835'] : + # if type(self.config[_id]) == str: + # self.config[_id] = json.loads(open(self.config[_id]).read()) + # if type(self.config[_id]) == dict : + # self.config[_id] = [self.config[_id]] + _config = Parser.setup(path) self.config = _config['parser'] + self.store = _config['store'] self.cache = {} self.files = [] @@ -261,10 +292,10 @@ class Parser (Process): def set_files(self,files): self.files = files def get_map(self,row,config,version=None): - + # label = config['label'] if 'label' in config else None handler = Formatters() - + if 'map' not in config and hasattr(handler,config['apply']): pointer = getattr(handler,config['apply']) @@ -274,10 +305,11 @@ class Parser (Process): # # Pull the goto configuration that skips rows # - omap = config['map'] if not version or version not in config else config[version] anchors = config['anchors'] if 'anchors' in config else [] rewrite = config['rewrite'] if 'rewrite' in config else {} + if len(row) == 2 and row[0] == 'HI' : + row = ([row[0]] + row[1].split(':')) if type(row[0]) == str: object_value = {} for key in omap : @@ -290,8 +322,7 @@ class Parser (Process): index = aindex + index if index < len(row) : - value = row[index] - + value = row[index] if 'cast' in config and key in config['cast'] and value.strip() != '' : if config['cast'][key] in ['float','int']: try: @@ -329,29 +360,17 @@ class Parser (Process): value = value[_key] else: value = "" - - - value = {key:value} if key not in value else value - - - else: if 'syn' in config and value in config['syn'] : # value = config['syn'][value] pass - if type(value) == dict : - - # object_value = dict(object_value, **value) - object_value = jsonmerge.merge(object_value, value) - - else: - + if type(value) == dict : + object_value = jsonmerge.merge(object_value, value) + else: object_value[key] = value - - else: # # we are dealing with a complex object @@ -361,14 +380,6 @@ class Parser (Process): value = self.get.value(row_item,config,version) object_value.append(value) - - # - # We need to add the index of the object it matters in determining the claim types - # - - # object_value.append( list(get_map(row_item,config,version))) - # object_value = {label:object_value} - return object_value def set_cache(self,tmp,_info) : """ @@ -379,6 +390,7 @@ class Parser (Process): value=_info['cache']['value'] field = _info['cache']['field'] if value in tmp : + self.cache [key] = {field:tmp[value]} pass def get_cache(self,row) : @@ -398,10 +410,7 @@ class Parser (Process): value = {} for row in content[:] : - - row = util.split(row.replace('\n','').replace('~','')) - _info = util.get.config(self.config[_code][0],row) if self._custom_config and _code in self._custom_config: _cinfo = util.get.config(self._custom_config[_code],row) @@ -458,7 +467,10 @@ class Parser (Process): name = _info['field'] # value[name] = tmp # value = jsonmerge.merge(value,{name:tmp}) - value = dict(value,**{name:tmp}) + if name not in value : + value = dict(value,**{name:tmp}) + else: + value[name] = dict(value[name],**tmp) else: value = dict(value,**tmp) @@ -486,12 +498,12 @@ class Parser (Process): TOP_ROW = content[1].split('*') SUBMITTED_DATE = util.parse.date(TOP_ROW[4]) - - CATEGORY= content[2].split('*')[1].strip() - VERSION = content[1].split('*')[-1].replace('~','').replace('\n','') + CATEGORY= content[2].split('*')[1].strip() + VERSION = content[1].split('*')[-1].replace('~','').replace('\n','') SENDER_ID = TOP_ROW[2] + row = util.split(content[3]) _info = util.get_config(self.config[_code][0],row) @@ -501,7 +513,8 @@ class Parser (Process): value["submitted"] = SUBMITTED_DATE value['sender_id'] = SENDER_ID - value = dict(value,**self.apply(content,_code)) + # value = dict(value,**self.apply(content,_code)) + value = jsonmerge.merge(value,self.apply(content,_code)) # Let's parse this for default values return value #jsonmerge.merge(value,self.apply(content,_code)) @@ -555,6 +568,7 @@ class Parser (Process): index = 0; _toprows = [] _default = None + for row in file : row = row.replace('\r','') @@ -665,25 +679,50 @@ class Parser (Process): for filename in self.files : content,logs,_code = self.read(filename) + self.finish(content,logs,_code) def finish(self,content,logs,_code) : args = self.store _args = json.loads(json.dumps(self.store)) - if args['type'] == 'mongo.MongoWriter' : - args['args']['doc'] = 'claims' if _code == '837' else 'remits' - _args['args']['doc'] = 'logs' + ISNEW_MONGO = 'provider' in args and args['provider'] in ['mongo', 'mongodb'] + ISLEG_MONGO = ('type' in args and args['type'] == 'mongo.MongoWriter') + if ISLEG_MONGO or ISNEW_MONGO: + if ISLEG_MONGO: + # Legacy specification ... + args['args']['doc'] = 'claims' if _code == '837' else 'remits' + _args['args']['doc'] = 'logs' + else: + args['doc'] = 'claims' if _code == '837' else 'remits' + _args['doc'] = 'logs' + else: - args['args']['table'] = 'claims' if _code == '837' else 'remits' - _args['args']['table'] = 'logs' - - if content : - writer = transport.factory.instance(**args) - writer.write(content) + if 'type' in args : + # Legacy specification ... + args['args']['table'] = 'claims' if _code == '837' else 'remits' + _args['args']['table'] = 'logs' + table = args['args']['table'] + else: + args['table']= 'claims' if _code == '837' else 'remits' + _args['table'] = 'logs' + table = args['table'] + + + writer = transport.factory.instance(**args) + IS_SQLITE = type(writer) == transport.disk.SQLiteWriter + if content: + if IS_SQLITE : + for row in content : + writer.apply("""insert into :table(data) values (':values')""".replace(":values",json.dumps(row)).replace(":table",table) ) + else: + writer.write(content) writer.close() - if logs : - + if logs : logger = transport.factory.instance(**_args) - logger.write(logs) + if IS_SQLITE: + for row in logs: + logger.apply("""insert into logs values (':values')""".replace(":values",json.dumps(row))) + else: + logger.write(logs) logger.close() if self.emit.post : diff --git a/setup.py b/setup.py index 531029f..05a9596 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { - "name":"healthcareio","version":"1.6.4.4", + "name":"healthcareio","version":"1.6.4.6", "author":"Vanderbilt University Medical Center", "author_email":"steve.l.nyemba@vumc.org", "include_package_data":True,