You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			354 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			354 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
"""
 | 
						|
    (c) 2019 EDI-Parser 1.0
 | 
						|
    Vanderbilt University Medical Center, Health Information Privacy Laboratory
 | 
						|
    https://hiplab.mc.vanderbilt.edu/tools
 | 
						|
 | 
						|
 | 
						|
    Authors:
 | 
						|
        Khanhly Nguyen, 
 | 
						|
        Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>
 | 
						|
 | 
						|
    License:
 | 
						|
        MIT, terms are available at https://opensource.org/licenses/MIT
 | 
						|
 | 
						|
    This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
 | 
						|
    USAGE :
 | 
						|
        - COMMAND LINE
 | 
						|
        
 | 
						|
        - EMBEDDED
 | 
						|
"""
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import hashlib
 | 
						|
import json
 | 
						|
class X12 :
 | 
						|
    def split(self,row,sep='*',prefix='HI') :
 | 
						|
        pass
 | 
						|
    def get_config(self,config,row):
 | 
						|
        pass
 | 
						|
    def hash(self,value):
 | 
						|
        pass
 | 
						|
    def suppress (self,value):
 | 
						|
        pass
 | 
						|
    def format_date(self,value):
 | 
						|
        pass
 | 
						|
    
 | 
						|
def split(row,sep='*',prefix='HI'):
 | 
						|
    """
 | 
						|
    This function is designed to split an x12 row and 
 | 
						|
    """
 | 
						|
    if row.startswith(prefix) is False:
 | 
						|
        value = []
 | 
						|
        for row_value in row.replace('~','').split(sep) :
 | 
						|
            
 | 
						|
            if '>' in row_value :
 | 
						|
                if row_value.startswith('HC') or row_value.startswith('AD'):
 | 
						|
                
 | 
						|
                    value += row_value.split('>')[:2] 
 | 
						|
                else:
 | 
						|
                    
 | 
						|
                    value += row_value.split('>')   if row.startswith('CLM') is False else [row_value]
 | 
						|
                    
 | 
						|
            else :
 | 
						|
                
 | 
						|
                value.append(row_value)
 | 
						|
        return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
 | 
						|
    else:
 | 
						|
        
 | 
						|
        return [ [prefix]+ split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
 | 
						|
def get_config(config,row):
 | 
						|
    """
 | 
						|
    This function will return the meaningfull parts of the configuration for a given item
 | 
						|
    """
 | 
						|
    _row = list(row) if type(row[0]) == str else list(row[0])
 | 
						|
    
 | 
						|
    _info = config[_row[0]] if _row[0] in config else {}
 | 
						|
    key = None
 | 
						|
    if '@ref' in _info:
 | 
						|
        key = list(set(_row) & set(_info['@ref'].keys()))
 | 
						|
        if key :
 | 
						|
            key  = key[0]
 | 
						|
            return _info['@ref'][key]
 | 
						|
        else:
 | 
						|
            return {}
 | 
						|
        
 | 
						|
    if not _info and 'SIMILAR' in config:
 | 
						|
        #
 | 
						|
        # Let's look for the nearest key using the edit distance
 | 
						|
        if _row[0] in config['SIMILAR']    :
 | 
						|
            key = config['SIMILAR'][_row[0]]
 | 
						|
            _info = config[key]
 | 
						|
    
 | 
						|
    return _info
 | 
						|
def hash(value):
 | 
						|
    salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else ''
 | 
						|
    _value = str(value)+ salt
 | 
						|
    if sys.version_info[0] > 2 :
 | 
						|
        return hashlib.md5(_value.encode('utf-8')).hexdigest()
 | 
						|
    else:
 | 
						|
        return hashlib.md5(_value).hexdigest()
 | 
						|
def suppress(value):
 | 
						|
    return 'N/A'
 | 
						|
    
 | 
						|
def format_date(value) :
 | 
						|
    if len(value) == 8 :
 | 
						|
        year = value[:4]
 | 
						|
        month = value[4:6]
 | 
						|
        day = value[6:]
 | 
						|
        return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
 | 
						|
    elif len(value) == 6 :
 | 
						|
        year = '20' + value[:2]
 | 
						|
        month = value[2:4]
 | 
						|
        day   = value[4:]
 | 
						|
        return "-".join([year,month,day])
 | 
						|
def format_time(value):
 | 
						|
    return ":".join([value[:2],value[2:] ])[:5]
 | 
						|
def sv3_parse(value):
 | 
						|
    if '>' in value :
 | 
						|
        terms = value.split('>')
 | 
						|
        return {'type':terms[0],'code':terms[1]}
 | 
						|
        
 | 
						|
    pass
 | 
						|
def sv2_parse(value):
 | 
						|
    #
 | 
						|
    # @TODO: Sometimes there's a suffix (need to inventory all the variations)
 | 
						|
    #
 | 
						|
    if '>' in value or ':' in value:
 | 
						|
        xchar = '>' if '>' in value else ':'
 | 
						|
        _values = value.split(xchar)
 | 
						|
        modifier = {}
 | 
						|
        
 | 
						|
        if len(_values) > 2 :
 | 
						|
 | 
						|
            modifier= {"code":_values[2]}
 | 
						|
            if len(_values) > 3 :
 | 
						|
                modifier['type'] = _values[3]
 | 
						|
        _value = {"code":_values[1],"type":_values[0]}
 | 
						|
        if modifier :
 | 
						|
            _value['modifier'] = modifier
 | 
						|
 | 
						|
        return _value
 | 
						|
    else:
 | 
						|
        return value
 | 
						|
def format_proc(value):
 | 
						|
    for xchar in [':','<'] :
 | 
						|
        if xchar in value and len(value.split(xchar)) > 1 :
 | 
						|
            #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}
 | 
						|
            _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()}
 | 
						|
            break
 | 
						|
        else:
 | 
						|
            _value = str(value)
 | 
						|
    return _value
 | 
						|
def format_diag(value):
 | 
						|
 | 
						|
    return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
 | 
						|
def format_pos(value):
 | 
						|
    
 | 
						|
    xchar = '>' if '>' in value else ':'
 | 
						|
    x = value.split(xchar)    
 | 
						|
    x =  {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}
 | 
						|
    return x
 | 
						|
    
 | 
						|
def get_map(row,config,version=None):
 | 
						|
    
 | 
						|
    label = config['label'] if 'label' in config else None    
 | 
						|
    
 | 
						|
    omap = config['map'] if not version or version not in config else config[version]
 | 
						|
    anchors = config['anchors'] if 'anchors' in config else []
 | 
						|
    if type(row[0]) == str:        
 | 
						|
        object_value = {}
 | 
						|
        for key in omap :
 | 
						|
            index = omap[key]
 | 
						|
            if anchors and set(anchors) & set(row):
 | 
						|
                _key = list(set(anchors) & set(row))[0]
 | 
						|
                
 | 
						|
                aindex = row.index(_key)
 | 
						|
                index = aindex +  index
 | 
						|
 | 
						|
            if index < len(row) :
 | 
						|
                value = row[index] 
 | 
						|
                
 | 
						|
                if 'cast' in config and key in config['cast'] and value.strip() != '' :
 | 
						|
                    
 | 
						|
                    value = eval(config['cast'][key])(value)
 | 
						|
 | 
						|
                    
 | 
						|
                if type(value) == dict :
 | 
						|
                    for objkey in value :
 | 
						|
                        
 | 
						|
                        if type(value[objkey]) == dict :
 | 
						|
                            continue 
 | 
						|
                        if 'syn' in config and value[objkey] in config['syn'] :
 | 
						|
                            value[objkey] = config['syn'][ value[objkey]]
 | 
						|
                    value = {key:value} if key not  in value else value
 | 
						|
                else:
 | 
						|
                    if 'syn' in config and value in config['syn'] :
 | 
						|
                        value = config['syn'][value]
 | 
						|
                if type(value) == dict :
 | 
						|
                    
 | 
						|
                    object_value = dict(object_value, **value) 
 | 
						|
                else:
 | 
						|
                    object_value[key] = value
 | 
						|
    else:
 | 
						|
        #
 | 
						|
        # we are dealing with a complex object
 | 
						|
        object_value = []
 | 
						|
        
 | 
						|
        for row_item in row :
 | 
						|
            value = get_map(row_item,config,version)            
 | 
						|
            object_value.append(value)
 | 
						|
            #
 | 
						|
            # We need to add the index of the object it matters in determining the claim types
 | 
						|
            #
 | 
						|
            
 | 
						|
            # object_value.append( list(get_map(row_item,config,version)))
 | 
						|
        # object_value = {label:object_value}
 | 
						|
    return object_value
 | 
						|
 | 
						|
def get_locations(x12_file,section='HL') :
 | 
						|
    locations = []
 | 
						|
    for line in x12_file :
 | 
						|
        
 | 
						|
        if line.strip().startswith(section) :
 | 
						|
            i = x12_file.index(line)
 | 
						|
            locations.append(i)
 | 
						|
    return locations
 | 
						|
 | 
						|
#def get_claims(filename,config,section) :
 | 
						|
def get_content(filename,config,section=None) :
 | 
						|
    """
 | 
						|
    This function returns the of the EDI file parsed given the configuration specified
 | 
						|
    :section    loop prefix (HL, CLP)
 | 
						|
    :config     configuration with formatting rules, labels ...
 | 
						|
    :filename   location of the file
 | 
						|
    """
 | 
						|
    section = section if section else config['SECTION']
 | 
						|
    logs = []
 | 
						|
    try:
 | 
						|
 | 
						|
        x12_file = open(filename.strip(),errors='ignore').read().split('\n')
 | 
						|
    except Exception as e:
 | 
						|
        #
 | 
						|
        # We have an error here that should be logged 
 | 
						|
        if sys.version_info[0] > 2 :
 | 
						|
            # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
 | 
						|
            logs.append ({"version":"unknown","filename":filename,"msg":e.args[0]})
 | 
						|
        else:
 | 
						|
            # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
 | 
						|
            logs.append ({"version":"unknown","filename":filename,"msg":e.message})
 | 
						|
        return [],logs
 | 
						|
 | 
						|
        pass
 | 
						|
    
 | 
						|
    if len(x12_file) == 1 :
 | 
						|
        
 | 
						|
        x12_file = x12_file[0].split('~')
 | 
						|
        
 | 
						|
    #partitions = '\n'.join(x12_file).split(section+'*')
 | 
						|
    locations = get_locations(x12_file,section)
 | 
						|
    claims = []
 | 
						|
    #
 | 
						|
    # given locations it is possible to build up the partitions (made of segments)
 | 
						|
    
 | 
						|
    beg = locations [0]
 | 
						|
    partitions = []
 | 
						|
    for end in locations[1:] :
 | 
						|
        partitions.append ("\n".join(x12_file[beg:end]))
 | 
						|
        beg = end
 | 
						|
    
 | 
						|
    # VERSION = x12_file[2].split('*')[3].replace('~','')    
 | 
						|
    TOP_ROW = x12_file[1].split('*')
 | 
						|
    CATEGORY= x12_file[2].split('*')[1].strip()
 | 
						|
    VERSION         = x12_file[1].split('*')[-1].replace('~','')   
 | 
						|
    SUBMITTED_DATE  = format_date(TOP_ROW[4])
 | 
						|
    SENDER_ID       = TOP_ROW[2]
 | 
						|
    row = split(x12_file[3])
 | 
						|
    _info = get_config(config,row)    
 | 
						|
    
 | 
						|
    _default_value = get_map(row,_info,VERSION) if _info else {}    
 | 
						|
    
 | 
						|
    N = len(locations)
 | 
						|
 | 
						|
    # for index in range(0,N-1):
 | 
						|
    #     beg = locations[index]
 | 
						|
    #     end = locations[index+1]
 | 
						|
    #     claim = {}
 | 
						|
    for segment in partitions :
 | 
						|
        
 | 
						|
        claim = {}   
 | 
						|
        # for row in x12_file[beg:end] :
 | 
						|
        segment = segment.replace('\n','').split('~')
 | 
						|
        for row in segment :
 | 
						|
            row = split(row)
 | 
						|
            
 | 
						|
            _info = get_config(config,row)
 | 
						|
            if _info :
 | 
						|
                try:                    
 | 
						|
                    # tmp = get_map(row,_info,VERSION)
 | 
						|
                    # if 'parser' in _info :
 | 
						|
                    #     pointer = eval(_info['parser'])
 | 
						|
                    
 | 
						|
                    tmp = get_map(row,_info,VERSION)
 | 
						|
                    
 | 
						|
                except Exception as e:                    
 | 
						|
                    if sys.version_info[0] > 2 :
 | 
						|
                        # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
 | 
						|
                        logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":row,"completed":False,"rows":len(row)})
 | 
						|
                    else:
 | 
						|
                        # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
 | 
						|
                        logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":row,"rows":len(row),"completed":False})
 | 
						|
                    claim = {}
 | 
						|
                    break
 | 
						|
                
 | 
						|
                if 'label' not in _info :
 | 
						|
                    tmp['version']      = VERSION   
 | 
						|
                    tmp['submitted']    = SUBMITTED_DATE
 | 
						|
                    if TOP_ROW[1] == 'HP' :
 | 
						|
                        tmp['payer_id'] = SENDER_ID
 | 
						|
                        
 | 
						|
                    elif TOP_ROW[1] == 'HC':
 | 
						|
                        tmp['provider_id'] = SENDER_ID
 | 
						|
                        
 | 
						|
                    tmp['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}
 | 
						|
                    claim = dict(claim, **tmp)
 | 
						|
                    
 | 
						|
                    
 | 
						|
                else:
 | 
						|
                    label = _info['label']
 | 
						|
                    if type(tmp) == list :
 | 
						|
                        
 | 
						|
                        claim[label] = tmp if label not in claim else claim[label] + tmp
 | 
						|
                        
 | 
						|
                    else:
 | 
						|
                        if label not in claim:                    
 | 
						|
                            claim[label] = [tmp]
 | 
						|
                        elif len(list(tmp.keys())) == 1 :
 | 
						|
                            
 | 
						|
                            index = len(claim[label]) -1 
 | 
						|
                            claim[label][index] = dict(claim[label][index],**tmp)
 | 
						|
                        else:
 | 
						|
                            claim[label].append(tmp)
 | 
						|
                    if len(claim[label]) > 0 :                    
 | 
						|
                        labels = []
 | 
						|
                        for item in claim[label] :
 | 
						|
                            item['_index'] = len(labels)
 | 
						|
                            if item not in labels :
 | 
						|
                                
 | 
						|
                                labels.append(item)
 | 
						|
                        claim[label] = labels
 | 
						|
                        # claim[label] = list( set(claim[label])) #-- removing redundancies
 | 
						|
        if claim and 'claim_id' in claim:
 | 
						|
            
 | 
						|
            claim = dict(claim,**_default_value)
 | 
						|
            claim['name'] = filename.split(os.sep)[-1] #.replace(ROOT,'')
 | 
						|
            claim['index'] = len(claims) if len(claims) > 0 else 0
 | 
						|
            claims.append(claim)
 | 
						|
        else:
 | 
						|
            #
 | 
						|
            # Could not find claim identifier associated with data 
 | 
						|
            #
 | 
						|
            pass
 | 
						|
            
 | 
						|
            
 | 
						|
    return claims,logs
 |