plug-n-play design

__author__ = 'The Phi Technology LLC'
__version__ = '1.0'
__license__ = """
(c) 2019 EDI Parser Toolkit,
Health Information Privacy Lab, Vanderbilt University Medical Center & The Phi Technology
Steve L. Nyemba <>
Khanhly Nguyen <>
This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
Usage :
Commandline :
python --parse claims|remits --config <path>
Embedded :

This module is designed to perform exports to a relational data stores
Note that the There are two possible methods to perform relational exports
import transport
from transport import providers
import healthcareio.x12.plugins
# We start by loading all the plugins
def primary_key (**_args) :
_plugins = _args['plugins']
for key in _plugins :
_lpointers =
def init (**_args):
if 'path' in _args :
_path = _args['path']
_plugins,_parents = healthcareio.x12.plugins.instance(path=_path)
_plugins,_parents = healthcareio.x12.plugins.instance()
for key in _plugins :
_lpointers = _plugins[key]
_foreign = {}
_table = {}
for _pointer in _lpointers :
_meta = _pointer.meta
if 'map' in _meta :
_attr = list(_meta['map'].values())
if 'field' in _meta :
_name = _meta['field']
_foreign[_name] = _attr

import datetime
def date(**_args):
This function will return a data as presented in the {x12} i.e it could be a date-range or a single date
- In the case of a single data it is returned as a string
- In the case of a range a complex object is returned with to,from keys
NOTE: dates will be formatted as they
if not _args :
return ['from','to','type']
_date = ""
return _date
def procedure (**_args):
This function will parse SVC element and return given the following The return object is as follows :
claim_id,charge_amount, payment_amount,patient_amount,patient_status,claim_status
cols = ['type','code','amount']
if not _args :
return cols
_procedure = dict.fromkeys(cols,None)
_row = _args['row']
# _document = _args['document']
if len(_row) == 3 :
_procedure = dict(zip(cols,_row[1:4]))
return _procedure
return _info
def SV2(**_args):
def SV3(**_args):
def HL (**_args):
def HI(**_args):

@ -7,20 +7,49 @@ In addition to the allow custom plugins to be written/loaded and these will be g
- Support configuration specification
import os
from . import common
from . import header
from . import body
import sys
# from . import common
# from . import header
# from . import body
import importlib as IL
# import imp
from .. import parser
# from .claims import *
# from .remits import *
# EDI = body.BODY
# X12Handler = body.BODY
from healthcareio.x12.plugins.default import claims
from healthcareio.x12.plugins.default import remits
# import .remits
EDI = body.BODY
__version__ = '0.01'
__author__ = 'The Phi Technology'
def instance(**_args):
@parser(element='ISA',x12='837',field='header', map={15:'mode',12:'version',9:'date',10:'time'})
def ISA(**_args):
:row raw {x12} row
:data parsed data
:meta elements containing map {index:field_name}
@parser(element='GS', map={1:'type',2:'sender',3:'receiver',4:'date',5:'time',8:'version'},field='receiver')
def GS(**_args):
@parser(element='ST', x12='837', field='header', map={1:'x12',2:'control_number'})
def ST(**_args):
:row raw {x12} row
:data parsed data
:meta elements containing map {index:field_name}
def BHT (**_args):
:row raw {x12} row
:data parsed data
:meta elements containing map {index:field_name}
# defining commong functions that can/should be used accross the board
# # class Parser :
# def __init__(**_args):
# folder = _args['path']
# files = [ os.sep.join(_name,folder) for _name in os.listdir(folder)]
# pass

from typing import Any
import numpy as np
import json
from multiprocessing import Process, RLock
import os
import io
import queue
import transport
from transport import providers
class Store(Process):
This is the data-store service that will handle read/writes
dataStore = None
def init(self,**_args):
if Store.dataStore is None :
_args = _args['store']
def reset():
class X12DOCUMENT (Process):
X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
_queue = queue.Queue()
class MODE :
# The following allow us to handle raw content (stream) or a filename
# The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
class ConfigHandler :
def format(self,**_args):
This function formats variations of an element's parsing rules
:info {index,field|label,map}
_info = _args['info']
_ref = {}
for _item in _info :
_index = str(_item['index'])
_field = _item['field'] if 'field' in _item else None
_label = _item['label'] if 'label' in _item else None
if _field :
_ref[_index] = {'field':_field}
elif _label :
_ref[_index] = {'label':_label}
return {'@ref':_ref}
def _getColumnsIndexes(self,_columns,_indexes,_map):
This function return columns and indexes related if a parsing map is passed
:param _columns
:param _indexes
:param _map parsing map (field:index)
# @TODO: insure the lengths are the same for adequate usage downstream ...
_xcolumns,_xindexes = list(_map.keys()), list(_map.values())
keys,values = _xcolumns + _columns,_xindexes + _indexes
_config = dict(zip(keys,values))
_outColumns,_outIndexes = list(_config.keys()),list(_config.values())
return _outColumns,_outIndexes
def _getObjectAtributes(self,_config):
_field = _config['field'] if 'field' in _config else {}
_label = _config['label'] if 'label' in _config else {}
return _field,_label
def merge(self,**_args):
# This function overrides the old configuration with the new configuration specifications
# _columns,_indexes = [],[]
_columns,_indexes = _args['columns'],_args['index']
_map = {}
_config = _args['config'] if 'config' in _args else {}
_field,_label = self._getObjectAtributes(_config)
if 'map' in _config :
_map = _args['config']['map']
_columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
if '@ref' in _config :
# _columns,_indexes = [],[]
_row = _args['row']
_ref = _config['@ref']
for _anchor in _ref:
# print ([_anchor,_anchor == _row[1].strip()])
if _anchor == _row[1].strip() :
_field,_label = self._getObjectAtributes(_ref[_anchor])
_map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
if _map :
_columns,_indexes = self._getColumnsIndexes([],[],_map)
# _columns,_indexes = _columns + _map.keys()
return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
def legacy(self,**_args):
# This function returns the legacy configuration (default parsing)
_config = _args['config'] if 'config' in _args else {}
_field,_label = self._getObjectAtributes(_config)
_columns,_indexes = [],[]
if 'map' in _config :
_columns = list(_config['map'].keys())
_indexes = list(_config['map'].values())
return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
def override(self,**_args):
return _args['columns'],_args['indexes']
def __init__(self,**_args):
self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
if 'files' in _args :
self.files = _args['files']
self._config = _args['config'] if 'config' in _args else {}
self._document = []
self._x12FileType = None
self._configHandler = X12DOCUMENT.ConfigHandler()
#-- The files need to be classified, the files need to be either claims or remits
if 'store' not in self._config :
self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
self._store_args = self._config['store']
def init(self,_header):
Expected Elements must include ST
def merge (self,_x,_y):
This function will merge two objects _x, _y
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
if _zcols :
_out = dict(_x,**{})
for _key in _y.keys() :
if not _key in _zcols :
_out[_key] = _y[_key]
if type(_out[_key]) == list :
_out[_key] += _y[_key]
elif type(_out[_key]) == dict:
_out[_key] = dict(_out[_key],**_y[_key])
_out[_key] = _y[_key]
return _out
return dict(_x,**_y)
def split(self,content):
This function will split the content of an X12 document into blocks and headers
:content x12 document in raw format (text)
#_content = content.split('~')
_content = content.split('HL')
_header = _content[:1][0].split('~')
_blocks = ['HL'+_item for _item in _content[1:]]
_blocks = [_item.split('~') for _item in _blocks ]
# for row in _content :
# if not _blocks and not row.startswith('HL') :
# _header.append(row)
# else:
# _blocks.append(row)
return {'header':_header,'blocks':_blocks}
def parse (self,columns,index,**_args):
This function encapulates how an x12 document element will be processed
:columns list of attributes that make up the object
:index indexes of the said items in the element
- row raw x12 element (string)
- config configuration of the element. his should indicate functions to apply against function
_ELEMENT = _args['row'][0]
# get the right configuration from the _config object
_config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
# _field = _config['field'] if 'field' in _config else None
# _label = _config['label'] if 'label' in _config else None
_map = _config['map'] if 'map' in _config else {}
# Let's see if overriding the fields/labels isn't necessary
# columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
# _field = _field if not _refField else _refField
# _label = _label if not _refLabel else _refLabel
_outInfo = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
_field,_label = _outInfo['field'],_outInfo['label']
_columns,_index = _outInfo['columns'],_outInfo['index']
if 'row' in _args:
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
_index = np.array(_index)
# Sometimes the _row doesn't have all expected indexes, we will compensate
# This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
if np.max(_index) > len(_row) -1 :
_delta = 1 + np.max(_index) - len(_row)
_row = _row + np.repeat('',_delta).tolist()
_row = np.array(_row)
# _element = _row[0]
_configKeys = [] #list(self._config.keys())
_configTree = [] #list(self._config.values())
if 'config' in _args :
_config = _args['config']
_configKeys = list(_config.keys())
_configTree = list(_config.values())
_config = {}
_info = dict(zip(_columns,_row[_index].tolist()))
_document = _args['document'] if 'document' in _args else {}
# Extracting configuration (minimal information)
# _config = _args['config'] if 'config' in _args else {}
# _config = self._config
# if '@ref' in _config :
# print (_config['@ref'])
# _values = _config['@ref']
# print (_values)
if _field :
if not _field in _document :
return {_field:_info}
return self.merge(_document[_field],_info)
elif _label :
if not _label in _document :
return {_label:[_info]}
return _document[_label] + [_info]
return _info
return columns
def elements(self):
This function returns elements that are supported as specified by X12 standard
return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
def pointers(self):
This function returns pointers associated with each element ...
:return Object of Element:Function
_attr = self.elements()
_pointers = [getattr(self,_name) for _name in _attr]
return dict(zip(_attr,_pointers))
def set(self,_info,_document,_config):
_attrName,_attrType = None,None
if 'label' in _config :
_attrType = 'label'
_attrName = _config['label']
elif 'field' in _config :
_attrType = 'field'
_attrName = _config['field']
if _attrName :
if _attrName not in _document :
_document[_attrName] = [] if _attrType == 'label' else {}
# @TODO: make sure we don't have a case of an attribute being overridden
if type(_document[_attrName]) == list :
_document[_attrName] += [_info]
_document[_attrName] = dict(_document[_attrName],**_info)
# _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
return _document
return dict(_document,**_info)
def log (self,**_args):
def run(self):
This function will trigger the workflow associated with a particular file
_getContent = {
# For the sake of testing, the following insures
# that raw string content is handled as if it were a file
X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) ,
X12DOCUMENT.MODE.NAMES: (lambda name: open(name))
_writer = transport.factory.instance(**self._store_args)
for _filename in self.files :
_documents = []
_parts = []
# _content = (open(_filename)).read()
_reader = _getContent[self._mode]
_content = _reader(_filename).read()
_info = self.split(_content)
_header = self.apply(_info['header'])
# print (json.dumps(_header))
for _content in _info['blocks'] :
_body = self.apply(_content,header=_header)
_doc = self.merge(_header,_body)
if _doc and 'claim_id' in _doc:
# X12DOCUMENT._queue.put(_document)
_documents += [_doc]
except Exception as e:
# @TODO: Log this issue for later analysis ...
print (e)
# Let us post this to the documents we have, we should find a place to post it
if _documents :
# print (_header['header']),writer=_writer)
def post(self,**_args):
This function is intended to post content to a given location
:param document
:param writer
_writer = _args['writer'] if 'writer' in _args else None
_document = _args['document']
if not _writer:
def _getConfig(self,_chunk):
# Let us determine what kind of file we are dealing with, so we can extract the configuration
# For this we need to look for the ST loop ...
line = [line for line in _chunk if line and line[:2] == 'ST' ]
if line :
# We found the header of the block, so we can set the default configuration
self._x12FileType = line[0].split('*')[1].strip()
_config = {}
if self._x12FileType :
_config = self._config[self._x12FileType]
return _config
def apply(self,_chunk, header = {}):
_chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
_document,_cached = {},{}
_pointers = self.pointers()
_config = self._getConfig(_chunk)
# The configuration comes from the file, let's run this in merge mode
# _config = self._configHandler.merge
_pid = None
for line in _chunk :
segments = line.split('*')
_ELEMENT = segments[0]
if _ELEMENT not in _pointers or not _ELEMENT:
if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
_pid = _ELEMENT
if _pid not in _cached :
_cached [_pid] = {}
_pointer = _pointers[_ELEMENT]
_args = {'row':segments,'document':_document,'header':header,'config':(_config)}
_parsedLine = _pointer(**_args)
# print ([_pid,_ELEMENT,_parsedLine])
_cached[_pid] = self.merge(_cached[_pid],_parsedLine)
# Let's create the documents as we understand them to be
# @TODO: Create a log so there can be visibility into the parser
_document = {}
for _id in _cached :
# print ('patient' in _cached[_id] )
_document = self.merge(_document,_cached[_id])
return _document

_args ['plugin-context'] = {'@ref':CONTEXT_MAP}
return self.parse(_columns,[9,2,1],**_args)
def DMG (self,**_args):
Expected Element DMG
Expected Element DMG, these need to be stored in a patient object
_columns = ['dob','gender','format']
_info = self.parse(_columns,[2,3,1],**_args)
return _info
return {'patient':_info}
def DTP (self,**_args):
Expected Element DTP
Expected Element HL
The expected block is supposed to be unprocessed (to make things simple)
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('~')
# _row = _args['row'] if type(_args['row']) == list else _args['row'].split('~')
# _attr = self.elements() #[_name for _name in dir() if not _name.islower() and not _name.startswith('_')]
# _pointers = [getattr(self,_name) for _name in _attr]
# The index here tells us what we are processing i.e index == 1 something about header
_columns = ['_index','parent','code','child']
_args['row'] = _row[0]
# _args['row'] = _row[0]
_info = self.parse (_columns,[1,2,3,4],**_args)
# _field = 'billing_provider' if _info['_index'] == '1' else 'patient'
# _config ={'field':_field}
return _info
# _claim = {_field:_info}
# for _element in _row[1:] :
# _key = _element.split('*')[0]
# if _key in _map and len(_element) > 0:
# _document = _args['document']
# _pointer = _map[_key]
# if _key not in ['CLM','HI','SV3','SV2','SV1'] :
# _claim = self.merge (_claim,_pointer(row=_element.strip().split('*'),document=_document,config=_config))
# else:
# _config = _args['config'] if 'config' in _args else {}
# _claim = self.merge (_claim,_pointer(row=_element.strip().split('*'),document=_document,config=_config))
# else:
# print (['SKIPPING ',_key])
# pass
# return _claim
# def apply(self,_block):
# """
# :_block elements that do not belong to the header block
# """
# _apply = self.pointers()
# _header = {}
# if _block :
# for content in _block :
# _KEY_ELEMENT = content.split('*')[0]
# if _KEY_ELEMENT not in _apply :
# #
# # @TODO: Log elements that are skipped
# # print ([_KEY_ELEMENT , 'NOT FOUND'])
# continue
# _info = _apply[_KEY_ELEMENT](row=content,document=_header)
# if _info :
# if not _header :
# _header = _info
# else:
# _header = self.merge(_header,_info)
# else:
# #
# # For some reason the parser failed by returning a null
# # @TODO: Log this event ....
# pass
# else:
# #
# # @TODO: return the meta data for what is expected
# pass
# return _header

import numpy as np
from .. import parser
from datetime import datetime
@parser(element='NM1',x12='*', anchor={'41':'submitter','40':'receiver','82':'rendering_provider','85':'billing_provider','87':'pay_to_provider','IL':'patient','PR':'payer','QC':'patient','DN':'referring_provider','77':'provider','2':'billing_provider'}, map={1:'type',3:'name',-1:'id'})
def NM1 (**_args):
Expected Element NM1
ref IL,40,41,82,85,PR ...
Information about entities (doctors, clearing house, provider). we should be mindful of the references
# '2':{'field':'payer'},
# 'PR':{'field':'payer'},
# '41':{'field':'header'},
# '45':{'field':'ambulance_location'},
# 'IL':{'field':'patient','map':{'type':2,'first_name':4,'last_name':3}},
# 'P5':{'field':'plan_sponsor'},
# '82':{'field':'rendering_provider','map':{'type':2,'first_name':4,'last_name':3}},
# '85':{'field':'billing_provider'},
# }
# _args ['plugin-context'] = {'@ref':CONTEXT_MAP}
# # _map = {_CODE_INDEX:{'41':'submitter','40':'receiver','PR':'payer'}}
# _columns = ['type','name','id']
# _indexes = [1,3,-1]
# # _info = [{'index':'40','field':'receiver'},{'index':'41','field':'submitter'},{'index':'PR','field':'payer'}]
# _pointer = _args['parser']
# _info = _pointer(_columns,_indexes,**_args)
# self.lastelement = _info
# return _info
@parser(element='N3',x12='837', parent='NM1',map={1:'address_line_1',2:'address_line_2'})
def N3 (**_args):
Expected Element N3
# _columns = ['address_line_1']
# return self.parse(_columns,[1,2],**_args)
def N4(**_args):
Expected Element N4
# _columns = ['city','state','zip']
# return self.parse(_columns,[1,2,3],**_args)
@parser(element='HI',x12='837', map={1:'type',2:'code'})
def HI(**_args):
Expected Element HI
This function will parse diagnosis codes ICD 9/10
# _columns = ['code','type']
# return self.parse(_columns,[2,1],**_args)
def AMT (**_args):
Expected Element AMT
# _columns = ['amount','qualifier']
# return self.parse(_columns,[2,1],**_args)
def SBR (**_args):
Expected Element SBR
# _index = [9,1]
# _columns = ['vendor','individual_code','type']
# return self.parse(_columns,[9,2,1],**_args)
@parser(element='DMG',x12='837', field='patient',map={2:'dob',3:'gender',1:'format'})
def DMG (**_args):
Expected Element DMG, these need to be stored in a patient object
_data = _args['data']
_y = _data['dob'][:4]
_m= _data['dob'][4:6]
_d = _data['dob'][6:].strip()
_data['dob'] = datetime(year=int(_y), month=int(_m),day=int(_d))
return _data
# _columns = ['dob','gender','format']
# _info = self.parse(_columns,[2,3,1],**_args)
# return {'patient':_info}
@parser(element='DTP', x12='837', field='date',map={3:['to','from']})
def DTP (**_args):
Expected Element DTP
# _columns = ['to','from','type']
# return self.parse(_columns,[3],**_args)
_data = _args['data']
_data['to'] = '-'.join([_data['to'][:4],_data['to'][4:6],_data['to'][6:]])
_data['from'] = '-'.join([_data['from'][:4],_data['from'][4:6],_data['from'][6:]])
return _data
def PER (**_args):
Expected Element PER
# _map = {_CODE_INDEX:{'IC':'submitter'}} # attribute to store the data in
# _columns = ['contact_name','phone','email']
# _info = self.parse (_columns,[2,4,8],**_args)
# @TODO: Inspect the configuration file for the attribute information
# return _info
def CLM (**_args):
Expected Element CLM
_data = _args['data']
_data['claim_amount'] = np.float64(_data['claim_amount'])
return _data
# _columns = ['claim_id','claim_amount','facility_code','facility_qualifier','frequency_code']
# return self.parse(_columns,[1,2,5,5,5],**_args)
# @parser(element='REF', field='ref',map={2:'id'})
def REF (**_args):
# print (_args)
_columns = ['identifier','qualifier','']
# _CODE_INDEX = 1 # -- according to x12 standard
# _map = {_CODE_INDEX:{'EA':'patient','EI':'provider','6R':'','D9':''}}
# return self.parse(_columns,[2],**_args)
def HI (**_args):
Expected Element HI
# _columns = ['code','type']
# return self.parse(_columns,[1,2],**_args)
_data = _args['data']
if ':' in _data['code'] :
_data['type'],_data['code'] = _data['code'].split(':')
return _data
def SV1 (**_args):
Expected Element SV1
# _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
# _columns = ['type','code','amount','modifier_1','modifier_2','modifier_3','modifier_4','place_of_service','units','measurement']
# return self.parse(_columns,[1,1,2,1,1,1,1,5,4,3],**_args)
_data = _args['data']
if 'type' in _data :
_data['type'] = _data['type'].split(':')[0]
_data['code'] = _data['code'].split(':')[1]
_data['amount']= np.float64(_data['amount'])
return _data
@parser (element='HL',x12='837', field='patient', map={1:'_index',2:'parent_code',3:'level_code',4:'child_code'})
def HL (**_args) :
Expected Element HL
The expected block is supposed to be unprocessed (to make things simple)
# _data = _args['data']
# _data['_index'] = int(_data['_index'])
# return _data
# # _row = _args['row'] if type(_args['row']) == list else _args['row'].split('~')
# # _attr = self.elements() #[_name for _name in dir() if not _name.islower() and not _name.startswith('_')]
# # _pointers = [getattr(_name) for _name in _attr]
# # _map = dict(zip(_attr,_pointers))
# _map = self.pointers()
# #
# # The index here tells us what we are processing i.e index == 1 something about header
# #
# _columns = ['_index','parent','code','child']
# # _args['row'] = _row[0]
# _info = self.parse (_columns,[1,2,3,4],**_args)
# # _field = 'billing_provider' if _info['_index'] == '1' else 'patient'
# # _config ={'field':_field}
# return _info

_field,_label = self._getObjectAtributes(_config)
_columns,_indexes = [],[]
if 'map' in _config :
_columns = list(_config['map'].keys())
_indexes = list(_config['map'].values())
#_content = content.split('~')
_content = content.split('HL')
_header = _content[:1][0].split('~')
xchar = '~\n' if '~\n' in _content[0] else '~'
_header = _content[:1][0].split(xchar) #.split('~')
_blocks = ['HL'+_item for _item in _content[1:]]
_blocks = [_item.split('~') for _item in _blocks ]
_blocks = ['HL*'+_item for _item in _content[1:]]
# xchar = '~\n' if '~\n' in _blocks[0] else '~'
_blocks = [_item.split(xchar) for _item in _blocks ]
# for row in _content :
# if not _blocks and not row.startswith('HL') :
# _header.append(row)
# else:
# _blocks.append(row)
return {'header':_header,'blocks':_blocks}
def parse (self,columns,index,**_args):
- config configuration of the element. his should indicate functions to apply against function
_ELEMENT = _args['row'][0]
# get the right configuration from the _config object
_config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
_row = _row + np.repeat('',_delta).tolist()
_row = np.array(_row)
# _element = _row[0]
# _configKeys = [] #list(self._config.keys())
# _configTree = [] #list(self._config.values())
# if 'config' in _args :
# _config = _args['config']
# _configKeys = list(_config.keys())
# _configTree = list(_config.values())
# else:
# _config = {}
_info = dict(zip(_columns,_row[_index].tolist()))
_document = _args['document'] if 'document' in _args else {}
_item = _document[_label] + [_info]
_item = _info
if _ELEMENT in self._hierarchy and _field:
# print ([_field,_item])
self.lastelement = _item
@ -307,9 +305,17 @@ class X12DOCUMENT (Process):
_ikey = list(self.lastelement.keys())[0]
_oldinfo = self.lastelement[_ikey]
_item = {_ikey: self.merge(_oldinfo,_item)}
if type(_oldinfo) != dict :
# This is we should log somewhere to suggest an issue happened
# self.log(action='error',input=_row)
_item = {_ikey: self.merge(_oldinfo,_item)}
return _item
def log (self,**_args):
def parseBlocks (self,_blocks,_header):
This function extracts blocks and returns them to the caller,
Blocks of a document are made of transactional loops, that constitute a patients claim
_tmp = {}
_documents = []
for _content in _blocks :
_body = self.apply(_content,header=_header)
_doc = self.merge(_header,_body)
# self.log(action='parse',section='body',input=_content[0])
if _doc and 'claim_id' in _doc:
# X12DOCUMENT._queue.put(_document)
# self.log(action='parse',section='document')
_documents += [self.merge(_tmp,_doc)]
_tmp = {}
# The document is being built and not yet ready
_tmp = self.merge(_tmp,_doc)
return _documents
def run(self):
This function will trigger the workflow associated with a particular file
_getContent = {
# For the sake of testing, the following insures
# that raw string content is handled as if it were a file
X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) ,
X12DOCUMENT.MODE.NAMES: (lambda name: open(name))
_writer = transport.factory.instance(**self._store_args)
for _filename in self.files :
# self.log(action='parse',section='file',input=_filename)
_documents = []
_parts = []
# _content = (open(_filename)).read()
_reader = _getContent[self._mode]
_content = _reader(_filename).read()
if os.sep in _filename and os.path.exists(_filename) :
_reader = open(_filename)
# This is a stream, we are wrapping it into an appropriate structure
_reader = io.StringIO(_filename)
# Let us log the mode we have set ...
_content =
if hasattr(_reader,'close') :
_info = self.split(_content)
_header = self.apply(_info['header'])
# print (json.dumps(_header))
_tmp = {}
for _content in _info['blocks'] :
_body = self.apply(_content,header=_header)
_doc = self.merge(_header,_body)
if _doc and 'claim_id' in _doc:
# X12DOCUMENT._queue.put(_document)
if _info['blocks'] :
# processing blocks for the current claim
_documents = self.parseBlocks(_info['blocks'],_header)
_documents += [self.merge(_tmp,_doc)]
_tmp = {}
# The document is being built and not yet ready
_tmp = self.merge(_tmp,_doc)
except Exception as e:
# @TODO: Log this issue for later analysis ...
print (e)
# Let us post this to the documents we have, we should find a place to post it
# #
# # Let us post this to the documents we have, we should find a place to post it
# #
if _documents :
# print (_header['header'])
_writer = transport.factory.instance(**self._store_args),writer=_writer)
def post(self,**_args):
segments = line.split('*')
_ELEMENT = segments[0]
if _ELEMENT not in _pointers or not _ELEMENT:
if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
_parsedLine = _pointer(**_args)
# print ([_pid,_ELEMENT,_parsedLine])
_cached[_pid] = self.merge(_cached[_pid],_parsedLine)
if _pid in _cached :
_cached[_pid] = self.merge(_cached[_pid],_parsedLine)
_cached[_pid] = _parsedLine

_columns= ['app_id','date','time','type']
return self.parse(_columns,[3,4,5,6],**_args)
# let us perform this

import numpy as np
from .. import parser
@parser(element='ISA',x12='835',field='header', map={6:'submitter_id',8:'receiver_id',9:'date',10:'time'})
def ISA(**_args):
:row raw {x12} row
:data parsed data
:meta elements containing map {index:field_name}
# print (_args['data'])
@parser(element='ST',x12='835', field='ISA', map={1:'x12',2:'control_number'})
def ST(**_args):
:row raw {x12} row
:data parsed data
:meta elements containing map {index:field_name}
@parser (element='BPR',x12='835',map={2:'transaction_amount',3:'transaction_type',4:'method',6:'depository'})
def BPR (**_args):
def CLP (**_args):
_data = _args['data']
for _id in ['charge_amount','payment_amount','patient_amount']:
_data[_id] = np.float64(_data[_id])
return _data
@parser (element='PER',x12='835',field="billing_provider",map={2:'name',4:'phone_number'})
def PER (**_args):
def N1(**_args):
def DTM(**_args):
@parser(element='PLB',x12='835',container='provider', map={1:'id',2:'adjustment_fiscal_year',-1:'adjustment_amount'})
def PLB(**_args):
def CAS(**_args):
def SVC (**_args):
_data = _args['data']
_data['type'] = _data['type'].split('|')[0]
_data['code'] = _data['code'].split('|')[1]
_data['charge_amount'] = np.float64(_data['charge_amount'])
_data['paid_amount'] = np.float64(_data['paid_amount'])
return _data
def N1(**_args):
def N3(**_args):
def N4(**_args):
@parser (element='AMT',x12='835',container='amounts', map={2:'amount',1:'type'})
def AMT (**_args):
_data = _args['data']
if _data['type'] in _map :
_data['type'] = _map[_data['type']]
_data['amount'] = np.float64(_data['amount'])
return _data

This package contains tools used across the various modules, these tools actually "do the work"
We intend to have these tools be Object-Oriented by design so as to not run into any concurrency issues
from . import file, document, common
from healthcareio import x12
from multiprocessing import Process
# class X12Engine(Process):
# def __init__(self,**_args):
# """
# :files group of files to be processed
# """
# self.files = _args['files']
# self._cHandler = file.Content()
# self._dHandler = document.Builder(plugins=_args['plugins'],parents=_args['plugins'])
# def run(self):
# """
# This function performs parsing given
# """
# for _location in self.files :
# _content =
# _content = self._cHandler.split(_content)
# pass
def merge (_x,_y):
This function will merge two objects _x, _y
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
if _zcols :
_out = dict(_x,**{})
for _key in list(_y.keys()) :
if _key not in _zcols and _key:
_out[_key] = _y[_key]
if type(_out[_key]) == list :
for value in _y[_key] :
if value not in _out[_key] :
# _out[_key] += _y[_key]
elif type(_out[_key]) == dict:
_out[_key] = dict(_out[_key],**_y[_key])
_out[_key] = _y[_key]
return _out
return dict(_x,**_y)
def template(**_args) :
This function generates an object template to be used in object assignment and export functionalities
We chose to proceed in this manner so as to enforce consistency of the parser
:plugins {*,837,835} with element and pointers associated
_plugins = _args['plugins']
_object = {'837':{},'835':{}}
for _x12 in _plugins :
_pointers = _plugins[_x12]
for _element in _pointers :
_meta = _pointers[_element].meta
_values = _meta['map'].values() if 'map' in _meta else _meta['columns']
# where do the attributes go ..
_attr = []
for _item in list(_values) :
if type(_item) == list :
_attr = _attr + _item
_field = []
if 'field' in _meta or 'container' in _meta :
_field = _meta['field'] if 'field' in _meta else _meta['container']
if 'anchor' in _meta : #-- No parents are expected
_field = _meta['anchor'].values()
elif _meta['parent'] :
# It means the attributes will be
_parentPlug = x12.plugins.filter(elements=[_meta['parent']],plugins=_plugins)
_pid = list(_parentPlug.keys())[0]
_parentMeta = _parentPlug[_pid][_meta['parent']].meta
_attr = _attr + list(_parentMeta['map'].values()) if 'map' in _parentMeta else _parentMeta['columns']
if 'anchor' in _parentMeta :
_field = list(_parentMeta['anchor'].values())
_field = [_field] if type(_field) == str else _field
_attr = dict.fromkeys(_attr,'')
if not _field :
_info = (_attr)
_info = (dict.fromkeys(_field,_attr))
if _x12 == '*' :
_object['837']= merge(_object['837'], _info)
_object['835']= merge (_object['835'], _info)
_object[_x12] = merge(_object[_x12],_info)
return _object
# def template(**_args) :
# """
# This function generates an object template to be used in object assignment and export functionalities
# We chose to proceed in this manner so as to enforce consistency of the parser
# :plugins {*,837,835} with element and pointers associated
# """
# _plugins = _args['plugins']
# _object = {'837':{},'835':{}}
# for _x12 in _plugins :
# _pointers = _plugins[_x12]
# for _element in _pointers :
# _meta = _pointers[_element].meta
# _values = _meta['map'].values() if 'map' in _meta else _meta['columns']
# #
# # where do the attributes go ..
# #
# _attr = []
# for _item in list(_values) :
# if type(_item) == list :
# _attr = _attr + _item
# else:
# _attr.append(_item)
# _field = []
# if 'field' in _meta or 'container' in _meta :
# _field = _meta['field'] if 'field' in _meta else _meta['container']
# if 'anchor' in _meta : #-- No parents are expected
# _field = _meta['anchor'].values()
# elif _meta['parent'] :
# #
# # It means the attributes will be
# _parentPlug = filter(elements=[_meta['parent']],plugins=_plugins)
# _pid = list(_parentPlug.keys())[0]
# _parentMeta = _parentPlug[_pid][_meta['parent']].meta
# _attr = _attr + list(_parentMeta['map'].values()) if 'map' in _parentMeta else _parentMeta['columns']
# if 'anchor' in _parentMeta :
# _field = list(_parentMeta['anchor'].values())
# _field = [_field] if type(_field) == str else _field
# _attr = dict.fromkeys(_attr,'')
# if not _field :
# _info = (_attr)
# else:
# _info = (dict.fromkeys(_field,_attr))
# if _x12 == '*' :
# _object['837']= merge(_object['837'], _info)
# _object['835']= merge (_object['835'], _info)
# else:
# _object[_x12] = merge(_object[_x12],_info)
# return _object

# class Common :
# def parent(self,**_args):
# """
# This function returns the "parent" pointer associated with a given element
# :meta meta data of a decorated/annotated function
# """
# _meta = _args['meta']
# _item = None
# if 'parent' in _meta : #hasattr(_meta,'parent'):
# _hasField = 'field' in _meta
# _hasParent= _meta['element'] in self._parents
# if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'):
# self._last = _item
# pass
# else:
# for key in self._parents :
# if _meta.element in self._parents[key] :
# _ikey = list(self._last.keys())[0]
# _oldinfo = self._last[_ikey]
# if type(_oldinfo) != dict :
# #
# # Only applicable against a dictionary not a list (sorry)
# pass
# else:
# _item = {_ikey: self.merge(_oldinfo,_item)}
# break
# pass
# return _item

This file encapsulates the functions needed to build a document
import numpy as np
import copy
class Builder:
__doc__ = """
This class is intended to create and manipulate objects
:merge The class merges two objects and accounts for attributes that are lists
:parent returns the parent for a given object
def __init__(self,**_args):
self._last = {}
self._plugins = copy.deepcopy(_args['plugins'])
self._parents = copy.deepcopy(_args['parents'])
self._loop = {}
def reset (self):
self._last = {}
self._loop = {}
def parent(self,**_args):
This function returns the parent item of an object
:meta meta data of a decorated/annotated function
_meta = _args['meta']
# _item = None
if _meta['parent'] :
_id = _meta['parent']
if _id :
return self._last[_id] if _id in self._last else None
return None
# if _id in self._parents :
# self._last[_id] =
# if 'parent' in _meta : #hasattr(_meta,'parent'):
# _hasField = 'field' in _meta
# _hasParent= _meta['element'] in self._parents
# if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'):
# self._last = _item
# pass
# else:
# for key in self._parents :
# if _meta['element'] in self._parents[key] :
# _ikey = list(self._last.keys())[0]
# _oldinfo = self._last[_ikey]
# if type(_oldinfo) != dict :
# #
# # Only applicable against a dictionary not a list (sorry)
# pass
# else:
# _item = {_ikey: self.merge(_oldinfo,_item)}
# break
# pass
# return _item
def count(self,_element):
if _element not in self._loop :
self._loop[_element] = 0
self._loop[_element] += 1
def pointer(self,**_args):
This function returns a pointer associated with a row element
@TODO: Make sure we know what kind of file we are processing (it would help suppress the loop)
_id = _args['row'][0] if 'row' in _args else _args['element']
_filetype = _args['x12']
_pointer = None
if _id in self._plugins[_filetype] :
_pointer = self._plugins[_filetype][_id]
for _x12 in self._plugins :
if _id in self._plugins[_x12] :
_pointer = self._plugins[_x12][_id]
return _pointer
def field(self,**_args) :
_row = _args['row']
_meta= _args['meta']
_field = None
if _meta['parent'] :
_field = self.parent(meta=_meta)['field']
if 'field' in _meta or 'container' in _meta :
_field = _meta['field'] if 'field' in _meta else _meta['container']
if 'anchor' in _meta :
_anchor = _meta['anchor']
for key in _anchor :
if key == _row[1].strip() :
_field = _anchor[key]
return _field
def merge (self,_x,_y):
This function will merge two objects _x, _y
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
if _zcols :
_out = dict(_x,**{})
for _key in list(_y.keys()) :
if _key not in _zcols and _key:
_out[_key] = _y[_key]
if type(_out[_key]) == list :
for value in _y[_key] :
if value not in _out[_key] :
# _out[_key] += _y[_key]
elif type(_out[_key]) == dict:
_out[_key] = dict(_out[_key],**_y[_key])
_out[_key] = _y[_key]
return _out
return dict(_x,**_y)
def parse (self,**_args):
This function will perform parsing on behalf of the plugin by relying on map function
:row raw x12 row
:meta meta data of the plugin function
#-- Loop Markers
_row = _args['row']
_map = _args['meta']['map']
# _map = self.pointer(row=_row).meta['map']
_index = list(_map.keys())
_columns = [] #[_map[_id] for _id in _index ]
for _id in _index :
_name = _map[_id]
if type(_name) == list :
_columns += _name
_i = _index.index(_id)
_index = (_index[:_i] + np.repeat(_index[_i], len(_name)).tolist()+_index[_i+1:])
_info = {}
_index = np.array(_index).astype(int)
# _document = _args['document']
if np.max(_index) > len(_row) -1 :
_delta = 1 + np.max(_index) - len(_row)
_row = _row + np.repeat('',_delta).tolist()
_row = np.array(_row)
_info = dict(zip(_columns,_row[_index].tolist()))
except Exception as e:
# print (_row)
# print ( e)
return _info
def meta (self,**_args):
_row = _args['row']
_id = _row[0]
_meta = None
for key in self._plugins :
_items = self._plugins[key]
if _id in _items :
_meta = (_items[_id].meta)
return _meta
def update(self,**_args):
_element = _args['row'][0]
if _element in self._parents :
_meta = self.meta(row=_args['row'])
if 'field' not in _meta :
_field = self.field(row=_args['row'],meta=_meta)
_field = _meta['field']
self._last[_element] = {'data':_args['data'],'field':_field}
def bind(self,**_args):
This function is intended to make an object out of an element
:row raw row of x12
:document object that is the document
_row = _args['row']
_filetype = _args['x12']
_id = _row[0]
_pointer = self.pointer(row=_row,x12=_filetype)
_parent = None
_data = {}
# _document = _args['document']
if not _pointer :
return None,None
# Should we use the built-in parser or not
if _pointer and 'map' in _pointer.meta :
_data = self.parse(row=_row,meta=_pointer.meta)
# This function will be used as formatter (at least)
# We will also insure that the current element is not the last one
_out = _pointer(row=_row,data=_data, meta=_pointer.meta)
_data = _data if _out is None else _out
self.update(row = _row, data=_data) #-- If this element is considered a parent, we store it
return _data, _pointer.meta
def build (self,**_args):
This function attemps to place a piece of data within a document
_meta = _args['meta']
_data = _args['data']
_row = _args['row']
_document = _args['document']
# if _meta['parent'] :
# _field = self.parent(meta=_meta)['field']
# elif 'field' in _meta :
# _field = _meta['field']
# elif 'container' in _meta :
# _field = _meta['container']
# if type(_document[_field]) != list :
# _data = self.merge(_document[_field],_data)
# _document[_field] = []
# elif 'anchor' in _meta:
# _field = self.field(row=_row,meta=_meta)
# else:
# _field = None
_field = self.field(meta=_meta,row=_row)
if _field :
if 'container' in _meta and type(_document[_field]) != list :
_document[_field] = []
if _field and _document:
if _field not in _document :
_document[_field] =_data
if 'container' in _meta :
_document[_field] = self.merge(_document[_field],_data)
if not _field and 'anchor' in _meta :
# This is an unusual situation ...
_document = self.merge(_document,_data)
return _document

import os
import numpy as np
from io import StringIO
# from .common import Common
class Content :
This class implements functions that will manipulate content of a file
:split splits the content
:read reads the content of a file given a filename
:parse parses the content of a file given a map {index:field_name}
def __init__(self,**_args):
self._parents = {}
self._lastelement = {}
def split(self,_content):
if type(_content) == str :
_xchar = '~\n' if '~\n' in _content else ('~' if '~' in _content else ('\n' if '\n' in _content else None))
_x12 = '837' if 'CLM*' in _content else ('835' if 'CLP*' in _content else None)
_map = {'835':'CLP','837':'CLM'}
_claim_mark = _map[_x12]
_content = _content.split(_claim_mark)
_xchar = ''.join(_xchar)
_chunks = []
for _block in _content :
if len(_chunks) > 0 :
_block = _claim_mark+ _block
_splitblocks = [row.strip().split('*') for row in _block.split(_xchar) if row.strip()]
return _chunks,_x12
# if _xchar :
# _xchar = ''.join(_xchar)
# _rows = _content.split(_xchar)
# return [row.strip().split('*') for row in _rows if row.strip()]
# else:
# return _content.split('*')
return [],None
def read(self,**_args):
This function will read and clean-up the content of a file
_filename = _args['filename']
if type(_filename) == StringIO :
f = open(_filename)
_content =
return _content
def _ix_parse (self,columns,index,**_args):
This function encapulates how an x12 document element will be processed
:columns list of attributes that make up the object
:index indexes of the said items in the element
- row raw x12 element (string)
- pointer decorated function
- document
_ELEMENT = _args['row'][0]
_pointer = _args['pointer']
_document = _args['document']
if 'map' in _pointer.meta :
_map = _pointer.meta['map']
_index = list(_map.keys())
_columns = [_map[_id] for _id in _index ]
_info = {}
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
_index = np.array(_index)
# Sometimes the _row doesn't have all expected indexes, we will compensate
# This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
if np.max(_index) > len(_row) -1 :
_delta = 1 + np.max(_index) - len(_row)
_row = _row + np.repeat('',_delta).tolist()
_row = np.array(_row)
_info = dict(zip(_columns,_row[_index].tolist()))
# We should call the function that is intended to perform the parsing
_info = _pointer(row=_args['row'],document=_document,meta=_pointer.meta)
# @TODO: We should look into the object created and enforce the specifications are met
return _info
# def consolidate(self,**_args):
# """
# This function takes an object and addit to the document given meta data
# :document document associated associated with a claim (processing the loops)
# :object
# :caller attributes within the decorator
# """
# _document = _args['document'] if 'document' in _args else {}
# _info = _args['object']
# _meta = _args['meta']
# #
# # @TODO:
# # Apply parsing/casting function to the object retrieved
# # _apply(_info) #-- the object will be processed accordingly
# #
# #
# # @TODO:
# # The objects parsed must be augmented against the appropriate ones e.g: NM1 <- N1,N2,N3,N4
# # - Find a way to drive this from a configuration ...
# #
# if 'field' in _meta : #hasattr(_meta,'field') :
# _field = _meta['field']
# if not _field in _document :
# _item = {_field:_info}
# else:
# _item = self.merge(_document[_field],_info)
# elif 'container' in _meta: # hasattr(_meta,'container') :
# _label = _meta.container
# if not _label in _document :
# _item = {_label:[_info]}
# else:
# _item = _document[_label] + [_info]
# else:
# _item = _info
# if 'parent' in _meta : #hasattr(_meta,'parent'):
# _hasField = 'field' in _meta
# _hasParent= _meta['element'] in self._parents
# if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'):
# self_last = _item
# pass
# else:
# for key in self._parents :
# if _meta.element in self._parents[key] :
# _ikey = list(self_last.keys())[0]
# _oldinfo = self_last[_ikey]
# if type(_oldinfo) != dict :
# #
# # Only applicable against a dictionary not a list (sorry)
# pass
# else:
# _item = {_ikey: self.merge(_oldinfo,_item)}
# break
# pass
# return _item
class Location :
def get(**_args):
_path = _args['path']
files = []
if os.path.isdir(_path):
for root,_dir,f in os.walk(_path) :
if f :
files += [os.sep.join([root,name]) for name in f]
files = [path for path in files if os.path.isfile(path)]
files = [_path]
_chunks = 0 if 'chunks' not in _args else int(_args['chunks'])
return files if not _chunks else np.array_split(files,_chunks)