Merge branch 'dev' of lab/parse-edi into master

master
steve 4 years ago committed by Gogs
commit db47e9e6b1

@ -39,39 +39,27 @@ We wrote this frame to be used in both command line or as a library within in yo
--batch number of processes to spawn to parse the files --batch number of processes to spawn to parse the files
--resume tells the parser to resume parsing --resume tells the parser to resume parsing
if all files weren't processed or new files were added into the folder if all files weren't processed or new files were added into the folder
**dashboard** 3. dashboard
There is a built-in dashboard that has features There is a built-in dashboard that has displays descriptive analytics in a web browser
healthcare-io.py --server <port> [--context <name>] healthcare-io.py --server <port> [--context <name>]
**Embedded in Code :** **Embedded in Code :**
Use **parse-edi** within your code base as a library and handle storing data in a data store of choice The Healthcare/IO **parser** can be used within your code base as a library and handle storing data in a data store of choice
import edi.parser import healthcareio
import json
import os
ROOT = 'data'
CLAIMS_FOLDER = os.sep.join([ROOT,'837']) #-- data/837 contains all 837 formatted files
CONFIG_FOLDER = os.sep.join([ROOT,'config'])#-- data/config contains 837.json or 835.json
files = os.listdir(CLAIMS_FOLDER)
filename = os.sep.join([CLAIM_FOLDER,files[0]]) #-- selecting the first file in the folder (it's an example)
conf = json.loads(open( os.sep.join([CONFIG_FOLDER,'837.json']) ).read())
info = edi.parser.get_content(file,conf) #-- array of objects claims/remits
## Credits ## Credits
* [Khanhly Nguyen] (<khanhly.t.nguyen@gmail.com>) * [Khanhly Nguyen] (<khanhly.t.nguyen@gmail.com>)
* [Gaylon Stanley] (<gaylon.stanley@vanderbilt.edu>) * [Gaylon Stanley] (<gaylon.stanley@vumc.org>)
* [Cheng Gao] (<cheng.gao@vanderbilt.edu>) * [Cheng Gao] (<cheng.gao@vanderbilt.edu>)
* [Brad Malin] (brad.malin@vanderbilt.edu) * [Brad Malin] (brad.malin@vanderbilt.edu)
* [Steve L. Nyemba] (<steve.l.nyemba@vanderbilt.edu>) * [Steve L. Nyemba] (<steve.l.nyemba@vumc.org>)

@ -16,4 +16,5 @@ Usage :
""" """
from healthcareio import analytics from healthcareio import analytics
import healthcareio.x12 as x12
# from healthcareio import server # from healthcareio import server

@ -42,7 +42,7 @@ import sys
import numpy as np import numpy as np
from multiprocessing import Process from multiprocessing import Process
import time import time
from healthcareio import x12
PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
@ -257,34 +257,29 @@ if __name__ == '__main__' :
# @TODO: Log this here so we know what is being processed or not # @TODO: Log this here so we know what is being processed or not
SCOPE = None SCOPE = None
if files and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']): if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
# _map = {'claims':'837','remits':'835'} # logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
# key = _map[SYS_ARGS['parse']] # if info['store']['type'] == 'disk.DiskWriter' :
# CONFIG = info['parser'][key] # info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
# if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) : # elif info['store']['type'] == 'disk.SQLiteWriter' :
# CONFIG = CONFIG[ int(SYS_ARGS['version'])] # # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
# else: # pass
# CONFIG = CONFIG[-1]
logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
if info['store']['type'] == 'disk.DiskWriter' :
info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
elif info['store']['type'] == 'disk.SQLiteWriter' :
# info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
pass
if info['store']['type'] == 'disk.SQLiteWriter' : # if info['store']['type'] == 'disk.SQLiteWriter' :
info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower() # info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
else: # _info = json.loads(json.dumps(info['store']))
# # _info['args']['table']='logs'
# if we are working with no-sql we will put the logs in it (performance )? # else:
# #
# # if we are working with no-sql we will put the logs in it (performance )?
info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower() # info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
_info = json.loads(json.dumps(info['store'])) # _info = json.loads(json.dumps(info['store']))
_info['args']['doc'] = 'logs' # _info['args']['doc'] = 'logs'
logger = factory.instance(**_info) # logger = factory.instance(**_info)
writer = factory.instance(**info['store']) # writer = factory.instance(**info['store'])
# #
# we need to have batches ready for this in order to run some of these queries in parallel # we need to have batches ready for this in order to run some of these queries in parallel
@ -293,23 +288,19 @@ if __name__ == '__main__' :
# #
BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch']) BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
#logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'})
# schema = info['schema']
# for key in schema :
# sql = schema[key]['create']
# writer.write(sql)
files = np.array_split(files,BATCH_COUNT) files = np.array_split(files,BATCH_COUNT)
procs = [] procs = []
index = 0 index = 0
for row in files : for row in files :
row = row.tolist() row = row.tolist()
logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
proc = Process(target=apply,args=(row,info['store'],_info,)) # proc = Process(target=apply,args=(row,info['store'],_info,))
proc.start() parser = x12.Parser(os.sep.join([PATH,'config.json']))
procs.append(proc) parser.set.files(row)
index = index + 1 parser.start()
procs.append(parser)
# index = index + 1
while len(procs) > 0 : while len(procs) > 0 :
procs = [proc for proc in procs if proc.is_alive()] procs = [proc for proc in procs if proc.is_alive()]
time.sleep(2) time.sleep(2)

@ -287,7 +287,7 @@ def get_content(filename,config,section=None) :
# tmp = get_map(row,_info,VERSION) # tmp = get_map(row,_info,VERSION)
# if 'parser' in _info : # if 'parser' in _info :
# pointer = eval(_info['parser']) # pointer = eval(_info['parser'])
# print (pointer(row))
tmp = get_map(row,_info,VERSION) tmp = get_map(row,_info,VERSION)
except Exception as e: except Exception as e:
@ -323,7 +323,7 @@ def get_content(filename,config,section=None) :
if label not in claim: if label not in claim:
claim[label] = [tmp] claim[label] = [tmp]
elif len(list(tmp.keys())) == 1 : elif len(list(tmp.keys())) == 1 :
# print "\t",len(claim[label]),tmp
index = len(claim[label]) -1 index = len(claim[label]) -1
claim[label][index] = dict(claim[label][index],**tmp) claim[label][index] = dict(claim[label][index],**tmp)
else: else:

@ -0,0 +1,466 @@
"""
(c) 2019 Healthcare/IO 1.0
Vanderbilt University Medical Center, Health Information Privacy Laboratory
https://hiplab.mc.vanderbilt.edu/healthcareio
Authors:
Khanhly Nguyen,
Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>
License:
MIT, terms are available at https://opensource.org/licenses/MIT
This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
USAGE :
- COMMAND LINE
- EMBEDDED
"""
import hashlib
import json
import os
import sys
from itertools import islice
from multiprocessing import Process
import transport
class void :
pass
class Formatters :
def __init__(self):
# self.config = config
self.get = void()
self.get.config = self.get_config
self.parse = void()
self.parse.sv3 = self.sv3
self.parse.sv2 = self.sv2
self.sv2_parse = self.sv2
self.sv3_parse = self.sv3
self.format_proc = self.procedure
self.format_diag = self.diagnosis
self.parse.procedure = self.procedure
self.parse.diagnosis = self.diagnosis
self.parse.date = self.date
self.format_date = self.date
self.format_pos = self.pos
self.format_time = self.time
def split(self,row,sep='*',prefix='HI') :
"""
This function is designed to split an x12 row and
"""
if row.startswith(prefix) is False:
value = []
for row_value in row.replace('~','').split(sep) :
if '>' in row_value :
if row_value.startswith('HC') or row_value.startswith('AD'):
value += row_value.split('>')[:2]
else:
value += row_value.split('>') if row.startswith('CLM') is False else [row_value]
else :
value.append(row_value.replace('\n',''))
return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
else:
return [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
def get_config(self,config,row):
"""
This function will return the meaningfull parts of the configuration for a given item
"""
_row = list(row) if type(row[0]) == str else list(row[0])
_info = config[_row[0]] if _row[0] in config else {}
key = None
if '@ref' in _info:
key = list(set(_row) & set(_info['@ref'].keys()))
if key :
key = key[0]
return _info['@ref'][key]
else:
return {}
if not _info and 'SIMILAR' in config:
#
# Let's look for the nearest key using the edit distance
if _row[0] in config['SIMILAR'] :
key = config['SIMILAR'][_row[0]]
_info = config[key]
return _info
def hash(self,value):
salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else ''
_value = str(value)+ salt
if sys.version_info[0] > 2 :
return hashlib.md5(_value.encode('utf-8')).hexdigest()
else:
return hashlib.md5(_value).hexdigest()
def suppress (self,value):
return 'N/A'
def date(self,value):
if len(value) > 8 or '-' in value:
value = value.split('-')[0]
if len(value) == 8 :
year = value[:4]
month = value[4:6]
day = value[6:]
return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
elif len(value) == 6 :
year = '20' + value[:2]
month = value[2:4]
day = value[4:]
#
# We have a date formatting issue
return "-".join([year,month,day])
def time(self,value):
pass
def sv3(self,value):
if '>' in value [1]:
terms = value[1].split('>')
return {'type':terms[0],'code':terms[1],"amount":float(value[2])}
else:
return {"code":value[2],"type":value[1],"amount":float(value[3])}
def sv2(self,value):
#
# @TODO: Sometimes there's a suffix (need to inventory all the variations)
#
if '>' in value or ':' in value:
xchar = '>' if '>' in value else ':'
_values = value.split(xchar)
modifier = {}
if len(_values) > 2 :
modifier= {"code":_values[2]}
if len(_values) > 3 :
modifier['type'] = _values[3]
_value = {"code":_values[1],"type":_values[0]}
if modifier :
_value['modifier'] = modifier
return _value
else:
return value
def procedure(self,value):
for xchar in [':','<'] :
if xchar in value and len(value.split(xchar)) > 1 :
#_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}
_value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()}
break
else:
_value = str(value)
return _value
def diagnosis(self,alue):
return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
def pos(self,value):
"""
formatting place of service information within a segment (REF)
"""
xchar = '>' if '>' in value else ':'
x = value.split(xchar)
x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}
return x
class Parser (Process):
def __init__(self,path):
Process.__init__(self)
self.utils = Formatters()
self.get = void()
self.get.value = self.get_map
self.get.default_value = self.get_default_value
_config = json.loads(open(path).read())
self.config = _config['parser']
self.store = _config['store']
self.files = []
self.set = void()
self.set.files = self.set_files
def set_files(self,files):
self.files = files
def get_map(self,row,config,version=None):
# label = config['label'] if 'label' in config else None
handler = Formatters()
if 'map' not in config and hasattr(handler,config['apply']):
pointer = getattr(handler,config['apply'])
object_value = pointer(row)
return object_value
omap = config['map'] if not version or version not in config else config[version]
anchors = config['anchors'] if 'anchors' in config else []
if type(row[0]) == str:
object_value = {}
for key in omap :
index = omap[key]
if anchors and set(anchors) & set(row):
_key = list(set(anchors) & set(row))[0]
aindex = row.index(_key)
index = aindex + index
if index < len(row) :
value = row[index]
if 'cast' in config and key in config['cast'] and value.strip() != '' :
if config['cast'][key] in ['float','int'] :
value = eval(config['cast'][key])(value)
elif hasattr(handler,config['cast'][key]):
pointer = getattr(handler,config['cast'][key])
value = pointer(value)
else:
print ("Missing Pointer ",config['cast'][key])
# print (key,value)
if type(value) == dict :
for objkey in value :
if type(value[objkey]) == dict :
continue
if 'syn' in config and value[objkey] in config['syn'] :
value[objkey] = config['syn'][ value[objkey]]
value = {key:value} if key not in value else value
else:
if 'syn' in config and value in config['syn'] :
value = config['syn'][value]
if type(value) == dict :
object_value = dict(object_value, **value)
else:
object_value[key] = value
else:
#
# we are dealing with a complex object
object_value = []
for row_item in row :
value = self.get.value(row_item,config,version)
object_value.append(value)
#
# We need to add the index of the object it matters in determining the claim types
#
# object_value.append( list(get_map(row_item,config,version)))
# object_value = {label:object_value}
return object_value
def apply(self,content,_code) :
"""
:file content i.e a segment with the envelope
:_code 837 or 835 (helps get the appropriate configuration)
"""
util = Formatters()
# header = default_value.copy()
value = {}
for row in content[:] :
row = util.split(row.replace('\n','').replace('~',''))
_info = util.get.config(self.config[_code][0],row)
if _info :
try:
tmp = self.get.value(row,_info)
# if 'P1080351470' in content[0] and 'PLB' in row:
# print (_info)
# print (row)
# print (tmp)
if not tmp :
continue
if 'label' in _info :
label = _info['label']
if type(tmp) == list :
value[label] = tmp if label not in value else value[label] + tmp
else:
if label not in value:
value[label] = [tmp]
elif len(list(tmp.keys())) == 1 :
# print "\t",len(claim[label]),tmp
index = len(value[label]) -1
value[label][index] = dict(value[label][index],**tmp)
else:
value[label].append(tmp)
tmp['_index'] = len(value[label]) -1
# if len(value[label]) > 0 :
# labels = []
# for item in value[label] :
# item['_index'] = len(labels)
# if item not in labels :
# labels.append(item)
# value[label] = labels
elif 'field' in _info :
name = _info['field']
value[name] = tmp
else:
value = dict(value,**tmp)
pass
except Exception as e :
print ('__',e.args)
pass
return value if value else {}
def get_default_value(self,content,_code):
util = Formatters()
TOP_ROW = content[1].split('*')
CATEGORY= content[2].split('*')[1].strip()
VERSION = content[1].split('*')[-1].replace('~','').replace('\n','')
SUBMITTED_DATE = util.parse.date(TOP_ROW[4])
SENDER_ID = TOP_ROW[2]
row = util.split(content[3])
_info = util.get_config(self.config[_code][0],row)
value = self.get.value(row,_info,VERSION) if _info else {}
value['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}
value["submitted"] = SUBMITTED_DATE
# value['version'] = VERSION
if _code== '835' :
value['payer_id'] = SENDER_ID
else:
value['provider_id'] = SENDER_ID
return value
def read(self,filename) :
"""
:formerly get_content
This function returns the of the EDI file parsed given the configuration specified. it is capable of identifying a file given the content
:section loop prefix (HL, CLP)
:config configuration with formatting rules, labels ...
:filename location of the file
"""
# section = section if section else config['SECTION']
logs = []
claims = []
try:
file = open(filename.strip(),errors='ignore')
INITIAL_ROWS = list(islice(file,4)) #.readlines(4)
_code = "unknown"
if len(INITIAL_ROWS) == 1 :
file = INITIAL_ROWS[0].split('~')
INITIAL_ROWS = file[:4]
if len(INITIAL_ROWS) < 3 :
return None,[{"name":filename,"completed":False}],None
section = 'HL' if INITIAL_ROWS[1].split('*')[1] == 'HC' else 'CLP'
_code = '837' if section == 'HL' else '835'
DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code)
DEFAULT_VALUE['name'] = filename.strip()
#
# In the initial rows, there's redundant information (so much for x12 standard)
# index 1 identifies file type i.e CLM for claim and CLP for remittance
segment = []
index = 0;
for row in file :
if row.startswith(section) and not segment:
segment = [row]
continue
elif segment and not row.startswith(section):
segment.append(row)
if len(segment) > 1 and row.startswith(section):
#
# process the segment somewhere (create a thread maybe?)
#
# default_claim = dict({"index":index},**DEFAULT_VALUE)
_claim = self.apply(segment,_code)
# if _claim['claim_id'] == 'P1080351470' :
# print (_claim)
# _claim = dict(DEFAULT_VALUE,**_claim)
if _claim :
_claim['index'] = index #len(claims)
claims.append(dict(DEFAULT_VALUE,**_claim))
segment = [row]
index += 1
pass
#
# Handling the last claim found
if segment[0].startswith(section) :
default_claim = dict({"name":index},**DEFAULT_VALUE)
claim = self.apply(segment,_code)
if claim :
claim['index'] = len(claims)
claims.append(dict(DEFAULT_VALUE,**claim))
if type(file) != list :
file.close()
# x12_file = open(filename.strip(),errors='ignore').read().split('\n')
except Exception as e:
logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]})
return [],logs,None
rate = 0 if len(claims) == 0 else (1 + index)/len(claims)
logs.append ({"parse":"claims" if _code == '837' else 'remits',"completed":True,"name":filename,"rate":rate})
# self.finish(claims,logs,_code)
return claims,logs,_code
def run(self):
for filename in self.files :
content,logs,_code = self.read(filename)
self.finish(content,logs,_code)
def finish(self,content,logs,_code) :
args = self.store
_args = json.loads(json.dumps(self.store))
if args['type'] == 'mongo.MongoWriter' :
args['args']['doc'] = 'claims' if _code == '837' else 'remits'
_args['args']['doc'] = 'logs'
if content :
writer = transport.factory.instance(**args)
writer.write(content)
writer.close()
if logs :
logger = transport.factory.instance(**_args)
logger.write(logs)
logger.close()
# p = Parser('/home/steve/.healthcareio/config.json')
# p.set.files(['../../data/small/claims/ssiUB1122042711220427127438.clm_191122T043504'])
# path = '../../data/small/claims/ssiUB1122042711220427127438.clm_191122T043504'
# path = '../../data/small/claims/problems-with-procs'
# path = '../../data/small/remits/1SG03927258.dat_181018T074559'
# _path = "../../data/small/remits/1TR21426701.dat_180703T074559"
# p.start()
# p.join()
# claims,logs = p.read(path)
# print (json.dumps(claims[3]))
# print (logs)

@ -8,7 +8,7 @@ import sys
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = { args = {
"name":"healthcareio","version":"1.3.1", "name":"healthcareio","version":"1.3.7",
"author":"Vanderbilt University Medical Center", "author":"Vanderbilt University Medical Center",
"author_email":"steve.l.nyemba@vumc.org", "author_email":"steve.l.nyemba@vumc.org",
"license":"MIT", "license":"MIT",

Loading…
Cancel
Save