diff --git a/README.md b/README.md index 4368909..6c0a632 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ We wrote this frame to be used in both command line or as a library within in yo 1. signup to get parsing configuration - The parser is driven by a configuration, file you need by signing up. + The parser is driven by a configuration file that specifies fields to parse and how to parse them. You need by signing up, to get a copy of the configuration file. healthcare-io.py --signup [--store ] @@ -49,7 +49,17 @@ We wrote this frame to be used in both command line or as a library within in yo --batch number of processes to spawn to parse the files --resume tells the parser to resume parsing if all files weren't processed or new files were added into the folder + +4. export data to a relational data-store + + The parser will export data into other data-stores as a relational tables allowing users to construct views to support a variety of studies. + + healthcare-io.py --export <835|837> --config + + with: + --config configuration to support data-store + **Embedded in Code :** The Healthcare/IO **parser** can be used within your code base as a library and handle storing data in a data store of choice diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py index f187d1e..2540a9f 100644 --- a/healthcareio/healthcare-io.py +++ b/healthcareio/healthcare-io.py @@ -44,6 +44,7 @@ import numpy as np from multiprocessing import Process import time from healthcareio import x12 +from healthcareio.export import export import smart from healthcareio.server import proxy import pandas as pd @@ -57,6 +58,24 @@ if not os.path.exists(PATH) : import platform import sqlite3 as lite # PATH = os.sep.join([os.environ['HOME'],'.edi-parser']) +HELP_MESSAGE = """ + cli: + + healthcare-io.py --<[signup|init]> --store [--batch ] + healthcare-io.py --parse --folder [--batch ] [--resume] + healthcare-io.py --check-update + healthcare-io.py --export <835|837> --config + action : + --signup|init signup user and get configuration file + --parse starts parsing + --check checks for updates + --export export data of a 835 or 837 into another database + parameters : + --<[signup|init]> signup or get a configuration file from a parsing server + --folder location of the files (the program will recursively traverse it) + --store data store mongo or sqlite or mongodb + --resume will attempt to resume if there was an interruption + """ def signup (**args) : """ :email user's email address @@ -125,77 +144,77 @@ def init(): # # Global variables that load the configuration files -def parse(**args): - """ - This function will parse the content of a claim or remittance (x12 format) give the following parameters - :filename absolute path of the file to be parsed - :type claims|remits in x12 format - """ - global INFO - if not INFO : - INFO = init() - if args['type'] == 'claims' : - CONFIG = INFO['parser']['837'] - elif args['type'] == 'remits' : - CONFIG = INFO['parser']['835'] - else: - CONFIG = None - if CONFIG : - # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0] - CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1] - SECTION = CONFIG['SECTION'] - os.environ['HEALTHCAREIO_SALT'] = INFO['owner'] +# def parse(**args): +# """ +# This function will parse the content of a claim or remittance (x12 format) give the following parameters +# :filename absolute path of the file to be parsed +# :type claims|remits in x12 format +# """ +# global INFO +# if not INFO : +# INFO = init() +# if args['type'] == 'claims' : +# CONFIG = INFO['parser']['837'] +# elif args['type'] == 'remits' : +# CONFIG = INFO['parser']['835'] +# else: +# CONFIG = None +# if CONFIG : +# # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0] +# CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1] +# SECTION = CONFIG['SECTION'] +# os.environ['HEALTHCAREIO_SALT'] = INFO['owner'] - return get_content(args['filename'],CONFIG,SECTION) -def resume (files,id,config): - _args = config['store'].copy() - if 'mongo' in config['store']['type'] : - _args['type'] = 'mongo.MongoReader' - reader = factory.instance(**_args) - _files = [] - if 'resume' in config['analytics'] : - _args = config['analytics']['resume'][id] - _files = reader.read(**_args) - _files = [item['name'] for item in _files if item['name'] != None] - return list(set(files) - set(_files)) +# return get_content(args['filename'],CONFIG,SECTION) +# def resume (files,id,config): +# _args = config['store'].copy() +# if 'mongo' in config['store']['type'] : +# _args['type'] = 'mongo.MongoReader' +# reader = factory.instance(**_args) +# _files = [] +# if 'resume' in config['analytics'] : +# _args = config['analytics']['resume'][id] +# _files = reader.read(**_args) +# _files = [item['name'] for item in _files if item['name'] != None] +# return list(set(files) - set(_files)) - return files - pass -def apply(files,store_info,logger_info=None): - """ - :files list of files to be processed in this given thread/process - :store_info information about data-store, for now disk isn't thread safe - :logger_info information about where to store the logs - """ + # return files + # pass +# def apply(files,store_info,logger_info=None): +# """ +# :files list of files to be processed in this given thread/process +# :store_info information about data-store, for now disk isn't thread safe +# :logger_info information about where to store the logs +# """ - if not logger_info : - logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) - else: - logger = factory.instance(**logger_info) +# if not logger_info : +# logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) +# else: +# logger = factory.instance(**logger_info) - writer = factory.instance(**store_info) - for filename in files : +# writer = factory.instance(**store_info) +# for filename in files : - if filename.strip() == '': - continue - # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) - # - try: - content,logs = parse(filename = filename,type=SYS_ARGS['parse']) +# if filename.strip() == '': +# continue +# # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) +# # +# try: +# content,logs = parse(filename = filename,type=SYS_ARGS['parse']) - if content : - writer.write(content) - if logs : - [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] - else: - logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) - except Exception as e: +# if content : +# writer.write(content) +# if logs : +# [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] +# else: +# logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) +# except Exception as e: - logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) - # print ([filename,len(content)]) - # - # @TODO: forward this data to the writer and log engine +# logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) +# # print ([filename,len(content)]) +# # +# # @TODO: forward this data to the writer and log engine # def upgrade(**args): """ @@ -360,95 +379,27 @@ if __name__ == '__main__' : # # this function is designed to export the data to csv # - format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv' - format = format.lower() - if set([format]) not in ['xls','csv'] : - format = 'csv' - - else: - msg = """ - cli: - - healthcare-io.py --<[signup|init]> --store [--batch ] - healthcare-io.py --parse --folder [--batch ] [--resume] - healthcare-io.py --check-update - action : - --signup|init signup user and get configuration file - --parse starts parsing - --check checks for updates - parameters : - --<[signup|init]> signup or get a configuration file from a parsing server - --folder location of the files (the program will recursively traverse it) - --store data store mongo or sqlite or mongodb - --resume will attempt to resume if there was an interruption - """ - print(msg) - pass - # """ - # The program was called from the command line thus we are expecting - # parse in [claims,remits] - # config os.sep.path.exists(path) - # folder os.sep.path.exists(path) - # store store () - # """ - # p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS) - # TYPE = { - # 'mongo':'mongo.MongoWriter', - # 'couch':'couch.CouchWriter', - # 'disk':'disk.DiskWriter' - # } - # INFO = { - # '837':{'scope':'claims','section':'HL'}, - # '835':{'scope':'remits','section':'CLP'} - # } - # if p : - # args = {} - # scope = SYS_ARGS['config'][:-5].split(os.sep)[-1] - # CONTEXT = INFO[scope]['scope'] - # # - # # @NOTE: - # # improve how database and data stores are handled. - # if SYS_ARGS['store'] == 'couch' : - # args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'} - # args['dbname'] = SYS_ARGS['db'] + path = SYS_ARGS['config'] + TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835' + if not os.path.exists(path) or TYPE not in ['835','837']: + print (HELP_MESSAGE) + else: + # + # Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ... + # + _store = json.loads( (open(path) ).read()) - # elif SYS_ARGS ['store'] == 'mongo': - # args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'} - # if SYS_ARGS['store'] in ['mongo','couch']: - # args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes' - # args['doc'] = CONTEXT + pipes = export.Factory.instance(type=TYPE,write_store={"type":"sql.SQLWriter","args":{"provider":"postgresql","db":"sample",}}) #"inspect":0,"cast":0}}) + # pipes[0].run() + for thread in pipes: + thread.start() + time.sleep(1) + while pipes : + pipes = [thread for thread in pipes if thread.is_alive()] + time.sleep(1) + - # TYPE = TYPE[SYS_ARGS['store']] - # writer = factory.instance(type=TYPE,args=args) - # if SYS_ARGS['store'] == 'disk': - # writer.init(path = 'output-claims.json') - # logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"})) - # files = os.listdir(SYS_ARGS['folder']) - # CONFIG = json.loads(open(SYS_ARGS['config']).read()) - # SECTION = INFO[scope]['section'] - # for file in files : - # if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) : - # break - # else: - # filename = os.sep.join([SYS_ARGS['folder'],file]) - - # try: - # content,logs = get_content(filename,CONFIG,SECTION) - # except Exception as e: - # if sys.version_info[0] > 2 : - # logs = [{"filename":filename,"msg":e.args[0]}] - # else: - # logs = [{"filename":filename,"msg":e.message}] - # content = None - # if content : - - # writer.write(content) - # if logs: - - # logger.write(logs) - - - # pass - # else: - # print (__doc__) + else: + + print(HELP_MESSAGE) diff --git a/healthcareio/server/__init__.py b/healthcareio/server/__init__.py index e548fd8..165e93e 100644 --- a/healthcareio/server/__init__.py +++ b/healthcareio/server/__init__.py @@ -9,7 +9,7 @@ import transport import pandas as pd import numpy as np from healthcareio import x12 - +from healthcareio.export import export from multiprocessing import Process # from flask_socketio import SocketIO, emit, disconnect,send from healthcareio.server import proxy diff --git a/setup.py b/setup.py index 8fe3c57..92e2057 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { - "name":"healthcareio","version":"1.4.6", + "name":"healthcareio","version":"1.4.8", "author":"Vanderbilt University Medical Center", "author_email":"steve.l.nyemba@vumc.org", "include_package_data":True,