From c3f5759a6aa55b8c9bbd895fa4ae3ef50c6456e6 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 18 May 2023 09:22:48 -0500 Subject: [PATCH] plugin features and parsing --- healthcareio/x12/parser.py | 45 ++ healthcareio/x12/plugins/__init__.py | 157 +++++++ healthcareio/x12/plugins/default.py | 38 ++ healthcareio/x12/plugins/default/__init__.py | 22 + healthcareio/x12/plugins/default/body.py | 194 ++++++++ healthcareio/x12/plugins/default/common.py | 456 +++++++++++++++++++ healthcareio/x12/plugins/default/header.py | 42 ++ healthcareio/x12/plugins/legacy.py | 0 8 files changed, 954 insertions(+) create mode 100644 healthcareio/x12/parser.py create mode 100644 healthcareio/x12/plugins/__init__.py create mode 100644 healthcareio/x12/plugins/default.py create mode 100644 healthcareio/x12/plugins/default/__init__.py create mode 100644 healthcareio/x12/plugins/default/body.py create mode 100644 healthcareio/x12/plugins/default/common.py create mode 100644 healthcareio/x12/plugins/default/header.py create mode 100644 healthcareio/x12/plugins/legacy.py diff --git a/healthcareio/x12/parser.py b/healthcareio/x12/parser.py new file mode 100644 index 0000000..14725d4 --- /dev/null +++ b/healthcareio/x12/parser.py @@ -0,0 +1,45 @@ +""" +This class refactors the default parsing class (better & streamlined implementation) +""" +from multiprocessing import Process, RLock +import os +import json + + + +class parser (Process) : + _CONFIGURATION = {} + def __init__(self,path=None) : + if not parser._CONFIGURATION : + _path = path if path else os.sep.join([os.environ['HOME'],'.healthcareio/config.json']) + # + # @TODO: Load custom configuration just in case we need to do further processing + config = json.loads(open(path).read()) + parser._CONFIGURATION = config['parser'] + # + # do we have a custom configuration in this location + # + _custompath = _path.replace('config.json','') + _custompath = _custompath if not _custompath.endswith(os.sep) else _custompath[:-1] + _custompath = os.sep.join([_custompath,'custom']) + if os.exists(_custompath) : + files = os.listdir(_custompath) + if files : + _filename = os.sep.join([_custompath,files[0]]) + _customconf = json.loads(open(_filename).read()) + # + # merge with existing configuration + + + else: + pass + + # + # + class getter : + def value(self,) : + pass + class setter : + def files(self,files): + pass + \ No newline at end of file diff --git a/healthcareio/x12/plugins/__init__.py b/healthcareio/x12/plugins/__init__.py new file mode 100644 index 0000000..da6f21e --- /dev/null +++ b/healthcareio/x12/plugins/__init__.py @@ -0,0 +1,157 @@ +""" +This function contains code to load plugins, the idea of a plugin is that it is designed to load, functions written outside of the scope of the parser. +This enables better parsing, and the creation of a pipeline that can perform pre/post conditions given that {x12} is organized in loops + +Contract: + Functions will take in an object (the row of the claim they are intended to parse), and prior rows (parsed) +""" +import os +import importlib as IL +import sys + +DEFAULT_PLUGINS='healthcareio.x12.plugins.default' +class MODE : + TRUST,CHECK,TEST,TEST_AND_CHECK= [0,1,2,3] +def instance(**_args) : + pass +def has(**_args) : + """ + This function will inspect if a function is valid as a plugin or not + name : function name for a given file + path : python file to examine + """ + _pyfile = _args['path'] if 'path' in _args else '' + _name = _args['name'] + # p = os.path.exists(_pyfile) + _module = {} + if os.path.exists(_pyfile): + _info = IL.utils.spec_from_file_location(_name,_pyfile) + if _info : + _module = IL.utils.module_from_spec(_info) + _info.load.exec(_module) + + + else: + _module = sys.modules[DEFAULT_PLUGINS] + return hasattr(_module,_name) +def get(**_args) : + """ + This function will inspect if a function is valid as a plugin or not + name : function name for a given file + path : python file to examine + """ + _pyfile = _args['path'] if 'path' in _args else '' + _name = _args['name'] + # p = os.path.exists(_pyfile) + _module = {} + if os.path.exists(_pyfile): + _info = IL.utils.spec_from_file_location(_name,_pyfile) + if _info : + _module = IL.utils.module_from_spec(_info) + _info.load.exec(_module) + + + else: + _module = sys.modules[DEFAULT_PLUGINS] + return getattr(_module,_name) if hasattr(_module,_name) else None + + +def test (**_args): + """ + This function will test a plugin to insure the plugin conforms to the norm we are setting here + :pointer function to call + """ + _params = {} + try: + if 'pointer' in _args : + _caller = _args['pointer'] + else: + _name = _args['name'] + _path = _args['path'] if 'path' in _args else None + _caller = get(name=_name,path=_path) + _params = _caller() + # + # the expected result is a list of field names [field_o,field_i] + # + return [_item for _item in _params if _item not in ['',None] and type(_item) == str] + except Exception as e : + return [] + pass +def inspect(**_args): + _mode = _args['mode'] + _name= _args['name'] + _path= _args['path'] + if _mode == MODE.CHECK : + _doapply = [has] + elif _mode == MODE.TEST : + _doapply = [test] + elif _mode == MODE.TEST_AND_CHECK : + _doapply = [has,test] + _status = True + _plugin = {"name":_name} + for _pointer in _doapply : + _plugin[_pointer.__name__] = _pointer(name=_name,path=_path) + if not _plugin[_pointer.__name__] : + _status = False + break + _plugin['loaded'] = _status + return _plugin +def load(**_args): + """ + This function will load all the plugins given an set of arguments : + path file + name list of functions to export + mode 1- CHECK ONLY, 2 - TEST ONLY, 3- TEST_AND_CHECK + """ + _path = _args ['path'] + _names= _args['names'] + _mode= _args ['mode'] if 'mode' in _args else MODE.TEST_AND_CHECK + _doapply = [] + if _mode == MODE.CHECK : + _doapply = [has] + elif _mode == MODE.TEST : + _doapply = [test] + elif _mode == MODE.TEST_AND_CHECK : + _doapply = [has,test] + # _plugins = [] + _plugins = {} + for _name in _names : + _plugin = {"name":_name} + if 'inspect' in _args and _args['inspect'] : + _plugin = inspect(name=_name,mode=_mode,path=_path) + else: + _plugin["method"] = "" + _status = True + _plugin['loaded'] = _status + if _plugin['loaded'] : + _plugin['pointer'] = get(name=_name,path=_path) + else: + _plugin['pointer'] = None + + # _plugins.append(_plugin) + _plugins[_name] = _plugin + return _plugins + + +def parse(**_args): + """ + This function will apply a function against a given function, and data + :row claim/remits pre-processed + :plugins list of plugins + :conifg configuration associated with + """ + _row = _args['row'] + _document = _args['document'] + _config = _args['config'] + """ + "apply":"@path:name" + """ + + _info = _args['config']['apply'] + _plug_conf = _args['config']['plugin'] if 'plugin' in _args['config'] else {} + if _info.startswith('@') : + _path = '' #-- get this from general configuration + elif _info.startswith('!'): + _path = _info.split('!')[0][1:] + _name = _info.split(':')[-1] + _name = _args['config']['apply'].split(_path) \ No newline at end of file diff --git a/healthcareio/x12/plugins/default.py b/healthcareio/x12/plugins/default.py new file mode 100644 index 0000000..d3a89bc --- /dev/null +++ b/healthcareio/x12/plugins/default.py @@ -0,0 +1,38 @@ +import datetime + +def date(**_args): + """# + This function will return a data as presented in the {x12} i.e it could be a date-range or a single date + - In the case of a single data it is returned as a string + - In the case of a range a complex object is returned with to,from keys + NOTE: dates will be formatted as they + """ + if not _args : + return ['from','to','type'] + _date = "" + return _date +def procedure (**_args): + """ + This function will parse SVC element and return given the following The return object is as follows : + claim_id,charge_amount, payment_amount,patient_amount,patient_status,claim_status + + """ + cols = ['type','code','amount'] + if not _args : + return cols + _procedure = dict.fromkeys(cols,None) + _row = _args['row'] + # _document = _args['document'] + if len(_row) == 3 : + _procedure = dict(zip(cols,_row[1:4])) + return _procedure + + return _info +def SV2(**_args): + pass +def SV3(**_args): + pass +def HL (**_args): + pass +def HI(**_args): + pass \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/__init__.py b/healthcareio/x12/plugins/default/__init__.py new file mode 100644 index 0000000..ab7dc1e --- /dev/null +++ b/healthcareio/x12/plugins/default/__init__.py @@ -0,0 +1,22 @@ +""" +This file serves as the interface (so to speak) to the x12 parser, plugin interface +@TODO: + - How to write custom plugin + - Provide interface for meta-data (expected) + - Support configuration specification +""" +import os +from . import common +from . import header +from . import body + +EDI = body.BODY + +def instance(**_args): + pass + +# class Parser : +# def __init__(**_args): +# folder = _args['path'] +# files = [ os.sep.join(_name,folder) for _name in os.listdir(folder)] +# pass \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/body.py b/healthcareio/x12/plugins/default/body.py new file mode 100644 index 0000000..b191c9f --- /dev/null +++ b/healthcareio/x12/plugins/default/body.py @@ -0,0 +1,194 @@ +from .common import X12DOCUMENT +from .header import HEADER +import numpy as np + + +class BODY (HEADER): + """ + This class will contain functions that will parse elements associated with the claim body + """ + def BHT (self,**_args): + """ + Expected Element BHT + This functiion will process header submitted (receiver,claim_type,) + """ + _columns= ['receiver_id','submitted_date','submitted_time','claim_type'] + return self.parse(_columns,[3,7],**_args) + + def NM1 (self,**_args): + """ + Expected Element NM1 + ref IL,40,41,82,85,PR ... + Information about entities (doctors, clearing house, provider). we should be mindful of the references + """ + # _CODE_INDEX = 1 + # _map = {_CODE_INDEX:{'41':'submitter','40':'receiver','PR':'payer'}} + _columns = ['type','name','id'] + _indexes = [1,3,-1] + # _info = [{'index':'40','field':'receiver'},{'index':'41','field':'submitter'},{'index':'PR','field':'payer'}] + + _info = self.parse(_columns,_indexes,**_args) + return _info + def N3 (self,**_args): + """ + Expected Element N3 + """ + + _columns = ['address_line_1'] + return self.parse(_columns,[1,2],**_args) + + def N4(self,**_args): + """ + Expected Element N4 + """ + _columns = ['city','state','zip'] + return self.parse(_columns,[1,2,3],**_args) + def HI(self,**_args): + """ + Expected Element HI + This function will parse diagnosis codes ICD 9/10 + """ + _columns = ['code','type'] + return self.parse(_columns,[2,1],**_args) + def AMT (self,**_args): + """ + Expected Element AMT + """ + _columns = ['amount','qualifier'] + return self.parse(_columns,[2,1],**_args) + def SBR (self,**_args): + """ + Expected Element SBR + """ + _index = [9,1] + _columns = ['vendor','individual_code','type'] + return self.parse(_columns,[9,2,1],**_args) + def DMG (self,**_args): + """ + Expected Element DMG + """ + _columns = ['dob','gender','format'] + _info = self.parse(_columns,[2,3,1],**_args) + + return _info + def DTP (self,**_args): + """ + Expected Element DTP + """ + _columns = ['to','from','type'] + return self.parse(_columns,[3],**_args) + def PER (self,**_args): + """ + Expected Element PER + """ + _CODE_INDEX = 1 + _map = {_CODE_INDEX:{'IC':'submitter'}} # attribute to store the data in + _columns = ['contact_name','phone','email'] + _info = self.parse (_columns,[2,4,8],**_args) + # + # @TODO: Inspect the configuration file for the attribute information + # + return _info + def CLM (self,**_args): + """ + Expected Element CLM + """ + _columns = ['claim_id','claim_amount','facility_code','facility_qualifier','frequency_code'] + return self.parse(_columns,[1,2,5,5,5],**_args) + def REF (self,**_args): + _columns = ['identifier','qualifier',''] + _CODE_INDEX = 1 # -- according to x12 standard + _map = {_CODE_INDEX:{'EA':'patient','EI':'provider','6R':'','D9':''}} + return self.parse(_columns,[2],**_args) + + def HI (self,**_args): + """ + Expected Element HI + """ + _columns = ['code','type'] + return self.parse(_columns,[1,2],**_args) + def SV1 (self,**_args): + """ + Expected Element SV1 + """ + _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*') + _columns = ['type','code','amount','modifier_1','modifier_2','modifier_3','modifier_4','place_of_service','units','measurement'] + return self.parse(_columns,[1,1,2,1,1,1,1,5,4,3],**_args) + + def SV3 (self,**_args): + return self.SV1(**_args) + def HL (self,**_args) : + """, + Expected Element HL + The expected block is supposed to be unprocessed (to make things simple) + """ + _row = _args['row'] if type(_args['row']) == list else _args['row'].split('~') + + # _attr = self.elements() #[_name for _name in dir() if not _name.islower() and not _name.startswith('_')] + # _pointers = [getattr(self,_name) for _name in _attr] + # _map = dict(zip(_attr,_pointers)) + _map = self.pointers() + + # + # The index here tells us what we are processing i.e index == 1 something about header + # + _columns = ['_index','parent','code','child'] + _args['row'] = _row[0] + + _info = self.parse (_columns,[1,2,3,4],**_args) + # _field = 'billing_provider' if _info['_index'] == '1' else 'patient' + # _config ={'field':_field} + return _info + # _claim = {_field:_info} + # for _element in _row[1:] : + # _key = _element.split('*')[0] + # if _key in _map and len(_element) > 0: + # _document = _args['document'] + # _pointer = _map[_key] + # if _key not in ['CLM','HI','SV3','SV2','SV1'] : + # _claim = self.merge (_claim,_pointer(row=_element.strip().split('*'),document=_document,config=_config)) + # else: + # _config = _args['config'] if 'config' in _args else {} + # _claim = self.merge (_claim,_pointer(row=_element.strip().split('*'),document=_document,config=_config)) + # else: + # print (['SKIPPING ',_key]) + # pass + + # return _claim + # def apply(self,_block): + # """ + # :_block elements that do not belong to the header block + # """ + # _apply = self.pointers() + + # _header = {} + # if _block : + + # for content in _block : + + # _KEY_ELEMENT = content.split('*')[0] + # if _KEY_ELEMENT not in _apply : + # # + # # @TODO: Log elements that are skipped + # # print ([_KEY_ELEMENT , 'NOT FOUND']) + # continue + + # _info = _apply[_KEY_ELEMENT](row=content,document=_header) + + # if _info : + # if not _header : + # _header = _info + + # else: + + # _header = self.merge(_header,_info) + # else: + # # + # # For some reason the parser failed by returning a null + # # @TODO: Log this event .... + # pass + # else: + # # + # # @TODO: return the meta data for what is expected + # pass + # return _header \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/common.py b/healthcareio/x12/plugins/default/common.py new file mode 100644 index 0000000..4ad687d --- /dev/null +++ b/healthcareio/x12/plugins/default/common.py @@ -0,0 +1,456 @@ +from typing import Any +import numpy as np +import json +from multiprocessing import Process, RLock +import os +import io +import queue +import transport +from transport import providers + +class Store(Process): + """ + This is the data-store service that will handle read/writes + """ + dataStore = None + @staticmethod + def init(self,**_args): + if Store.dataStore is None : + _args = _args['store'] + + else: + pass + @staticmethod + def reset(): + pass + +class X12DOCUMENT (Process): + """ + X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object + """ + _queue = queue.Queue() + + class MODE : + # + # The following allow us to handle raw content (stream) or a filename + # The raw content will be wrapped into io.StringIO so that it is handled as if it were a file + # + NAMES,STREAM = 'NAMES','STREAM' + class ConfigHandler : + def format(self,**_args): + """ + This function formats variations of an element's parsing rules + :info {index,field|label,map} + """ + + _info = _args['info'] + _ref = {} + + for _item in _info : + _index = str(_item['index']) + _field = _item['field'] if 'field' in _item else None + _label = _item['label'] if 'label' in _item else None + if _field : + _ref[_index] = {'field':_field} + elif _label : + _ref[_index] = {'label':_label} + + return {'@ref':_ref} + def _getColumnsIndexes(self,_columns,_indexes,_map): + """ + This function return columns and indexes related if a parsing map is passed + :param _columns + :param _indexes + :param _map parsing map (field:index) + """ + # @TODO: insure the lengths are the same for adequate usage downstream ... + _xcolumns,_xindexes = list(_map.keys()), list(_map.values()) + keys,values = _xcolumns + _columns,_xindexes + _indexes + _config = dict(zip(keys,values)) + + _outColumns,_outIndexes = list(_config.keys()),list(_config.values()) + return _outColumns,_outIndexes + def _getObjectAtributes(self,_config): + _field = _config['field'] if 'field' in _config else {} + _label = _config['label'] if 'label' in _config else {} + return _field,_label + def merge(self,**_args): + # + # This function overrides the old configuration with the new configuration specifications + # + + # _columns,_indexes = [],[] + _columns,_indexes = _args['columns'],_args['index'] + _map = {} + _config = _args['config'] if 'config' in _args else {} + _field,_label = self._getObjectAtributes(_config) + + if 'map' in _config : + _map = _args['config']['map'] + _columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map) + + if '@ref' in _config : + # _columns,_indexes = [],[] + _row = _args['row'] + + _ref = _config['@ref'] + + for _anchor in _ref: + # print ([_anchor,_anchor == _row[1].strip()]) + if _anchor == _row[1].strip() : + _field,_label = self._getObjectAtributes(_ref[_anchor]) + + _map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {} + + if _map : + _columns,_indexes = self._getColumnsIndexes([],[],_map) + + + break + # _columns,_indexes = _columns + _map.keys() + + return {'columns':_columns,'index':_indexes,'field':_field,'label':_label} + def legacy(self,**_args): + # + # This function returns the legacy configuration (default parsing) + # + + _config = _args['config'] if 'config' in _args else {} + _field,_label = self._getObjectAtributes(_config) + _columns,_indexes = [],[] + if 'map' in _config : + _columns = list(_config['map'].keys()) + _indexes = list(_config['map'].values()) + + return {'columns':_columns,'index':_indexes,'field':_field,'label':_label} + def override(self,**_args): + return _args['columns'],_args['indexes'] + def __init__(self,**_args): + super().__init__() + self._mode = _args['mode'] if 'mode' in _args else 'NAMES' + if 'files' in _args : + self.files = _args['files'] + self._config = _args['config'] if 'config' in _args else {} + self._document = [] + + self._x12FileType = None + self._configHandler = X12DOCUMENT.ConfigHandler() + + # + #-- The files need to be classified, the files need to be either claims or remits + # + if 'store' not in self._config : + self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE} + else: + self._store_args = self._config['store'] + + def init(self,_header): + """ + Expected Elements must include ST + """ + pass + + def merge (self,_x,_y): + """ + This function will merge two objects _x, _y + """ + _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns + + if _zcols : + _out = dict(_x,**{}) + for _key in _y.keys() : + if not _key in _zcols : + + _out[_key] = _y[_key] + else: + if type(_out[_key]) == list : + _out[_key] += _y[_key] + elif type(_out[_key]) == dict: + _out[_key] = dict(_out[_key],**_y[_key]) + else: + _out[_key] = _y[_key] + + return _out + else: + + return dict(_x,**_y) + + def split(self,content): + """ + This function will split the content of an X12 document into blocks and headers + :content x12 document in raw format (text) + """ + #_content = content.split('~') + _content = content.split('HL') + _header = _content[:1][0].split('~') + + _blocks = ['HL'+_item for _item in _content[1:]] + _blocks = [_item.split('~') for _item in _blocks ] + + # for row in _content : + # if not _blocks and not row.startswith('HL') : + # _header.append(row) + # else: + # _blocks.append(row) + return {'header':_header,'blocks':_blocks} + def parse (self,columns,index,**_args): + """ + This function encapulates how an x12 document element will be processed + :columns list of attributes that make up the object + :index indexes of the said items in the element + :_args + - row raw x12 element (string) + - config configuration of the element. his should indicate functions to apply against function + """ + _ELEMENT = _args['row'][0] + # + # get the right configuration from the _config object + _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {} + + # _field = _config['field'] if 'field' in _config else None + # _label = _config['label'] if 'label' in _config else None + _map = _config['map'] if 'map' in _config else {} + # + # Let's see if overriding the fields/labels isn't necessary + + + # columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config) + # _field = _field if not _refField else _refField + # _label = _label if not _refLabel else _refLabel + + _outInfo = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config) + + _field,_label = _outInfo['field'],_outInfo['label'] + _columns,_index = _outInfo['columns'],_outInfo['index'] + + + if 'row' in _args: + _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*') + + _index = np.array(_index) + + # + # Sometimes the _row doesn't have all expected indexes, we will compensate + # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format) + # + if np.max(_index) > len(_row) -1 : + _delta = 1 + np.max(_index) - len(_row) + _row = _row + np.repeat('',_delta).tolist() + _row = np.array(_row) + + # _element = _row[0] + + _configKeys = [] #list(self._config.keys()) + _configTree = [] #list(self._config.values()) + if 'config' in _args : + _config = _args['config'] + _configKeys = list(_config.keys()) + _configTree = list(_config.values()) + else: + _config = {} + + _info = dict(zip(_columns,_row[_index].tolist())) + _document = _args['document'] if 'document' in _args else {} + # + # Extracting configuration (minimal information) + # _config = _args['config'] if 'config' in _args else {} + # _config = self._config + + + # if '@ref' in _config : + # print (_config['@ref']) + # _values = _config['@ref'] + # print (_values) + + if _field : + if not _field in _document : + return {_field:_info} + else: + return self.merge(_document[_field],_info) + elif _label : + if not _label in _document : + return {_label:[_info]} + else: + return _document[_label] + [_info] + else: + return _info + + else: + return columns + def elements(self): + """ + This function returns elements that are supported as specified by X12 standard + """ + return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ] + def pointers(self): + """ + This function returns pointers associated with each element ... + :return Object of Element:Function + """ + _attr = self.elements() + _pointers = [getattr(self,_name) for _name in _attr] + return dict(zip(_attr,_pointers)) + + def set(self,_info,_document,_config): + _attrName,_attrType = None,None + + if 'label' in _config : + _attrType = 'label' + _attrName = _config['label'] + elif 'field' in _config : + _attrType = 'field' + _attrName = _config['field'] + + if _attrName : + if _attrName not in _document : + _document[_attrName] = [] if _attrType == 'label' else {} + + # + # @TODO: make sure we don't have a case of an attribute being overridden + if type(_document[_attrName]) == list : + _document[_attrName] += [_info] + else: + _document[_attrName] = dict(_document[_attrName],**_info) + # _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info) + + return _document + + return dict(_document,**_info) + + pass + def log (self,**_args): + pass + def run(self): + """ + This function will trigger the workflow associated with a particular file + """ + _getContent = { + # + # For the sake of testing, the following insures + # that raw string content is handled as if it were a file + # + X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) , + X12DOCUMENT.MODE.NAMES: (lambda name: open(name)) + + + } + _writer = transport.factory.instance(**self._store_args) + for _filename in self.files : + try: + _documents = [] + _parts = [] + # _content = (open(_filename)).read() + _reader = _getContent[self._mode] + _content = _reader(_filename).read() + _info = self.split(_content) + _fileType=self.init(_content) + _header = self.apply(_info['header']) + + # print (json.dumps(_header)) + for _content in _info['blocks'] : + + _body = self.apply(_content,header=_header) + _doc = self.merge(_header,_body) + + if _doc and 'claim_id' in _doc: + # X12DOCUMENT._queue.put(_document) + + _documents += [_doc] + + + except Exception as e: + # + # @TODO: Log this issue for later analysis ... + print (e) + pass + # + # Let us post this to the documents we have, we should find a place to post it + # + if _documents : + # print (_header['header']) + + self.post(document=_documents,writer=_writer) + break + + def post(self,**_args): + """ + This function is intended to post content to a given location + :param document + :param writer + """ + _writer = _args['writer'] if 'writer' in _args else None + _document = _args['document'] + if not _writer: + X12DOCUMENT._queue.put(_document) + else: + + _writer.write(_document) + def _getConfig(self,_chunk): + # + # Let us determine what kind of file we are dealing with, so we can extract the configuration + # For this we need to look for the ST loop ... + # + + line = [line for line in _chunk if line and line[:2] == 'ST' ] + + if line : + # + # We found the header of the block, so we can set the default configuration + # + self._x12FileType = line[0].split('*')[1].strip() + _config = {} + if self._x12FileType : + _config = self._config[self._x12FileType] + + return _config + + def apply(self,_chunk, header = {}): + """ + _chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA + """ + + + _document,_cached = {},{} + _pointers = self.pointers() + _config = self._getConfig(_chunk) + # + # The configuration comes from the file, let's run this in merge mode + # _config = self._configHandler.merge + _pid = None + for line in _chunk : + + segments = line.split('*') + + _ELEMENT = segments[0] + + if _ELEMENT not in _pointers or not _ELEMENT: + continue + if _ELEMENT in ['HL','CLM','ISA'] or not _pid: + _pid = _ELEMENT + if _pid not in _cached : + _cached [_pid] = {} + + _pointer = _pointers[_ELEMENT] + + _args = {'row':segments,'document':_document,'header':header,'config':(_config)} + + + _parsedLine = _pointer(**_args) + # print ([_pid,_ELEMENT,_parsedLine]) + + _cached[_pid] = self.merge(_cached[_pid],_parsedLine) + + + # + # Let's create the documents as we understand them to be + # @TODO: Create a log so there can be visibility into the parser + # + _document = {} + for _id in _cached : + # print ('patient' in _cached[_id] ) + + _document = self.merge(_document,_cached[_id]) + + return _document + + \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/header.py b/healthcareio/x12/plugins/default/header.py new file mode 100644 index 0000000..fae7645 --- /dev/null +++ b/healthcareio/x12/plugins/default/header.py @@ -0,0 +1,42 @@ +""" + +This file contains parsing functions for a claim header, the functions are built within class (Object Orientation) +The intent to leverage Object Orientated designs is to easily support multi-processing + +""" +import numpy as np +import json +# from . im common +from .common import X12DOCUMENT + + +class HEADER (X12DOCUMENT): + # @staticmethod + def ISA(self,**_args): + + _columns = ['mode','version','date','time'] + _index = [15,12,9,10] + return self.parse(_columns,_index,**_args) + # @staticmethod + def GS(self,**_args): + """ + Expected Element GS + """ + _columns = ['type','sender','receiver','date','time','version'] + return self.parse(_columns,[1,2,3,4,5,8],**_args) + # @staticmethod + def ST(self,**_args): + """ + Expected Element ST + According to {x12} standard ST will take precedence over GS + """ + _columns = ['type','control_number','version'] + return self.parse(_columns,[1,2,3],**_args) + # @staticmethod + def BHT (self,**_args): + """ + Expected Element BHT + This functiion will process header submitted (receiver,claim_type,) + """ + _columns= ['app_id','date','time','type'] + return self.parse(_columns,[3,4,5,6],**_args) diff --git a/healthcareio/x12/plugins/legacy.py b/healthcareio/x12/plugins/legacy.py new file mode 100644 index 0000000..e69de29