You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			171 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			171 lines
		
	
	
		
			6.9 KiB
		
	
	
	
		
			Python
		
	
"""
 | 
						|
    This file serves as proxy to healthcare-io, it will be embedded into the API
 | 
						|
"""
 | 
						|
import os
 | 
						|
import transport
 | 
						|
import numpy as np
 | 
						|
from healthcareio import x12
 | 
						|
import pandas as pd
 | 
						|
import smart
 | 
						|
from healthcareio.analytics import Apex
 | 
						|
import time
 | 
						|
class get :
 | 
						|
    PROCS = []
 | 
						|
    PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
 | 
						|
    @staticmethod
 | 
						|
    def resume (files,args):
 | 
						|
        """
 | 
						|
        This function will determine the appropriate files to be processed by performing a simple complementary set operation against the logs
 | 
						|
        @TODO: Support data-stores other than mongodb
 | 
						|
        :param files   list of files within a folder
 | 
						|
        :param _args    configuration
 | 
						|
        """
 | 
						|
        _args = args['store'].copy()
 | 
						|
        if 'mongo' in _args['type'] :
 | 
						|
            _args['type'] = 'mongo.MongoReader'
 | 
						|
            reader = transport.factory.instance(**_args)
 | 
						|
        _files = []
 | 
						|
        try:
 | 
						|
            pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
 | 
						|
            _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
 | 
						|
            _files = reader.read(mongo = _args)
 | 
						|
            _files = [item['name'] for item in _files]
 | 
						|
        except Exception as e :
 | 
						|
            pass
 | 
						|
        print (["found ",len(files),"\tProcessed  ",len(_files)])
 | 
						|
        return list(set(files) - set(_files))
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def processes(_args):
 | 
						|
        _info = pd.DataFrame(smart.top.read(name='healthcare-io.py'))[['name','cpu','mem']]
 | 
						|
        
 | 
						|
        if _info.shape[0] == 0 :
 | 
						|
            _info = pd.DataFrame({"name":["healthcare-io.py"],"cpu":[0],"mem":[0]})
 | 
						|
        # _info = pd.DataFrame(_info.groupby(['name']).sum())
 | 
						|
        # _info['name'] = ['healthcare-io.py']
 | 
						|
        m = {'cpu':'CPU','mem':'RAM','name':'name'}
 | 
						|
        _info.columns = [m[name] for name in _info.columns.tolist()]
 | 
						|
        _info.index = np.arange(_info.shape[0])
 | 
						|
 | 
						|
        charts = []
 | 
						|
        for label in ['CPU','RAM'] :
 | 
						|
            value = _info[label].sum()
 | 
						|
            df = pd.DataFrame({"name":[label],label:[value]})
 | 
						|
            charts.append (
 | 
						|
                Apex.apply(
 | 
						|
                    {"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}}
 | 
						|
                    )['apex']
 | 
						|
                )
 | 
						|
        #
 | 
						|
        # This will update the counts for the processes, upon subsequent requests so as to show the change
 | 
						|
        #     
 | 
						|
        N = 0
 | 
						|
        lprocs = []
 | 
						|
        for proc in get.PROCS :
 | 
						|
            if proc.is_alive() :
 | 
						|
                lprocs.append(proc)
 | 
						|
        N = len(lprocs)     
 | 
						|
        get.PROCS = lprocs
 | 
						|
        return {"process":{"chart":charts,"counts":N}}
 | 
						|
    @staticmethod
 | 
						|
    def files (_args):
 | 
						|
        _info = smart.folder.read(path='/data')
 | 
						|
        N = _info.files.tolist()[0]
 | 
						|
        if 'mongo' in _args['store']['type'] :
 | 
						|
            store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
 | 
						|
            # reader = transport.factory.instance(**_args)
 | 
						|
            
 | 
						|
            pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}]
 | 
						|
            query = {"mongo":{"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}}
 | 
						|
            # _info = pd.DataFrame(reader.read(mongo={"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}))
 | 
						|
            pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
 | 
						|
            _query = {"mongo":{"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}} #-- distribution claims/remits
 | 
						|
 | 
						|
 | 
						|
        else:
 | 
						|
            store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
 | 
						|
            store_args['args']['table'] = 'logs'
 | 
						|
            query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"}
 | 
						|
            _query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits
 | 
						|
        reader = transport.factory.instance(**store_args)
 | 
						|
        _info = pd.DataFrame(reader.read(**query))
 | 
						|
        if not _info.shape[0] :
 | 
						|
            _info = pd.DataFrame({"status":["completed"],"count":[0]})
 | 
						|
        _info['count'] = np.round( (_info['count'] * 100 )/N,2)
 | 
						|
        
 | 
						|
        charts = [Apex.apply({"data":_info,"chart":{"type":"radial","axis":{"y":"status","x":"count"}}})['apex']]
 | 
						|
        #
 | 
						|
        # Let us classify the files now i.e claims / remits
 | 
						|
        #
 | 
						|
        
 | 
						|
        
 | 
						|
        # pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
 | 
						|
        # _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
 | 
						|
        # r = pd.DataFrame(reader.read(mongo=_args))
 | 
						|
        r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits
 | 
						|
        r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex']
 | 
						|
        r['chart']['height'] = '100%'
 | 
						|
        r['legend']['position'] = 'bottom'
 | 
						|
 | 
						|
        charts += [r]
 | 
						|
 | 
						|
        
 | 
						|
        return {"files":{"counts":N,"chart":charts}}
 | 
						|
 | 
						|
        pass
 | 
						|
#
 | 
						|
# Process handling ....
 | 
						|
 | 
						|
 | 
						|
def run (_args) :
 | 
						|
    """
 | 
						|
    This function will run the jobs and insure as processes (as daemons).
 | 
						|
    :param _args    system configuration
 | 
						|
    """
 | 
						|
    FILES = []
 | 
						|
    BATCH = int(_args['args']['batch']) #-- number of processes (poorly named variable)
 | 
						|
 | 
						|
    for root,_dir,f in os.walk(_args['args']['folder']) :
 | 
						|
        if f :
 | 
						|
            FILES += [os.sep.join([root,name]) for name in f]
 | 
						|
    FILES = get.resume(FILES,_args)
 | 
						|
    FILES = np.array_split(FILES,BATCH)
 | 
						|
    
 | 
						|
    for FILE_GROUP in FILES :
 | 
						|
        
 | 
						|
        FILE_GROUP = FILE_GROUP.tolist()
 | 
						|
        # logger.write({"process":index,"parse":_args['parse'],"file_count":len(row)})
 | 
						|
        # proc = Process(target=apply,args=(row,info['store'],_info,))
 | 
						|
        parser = x12.Parser(get.PATH) #os.sep.join([PATH,'config.json']))
 | 
						|
        parser.set.files(FILE_GROUP)   
 | 
						|
        parser.daemon = True
 | 
						|
        parser.start()
 | 
						|
        get.PROCS.append(parser)     
 | 
						|
        time.sleep(3)
 | 
						|
    #
 | 
						|
    # @TODO:consider submitting an update to clients via publish/subscribe framework
 | 
						|
    #
 | 
						|
    return get.PROCS
 | 
						|
def stop(_args):
 | 
						|
    for job in get.PROCS :
 | 
						|
        if job.is_alive() :
 | 
						|
            job.terminate()
 | 
						|
    get.PROCS = []
 | 
						|
    #
 | 
						|
    # @TODO: consider submitting an update to clients via publish/subscribe framework
 | 
						|
    pass
 | 
						|
def write(src_args,dest_args,files) :
 | 
						|
    #
 | 
						|
    # @TODO: Support for SQLite
 | 
						|
    pass
 | 
						|
def publish (src_args,dest_args,folder="/data"):
 | 
						|
    FILES = []
 | 
						|
    for root,_dir,f in os.walk(folder) :
 | 
						|
        if f :
 | 
						|
            FILES += [os.sep.join([root,name]) for name in f]
 | 
						|
    #
 | 
						|
    # @TODO: Add support for SQLite ....
 | 
						|
    
 | 
						|
    FILES = np.array_split(FILES,4)
 | 
						|
    
 |