Upload files to 'edi'

steve 5 years ago
parent c428143ef1
commit 9d53b15e4d

@ -0,0 +1,16 @@
(c) 2019 EDI Parser Toolkit,
Health Information Privacy Lab, Vanderbilt University Medical Center
Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
Usage :
Commandline :
python xreader.py --parse claims|remits --config <path>
Embedded :

@ -0,0 +1,100 @@
(c) 2019 Claims Toolkit,
Health Information Privacy Lab, Vanderbilt University Medical Center
Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
Usage :
Commandline :
python edi --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
with :
--scope <claims|remits>
--config path of the x12 to be parsed i.e it could be 835, or 837
--folder location of the files (they must be decompressed)
--store data store could be disk, mongodb, couchdb
--db|path name of the folder to store the output or the database name
Embedded in Code :
import edi.parser
import json
file = '/data/claim_1.x12'
conf = json.loads(open('config/837.json').read())
from params import SYS_ARGS
from transport import factory
from parser import *
import os
import json
import sys
if __name__ == '__main__' :
The program was called from the command line thus we are expecting
parse in [claims,remits]
config os.sep.path.exists(path)
folder os.sep.path.exists(path)
store store ()
p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
TYPE = {
INFO = {
if p :
args = {}
scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
CONTEXT = INFO[scope]['scope']
# @NOTE:
# improve how database and data stores are handled.
if SYS_ARGS['store'] == 'couch' :
args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
args['dbname'] = SYS_ARGS['db']
elif SYS_ARGS ['store'] == 'mongo':
args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27217'}
if SYS_ARGS['store'] in ['mongo','couch']:
args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
args['doc'] = CONTEXT
TYPE = TYPE[SYS_ARGS['store']]
writer = factory.instance(type=TYPE,args=args)
logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
files = os.listdir(SYS_ARGS['folder'])
CONFIG = json.loads(open(SYS_ARGS['config']).read())
SECTION= INFO[scope]['section']
for file in files :
if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
filename = os.sep.join([SYS_ARGS['folder'],file])
content,logs = get_content(filename,CONFIG,SECTION)
except Exception as e:
if sys.version_info[0] > 2 :
logs = [{"filename":filename,"msg":e.args[0]}]
logs = [{"filename":filename,"msg":e.message}]
content = None
if content :
writer.write(row= content)
if logs:
print (__doc__)

@ -0,0 +1,18 @@
import sys
SYS_ARGS = {'context':''}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i][2:] #.replace('-','')
SYS_ARGS[key] = 1
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value:
SYS_ARGS[key] = value
i += 2

@ -0,0 +1,199 @@
(c) 2019 EDI-Parser 1.0
Vanderbilt University Medical Center, Health Information Privacy Laboratory
Khanhly Nguyen,
Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>
MIT, terms are available at https://opensource.org/licenses/MIT
This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
import os
import sys
def split(row,sep='*',prefix='HI'):
This function is designed to split an x12 row and
if row.startswith(prefix) is False:
value = []
for row_value in row.replace('~','').split(sep) :
if '>' in row_value :
if row_value.startswith('HC') or row_value.startswith('AD'):
value += row_value.split('>')[:2]
value += row_value.split('>')
else :
return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
return [ [prefix]+ split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
def get_config(config,row):
This function will return the meaningfull parts of the configuration for a given item
_row = list(row) if type(row[0]) == str else list(row[0])
_info = config[_row[0]] if _row[0] in config else {}
key = None
if '@ref' in _info:
key = list(set(_row) & set(_info['@ref'].keys()))
if key :
key = key[0]
return _info['@ref'][key]
return {}
if not _info and 'SIMILAR' in config:
# Let's look for the nearest key using the edit distance
if _row[0] in config['SIMILAR'] :
key = config['SIMILAR'][_row[0]]
_info = config[key]
return _info
def format_date(value) :
year = value[:4]
month = value[4:6]
day = value[6:]
return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
def format_time(value):
return ":".join([value[:2],value[2:] ])[:5]
def format_proc(value):
if ':' in value :
return {"procedure_type":value.split(':')[0].strip(),"procedure_code":value.split(':')[1].strip()}
return value
def map(row,config,version):
label = config['label'] if 'label' in config else None
omap = config['map'] if version not in config else config[version]
anchors = config['anchors'] if 'anchors' in config else []
if type(row[0]) == str:
object_value = {}
for key in omap :
index = omap[key]
if anchors and set(anchors) & set(row):
_key = list(set(anchors) & set(row))[0]
aindex = row.index(_key)
index = aindex + index
if index < len(row) :
value = row[index]
if 'cast' in config and key in config['cast'] and value.strip() != '' :
value = eval(config['cast'][key])(value)
if 'syn' in config and value in config['syn'] :
value = config['syn'][value]
if type(value) == dict :
object_value = dict(object_value, **value)
object_value[key] = value
# we are dealing with a complex object
object_value = []
for row_item in row :
object_value.append( list(map(row_item,config,version)))
# object_value = {label:object_value}
return object_value
def get_locations(x12_file,section='HL') :
locations = []
for line in x12_file :
if line.strip().startswith(section) :
i = x12_file.index(line)
return locations
#def get_claims(filename,config,section) :
def get_content(filename,config,section=None) :
This function returns the of the EDI file parsed given the configuration specified
:section loop prefix (HL, CLP)
:config configuration with formatting rules, labels ...
:filename location of the file
section = section if section else config['SECTION']
x12_file = open(filename).read().split('\n')
if len(x12_file) == 1 :
x12_file = x12_file[0].split('~')
locations = get_locations(x12_file,section)
claims = []
logs = []
# VERSION = x12_file[2].split('*')[3].replace('~','')
VERSION = x12_file[1].split('*')[-1].replace('~','')
row = split(x12_file[3])
_info = get_config(config,row)
_default_value = list(map(row,_info,VERSION)) if _info else None
N = len(locations)
for index in range(0,N-1):
beg = locations[index]
end = locations[index+1]
claim = {}
for row in x12_file[beg:end] :
row = split(row)
_info = get_config(config,row)
if _info :
# tmp = map(row,_info,VERSION)
tmp = list(map(row,_info,VERSION))
except Exception as e:
if sys.verion_info[0] > 2 :
logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
claim = {}
if 'label' not in _info :
tmp['version'] = VERSION
claim = dict(claim, **tmp)
label = _info['label']
if type(tmp) == list :
claim[label] = tmp if label not in claim else claim[label] + tmp
if label not in claim:
claim[label] = [tmp]
elif len(list(tmp.keys())) == 1 :
# print "\t",len(claim[label]),tmp
index = len(claim[label]) -1
claim[label][index] = dict(claim[label][index],**tmp)
if claim and 'claim_id' in claim:
claim = dict(claim,**_default_value)
claim['name'] = filename[:-5].split(os.sep)[-1] #.replace(ROOT,'')
claim['index'] = index
return claims,logs