Merge branch 'dev' of lab/parse-edi into master

master
steve 4 years ago committed by Gogs
commit ddda9528e3

@ -27,11 +27,21 @@ We wrote this frame to be used in both command line or as a library within in yo
1. signup to get parsing configuration 1. signup to get parsing configuration
The parser is driven by a configuration, file you need by signing up.
healthcare-io.py --signup <email> [--store <mongo|sqlite>] healthcare-io.py --signup <email> [--store <mongo|sqlite>]
2. parsing claims in a folder 2. check version
Occasionally the attributes in the configuration file may change, This function will determine if there is a new version available.
healthcare-io.py --check-update
3. parsing data in a folder
healthcare-io.py --parse <claims|remits> --folder <path> [--batch <n>] [--resume] The parser will recursively traverse a directory with claims and or remittances
healthcare-io.py --parse --folder <path> [--batch <n>] [--resume]
with : with :
--parse tells the engine what to parse claims or remits --parse tells the engine what to parse claims or remits
@ -39,11 +49,6 @@ We wrote this frame to be used in both command line or as a library within in yo
--batch number of processes to spawn to parse the files --batch number of processes to spawn to parse the files
--resume tells the parser to resume parsing --resume tells the parser to resume parsing
if all files weren't processed or new files were added into the folder if all files weren't processed or new files were added into the folder
3. dashboard
There is a built-in dashboard that has displays descriptive analytics in a web browser
healthcare-io.py --server <port> [--context <name>]
**Embedded in Code :** **Embedded in Code :**

@ -17,4 +17,6 @@ Usage :
from healthcareio import analytics from healthcareio import analytics
import healthcareio.x12 as x12 import healthcareio.x12 as x12
import healthcareio.params as params
# from healthcareio import server # from healthcareio import server

@ -32,9 +32,10 @@ Usage :
from healthcareio.params import SYS_ARGS from healthcareio.params import SYS_ARGS
from transport import factory from transport import factory
import requests import requests
from healthcareio import analytics from healthcareio import analytics
from healthcareio import server from healthcareio import server
from healthcareio.parser import get_content from healthcareio.parser import get_content
import os import os
import json import json
@ -56,10 +57,10 @@ if not os.path.exists(PATH) :
import platform import platform
import sqlite3 as lite import sqlite3 as lite
# PATH = os.sep.join([os.environ['HOME'],'.edi-parser']) # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
def register (**args) : def signup (**args) :
""" """
:email user's email address :email user's email address
:url url of the provider to register :url url of the provider to signup
""" """
email = args['email'] email = args['email']
@ -203,12 +204,27 @@ def upgrade(**args):
""" """
url = args['url'] if 'url' in args else URL+"/upgrade" url = args['url'] if 'url' in args else URL+"/upgrade"
headers = {"key":args['key'],"email":args["email"],"url":url} headers = {"key":args['key'],"email":args["email"],"url":url}
def check(**_args):
"""
This function will check if there is an update available (versions are in the configuration file)
:param url
"""
url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url']
url = url + "/version"
if 'version' not in _args :
version = {"_id":"version","current":0.0}
else:
version = _args['version']
http = requests.session()
r = http.get(url)
return r.json()
if __name__ == '__main__' : if __name__ == '__main__' :
info = init() info = init()
if 'out-folder' in SYS_ARGS : if 'out-folder' in SYS_ARGS :
OUTPUT_FOLDER = SYS_ARGS['out-folder'] OUTPUT_FOLDER = SYS_ARGS['out-folder']
SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
if set(list(SYS_ARGS.keys())) & set(['signup','init']): if set(list(SYS_ARGS.keys())) & set(['signup','init']):
# #
@ -217,10 +233,10 @@ if __name__ == '__main__' :
# #
email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init'] email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com' url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite' store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db'] db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
register(email=email,url=url,store=store,db=db) signup(email=email,url=url,store=store,db=db)
# else: # else:
# m = """ # m = """
# usage: # usage:
@ -244,11 +260,17 @@ if __name__ == '__main__' :
if 'file' in SYS_ARGS : if 'file' in SYS_ARGS :
files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else [] files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']): if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
names = os.listdir(SYS_ARGS['folder']) for root,_dir,f in os.walk(SYS_ARGS['folder']) :
files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
if f :
files += [os.sep.join([root,name]) for name in f]
# names = os.listdir(SYS_ARGS['folder'])
# files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
else: else:
# #
# raise an erro # raise an error
pass pass
# #
# if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
@ -256,40 +278,13 @@ if __name__ == '__main__' :
if 'resume' in SYS_ARGS : if 'resume' in SYS_ARGS :
store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() ) store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() )
files = proxy.get.resume(files,store_config ) files = proxy.get.resume(files,store_config )
print (["Found ",len(files)," files unprocessed"]) # print (["Found ",len(files)," files unprocessed"])
# #
# @TODO: Log this here so we know what is being processed or not # @TODO: Log this here so we know what is being processed or not
SCOPE = None SCOPE = None
if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']): if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
# logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
# if info['store']['type'] == 'disk.DiskWriter' :
# info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
# elif info['store']['type'] == 'disk.SQLiteWriter' :
# # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
# pass
# if info['store']['type'] == 'disk.SQLiteWriter' :
# info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
# _info = json.loads(json.dumps(info['store']))
# _info['args']['table']='logs'
# else:
# #
# # if we are working with no-sql we will put the logs in it (performance )?
# info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
# _info = json.loads(json.dumps(info['store']))
# _info['args']['doc'] = 'logs'
# logger = factory.instance(**_info)
# writer = factory.instance(**info['store'])
#
# we need to have batches ready for this in order to run some of these queries in parallel
# @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
# - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
#
BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch']) BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
files = np.array_split(files,BATCH_COUNT) files = np.array_split(files,BATCH_COUNT)
@ -308,26 +303,6 @@ if __name__ == '__main__' :
while len(procs) > 0 : while len(procs) > 0 :
procs = [proc for proc in procs if proc.is_alive()] procs = [proc for proc in procs if proc.is_alive()]
time.sleep(2) time.sleep(2)
# for filename in files :
# if filename.strip() == '':
# continue
# # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
# #
# try:
# content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
# if content :
# writer.write(content)
# if logs :
# [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
# else:
# logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
# except Exception as e:
# logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
# # print ([filename,len(content)])
# #
# # @TODO: forward this data to the writer and log engine
# #
@ -358,6 +333,28 @@ if __name__ == '__main__' :
pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False) pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
pthread = Process(target=pointer,args=()) pthread = Process(target=pointer,args=())
pthread.start() pthread.start()
elif 'check-update' in SYS_ARGS :
_args = {"url":SYS_ARGS['url']}
try:
if os.path.exists(os.sep.join([PATH,'config.json'])) :
SYS_ARGS['config'] = json.loads((open(os.sep.join([PATH,'config.json']))).read())
else:
SYS_ARGS['config'] = {}
if 'version' in SYS_ARGS['config'] :
_args['version'] = SYS_ARGS['config']['version']
version = check(**_args)
_version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version']
if _version['current'] != version['current'] :
print ()
print ("You need to upgrade your system to version to ",version['current'])
print ("\t- signup (for new configuration)")
print ("\t- use pip to upgrade the codebase")
else:
print ()
print ("You are running the current configuraiton version ",_version.current)
except Exception as e:
print (e)
pass
elif 'export' in SYS_ARGS: elif 'export' in SYS_ARGS:
# #
@ -373,11 +370,15 @@ if __name__ == '__main__' :
cli: cli:
healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>] healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
healthcare-io.py --parse claims --folder <path> [--batch <value>] healthcare-io.py --parse --folder <path> [--batch <value>] [--resume]
healthcare-io.py --parse remits --folder <path> [--batch <value>] [--resume] healthcare-io.py --check-update
action :
--signup|init signup user and get configuration file
--parse starts parsing
--check checks for updates
parameters : parameters :
--<[signup|init]> signup or get a configuration file from a parsing server --<[signup|init]> signup or get a configuration file from a parsing server
--folder location of the files (the program will recursively traverse it)
--store data store mongo or sqlite or mongodb --store data store mongo or sqlite or mongodb
--resume will attempt to resume if there was an interruption --resume will attempt to resume if there was an interruption
""" """

@ -11,11 +11,11 @@ import numpy as np
from healthcareio import x12 from healthcareio import x12
from multiprocessing import Process from multiprocessing import Process
from flask_socketio import SocketIO, emit, disconnect,send # from flask_socketio import SocketIO, emit, disconnect,send
from healthcareio.server import proxy from healthcareio.server import proxy
PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
app = Flask(__name__) app = Flask(__name__)
socket_ = SocketIO(app) # socket_ = SocketIO(app)
def resume (files): def resume (files):
_args = SYS_ARGS['config']['store'].copy() _args = SYS_ARGS['config']['store'].copy()
@ -30,7 +30,7 @@ def resume (files):
_files = [item['name'] for item in _files] _files = [item['name'] for item in _files]
except Exception as e : except Exception as e :
pass pass
print (["found ",len(files),"\tProcessed ",len(_files)])
return list(set(files) - set(_files)) return list(set(files) - set(_files))
@ -66,14 +66,14 @@ def push() :
_args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
r = pd.DataFrame(reader.read(mongo=_args)) r = pd.DataFrame(reader.read(mongo=_args))
r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r}) r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})
emit("update",r,json=True) # emit("update",r,json=True)
return r return r
@socket_.on('connect') # @socket_.on('connect')
def client_connect(**r): # def client_connect(**r):
print ('Connection received') # print ('Connection received')
print (r) # print (r)
push() # push()
pass # pass
@app.route("/favicon.ico") @app.route("/favicon.ico")
def _icon(): def _icon():

@ -32,7 +32,7 @@ class get :
_files = [item['name'] for item in _files] _files = [item['name'] for item in _files]
except Exception as e : except Exception as e :
pass pass
print (["found ",len(files),"\tProcessed ",len(_files)]) print ( [len(list(set(files) - set(_files))),' files to be processed'])
return list(set(files) - set(_files)) return list(set(files) - set(_files))
@staticmethod @staticmethod

@ -24,6 +24,7 @@ import sys
from itertools import islice from itertools import islice
from multiprocessing import Process from multiprocessing import Process
import transport import transport
import jsonmerge
class void : class void :
pass pass
class Formatters : class Formatters :
@ -95,7 +96,6 @@ class Formatters :
if _row[0] in config['SIMILAR'] : if _row[0] in config['SIMILAR'] :
key = config['SIMILAR'][_row[0]] key = config['SIMILAR'][_row[0]]
_info = config[key] _info = config[key]
return _info return _info
def hash(self,value): def hash(self,value):
@ -181,13 +181,16 @@ class Formatters :
return x return x
class Parser (Process): class Parser (Process):
def __init__(self,path): def __init__(self,path):
"""
:path path of the configuration file (it can be absolute)
"""
Process.__init__(self) Process.__init__(self)
self.utils = Formatters() self.utils = Formatters()
self.get = void() self.get = void()
self.get.value = self.get_map self.get.value = self.get_map
self.get.default_value = self.get_default_value self.get.default_value = self.get_default_value
_config = json.loads(open(path).read()) _config = json.loads(open(path).read())
self._custom_config = self.get_custom(path)
self.config = _config['parser'] self.config = _config['parser']
self.store = _config['store'] self.store = _config['store']
@ -197,6 +200,27 @@ class Parser (Process):
self.emit = void() self.emit = void()
self.emit.pre = None self.emit.pre = None
self.emit.post = None self.emit.post = None
def get_custom(self,path) :
"""
:path path of the configuration file (it can be absolute)
"""
#
#
_path = path.replace('config.json','')
if _path.endswith(os.sep) :
_path = _path[:-1]
_config = {}
_path = os.sep.join([_path,'custom'])
if os.path.exists(_path) :
files = os.listdir(_path)
if files :
fullname = os.sep.join([_path,files[0]])
_config = json.loads ( (open(fullname)).read() )
return _config
def set_files(self,files): def set_files(self,files):
self.files = files self.files = files
def get_map(self,row,config,version=None): def get_map(self,row,config,version=None):
@ -247,15 +271,18 @@ class Parser (Process):
value = {key:value} if key not in value else value value = {key:value} if key not in value else value
else: else:
if 'syn' in config and value in config['syn'] : if 'syn' in config and value in config['syn'] :
value = config['syn'][value] value = config['syn'][value]
if type(value) == dict : if type(value) == dict :
object_value = dict(object_value, **value) object_value = dict(object_value, **value)
else: else:
object_value[key] = value object_value[key] = value
else: else:
# #
# we are dealing with a complex object # we are dealing with a complex object
@ -275,26 +302,35 @@ class Parser (Process):
return object_value return object_value
def apply(self,content,_code) : def apply(self,content,_code) :
""" """
:file content i.e a segment with the envelope :content content of a file i.e a segment with the envelope
:_code 837 or 835 (helps get the appropriate configuration) :_code 837 or 835 (helps get the appropriate configuration)
""" """
util = Formatters() util = Formatters()
# header = default_value.copy() # header = default_value.copy()
value = {} value = {}
for row in content[:] : for row in content[:] :
row = util.split(row.replace('\n','').replace('~','')) row = util.split(row.replace('\n','').replace('~',''))
_info = util.get.config(self.config[_code][0],row) _info = util.get.config(self.config[_code][0],row)
if self._custom_config and _code in self._custom_config:
_cinfo = util.get.config(self._custom_config[_code],row)
else:
_cinfo = {}
# _info = self.consolidate(row=row,type=_code,config=_info,util=util)
# print ([row[0],_info])
# print ()
# continue
# _cinfo = util.get.config(self._custom_config[_code],row)
if _info : if _info :
try:
try:
_info = jsonmerge.merge(_info,_cinfo)
tmp = self.get.value(row,_info) tmp = self.get.value(row,_info)
# if 'P1080351470' in content[0] and 'PLB' in row:
# print (_info)
# print (row)
# print (tmp)
if not tmp : if not tmp :
continue continue
if 'label' in _info : if 'label' in _info :
@ -326,7 +362,9 @@ class Parser (Process):
elif 'field' in _info : elif 'field' in _info :
name = _info['field'] name = _info['field']
value[name] = tmp # value[name] = tmp
value = jsonmerge.merge(value,{name:tmp})
else: else:
@ -341,6 +379,7 @@ class Parser (Process):
return value if value else {} return value if value else {}
def get_default_value(self,content,_code): def get_default_value(self,content,_code):
util = Formatters() util = Formatters()
TOP_ROW = content[1].split('*') TOP_ROW = content[1].split('*')
CATEGORY= content[2].split('*')[1].strip() CATEGORY= content[2].split('*')[1].strip()
@ -359,6 +398,8 @@ class Parser (Process):
value['payer_id'] = SENDER_ID value['payer_id'] = SENDER_ID
else: else:
value['provider_id'] = SENDER_ID value['provider_id'] = SENDER_ID
#
# Let's parse this for default values
return value return value
def read(self,filename) : def read(self,filename) :
@ -381,8 +422,14 @@ class Parser (Process):
INITIAL_ROWS = file[:4] INITIAL_ROWS = file[:4]
if len(INITIAL_ROWS) < 3 : if len(INITIAL_ROWS) < 3 :
return None,[{"name":filename,"completed":False}],None return None,[{"name":filename,"completed":False}],None
section = 'HL' if INITIAL_ROWS[1].split('*')[1] == 'HC' else 'CLP' # section = 'HL' if INITIAL_ROWS[1].split('*')[1] == 'HC' else 'CLP'
_code = '837' if section == 'HL' else '835' # _code = '837' if section == 'HL' else '835'
# print ([_code,section])
_code = INITIAL_ROWS[2].split('*')[1].strip()
# section = 'CLP' if _code == '835' else 'HL'
section = self.config[_code][0]['SECTION'].strip()
#
# adjusting the
DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code) DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code)
DEFAULT_VALUE['name'] = filename.strip() DEFAULT_VALUE['name'] = filename.strip()
# #
@ -390,22 +437,30 @@ class Parser (Process):
# index 1 identifies file type i.e CLM for claim and CLP for remittance # index 1 identifies file type i.e CLM for claim and CLP for remittance
segment = [] segment = []
index = 0; index = 0;
_toprows = []
for row in file : for row in file :
row = row.replace('\r','')
if not segment and not row.startswith(section):
_toprows += [row]
if row.startswith(section) and not segment: if row.startswith(section) and not segment:
segment = [row] segment = [row]
continue continue
elif segment and not row.startswith(section): elif segment and not row.startswith(section):
segment.append(row) segment.append(row)
if len(segment) > 1 and row.startswith(section): if len(segment) > 1 and row.startswith(section):
# #
# process the segment somewhere (create a thread maybe?) # process the segment somewhere (create a thread maybe?)
# #
# default_claim = dict({"index":index},**DEFAULT_VALUE) # default_claim = dict({"index":index},**DEFAULT_VALUE)
# print (_toprows)
_claim = self.apply(segment,_code) _claim = self.apply(segment,_code)
# if _claim['claim_id'] == 'P1080351470' : # if _claim['claim_id'] == 'P1080351470' :
# print (_claim) # print (_claim)
# _claim = dict(DEFAULT_VALUE,**_claim) # _claim = dict(DEFAULT_VALUE,**_claim)
@ -425,12 +480,14 @@ class Parser (Process):
claim = self.apply(segment,_code) claim = self.apply(segment,_code)
if claim : if claim :
claim['index'] = len(claims) claim['index'] = len(claims)
claim = jsonmerge.merge(claim,self.apply(_toprows,_code))
claims.append(dict(DEFAULT_VALUE,**claim)) claims.append(dict(DEFAULT_VALUE,**claim))
if type(file) != list : if type(file) != list :
file.close() file.close()
# x12_file = open(filename.strip(),errors='ignore').read().split('\n') # x12_file = open(filename.strip(),errors='ignore').read().split('\n')
except Exception as e: except Exception as e:
logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]}) logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]})
return [],logs,None return [],logs,None

@ -8,14 +8,15 @@ import sys
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = { args = {
"name":"healthcareio","version":"1.4.4", "name":"healthcareio","version":"1.4.6",
"author":"Vanderbilt University Medical Center", "author":"Vanderbilt University Medical Center",
"author_email":"steve.l.nyemba@vumc.org", "author_email":"steve.l.nyemba@vumc.org",
"include_package_data":True,
"license":"MIT", "license":"MIT",
"packages":find_packages(), "packages":find_packages(),
"keywords":["healthcare","edi","x12","analytics","835","837","data","transport","protocol"] "keywords":["healthcare","edi","x12","analytics","835","837","data","transport","protocol"]
} }
args["install_requires"] = ['flask-socketio','seaborn','jinja2', 'weasyprint','data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','pymongo','numpy','cloudant','pika','boto','flask-session','smart_open','smart-top@git+https://dev.the-phi.com/git/steve/monitor.git@data-collector'] args["install_requires"] = ['flask-socketio','seaborn','jinja2','jsonmerge', 'weasyprint','data-transport@git+https://healthcareio.the-phi.com/git/code/transport.git','pymongo','numpy','cloudant','pika','boto','botocore','flask-session','smart_open','smart-top@git+https://healthcareio.the-phi.com/git/code/smart-top.git@data-collector']
args['url'] = 'https://hiplab.mc.vanderbilt.edu' args['url'] = 'https://hiplab.mc.vanderbilt.edu'
args['scripts']= ['healthcareio/healthcare-io.py'] args['scripts']= ['healthcareio/healthcare-io.py']
# args['entry_points'] = { # args['entry_points'] = {

Loading…
Cancel
Save