From a377f8ff43b58edfc188c9485374a2c99fbbd6bb Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 21 Nov 2023 13:01:34 -0600 Subject: [PATCH] v2.0: refactor --- healthcareio/x12/plugins/default/_common.py | 456 -------------------- 1 file changed, 456 deletions(-) delete mode 100644 healthcareio/x12/plugins/default/_common.py diff --git a/healthcareio/x12/plugins/default/_common.py b/healthcareio/x12/plugins/default/_common.py deleted file mode 100644 index 4ad687d..0000000 --- a/healthcareio/x12/plugins/default/_common.py +++ /dev/null @@ -1,456 +0,0 @@ -from typing import Any -import numpy as np -import json -from multiprocessing import Process, RLock -import os -import io -import queue -import transport -from transport import providers - -class Store(Process): - """ - This is the data-store service that will handle read/writes - """ - dataStore = None - @staticmethod - def init(self,**_args): - if Store.dataStore is None : - _args = _args['store'] - - else: - pass - @staticmethod - def reset(): - pass - -class X12DOCUMENT (Process): - """ - X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object - """ - _queue = queue.Queue() - - class MODE : - # - # The following allow us to handle raw content (stream) or a filename - # The raw content will be wrapped into io.StringIO so that it is handled as if it were a file - # - NAMES,STREAM = 'NAMES','STREAM' - class ConfigHandler : - def format(self,**_args): - """ - This function formats variations of an element's parsing rules - :info {index,field|label,map} - """ - - _info = _args['info'] - _ref = {} - - for _item in _info : - _index = str(_item['index']) - _field = _item['field'] if 'field' in _item else None - _label = _item['label'] if 'label' in _item else None - if _field : - _ref[_index] = {'field':_field} - elif _label : - _ref[_index] = {'label':_label} - - return {'@ref':_ref} - def _getColumnsIndexes(self,_columns,_indexes,_map): - """ - This function return columns and indexes related if a parsing map is passed - :param _columns - :param _indexes - :param _map parsing map (field:index) - """ - # @TODO: insure the lengths are the same for adequate usage downstream ... - _xcolumns,_xindexes = list(_map.keys()), list(_map.values()) - keys,values = _xcolumns + _columns,_xindexes + _indexes - _config = dict(zip(keys,values)) - - _outColumns,_outIndexes = list(_config.keys()),list(_config.values()) - return _outColumns,_outIndexes - def _getObjectAtributes(self,_config): - _field = _config['field'] if 'field' in _config else {} - _label = _config['label'] if 'label' in _config else {} - return _field,_label - def merge(self,**_args): - # - # This function overrides the old configuration with the new configuration specifications - # - - # _columns,_indexes = [],[] - _columns,_indexes = _args['columns'],_args['index'] - _map = {} - _config = _args['config'] if 'config' in _args else {} - _field,_label = self._getObjectAtributes(_config) - - if 'map' in _config : - _map = _args['config']['map'] - _columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map) - - if '@ref' in _config : - # _columns,_indexes = [],[] - _row = _args['row'] - - _ref = _config['@ref'] - - for _anchor in _ref: - # print ([_anchor,_anchor == _row[1].strip()]) - if _anchor == _row[1].strip() : - _field,_label = self._getObjectAtributes(_ref[_anchor]) - - _map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {} - - if _map : - _columns,_indexes = self._getColumnsIndexes([],[],_map) - - - break - # _columns,_indexes = _columns + _map.keys() - - return {'columns':_columns,'index':_indexes,'field':_field,'label':_label} - def legacy(self,**_args): - # - # This function returns the legacy configuration (default parsing) - # - - _config = _args['config'] if 'config' in _args else {} - _field,_label = self._getObjectAtributes(_config) - _columns,_indexes = [],[] - if 'map' in _config : - _columns = list(_config['map'].keys()) - _indexes = list(_config['map'].values()) - - return {'columns':_columns,'index':_indexes,'field':_field,'label':_label} - def override(self,**_args): - return _args['columns'],_args['indexes'] - def __init__(self,**_args): - super().__init__() - self._mode = _args['mode'] if 'mode' in _args else 'NAMES' - if 'files' in _args : - self.files = _args['files'] - self._config = _args['config'] if 'config' in _args else {} - self._document = [] - - self._x12FileType = None - self._configHandler = X12DOCUMENT.ConfigHandler() - - # - #-- The files need to be classified, the files need to be either claims or remits - # - if 'store' not in self._config : - self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE} - else: - self._store_args = self._config['store'] - - def init(self,_header): - """ - Expected Elements must include ST - """ - pass - - def merge (self,_x,_y): - """ - This function will merge two objects _x, _y - """ - _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns - - if _zcols : - _out = dict(_x,**{}) - for _key in _y.keys() : - if not _key in _zcols : - - _out[_key] = _y[_key] - else: - if type(_out[_key]) == list : - _out[_key] += _y[_key] - elif type(_out[_key]) == dict: - _out[_key] = dict(_out[_key],**_y[_key]) - else: - _out[_key] = _y[_key] - - return _out - else: - - return dict(_x,**_y) - - def split(self,content): - """ - This function will split the content of an X12 document into blocks and headers - :content x12 document in raw format (text) - """ - #_content = content.split('~') - _content = content.split('HL') - _header = _content[:1][0].split('~') - - _blocks = ['HL'+_item for _item in _content[1:]] - _blocks = [_item.split('~') for _item in _blocks ] - - # for row in _content : - # if not _blocks and not row.startswith('HL') : - # _header.append(row) - # else: - # _blocks.append(row) - return {'header':_header,'blocks':_blocks} - def parse (self,columns,index,**_args): - """ - This function encapulates how an x12 document element will be processed - :columns list of attributes that make up the object - :index indexes of the said items in the element - :_args - - row raw x12 element (string) - - config configuration of the element. his should indicate functions to apply against function - """ - _ELEMENT = _args['row'][0] - # - # get the right configuration from the _config object - _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {} - - # _field = _config['field'] if 'field' in _config else None - # _label = _config['label'] if 'label' in _config else None - _map = _config['map'] if 'map' in _config else {} - # - # Let's see if overriding the fields/labels isn't necessary - - - # columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config) - # _field = _field if not _refField else _refField - # _label = _label if not _refLabel else _refLabel - - _outInfo = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config) - - _field,_label = _outInfo['field'],_outInfo['label'] - _columns,_index = _outInfo['columns'],_outInfo['index'] - - - if 'row' in _args: - _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*') - - _index = np.array(_index) - - # - # Sometimes the _row doesn't have all expected indexes, we will compensate - # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format) - # - if np.max(_index) > len(_row) -1 : - _delta = 1 + np.max(_index) - len(_row) - _row = _row + np.repeat('',_delta).tolist() - _row = np.array(_row) - - # _element = _row[0] - - _configKeys = [] #list(self._config.keys()) - _configTree = [] #list(self._config.values()) - if 'config' in _args : - _config = _args['config'] - _configKeys = list(_config.keys()) - _configTree = list(_config.values()) - else: - _config = {} - - _info = dict(zip(_columns,_row[_index].tolist())) - _document = _args['document'] if 'document' in _args else {} - # - # Extracting configuration (minimal information) - # _config = _args['config'] if 'config' in _args else {} - # _config = self._config - - - # if '@ref' in _config : - # print (_config['@ref']) - # _values = _config['@ref'] - # print (_values) - - if _field : - if not _field in _document : - return {_field:_info} - else: - return self.merge(_document[_field],_info) - elif _label : - if not _label in _document : - return {_label:[_info]} - else: - return _document[_label] + [_info] - else: - return _info - - else: - return columns - def elements(self): - """ - This function returns elements that are supported as specified by X12 standard - """ - return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ] - def pointers(self): - """ - This function returns pointers associated with each element ... - :return Object of Element:Function - """ - _attr = self.elements() - _pointers = [getattr(self,_name) for _name in _attr] - return dict(zip(_attr,_pointers)) - - def set(self,_info,_document,_config): - _attrName,_attrType = None,None - - if 'label' in _config : - _attrType = 'label' - _attrName = _config['label'] - elif 'field' in _config : - _attrType = 'field' - _attrName = _config['field'] - - if _attrName : - if _attrName not in _document : - _document[_attrName] = [] if _attrType == 'label' else {} - - # - # @TODO: make sure we don't have a case of an attribute being overridden - if type(_document[_attrName]) == list : - _document[_attrName] += [_info] - else: - _document[_attrName] = dict(_document[_attrName],**_info) - # _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info) - - return _document - - return dict(_document,**_info) - - pass - def log (self,**_args): - pass - def run(self): - """ - This function will trigger the workflow associated with a particular file - """ - _getContent = { - # - # For the sake of testing, the following insures - # that raw string content is handled as if it were a file - # - X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) , - X12DOCUMENT.MODE.NAMES: (lambda name: open(name)) - - - } - _writer = transport.factory.instance(**self._store_args) - for _filename in self.files : - try: - _documents = [] - _parts = [] - # _content = (open(_filename)).read() - _reader = _getContent[self._mode] - _content = _reader(_filename).read() - _info = self.split(_content) - _fileType=self.init(_content) - _header = self.apply(_info['header']) - - # print (json.dumps(_header)) - for _content in _info['blocks'] : - - _body = self.apply(_content,header=_header) - _doc = self.merge(_header,_body) - - if _doc and 'claim_id' in _doc: - # X12DOCUMENT._queue.put(_document) - - _documents += [_doc] - - - except Exception as e: - # - # @TODO: Log this issue for later analysis ... - print (e) - pass - # - # Let us post this to the documents we have, we should find a place to post it - # - if _documents : - # print (_header['header']) - - self.post(document=_documents,writer=_writer) - break - - def post(self,**_args): - """ - This function is intended to post content to a given location - :param document - :param writer - """ - _writer = _args['writer'] if 'writer' in _args else None - _document = _args['document'] - if not _writer: - X12DOCUMENT._queue.put(_document) - else: - - _writer.write(_document) - def _getConfig(self,_chunk): - # - # Let us determine what kind of file we are dealing with, so we can extract the configuration - # For this we need to look for the ST loop ... - # - - line = [line for line in _chunk if line and line[:2] == 'ST' ] - - if line : - # - # We found the header of the block, so we can set the default configuration - # - self._x12FileType = line[0].split('*')[1].strip() - _config = {} - if self._x12FileType : - _config = self._config[self._x12FileType] - - return _config - - def apply(self,_chunk, header = {}): - """ - _chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA - """ - - - _document,_cached = {},{} - _pointers = self.pointers() - _config = self._getConfig(_chunk) - # - # The configuration comes from the file, let's run this in merge mode - # _config = self._configHandler.merge - _pid = None - for line in _chunk : - - segments = line.split('*') - - _ELEMENT = segments[0] - - if _ELEMENT not in _pointers or not _ELEMENT: - continue - if _ELEMENT in ['HL','CLM','ISA'] or not _pid: - _pid = _ELEMENT - if _pid not in _cached : - _cached [_pid] = {} - - _pointer = _pointers[_ELEMENT] - - _args = {'row':segments,'document':_document,'header':header,'config':(_config)} - - - _parsedLine = _pointer(**_args) - # print ([_pid,_ELEMENT,_parsedLine]) - - _cached[_pid] = self.merge(_cached[_pid],_parsedLine) - - - # - # Let's create the documents as we understand them to be - # @TODO: Create a log so there can be visibility into the parser - # - _document = {} - for _id in _cached : - # print ('patient' in _cached[_id] ) - - _document = self.merge(_document,_cached[_id]) - - return _document - - \ No newline at end of file