From 8bc64dbc8b60168d59256c5713e2fc27f823b4ea Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 12 Jan 2021 16:08:41 -0600 Subject: [PATCH] export functions --- healthcareio/export/__init__.py | 8 ++ healthcareio/export/export.py | 239 ++++++++++++++++++++++++++++++++ healthcareio/export/workers.py | 206 +++++++++++++++++++++++++++ 3 files changed, 453 insertions(+) create mode 100644 healthcareio/export/__init__.py create mode 100644 healthcareio/export/export.py create mode 100644 healthcareio/export/workers.py diff --git a/healthcareio/export/__init__.py b/healthcareio/export/__init__.py new file mode 100644 index 0000000..75d3708 --- /dev/null +++ b/healthcareio/export/__init__.py @@ -0,0 +1,8 @@ +""" + +This module is outside of the scope of the parser but it's a good to have because it allows to move data outside of mongodb into another data-store +dependencies: + - data-transport https://healthcareio.the-phi.com/git/code/transport +""" +from healthcareio.export import export +from healthcareio.export import workers diff --git a/healthcareio/export/export.py b/healthcareio/export/export.py new file mode 100644 index 0000000..d521247 --- /dev/null +++ b/healthcareio/export/export.py @@ -0,0 +1,239 @@ +""" +This file implements exporting data from a mongodb database to another data-store in relational format (csv). Any other use case will have to be performed with mongodb native tools + target: + File/SQLite + PostgreSQL + MySQL +@TODO: + - Insure to support both schemas and table prefixes + +Usage : + +License: + +Copyright 2019, The Phi Technology LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +""" +import transport +import numpy as np +import os +import json +import jsonmerge +import sys +# from airflow import DAG +from datetime import timedelta +# import healthcareio.export.workers as workers +from healthcareio.export import workers +import platform +from datetime import datetime +import copy +import requests +import time + +PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json']) +CONFIG = json.loads((open(PATH)).read()) +STORE_URI = 'http://healthcareio.the-phi.com/store/healthcareio' +# +# let us see if we have any custom configurations ... +PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','custom']) +CUSTOM_CONFIG = {} +if os.path.exists(PATH) and os.listdir(PATH) : + PATH = os.sep.join([PATH,os.listdir(PATH)[0]]) + CUSTOM_CONFIG = json.loads((open(PATH)).read()) + +_args = dict(CONFIG['store'],**{'type':'mongo.MongoReader'}) + +def get_field_names (_map,label): + fields = list(_map.keys()) + + return fields if not label else [{label:fields}] +def get_field (entry): + label = list(set(['label','field']) & set(entry.keys())) + label = None if not label else entry[label[0]] + if 'map' not in entry : + return None + _map = entry['map'] + return get_field_names(_map,label) + + pass +# +#-- Get the fields to export that will go in the the unwind ; +# +def meta(config) : + """ + This function will return the metadata associated with a given configuraiton 835 or 838 + :params config configuration section + """ + _info = [] + table_count = 1 + for prefix in config : + # if 'map' in config[prefix] : + # label = list(set(['label','field']) & set(config[prefix].keys())) + # label = None if not label else config[prefix][label[0]] + # _map = config[prefix]['map'] + # _info += (field_info(_map,label)) + + if type(config[prefix]) != dict : + continue + if '@ref' in config[prefix] and set(['label','field','map']) & set(config[prefix]['@ref'].keys()): + for subprefix in config[prefix]['@ref'] : + _entry = config[prefix]['@ref'][subprefix] + _info += get_field(_entry) + elif set(['label','field','map']) & set(config[prefix].keys()): + _entry = config[prefix] + if 'map' in _entry : + _info += get_field(_entry) + + + # + # We need to organize the fields appropriately here + # + fields = {"main":[],"rel":{}} + for row in _info : + if type(row) == str : + fields['main'] += [row] + else : + + fields['rel'] = jsonmerge.merge(fields['rel'],row) + + return fields +def create (**_args) : + skip = [] if 'skip' not in _args else _args['skip'] + fields = ([_args['key']] if 'key' in _args else []) + _args['fields'] + table = _args['table'] + sql = ['CREATE TABLE :table ',"(",",\n".join(["\t".join(["\t",name,"VARCHAR(125)"]) for name in fields]),")"] + return " ".join(sql) +def read (**_args) : + """ + This function will read rows with a set number of files and store them into a data-store + """ + files = _args['files'] + fields = _args ['fields'] + name = _args['id'] + pipeline= {"find":name,"filter":{"name":{"$in":files}}} + # + # @TODO: Find a way to write the data into a data-store + # - use dbi interface with pandas or stream it in + # +def init (**_args) : + """ + This function is intended to determine the number of tables to be created, as well as their type. + :param type {835,837} + :param skip list of fields to be skipped + """ + TYPE = _args['type'] + SKIP = _args['skip'] if 'skip' in _args else [] + _config = CONFIG['parser'][TYPE][0] + if TYPE in CUSTOM_CONFIG : + _config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE]) + # + # @TODO: implement fields to be skipped ... + # + TABLE_NAME = 'claims' if TYPE== '837' else 'remits' + _info = meta(_config) + # project = dict.fromkeys(["_id","claim_id"]+_info['main'],1) + project = {} + for field_name in _info['main'] : + + _name = "".join(["$",field_name]) + project[field_name] = {"$ifNull":[_name,""]} + project["_id"] = 0 + project = {"$project":project} + + r = [{"table":TABLE_NAME,"mongo":{"aggregate":TABLE_NAME,"pipeline":[project],"cursor":{},"allowDiskUse":True},"sql":create(table=TABLE_NAME,fields=_info['main'])}] + for table in _info['rel'] : + # + # NOTE: Adding _index to the fields + fields = _info['rel'][table] +["_index"] + + project = {"_id":0,"claim_id":1,"_index":1} #dict.fromkeys(["_id","claim_id"]+fields,[ ".".join([table,field_name]) for field_name in fields]) + for field_name in fields : + # project[field_name] = "$"+".".join([table,field_name]) + _name = "$"+".".join([table,field_name]) + project[field_name] = {"$ifNull":[_name,""]} #{"$cond":[{"$eq":[_name,None]},"",_name]} + project["_id"] = 0 + pipeline = [{"$unwind":"$"+table},{"$project":project}] + r += [{"table":table,"mongo":{"aggregate":TABLE_NAME,"cursor":{},"pipeline":pipeline,"allowDiskUse":True},"sql":create(table=table,key='claim_id',fields=fields)}] + + return r + +class Factory: + @staticmethod + def license(**_args): + body = {} + body['email'] = _args['email'] + body['host'] = platform.node() + body['date'] = {"month":datetime.now().month,"year":datetime.now().year,"day":datetime.now().day} + headers = {'uid': body['email'],'content-type':'application/json'} + uri = STORE_URI+'/init' + http = requests.session() + r = http.post(uri,headers=headers,data=body) + + return r.json() if r.status_code == 200 else {} + @staticmethod + def instance(**_args): + """ + The creation process will only require a target store and a type (385,837) + :param type EDI type to be processed i.e 835 or 837 + :param write_store target data-store (redshift, mariadb,mongodb ...) + """ + _features = Factory.license(email=CONFIG['owner']) + store = copy.deepcopy(CONFIG['store']) + store['type']='mongo.MongoReader' + + wstore = _args['write_store'] #-- output data store + TYPE = _args['type'] + PREFIX = 'clm_' if TYPE == '837' else 'era_' + SCHEMA = '' if 'schema' not in wstore['args'] else wstore['args']['schema'] + _config = CONFIG['parser'][TYPE][0] + if TYPE in CUSTOM_CONFIG : + _config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE]) + # _info = meta(_config) + job_args = init(type=TYPE) + # print (json.dumps(job_args)) + _jobs = [] + for row in job_args: + # _store = json.loads(json.dumps(wstore)) + _store = copy.deepcopy(wstore) + _store['args']['table'] = row['table'] + _pipe = [ + workers.CreateSQL(prefix=PREFIX,schema=SCHEMA,store=_store,sql=row['sql']), + workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,mongo=row['mongo'],max_rows=250000,features=_features,table=row['table']), + workers.Writer(prefix=PREFIX,schema=SCHEMA,store=_store) + ] + _jobs += [workers.Subject(observers=_pipe,name=row['table'])] + return _jobs + +# if __name__ == '__main__' : + # pass +# pipes = Factory.instance(type='835',write_store={"type":"sql.SQLWriter","args":{"provider":"postgresql","db":"sample",}}) #"inspect":0,"cast":0}}) +# # pipes[0].run() +# for thread in pipes: +# thread.start() +# time.sleep(1) +# while pipes : +# pipes = [thread for thread in pipes if thread.is_alive()] +# time.sleep(10) +# print (Factory.license(email='steve@the-phi.com')) +# +# check account with basic inormation +# + +# class Observerob: +# def __init__(**_args) : + +# #-- Let us all flatten the table +# # +# TYPE = '835' +# _config = jsonmerge.merge(CONFIG['parser'][TYPE][0],CUSTOM_CONFIG[TYPE]) +# # f = meta(CONFIG['parser'][TYPE][0]) +# # _f = meta(CUSTOM_CONFIG[TYPE]) +# f = meta(_config) +# # print (json.dumps( (f))) +# print (json.dumps(init(type='835'))) +# # print (create(fields=f['rel']['adjudicated'],table='adjudicated',key='claim_id')) diff --git a/healthcareio/export/workers.py b/healthcareio/export/workers.py new file mode 100644 index 0000000..3655c00 --- /dev/null +++ b/healthcareio/export/workers.py @@ -0,0 +1,206 @@ +""" + HealthcareIO - The Phi Technology LLC 2020 + + This file contains functionalities that implement elements of an ETL pipeline that will consist of various workers. + The pipeline is built around an observer design pattern. + + @TODO: Integrate with airflow and other process monitoring tools +""" +import transport +import os +from multiprocessing import Process +import numpy as np +import json +class Subject (Process): + def __init__(self,**_args): + super().__init__() + self.observers = _args['observers'] + self.index = 0 + self.name = _args['name'] + pass + def run(self): + self.notify() + def notify(self): + if self.index < len(self.observers) : + observer = self.observers[self.index] + _observer = None if self.index == 0 else self.observers[self.index -1] + _invalues = None if not _observer else _observer.get() + observer.init(caller=self,invalues = _invalues) + self.index += 1 + observer.execute() + +class Worker : + def __init__(self,**_args): + #PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) + #CONFIG = json.loads((open(PATH)).read()) + self._info = _args['store'] + self.logs = [] + self.schema = _args['schema'] + self.prefix = _args['prefix'] + + def name(self): + return self.__class__.__name__ + def log (self,**_args): + """ + This function is designed to log to either the console or a data-store + """ + print (_args) + pass + def init(self,**_args): + """ + Initializing a worker with arguments needed for it to perform it's task basic information needed are + :param caller caller to be notified + :param store data-store information i.e (pgsql,couchdb, mongo ...) + """ + self.caller = _args['caller'] + #self._info = _args['store'] + self._invalues = _args['invalues'] if 'invalues' in _args else None + def execute(self): + try: + self._apply() + finally: + + self.caller.notify() + def _apply(self): + pass + def get(self): + pass + def notify(self): + self.caller.notify() + def tablename(self,name) : + PREFIX_SEPARATOR = '_' if '_' not in self.prefix else '' + SCHEMA_SEPARATOR = '' if self.schema.strip() =='' else '.' + TABLE_NAME = PREFIX_SEPARATOR.join([self.prefix,name]) + return SCHEMA_SEPARATOR.join([self.schema,TABLE_NAME]) + + +class CreateSQL(Worker) : + """ + This class is intended to create an SQL Table given the + """ + def __init__(self,**_args): + super().__init__(**_args) + self._sql = _args['sql'] + def init(self,**_args): + super().init(**_args) + + def _apply(self) : + + sqltable = self.tablename(self._info['args']['table']) + # log = {"context":self.name(),"args":{"table":self._info['args']['table'],"sql":self._sql}} + log = {"context":self.name(),"args":{"table":sqltable,"sql":self._sql.replace(":table",sqltable)}} + try: + + + writer = transport.factory.instance(**self._info) + writer.apply(self._sql.replace(":table",sqltable)) + writer.close() + log['status'] = 1 + except Exception as e: + log['status'] = 0 + log['info'] = {"error":e.args[0]} + print (e) + finally: + self.log(**log) + +class Reader(Worker): + """ + read from mongodb and and make the data available to a third party + :param pipeline mongodb command + :param max_rows maximum rows to be written in a single insert + """ + def __init__(self,**_args): + super().__init__(**_args) + + + self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL + self.MAX_ROWS = _args['max_rows'] + self.table = _args['table'] + + # is_demo = 'features' not in _args or ('features' in _args and ('export_etl' not in _args['features'] or _args['features']['export_etl'] == 0)) + # + # @TODO: Bundle the limits with the features so as to insure that it doesn't come across as a magic number + # + # LIMIT = -1 + # if is_demo : + # LIMIT = 10000 + # if set(['find','distinct']) & set(self.pipeline.keys()) : + # self.pipeline['limit'] = LIMIT + # elif 'aggregate' in self.pipeline : + + # self.pipeline['pipeline'] = [{"$limit":LIMIT}] + self.pipeline['pipeline'] + # self.log(**{"context":self.name(),"demo":is_demo,"args":{"limit":LIMIT}}) + + def init(self,**_args): + super().init(**_args) + self.rows = [] + + def _apply(self): + self.reader = transport.factory.instance(**self._info) ; + + self.rows = self.reader.read(mongo=self.pipeline) + + N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1 + N = int(N) + # self.rows = rows + _log = {"context":self.name(),"args":self._info['args']['db'], "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}} + self.rows = np.array_split(self.rows,N) + + + # self.get = lambda : rows #np.array_split(rows,N) + self.reader.close() + + # + self.log(**_log) + # @TODO: Call the caller and notify it that this here is done + def get(self): + return self.rows + + +class Writer(Worker): + def __init__(self,**_args): + super().__init__(**_args) + def init(self,**_args): + """ + :param store output data-store needed for writing + :param invalues input values with to be written somewhere + """ + super().init(**_args) + + self._invalues = _args['invalues'] + + def _apply(self): + + # table = self._info['args']['table'] if 'table' in self._info['args'] else 'N/A' + table = self.tablename(self._info['args']['table']) + + self._info['args']['table'] = table; + writer = transport.factory.instance(**self._info) + index = 0 + + if self._invalues : + for rows in self._invalues : + # print (['segment # ',index,len(rows)]) + self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}}) + if len(rows) : + + writer.write(list(rows)) + index += 1 + # for _e in rows : + # writer.write(_e) + + + + else: + print ("No data was passed") + + + writer.close() + +#_args = {"type":"mongo.MongoReader","args":{"db":"parserio","doc":"logs"}} +#reader = Reader() +#reader.init(store = _args,pipeline={"distinct":"claims","key":"name"}) +#reader._apply() +#print (reader.get()) +#for row in reader.get() : +# print (row)