diff --git a/healthcareio/x12/export/__init___.py b/healthcareio/x12/export/__init___.py deleted file mode 100644 index 1cb5dc9..0000000 --- a/healthcareio/x12/export/__init___.py +++ /dev/null @@ -1,36 +0,0 @@ -__doc__ = """" -This module is designed to perform exports to a relational data stores, We have 2 methods to infer structure possible : -1. learn structure from the data -2. use template: fast, but may not generalize -Note that the There are two possible methods to perform relational exports -""" -# import transport -# from transport import providers -# # from healthcareio.x12 import plugins, util -# print ('hello world') -# class Mode : -# TEMPLATE,LEARN = [0,1] -# def build (): -# pass -# class Template : -# """ -# This class is intended to generate a set of SQL Instructions to to create the tables -# """ -# @staticmethod -# def build (**_args): -# """ -# This function will build SQL statements to create a table (perhaps not needed) -# :plugins loaded plugins -# :x12 837|835 file types -# """ -# _plugins=_args['plugins'] -# _x12 = _args['x12'] -# _template = util.template(plugins=_plugins)[_x12] -# _primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12) -# _tables = [] -# for _item in _template : -# if _primaryKey not in _item : -# _item[_primaryKey] = '' -# _tables.append(_item) -# return _tables - diff --git a/healthcareio/x12/publish.py b/healthcareio/x12/publish.py new file mode 100644 index 0000000..6438925 --- /dev/null +++ b/healthcareio/x12/publish.py @@ -0,0 +1,154 @@ +import copy +from . import util +import transport +import numpy as np +import time +import pandas as pd +from multiprocessing import Process + +def build (**_args): + """ + This function will build SQL statements to create a table (perhaps not needed) + :plugins loaded plugins + :x12 837|835 file types + """ + _plugins=_args['plugins'] + _x12 = _args['x12'] + _template = util.template(plugins=_plugins)[_x12] + _primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12) + _tables = [] + _main = {} + for _name in _template : + _item = _template[_name] #copy.deepcopy(_template[_name]) + if _primaryKey not in _item and type(_item) == dict: + _item[_primaryKey] = '' + _tables.append({_name:_item}) + else: + _main[_name] = '' + + _name = getContext(_x12) + _tables += [{_name:_main}] + _template[_name] = _main + + return _template #_tables +def getContext(_x12) : + return 'claims' if _x12 == '837' else 'remits' +def format(**_args) : + """ + :rows rows for the + :primary_key primary_key field name + :x12 file format + """ + + # _name = _args['table'] + _rows = _args['rows'] + + _primary_key = _args['primary_key'] + _x12 = _args['x12'] + _mainTableName = getContext(_x12) + _tables = {_mainTableName:[]} + + for _claim in _rows : + # # + # # Turn the claim into a relational model ... + # # + _main = {} + _pkvalue = None + if _primary_key in _claim : + _pkvalue = _claim[_primary_key] + + for _attrName in _claim : + _item = _claim[_attrName] + _item = update(_item,_primary_key,_pkvalue) + + if _attrName not in _tables and type(_item) in [dict,list]: + _tables[_attrName] = [] + if type(_item) in [dict,list] : + _tables[_attrName] += _item if type(_item) == list else [_item] + else: + # + # This section suggests we found a main table attribute + _main[_attrName] = _item + _tables[_mainTableName].append(_main) + + return _tables +def update (_item,key,value): + if type(_item) not in [dict,list] : + return _item + if type(_item) == dict : + _item[key] = value + else: + # + # List, we will go through every item and update accordingly + _index = 0 + for _row in _item : + if type(_row) == dict : + _row['_index'] = _index + _row[key] = value + return _item +def init(**_args): + """ + This function will kick off the export process provided claims/remits and the loaded plugins (not sure why) + It requires the data it is pulling to be consistently formatted (otherwise nothing can be done) + :plugins + :store data store information i.e {source,target} specifications for data-transport + :x12 file type i.e 837|835 + """ + _file_type = _args['x12'] + _plugins = _args['plugins'] + _store = _args['store'] + _default = build(plugins=_plugins,x12=_file_type) + + _df = read(store = _store['source'],x12=_file_type) + + _pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type) + SEGMENTS = 4 # arbitrary choice + _indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS) + jobs = [] + for _ii in _indexes : + + _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey) + + _thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default},)) + jobs.append(_thread) + if jobs : + jobs[0].start() + jobs[0].join() + while jobs : + jobs = [thread for thread in jobs if thread.is_alive()] + time.sleep(1) + +def read (**_args): + _store = copy.copy(_args['store']) + _x12 = _args['x12'] + _store['table'] = getContext(_x12) #'claims' if _x12 == '837' else 'remits' + reader = transport.factory.instance(**_store) + # + # @TODO: reading should support streaming (for scalability) + _df = reader.read() + + return _df + +def post(_args): + _data = _args['data'] + _store = _args['store'] + _default = _args['default'] + + for _name in _data : + _store['table'] = _name + _store['context']='write' + writer = transport.factory.instance(**_store) + if len(_data[_name]) == 0 and _name in _default: + _rows = [_default[_name]] + else: + _rows = _data[_name] + + + writer.write(_rows) + if hasattr(writer,'close') : + writer.close() + + # _xwriter = trasnport.factory.instance(**_store) + # _xwriter.write(_df) + # _info = format() + pass