You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			565 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			565 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
import pandas as pd
 | 
						|
import numpy as np
 | 
						|
import os
 | 
						|
import io
 | 
						|
import json
 | 
						|
from multiprocessing import Process
 | 
						|
import transport
 | 
						|
import sqlite3 as lite
 | 
						|
import numpy as np
 | 
						|
import transport
 | 
						|
import matplotlib.pyplot as plt
 | 
						|
import re, base64
 | 
						|
# from weasyprint import HTML, CSS
 | 
						|
COLORS = ["#f79256","#7dcfb6","#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"]
 | 
						|
class stdev :
 | 
						|
    def __init__(self) :
 | 
						|
        self.values = []
 | 
						|
    def step(self,value):
 | 
						|
        if value : #and type in [np.int64, np.int32,np.float64,np.float32, int]:
 | 
						|
            self.values.append(value)            
 | 
						|
    def finalize(self):
 | 
						|
        return np.std(self.values) if self.values else None
 | 
						|
 | 
						|
 | 
						|
# conn = lite.connect("/home/steve/healthcare-io/healthcare-io.db3")
 | 
						|
# conn.create_aggregate("stdev",1,stdev)
 | 
						|
# df = pd.read_sql("select count(distinct (json_extract(data,'$.patient_id'))) as patient_count, avg(json_array_length(data,'$.procedures')) mean, stdev(json_array_length(data,'$.procedures')) stdev from claims",conn)
 | 
						|
ROOT_FOLDER = 'stats'
 | 
						|
# plt.gcf().subplots_adjust(bottom=0.15)
 | 
						|
# from matplotlib import rcParams
 | 
						|
# rcParams.update({'figure.autolayout': True})
 | 
						|
class Chart :
 | 
						|
    @staticmethod
 | 
						|
    def remove_borders(axes,wedges,labels,item) :
 | 
						|
        # plt.axes()
 | 
						|
        axes.spines["top"].set_visible(False)
 | 
						|
        # plt.axes().
 | 
						|
        axes.spines["right"].set_visible(False)
 | 
						|
        axes.legend(wedges, labels #,title=item['label']
 | 
						|
        ,loc="upper right",fontsize=12,bbox_to_anchor=(1, 0, 0.5, 1),fancybox=True,framealpha=0.2)                
 | 
						|
        # plt.axes().
 | 
						|
        # axes.spines["left"].set_visible(False)
 | 
						|
        if 'axis' in item['chart'] :
 | 
						|
            
 | 
						|
            axes.set_ylabel(item['chart']['axis']['y'])
 | 
						|
            axes.set_xlabel(item['chart']['axis']['x'])
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def donut(item,**args) :
 | 
						|
        df = item['data']
 | 
						|
        x = item['chart']['x'] #args['x']
 | 
						|
        labels = item['chart']['y']
 | 
						|
        labels = df[labels]
 | 
						|
 | 
						|
        # figure = plt.figure()
 | 
						|
        figure, axes = plt. subplots()
 | 
						|
        # wedges, texts = plt.pie(df[x],labels=labels)
 | 
						|
        colors = COLORS[:len(labels)] #np.random.choice(COLORS,len(labels),replace=False)
 | 
						|
        wedges = axes.pie(df[x],labels=labels,wedgeprops=dict(width=0.3),colors=colors,autopct=lambda pct: "{:.2f}%\n({:.0f})".format(pct,int((pct/100)*df[x].sum() )))  #,autopct=lambda pct: func(pct, df[x].values))
 | 
						|
        # my_circle=plt.Circle( (0,0), 0.7, color='#ffffff',fill=True)
 | 
						|
        # p=plt.gcf()
 | 
						|
        # p.gca().add_artist(my_circle)   
 | 
						|
        # plt.legend(wedges, labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1))        
 | 
						|
        # axes.legend(wedges[0], labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1),framealpha=0,edgecolor='#CAD5E0',
 | 
						|
 | 
						|
        # )        
 | 
						|
#         x = plt.show()
 | 
						|
        Chart.remove_borders(axes,wedges[0],labels,item) 
 | 
						|
        plt.close()
 | 
						|
    
 | 
						|
        return figure
 | 
						|
    @staticmethod
 | 
						|
    def barh(item,**args):
 | 
						|
        """
 | 
						|
        This function will return/render a bar chart (horizontal) which is conducive to showing distributions of things like diagnosis codes        
 | 
						|
        """
 | 
						|
        # figure = plt.figure()
 | 
						|
        figure, axes = plt. subplots()
 | 
						|
        y_labels = item['chart']['y'][0]   
 | 
						|
        x_labels = item['chart']['x'] #[args['x']] if type(args['x']) == str else args['x']
 | 
						|
        df = item['data'].iloc[:9].copy()
 | 
						|
#         odf = item['data'].iloc[9:].copy().mean().to_frame().T
 | 
						|
#         odf[y_labels] = 'Other'
 | 
						|
#         df  = df.append(odf)
 | 
						|
        wedges = []
 | 
						|
        # COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600']
 | 
						|
        for x_ in x_labels:
 | 
						|
            index = x_labels.index(x_)
 | 
						|
            color = COLORS[index]
 | 
						|
            w = axes.barh(df[y_labels],df[x_],align='edge',label='counts' ,color=color)    
 | 
						|
            
 | 
						|
            wedges += [w]
 | 
						|
#         labels = [name.replace('_',' ') for name in x_labels]
 | 
						|
        # axes.legend(wedges,[name.replace('_',' ') for name in x_labels],
 | 
						|
        #             title=item['label'],
 | 
						|
        #             framealpha=0,
 | 
						|
        #             edgecolor='#CAD5E0',
 | 
						|
 | 
						|
        #           loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)
 | 
						|
        #           )   
 | 
						|
        Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in x_labels],item)
 | 
						|
        plt.close()
 | 
						|
    
 | 
						|
        return figure
 | 
						|
    @staticmethod
 | 
						|
    def spline(item,**args):
 | 
						|
        """
 | 
						|
        """
 | 
						|
        df = item['data']     
 | 
						|
        # figure = plt.figure()
 | 
						|
        figure, axes = plt. subplots()
 | 
						|
        wedges = []
 | 
						|
        item['chart']['x'] = [item['chart']['x']]if type(item['chart']['x']) == str else item['chart']['x']
 | 
						|
        # COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600']
 | 
						|
        for xl in item['chart']['x'] :
 | 
						|
            x = df[xl]
 | 
						|
            index = 0
 | 
						|
            for yl in item['chart']['y'] :
 | 
						|
                y  = df[yl]    
 | 
						|
                color = COLORS[index]
 | 
						|
                if 'scatter' in args :
 | 
						|
                    w = plt.plot(x,y,'o',color=color)    
 | 
						|
                else:
 | 
						|
                    w = plt.plot(x,y,color=color,marker='o')
 | 
						|
                
 | 
						|
                wedges += w
 | 
						|
                index += 1
 | 
						|
#         print (item['chart']['x'])
 | 
						|
        # if 'axis' in item :
 | 
						|
        #     axes.set_ylabel(item['axis']['y'])
 | 
						|
        #     axes.set_xlabel(item['axis']['x'])
 | 
						|
#         plt.title(item['label'])
 | 
						|
        # axes.legend(wedges,[name.replace('_',' ') for name in item['chart']['y']],
 | 
						|
        #           title=item['label'],
 | 
						|
        #           framealpha=0,
 | 
						|
        #           edgecolor='#CAD5E0',
 | 
						|
        #           loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)
 | 
						|
        #           )  
 | 
						|
        axes.grid(b=False,which='major',axis='x')
 | 
						|
        Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in item['chart']['y']],item)
 | 
						|
        plt.close()
 | 
						|
    
 | 
						|
        return figure
 | 
						|
    @staticmethod
 | 
						|
    def scatter(item,**args):
 | 
						|
        return Chart.spline(item,scatter=True)
 | 
						|
class Apex :
 | 
						|
    """
 | 
						|
    This class will format a data-frame to work with Apex charting engine
 | 
						|
    """
 | 
						|
    @staticmethod
 | 
						|
    def apply(item):
 | 
						|
        pointer = item['chart']['type']
 | 
						|
        if hasattr(Apex,pointer) :
 | 
						|
            pointer = getattr(Apex,pointer)
 | 
						|
            options = pointer(item)
 | 
						|
            options['responsive']= [
 | 
						|
                {
 | 
						|
                'breakpoint': 1,
 | 
						|
                'options': {
 | 
						|
                    'plotOptions':item['plotOptions'] if 'plotOptions' in item else None,
 | 
						|
                    
 | 
						|
                }
 | 
						|
                }
 | 
						|
            ]
 | 
						|
            return options
 | 
						|
        else:
 | 
						|
            print ("Oops")
 | 
						|
        pass
 | 
						|
    @staticmethod
 | 
						|
    def scatter(item):
 | 
						|
        options = Apex.spline(item)
 | 
						|
        options['apex']['chart']['type'] = 'scatter'
 | 
						|
        return options
 | 
						|
    @staticmethod
 | 
						|
    def scalar(item):
 | 
						|
        _df = item['data']
 | 
						|
        print (_df)
 | 
						|
        name = _df.columns.tolist()[0]
 | 
						|
        value = _df[name].values.round(2)[0]
 | 
						|
        html = '<div class="scalar"><div class="value">:value</div><div class="label">:label</div></div>'
 | 
						|
        if value > 999 and value < 1000000 :
 | 
						|
            value = " ".join([str(np.divide(value,1000).round(2)),"K"])
 | 
						|
        elif value > 999999  :
 | 
						|
            #@ Think of considering the case of a billion ...
 | 
						|
            value = " ".join([str(np.divide(value,1000000).round(2)),"M"])
 | 
						|
        else:
 | 
						|
            value = str(value)
 | 
						|
        unit = name.replace('_',' ') if 'unit' not in item else item['unit']
 | 
						|
        return {'html':html.replace(':value',value).replace(":label",unit)}
 | 
						|
    @staticmethod
 | 
						|
    def column(item):
 | 
						|
        df = item['data']
 | 
						|
        N = df.shape[0] if df.shape[0] < 10 else 10
 | 
						|
        axis = item['chart']['axis']
 | 
						|
        x = axis['x']
 | 
						|
        if type(x) == list :
 | 
						|
            x = x[0]
 | 
						|
        axis['y'] = [axis['y']] if type(axis['y']) != list else axis['y']
 | 
						|
        series = []
 | 
						|
        for y in axis['y'] :
 | 
						|
            series += [{"data": df[y].values.tolist()[:N],"name":y.upper().replace('_',' ')}]
 | 
						|
        xtitle,ytitle = Apex.get_labels(item)
 | 
						|
        options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":False,"width:":2,"color":["transparent"]}},"dataLabels":{"enabled":False},"legend":{"position":"right"}}
 | 
						|
        options['xaxis'] = {"categories":df[x].values.tolist()[:N],"title":xtitle['title']}
 | 
						|
        options['yaxis'] = ytitle
 | 
						|
        options['series'] = series
 | 
						|
        options['colors']  = COLORS[:df[x].size]
 | 
						|
        return {"apex":options}        
 | 
						|
        # options = Apex.barh(item)
 | 
						|
        # options['chart']['type'] = 'column'
 | 
						|
        # options['plotOptions']['bar'] = {'horizontal':False,'columnWidth':'55%'}
 | 
						|
        # options['stroke']={'show':True,'width':2,'colors':['transparent']}
 | 
						|
        # return {"apex":options}
 | 
						|
    @staticmethod
 | 
						|
    def get_labels(item):
 | 
						|
        xtitle = ytitle = ""
 | 
						|
        if "labels" not in item['chart'] :
 | 
						|
            xtitle = item['chart']['axis']['x']
 | 
						|
            ytitle = item['chart']['axis']['y']
 | 
						|
        else:
 | 
						|
            xtitle = item['chart']['labels']['x']
 | 
						|
            ytitle = item['chart']['labels']['y']
 | 
						|
        xtitle = xtitle if type(xtitle) != list else xtitle[0]
 | 
						|
        ytitle = ytitle if type(ytitle) != list else ytitle[0]
 | 
						|
        return {"title":{"text":xtitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}},{"title":{"text":ytitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}}
 | 
						|
        
 | 
						|
    @staticmethod
 | 
						|
    def bar(item):
 | 
						|
        return Apex.barh(item)
 | 
						|
    @staticmethod
 | 
						|
    def barh(item):
 | 
						|
        """
 | 
						|
        rendering a horizontal bar chart assuming for now that only one series is involved
 | 
						|
        @TODO: alias this with bar (!= column)
 | 
						|
        """
 | 
						|
        df = item['data']
 | 
						|
        N = df.shape[0] if df.shape[0] < 10 else 10
 | 
						|
        axis = item['chart']['axis']
 | 
						|
        y = axis['y']
 | 
						|
        if type(y) == list :
 | 
						|
            y = y[0]
 | 
						|
        axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x']
 | 
						|
        if not set(axis['x']) & set(df.columns.tolist()) :
 | 
						|
            print (set(axis['x']) & set(df.columns.tolist()))
 | 
						|
            print (axis['x'])
 | 
						|
            print (df.columns)
 | 
						|
            # df.columns = axis['x']
 | 
						|
        series = []
 | 
						|
        _min=_max = 0
 | 
						|
        for x in axis['x'] :
 | 
						|
            
 | 
						|
            series += [{"data": df[x].values.tolist()[:N],"name":x.upper().replace('_',' ')}]
 | 
						|
            _min = df[x].min() if df[x].min() < _min else _min
 | 
						|
            _max = df[x].max() if df[x].max() > _max else _max
 | 
						|
        
 | 
						|
        xtitle , ytitle = Apex.get_labels(item)
 | 
						|
        options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":True}},"dataLabels":{"enabled":False},"legend":{"position":"right"}}
 | 
						|
        options['xaxis'] = {"categories":df[y].values.tolist()[:N],"title":xtitle['title']}
 | 
						|
        
 | 
						|
        options['yaxis'] = ytitle
 | 
						|
        options['series'] = series
 | 
						|
        options['colors']  = COLORS[:df[x].size]
 | 
						|
        return {"apex":options}
 | 
						|
        
 | 
						|
    @staticmethod
 | 
						|
    def spline(item):
 | 
						|
        series = []
 | 
						|
 | 
						|
        df = item['data']
 | 
						|
        N = df.shape[0] if df.shape[0] < 10 else 10
 | 
						|
        axis = item['chart']['axis']
 | 
						|
        x = axis['x']
 | 
						|
        _min=_max = 0
 | 
						|
        for y in axis['y'] :
 | 
						|
            series += [{"data":df[y].values[:N].tolist(),"name":y.upper().replace('_',' ')}]
 | 
						|
            _min = df[y].min() if df[y].min() < _min else _min
 | 
						|
            _max = df[y].max() if df[y].max() > _max else _max
 | 
						|
 | 
						|
        colors = COLORS[:len(axis['y'])]    
 | 
						|
        options = {"chart":{"type":"line"},"series":series,"stroke":{"curve":"smooth"},"colors":colors,"legend":{"position":"right"}}
 | 
						|
        xtitle , ytitle = Apex.get_labels(item)
 | 
						|
        
 | 
						|
        options['xaxis'] = {"categories":df[x].values[:N].tolist(),"title":xtitle['title']}
 | 
						|
        options['yaxis'] = ytitle
 | 
						|
 | 
						|
        return {"apex":options}
 | 
						|
    @staticmethod
 | 
						|
    def donut(item):
 | 
						|
        """
 | 
						|
        :pre    data must have more than one item otherwise just make it a scalar
 | 
						|
        here we will use the key as labels and the values as the values (obviously)
 | 
						|
        labels are y-axis
 | 
						|
        values are x-axis
 | 
						|
        """
 | 
						|
        df = item['data']
 | 
						|
        
 | 
						|
        if df.shape [0]> 1 :
 | 
						|
            y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x']
 | 
						|
            labels = df[y_cols].values.tolist()
 | 
						|
            
 | 
						|
            values = df[x_cols].values.round(2).tolist()
 | 
						|
        else:
 | 
						|
            labels = [name.upper().replace('_',' ') for name in df.columns.tolist()]
 | 
						|
            values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
 | 
						|
            
 | 
						|
        colors  = COLORS[:len(values)]
 | 
						|
        options = {"series":values,"colors":colors,"labels":labels,"chart":{"type":"donut"},"plotOptions":{"pie":{"customScale":.8}},"legend":{"position":"right"}}
 | 
						|
        return {"apex":options}
 | 
						|
 | 
						|
        pass
 | 
						|
    
 | 
						|
class engine :
 | 
						|
    """
 | 
						|
    This engine is designed to load the configuration and run the queries given they are remittance or claims
 | 
						|
    @TODO:
 | 
						|
        - make sure the readers of the queries are configurable i.e use data-transport
 | 
						|
    """
 | 
						|
    def __init__(self,path) :
 | 
						|
        """
 | 
						|
        Loading configuration file from a designated location ...
 | 
						|
        """
 | 
						|
        f = open(path) ;
 | 
						|
        _config = json.loads(f.read())
 | 
						|
        self.store_config = _config['store']          
 | 
						|
        self.info   = _config['analytics']
 | 
						|
        _args = self.store_config
 | 
						|
        if self.store_config['type'] == 'mongo.MongoWriter' :
 | 
						|
            _args['type'] = 'mongo.MongoReader'
 | 
						|
        else:
 | 
						|
            _args['type'] = 'disk.SQLiteReader'
 | 
						|
        self.reader = transport.factory.instance(**_args)
 | 
						|
 | 
						|
    def apply (self,**args) :
 | 
						|
        """
 | 
						|
            type: claims or remits
 | 
						|
            filter  optional identifier claims, procedures, taxonomy, ...
 | 
						|
        """
 | 
						|
        _m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
 | 
						|
        # key = '837' if args['type'] == 'claims' else '835'
 | 
						|
        table = _m[ args['type']]
 | 
						|
        analytics = self.info[table]
 | 
						|
        if 'index' in args :
 | 
						|
            index = int(args['index'])
 | 
						|
            analytics = [analytics[index]]
 | 
						|
            
 | 
						|
        _info = list(analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
 | 
						|
        # conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
 | 
						|
        # conn.create_aggregate("stdev",1,stdev)
 | 
						|
        DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
 | 
						|
        r = []
 | 
						|
        for row in _info :
 | 
						|
            
 | 
						|
            for item in row['pipeline'] :
 | 
						|
                # item['data'] = pd.read_sql(item['sql'],conn)
 | 
						|
                query = {DB_TYPE:item[DB_TYPE]}
 | 
						|
                item['data'] = self.reader.read(**item)
 | 
						|
                if 'serialize' in args :
 | 
						|
                    
 | 
						|
                    item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
 | 
						|
                else:
 | 
						|
                    item['data'] = (pd.DataFrame(item['data']))
 | 
						|
                    
 | 
						|
                    
 | 
						|
                # if 'info' in item:
 | 
						|
                #     item['info'] = item['info'].replace(":rows",str(item["data"].shape[0]))
 | 
						|
        # conn.close()
 | 
						|
        
 | 
						|
        return _info
 | 
						|
 | 
						|
    def _html(self,item) :
 | 
						|
 | 
						|
        figure = None
 | 
						|
        df = item['data']
 | 
						|
        label = ['<div class="label">',item['label'],'</div>']
 | 
						|
        text = ['<div class="grid">',df.describe().iloc[:].round(2).to_html().replace('_',' '),'</div>']
 | 
						|
        info = ['<div class="info">',item['info'],'</div>'] if 'info' in item else []
 | 
						|
        if item['chart']['type'] in ['pie','donut','doughnut'] :
 | 
						|
            figure = Chart.donut(item)
 | 
						|
            text = ['<div class="grid">',df.to_html(index=False).replace('_',' '),'</div>']
 | 
						|
        elif item['chart']['type'] == 'scatter' :
 | 
						|
            figure = Chart.scatter(item)
 | 
						|
        elif item['chart']['type'] == 'spline' :
 | 
						|
            figure = Chart.spline(item)
 | 
						|
        elif item['chart']['type'] in ['barh','hbar'] :
 | 
						|
            figure = Chart.barh(item)
 | 
						|
        elif item['chart']['type'] == 'scalar' :
 | 
						|
            
 | 
						|
            figure  = (item['data'].apply(lambda col: '<div class="scalar"><div class="value bold">'+str(col.values[0].round(2))+'</div><div class="value-text">'+col.name.replace('_', ' ')+'</div></div>' ).tolist())
 | 
						|
            label   = text = []
 | 
						|
 | 
						|
            pass
 | 
						|
        if figure and item['chart']['type'] != 'scalar':
 | 
						|
            stream = io.BytesIO()
 | 
						|
            figure.savefig(stream,format='png',dpi=300,quality=95, bbox_inches = "tight",transparent=True)
 | 
						|
            stream.seek(0)
 | 
						|
            stream = base64.b64encode(stream.getvalue()).decode("utf-8")
 | 
						|
            stream = "data:image/png;base64,"+stream
 | 
						|
            figure = ['<div class="figure"><img src="'+stream+'">',"</div>"]
 | 
						|
            
 | 
						|
            # figure.canvas.draw()
 | 
						|
            # figure = "".join( map(chr,figure.canvas.tostring_argb())) #--bytes
 | 
						|
        # else:
 | 
						|
            # figure = [ ]
 | 
						|
        if item['chart']['type'] != 'scalar':
 | 
						|
            return ['<div class="frame"><div class="chart '+ item['chart']['type']+'">'] + [ " ".join(row) for row in [label,figure,text,info] if row] + ["</div></div>"]
 | 
						|
        else:
 | 
						|
            return [ " ".join(row) for row in [label,figure,text,info] if row] 
 | 
						|
        pass
 | 
						|
    def _csv(self,item):
 | 
						|
 | 
						|
        pass
 | 
						|
    def export(self,item,format):
 | 
						|
        """
 | 
						|
        We have a pipeline here and we should attempt to build a figure using seaborn within an html template using jinja2
 | 
						|
        This is considered a page (or an item) of an analysis where we will have both data and rendering information with accompanying text
 | 
						|
        """
 | 
						|
        html = []
 | 
						|
        for row in item['pipeline'] :
 | 
						|
            p = [ "<h2>",row['label'].replace('_',' '),"</h2>"]
 | 
						|
            y_label = [name for name in row['data'].columns if 'count' in name]
 | 
						|
            x_label = list(set(row['data'].columns) - set(y_label))
 | 
						|
            N = row.shape[0]
 | 
						|
            if 'info' in row :
 | 
						|
                p += ["<div class='info'>",row['info'],'</div>']
 | 
						|
            
 | 
						|
        pass
 | 
						|
 | 
						|
class LogAnalytics :
 | 
						|
    def __init__(self,path):
 | 
						|
        logs = open(path).read().split('\n')        
 | 
						|
        logs = [json.loads(row) for row in logs if row.strip() != '']
 | 
						|
        self.remits = {
 | 
						|
            "completed": np.sum([1 for row in logs if row['completed'] == True]),
 | 
						|
            "files":len(logs)
 | 
						|
        }
 | 
						|
        
 | 
						|
# m = LogAnalytics('/home/steve/healthcare-io/remits.log')
 | 
						|
 | 
						|
 | 
						|
css = """
 | 
						|
    <meta charset="utf-8">            
 | 
						|
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
 | 
						|
 | 
						|
    <title>HealthcareIO - :title </title>
 | 
						|
    <style>
 | 
						|
        body{
 | 
						|
            padding:8px;
 | 
						|
            padding-left:4%;
 | 
						|
            padding-right:4%;
 | 
						|
            
 | 
						|
 | 
						|
        }
 | 
						|
        .pane{
 | 
						|
            padding:4px;
 | 
						|
            display:grid;
 | 
						|
            gap:16px;
 | 
						|
            grid-template-columns:repeat(2,1fr) ;
 | 
						|
            
 | 
						|
            
 | 
						|
            }
 | 
						|
 | 
						|
        .numbers {
 | 
						|
            display:grid;
 | 
						|
            grid-template-columns:repeat(2,1fr);
 | 
						|
            gap:16px;
 | 
						|
            /*padding:2px;*/
 | 
						|
            /*border:1px solid #CAD5E0;*/
 | 
						|
            
 | 
						|
        }
 | 
						|
        .numbers .scalar {
 | 
						|
            padding:8px;
 | 
						|
            background-image: linear-gradient(to bottom, #f3f3f3,#d3d3d3, #ffffff);
 | 
						|
            border:1px solid #CAD5E0;
 | 
						|
            font-family:sans-serif;
 | 
						|
            text-transform:capitalize;
 | 
						|
            text-align:right;
 | 
						|
            font-size:12px;
 | 
						|
            display:grid;
 | 
						|
            grid-template-rows:auto 28px; gap:2px;
 | 
						|
            
 | 
						|
        }
 | 
						|
        .numbers .scalar .value-text {
 | 
						|
            border-top:1px solid #CAD5E0;
 | 
						|
            padding:8px;
 | 
						|
            font-weight:bold;
 | 
						|
            align-items:center;
 | 
						|
            font-size:14px;
 | 
						|
            display:grid;
 | 
						|
            
 | 
						|
            
 | 
						|
        }
 | 
						|
 | 
						|
        .numbers .scalar .value {
 | 
						|
            display:grid;
 | 
						|
            color:#004b79;
 | 
						|
            align-content:center;
 | 
						|
            font-size:48px; text-align:right; font-weight:bold;}
 | 
						|
        .frame {
 | 
						|
            background-image: linear-gradient(to bottom, #f3f3f3,#d3d3d3, #ffffff);
 | 
						|
            padding:2px;
 | 
						|
            border:1px solid #CAD5E0;
 | 
						|
            
 | 
						|
        }
 | 
						|
        .figure {grid-area:figure; width:500px; height:350px; display:grid; align-items:center}
 | 
						|
        .info {height:28px;  width:100%; grid-area:info;
 | 
						|
            display:grid;
 | 
						|
            align-items:center;
 | 
						|
            text-align:center; text-transform:capitalize; padding:4px; font-size:12px; font-family:sans-serif; border-top:1px solid #CAD5E0;}
 | 
						|
        .grid {grid-area:grid; }
 | 
						|
        .label {grid-area:label; font-weight:bold; font-size: 22px; text-align:center; text-transform:capitalize}
 | 
						|
        .chart {
 | 
						|
            padding:4px;
 | 
						|
            padding:8px;
 | 
						|
            display:grid; grid-template-areas:
 | 
						|
                "label  label   label"
 | 
						|
                "figure grid    grid"
 | 
						|
                "info   info    info" ;
 | 
						|
                
 | 
						|
            gap:2px;
 | 
						|
            
 | 
						|
        }
 | 
						|
        img {height:auto; max-width:100% ;}
 | 
						|
        table {width:100%; border-collapse: collapse;}
 | 
						|
        table , TH, TD{ font-size:14px; padding:8px; font-family:sans-serif; border:1px; border:1px solid #CAD5E0;}
 | 
						|
        table thead, tbody th { padding:4px; text-transform:capitalize; background-color:#4682B4; color:#ffffff; text-align:center}
 | 
						|
        table thead tr th {text-align:center}
 | 
						|
        table tbody td {text-align:right; font-weight: lighter}
 | 
						|
        table tbody tr:nth-child(odd) {background: #95bce0}
 | 
						|
        table tbody tr:nth-child(even) {background: #c8e5ff}
 | 
						|
        
 | 
						|
        
 | 
						|
    </style>
 | 
						|
"""    
 | 
						|
# folder = '/home/steve/.healthcareio/config.json'
 | 
						|
# e = engine(path=folder)
 | 
						|
# p = e.apply(type='claims')
 | 
						|
# values = []
 | 
						|
# html = [css]
 | 
						|
# for row in p :
 | 
						|
#     frame = []
 | 
						|
#     for item in row['pipeline'] :
 | 
						|
#         if row['pipeline'].index(item) == 0 :
 | 
						|
#             if item['chart']['type'] != 'scalar' :
 | 
						|
#                 # frame = ['<div class="frame">']
 | 
						|
#                 pass
 | 
						|
#             else:
 | 
						|
#                 frame = ['<div><div class="numbers">']
 | 
						|
        
 | 
						|
#         frame += e._html(item) #p[3]['pipeline'][0])
 | 
						|
#     frame   += ['</div></div>'] if item['chart']['type'] == 'scalar' else []
 | 
						|
#     html    += frame
 | 
						|
 | 
						|
# html = '<div class="pane">' + "\n".join(html) + "</div></div>"
 | 
						|
# f = open('out.html','w')
 | 
						|
# f.write(html.replace(":title","Claims"))
 | 
						|
# 
 | 
						|
# HTML(string=html).write_pdf('out.pdf',stylesheets=[CSS(string=css)])
 | 
						|
# x.write_pdf('./out.pdf')
 | 
						|
# print (p[2]['pipeline'][0]['data'])
 | 
						|
# e.export (p[0])
 | 
						|
# features = ['diagnosis.code']
 | 
						|
# split(folder = folder, features=features)
 | 
						|
 |