From a6b52719f9f718116d9cc13b59caceedffb45a9b Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 6 Feb 2024 14:32:59 -0600 Subject: [PATCH] bug fix: export & sql doesn't always have objects casted --- healthcareio/__main__.py | 22 ++++++++++++++++++--- healthcareio/x12/parser.py | 7 +++++-- healthcareio/x12/publish.py | 33 +++++++++++++++++++++++-------- healthcareio/x12/util/document.py | 2 -- 4 files changed, 49 insertions(+), 15 deletions(-) diff --git a/healthcareio/__main__.py b/healthcareio/__main__.py index 0cac63b..38e1235 100644 --- a/healthcareio/__main__.py +++ b/healthcareio/__main__.py @@ -178,12 +178,28 @@ def publish (file_type:str,path:str): path path to export configuration (data transport file) - file_type claims or remits + file_type claims or remits (835 or 837) """ - if file_type in ['837','claims'] : + _type = None + if file_type.strip() in ['837','claims'] : _type = 'claims' - elif file_type in ['835','remits']: + _x12 = '837' + elif file_type.strip() in ['835','remits']: _type = 'remits' + _x12 = '835' + if _type : + print ([f"Exporting {_type}"]) + _store = {'source':os.sep.join([CONFIG_FOLDER,'config.json']),'target':path} + for _key in _store : + f = open(_store[_key]) + _store[_key] = json.loads(f.read()) + f.close() + _store['source'] = _store['source']['store'] + + _plugins,_parents = x12.plugins.instance() + x12.publish.init(plugins=_plugins,x12=_x12,store=_store) + else: + print ("Can not determine type, (837 or 835)") if __name__ == '__main__' : diff --git a/healthcareio/x12/parser.py b/healthcareio/x12/parser.py index 464ebe0..71fd49d 100644 --- a/healthcareio/x12/parser.py +++ b/healthcareio/x12/parser.py @@ -17,6 +17,7 @@ from healthcareio.logger import X12Logger import time import pandas as pd +from transport import providers class BasicParser (Process) : def __init__(self,**_args): @@ -155,8 +156,10 @@ class X12Parser(BasicParser): TABLE = 'claims' if _args['x12'] in ['837','claims'] else 'remits' _store['table'] = TABLE _store['cotnext'] = 'write' - - _writer = transport.factory.instance(**_store) + _writer = transport.factory.instance(**_store) + # if _store['provider'] not in [providers.MONGODB, providers.COUCHDB] : + + _writer.write(_documents,table=TABLE) if getattr(_writer,'close') : _writer.close() diff --git a/healthcareio/x12/publish.py b/healthcareio/x12/publish.py index 1dfa0c3..3f6e289 100644 --- a/healthcareio/x12/publish.py +++ b/healthcareio/x12/publish.py @@ -5,6 +5,7 @@ import numpy as np import time import pandas as pd from multiprocessing import Process +import json def build (**_args): """ @@ -56,19 +57,31 @@ def format(**_args) : _pkvalue = None if _primary_key in _claim : _pkvalue = _claim[_primary_key] - + for _attrName in _claim : _item = _claim[_attrName] _item = update(_item,_primary_key,_pkvalue) - + # + # We have a casting problem, with relational data-store and JSON objects + # + if type(_item) == str and (_item.startswith("{") or _item.startswith("{")) : + try: + + _item = json.loads(_item) + + except Exception as ee : + # print (ee) + pass if _attrName not in _tables and type(_item) in [dict,list]: _tables[_attrName] = [] if type(_item) in [dict,list] : _tables[_attrName] += _item if type(_item) == list else [_item] + pass else: # # This section suggests we found a main table attribute _main[_attrName] = _item + _tables[_mainTableName].append(_main) return _tables @@ -100,20 +113,19 @@ def init(**_args): _default = build(plugins=_plugins,x12=_file_type) _df = read(store = _store['source'],x12=_file_type) - _pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type) SEGMENTS = 4 # arbitrary choice _indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS) jobs = [] for _ii in _indexes : try: - _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey) - + _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey) _thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default,'x12':_file_type},)) jobs.append(_thread) except Exception as e: # # Log: sigment, + print (e) pass if jobs : jobs[0].start() @@ -126,6 +138,7 @@ def read (**_args): _store = copy.copy(_args['store']) _x12 = _args['x12'] _store['table'] = getContext(_x12) #'claims' if _x12 == '837' else 'remits' + _store['context'] = 'read' reader = transport.factory.instance(**_store) # # @TODO: reading should support streaming (for scalability) @@ -136,9 +149,14 @@ def read (**_args): def post(_args): _data = _args['data'] _store = _args['store'] + _store['context'] = 'write' _default = _args['default'] _prefix = 'clm_' if _args['x12'] == '837' else 'rem_' + # if 'claims' in _data or 'remits' in _data : + # _key = 'claims' if 'claims' in _data else 'remits' + # _data = _data[_key] for _name in _data : + _tablename = _prefix+_name _store['table'] = _tablename if _name not in ['remits','claims'] else _name _store['context']='write' @@ -147,12 +165,11 @@ def post(_args): _rows = [_default[_name]] else: _rows = _data[_name] - - writer.write(pd.DataFrame(_rows).fillna('')) if hasattr(writer,'close') : writer.close() - + # + # Have a logger here to log what's going on ... # _xwriter = trasnport.factory.instance(**_store) # _xwriter.write(_df) # _info = format() diff --git a/healthcareio/x12/util/document.py b/healthcareio/x12/util/document.py index c1ffdaa..0818766 100644 --- a/healthcareio/x12/util/document.py +++ b/healthcareio/x12/util/document.py @@ -247,9 +247,7 @@ class Builder: # The element has NOT been specified by the plugin (alas) # For this case we would advise writing a user-defined plugin to handle this case # - print (self._logger) if self._logger : - print (['....................']) self._logger.log(action='missing-plugin',module='build',data={'element':_row[0],'anchor':_row[1]}) return _document