From 0a0e6064569e2b0f493506d1759546255f1a9636 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 10 Nov 2020 13:21:56 -0600 Subject: [PATCH] new design/architecture --- src/setup.py | 37 ++++++++++ src/smart/__init__.py | 49 +++++++++++++ src/smart/alert/__init__.py | 15 ++++ src/smart/folder/__init__.py | 63 +++++++++++++++++ src/smart/folder/__main__.py | 2 + src/smart/logger/__init__.py | 52 ++++++++++++++ src/smart/top/.__init__.py.swp | Bin 0 -> 16384 bytes src/smart/top/__init__.py | 124 +++++++++++++++++++++++++++++++++ src/smart/top/__main__.py | 5 ++ src/smart/view.py | 45 ++++++++++++ 10 files changed, 392 insertions(+) create mode 100644 src/setup.py create mode 100644 src/smart/__init__.py create mode 100644 src/smart/alert/__init__.py create mode 100644 src/smart/folder/__init__.py create mode 100644 src/smart/folder/__main__.py create mode 100644 src/smart/logger/__init__.py create mode 100644 src/smart/top/.__init__.py.swp create mode 100644 src/smart/top/__init__.py create mode 100644 src/smart/top/__main__.py create mode 100644 src/smart/view.py diff --git a/src/setup.py b/src/setup.py new file mode 100644 index 0000000..c5220ca --- /dev/null +++ b/src/setup.py @@ -0,0 +1,37 @@ +""" +""" +from setuptools import setup, find_packages +import os +import sys +def read(fname): + return open(os.path.join(os.path.dirname(__file__), fname)).read() +args = { + "name":"smart-top", + "version":"1.0.0", + "author":"The Phi Technology LLC","author_email":"info@the-phi.com", + "license":"MIT", + "packages":["smart"]} +args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] +args["install_requires"] = ['pandas','numpy','requests'] +args["url"] = "https://dev.the-phi.com/git/steve/smart-top.git" + +if sys.version_info[0] == 2 : + args['use_2to3'] = True + args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import'] +setup(**args) +# setup( +# name = "data-transport", +# version = "1.0", +# author = "The Phi Technology LLC", +# author_email = "steve@the-phi.com", +# license = "MIT", +# packages=['transport'], +# keywords=['mongodb','couchdb','rabbitmq','file','read','write','s3'], +# install_requires = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open'], +# url="https://dev.the-phi.com/git/steve/data-transport.git", +# use_2to3=True, +# long_description=read('README.md'), +# convert_2to3_doctests=['README.md'], +# #use_2to3_fixers=['your.fixers'], +# use_2to3_exclude_fixers=['lib2to3.fixes.fix_import'], +# ) diff --git a/src/smart/__init__.py b/src/smart/__init__.py new file mode 100644 index 0000000..bb40640 --- /dev/null +++ b/src/smart/__init__.py @@ -0,0 +1,49 @@ +""" +This framework allows data to be logged to a given data store i.e : + - disk, cloud (google, dropbox, box, sugarsync or s3) or a queue server +The intent of the framework is to work as a standalone or embedded in code as a logging framework +usage: + +dependencies : + data-transport pip install git+https://dev.the-phi.com/git/steve/data-transport.git +""" +import smart.top +import smart.folder +import smart.logger +# from transport import factory + +# class logger : +# """ +# This class is a basic logger, it will log data regardless of the types of data, We will have subclasses that will implement various data extraction schemas: +# - processes (top), + +# """ +# def __init__(self,**args): +# """ +# :store data store (disk,mongo,couch,google,dropbox) +# :args arguments to pass for the data-store (read transport documentation) +# :notify function that returns true/false for notification +# """ + +# self.store = factory.instance(type=store,args=args['args']) +# if 'notify' in args : +# self.notify = args +# pass + +# def log(self,row): +# """ +# This function will log data to a data store +# :row row to be stored +# """ +# self.store.write(row=row) +# if(hasattr(self,'notify')): +# if (self.notify(row)) : +# # +# # Let us notify the backend by generating a report and submitting it +# # +# stream = self.get.report() +# pass +# else: +# pass +# def report(self) : + diff --git a/src/smart/alert/__init__.py b/src/smart/alert/__init__.py new file mode 100644 index 0000000..b513d3c --- /dev/null +++ b/src/smart/alert/__init__.py @@ -0,0 +1,15 @@ +""" +This file will submit an alert to either a mailbox given a set of parameters, this will perform as following : + - as-a-service + - embedded +""" + +check = None +def post(**args): + """ + This function will submit a report to a given target provided some input + :key will perform as-a-service + :data data that will be submitted to smtp/queue server + :smtp will send the file to a mailbox + """ + pass \ No newline at end of file diff --git a/src/smart/folder/__init__.py b/src/smart/folder/__init__.py new file mode 100644 index 0000000..45c8f75 --- /dev/null +++ b/src/smart/folder/__init__.py @@ -0,0 +1,63 @@ +""" +This file is designed to retrieve information on a folder +{files,size,hash} +""" +import subprocess +import sys +import re +import os +import pandas as pd +import io +import datetime +class Util : + def size(self,stream): + + + PATTERN = '(^.+)([A-Z]+$)' + value,units = re.match('^(.+)([A-Z]+$)',stream).groups() + value = float(value) + if 'G' == units : + value *= 1000 + elif 'K' == units: + value /= 1000 + units = 'MB' + return {"size":value,"units":units} + def content(self,stream): + return {"content":stream.split(' ')[0].strip()} + + +def read(**args): + """ + The path can also take in regular expressions + """ + cmd = {"size":"du -sh :path","content":"find :path -type f -exec md5sum {} + | sort -z|md5sum"} + r = {} + util = Util() + for key in cmd : + _cmd = cmd[key] + handler = subprocess.Popen(_cmd.replace(':path',args['path']),shell=True,stdout=subprocess.PIPE,encoding='utf-8') + stream = handler.communicate()[0] + + if sys.version_info[0] > 2 : + rows = str(stream).split('\n') + else: + rows = stream.split('\n') + if key == 'size' : + rows = rows[0] + rows = util.size(rows.split('\t')[0]) + elif key == 'content' : + # + # There is a hash key that is generated and should be extracted + rows = rows[0] + rows = util.content(rows) + + r = dict(r, **rows) + r['path'] = args['path'] + r['name'] = args['path'].split(os.sep)[-1:][0] + r['node'] = os.uname()[1] + r['date'] = datetime.datetime.now().strftime('%m-%d-%Y') + r['time'] = datetime.datetime.now().strftime('%H:%M:%S') + + return pd.DataFrame([r]) + + pass diff --git a/src/smart/folder/__main__.py b/src/smart/folder/__main__.py new file mode 100644 index 0000000..1166748 --- /dev/null +++ b/src/smart/folder/__main__.py @@ -0,0 +1,2 @@ +import smart.folder +print (smart.folder.read(path='/home/steve/dev/data/vumc/aou')) \ No newline at end of file diff --git a/src/smart/logger/__init__.py b/src/smart/logger/__init__.py new file mode 100644 index 0000000..05496d5 --- /dev/null +++ b/src/smart/logger/__init__.py @@ -0,0 +1,52 @@ +import pandas as pd +import numpy as np +import transport +import datetime +import io +import json +import requests + +def subscribe (self,**args) : + """ + This function will subscribe an email to a given service (report,notification). If already susbcribed no further action will be performed + :email provide a valid email for the free plan. Upgrades will be done via the website + :id service identifier accepted values are GOOGLE_DRIVE,DROPBOX,BOX,ONE_DRIVE + + """ + url = "https://the-phi.com/store/smart-top/subscribe" + SERVICES=['GOOGLE','DROPBOX','BOX','ONE_DRIVE'] + if args['id'].upper() in SERVICES : + data = {"email":args['email']} + requests.post(url,data=data) + pass + +def log(**args) : + """ + This function will write to a designated location provided a set of inputs + :store mongo,file,couch,api + """ + # + # @TODO: Provide facility to write to a given cloud store (google,one-drive ...) + # This will have to be supported by some sort of subscription service + # + STORE_MAP = {"mongo":"MongoWriter","disk":"DiskWriter","couch":"CouchWriter",'sqlite':'SQLiteWriter'} + if 'store' not in args : + _id = 'console' + else: + _id = 'disk' if args['store'] == 'file' else args['store'] + _id = 'disk' if _id == 'sqlite' else _id + if _id == 'console' : + """ + We are going to print whatever we have to the console ... using the tool in cli mode + """ + print() + print (args['data']) + print () + # stream = args['memory'] + # stream.write(json.dumps(args['row']) if isinstance(args['row'],dict) else args['row']) + # stream.write("\n") + else: + store_type = ".".join([args['store'],STORE_MAP[_id]]) + store_args = args['params'] + store = transport.factory.instance(type=store_type,args=store_args) + store.write( args['row']) \ No newline at end of file diff --git a/src/smart/top/.__init__.py.swp b/src/smart/top/.__init__.py.swp new file mode 100644 index 0000000000000000000000000000000000000000..0728c9b03aaa40a4f24a9fa7bf3a8e651166f7c3 GIT binary patch literal 16384 zcmeHOTZkn`8LnK^T{Xri`rrjBXLi$l^qihOYeteY9^KKIU58|6hMgJL8_sB zJAJdOdS`}pBVzPH5Rrfn0SP_`5igJjuP-qu_z(~U6~y@9Q^Y4Bf+%YI{_5`2eP;He z;DbmveAC_4e^veU)n9*A{dH#N%*U25@`tP*!}VsyFVw zZoW1QGz{F9fqU5d&i2lb*i7qw{?@l_-Zn=g&@j+2&@j+2&@j+2&@j+2&@j+2@c+Yr zO73KzLUZmcn&cJtJC596Ev~m0>AOe17q88&VW454VW454VW454VW454VW454VW454 zVW45)e~ia0OTg!V zeP9ppfJcFIz{~eA_FLc!z(;^{00VyTM#jDhJOL!YgTO0qVC;{;kAY`^r+~YFzr3EY zKLEb~eh54Vd>M#=CE#7aZ%#7yHQ*^=5BLypAMnZv#(odH1pFL$9{3&*0uKTm;HR%+ z>^s0Wff3LJP5>{xma(4!&jQZ?PXar@V*m$Uehp*a06q__0WILqcSA>D6<7h5fO+88 zcQN)w;IqJ!z!>-d@NVEA2*A7ud>!}X4_xmT@F`Ae7`|TXHoP+)j5T$FDfD5MTCvHRg82FD}k;$~m^r&b_}X zZt~=Me8AJmE3aKCeX;B$5s_=EFlir?@;#1p3xy|QcxIdTk4fM&N^!lR>sgT<3O*Qc zx?57Eeqy$_r*e4ER7?4hzRM#el17PZE#l`Y=TnqoPOC&&B}9mbp+RrPwk4CmSEkW1 zDp{zVf90r0v(|?p6<3|}a~rFRt9`!Uy4>>vB~m^Xso*LG=aCzTGBu94>w6x)MXKz9 zsz82Ofd4oPCZ$ks>2}5-ka{jj&|}nt0xF>%fnVvkq@AwMOohtQ$d7ilw3h_|4`Ot7 z)uc-6swJSjw1&aZNPHJ0WKyKKg(AdXC&}D*p0THKK%ubQwOCk_)^S?e&rXm@$l4BH(klEHy9w$n# z{A|r1Wvtbtb4w!|#LB6qcHltCio+ z<$i%*%gaYaC^=QS{7iPVi|J?w*Wpw->b|bE+!*_kds*Zt@QbMe{v;78b$#xR9F^Gt zO){?Ja8U(U@JG3IAd|H6u|TT0!qc)QHKtwbA~2z=(|_;+De_|&CX*COr7JAw8Q4f= zjP#`#QAj7R64;I~L3XFrYPZ47nd$1tEYTaL?wEY=Qh|D0(R9P8#QK%X({L?J@rWZJ z+2e*xsg(1*FeMRqq`Qg1q~vhn(uMWW^2*}U$Kf#;$<;69DcfmqBiAp_x3PS2smNCq zvE9C55olnZsbA%)Tya^S7J=xk+Xi-_l(hWF6$h2us?&--pEx*@tufBZ$|{{vW3|kv z^h5I8L|Qw~O5@U;sjz9wL>nY#bebaF*vUegUUM75?6`BNKZpY?inJXUlPUQD%+7_N zUPE;**BOBL(CUtm+c<5+T85r|MyK-}dg%4@g z>yTHWF>%}#P0FV>lL+`}@nX8kJ=;)s3C%m`BmJz+os5we9o!}R^82&C^hA^83PT)u%Knn ze^3z2v2e?zATr*7%D(3B##BzTUJbaM@&V_)lh6zFVc9UY#DOoBG>`UGzlX7^9fh9$ zi`lK)n=n+`5$Sk7pMYsO2gec6j74r(n+%t6sAq=U>NVN4zK89>qA+R0Axk(NddE+- zCATc=SS4hv7 zZ2EopVJTX!UCQ@ZmQU5pS*Ao;m>hDnBudJp1Vau9)H;`y1Z`mHi*b$l34g$lo}VI@ z{0^4gQec6M;eb&br+bp+3v^9D7vB?ZLY~}3j3teWMJU`_{>+fPJgi{LqJ@O!lb{rD4(eyM&;w!cZ6;??f?7OcYhn8{eOAB|1|dbGr%vgfB!!43E**H9_Rso z!~XpR;48qVflXioco;B%cK~k#{z`j#;73Ir-=cb&t6`vFpkbh4pkbh4pkbh4pkbh4 zpkd&)4A2%<$0KnpVY_A#qnhC#7PPr>{S^O1931N14#FxX_EI8^5KcspvfK&fyX+Z; zt=wotd)(284u*`;D8$BWG%|`9(`09xVAoo`=wPTR@>0ZsBND}z^o{n>*iRzvJdrLU zGB|ut@yPWZoJyoV0!ae{ipHrcYXJzfj1E?IMQYOaZQy!UJfQl900YHIhDI)0I|iWu zMv+KS3>{6O?d7yMoeQkQk*EdIX6W>%(A37(5_@258snWS;8JokiuHD~>roJ(7ObyM zeMmdH^l=`}R7N=Yk<$ShBAZfc)l!Z;4JQ?-6cGbuR>wwQOs68W@c_2oN(Xp{1{Gq| zW0TZuoj7T96%eCCAAP_C)+Wys1Pu<&zz%m@n_snqO!WEs#=^$ty0wdXbV|qATzO<= z_40~Q3xnoDDC?SW$>T$d+E-ftLaS@M@TVGzs%R0S0y0o-L&q4h~ z)?O~-d2QiBUepF&bWztC5kwa>FG6F(B3xpH?i`(B 0 else " ".join(stream) + + if ' ' in cmd.split(os.sep)[len(cmd.split(os.sep))-1] : + p = cmd.split(os.sep)[len(cmd.split(os.sep))-1].split(' ') + name = p[0] + args = " ".join(p[1:]) + else: + name = cmd.split('/')[len(cmd.split(os.sep))-1] + args = " ".join(stream[index:]) if index > 0 else "" + + return [name,cmd,args] + def parse(self,rows,xchar=';'): + """ + This function parses the document returned by the execution of the command returns a document that will have to be parsed and formatted + """ + m = [] + TIME_INDEX = 5 + ARGS_INDEX = 6 + + for item in rows : + if rows.index(item) != 0 : + parts = item.split(xchar) + row = parts[:TIME_INDEX] + row.append(' '.join(parts[TIME_INDEX:ARGS_INDEX])) + row += self.app(parts[ARGS_INDEX:]) + else: + row = item.split(xchar) + row = (xchar.join(row)).strip() + if len(row.replace(";","")) > 0 : + m.append(row) + return m + + +def read(args) : + """ + This function will perform the actual reads of process informations. + @return {user,pid,start,status, name, args, mem,cpu} + """ + cmd = "ps -eo pid,user,pmem,pcpu,stat,etime,args|awk 'OFS=\";\" {$1=$1; if($5 > 9) print }'" + xchar = ";" + try: + handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) + stream = handler.communicate()[0] + if sys.version_info[0] > 2 : + rows = str(stream).split('\\n') + else: + rows = stream.split('\n') + + formatter = Util() + m = formatter.parse(rows) + + d = datetime.datetime.now().strftime('%m-%d-%Y') + t = datetime.datetime.now().strftime('%H:%M:%S') + m = [item for item in m if len(item) != len (m[0])] + m = "\n".join(m[1:]) + df = pd.read_csv(pd.compat.StringIO(m),sep=xchar) + df['date'] = np.repeat(d,df.shape[0]) + df['time'] = np.repeat(t,df.shape[0]) + df['node'] = np.repeat(os.uname()[1],df.shape[0]) + df.columns =['pid','user','mem','cpu','status','started','name','cmd','args','date','time','node'] + + + # + # We should filter the name of the apps we are interested in here (returning the full logs ) + # @TODO: Add filter here to handle filter on different columns + # + + if 'name' in args : + names = args['name'].split(',') + r = pd.DataFrame() + for name in names : + tmp = df[df.name == name.strip()] + if tmp.shape[0] : + r = r.append(tmp) + df = r + # + # For security reasons lets has the args columns with an MD5 or sha256 + # + + + df.args = [hashlib.md5(str(value).encode('utf-8')).hexdigest() for value in df.args.tolist()] + STATUS = {'R':'RUNNING','Z':'DEAD','D':'STASIS','S':'SLEEP','Sl':'SLEEP','Ss':'SLEEP','W':'PAGING','T':'DEAD'} + df.status = df.status.apply(lambda value: STATUS.get(value,'UNKNOWN')) + if 'cols' in args : + _cols = list(set(df.columns.tolist()) & set(args['cols'])) + if _cols : + df = df[_cols] + # + # we return a list of objects (no data-frames) + if 'logger' in args and args['logger'] != None : + logger = args['logger'] + logger(data=df) + return df.to_dict(orient='records') + + except Exception as e: + print (e) + pass + +if __name__ == '__main__' : + # + # Being directly called (external use of the ) + print(read()) diff --git a/src/smart/top/__main__.py b/src/smart/top/__main__.py new file mode 100644 index 0000000..4871de6 --- /dev/null +++ b/src/smart/top/__main__.py @@ -0,0 +1,5 @@ +import smart.top +import pandas as pd +df = pd.DataFrame (smart.top.read(name='firefox,code')) + +print (df.groupby(['user'])['cpu','mem'].sum()) \ No newline at end of file diff --git a/src/smart/view.py b/src/smart/view.py new file mode 100644 index 0000000..7cca938 --- /dev/null +++ b/src/smart/view.py @@ -0,0 +1,45 @@ +""" +""" +import smart +import sys +import json +import pandas as pd +import numpy as np +from multiprocessing import Process + + +SYS_ARGS = {} +if len(sys.argv) > 1: + + N = len(sys.argv) + for i in range(1,N): + value = None + if sys.argv[i].startswith('--'): + key = sys.argv[i][2:] #.replace('-','') + SYS_ARGS[key] = 1 + if i + 1 < N: + value = sys.argv[i + 1] = sys.argv[i+1].strip() + if key and value: + SYS_ARGS[key] = value + + + i += 2 + +# +# Let's determine what data to return ... +# --folder and/or --app +def format(prefix,pointer,logger,container): + return [{'args':{prefix:term.strip(),'logger':logger},'pointer':pointer} for term in container] + +folders = format('path',smart.folder.read,smart.logger.log,SYS_ARGS['folders'].split(',')) if 'folders' in SYS_ARGS else [] +apps = format('name',smart.top.read,smart.logger.log,SYS_ARGS['apps'].split(',')) if 'apps' in SYS_ARGS else [] +if 'cols' in SYS_ARGS : + cols = [name.strip() for name in SYS_ARGS['cols'].split(',')] +else: + cols = [] +nodes = folders + apps +for node in nodes : + if cols : + node['args']['cols'] = cols + pthread = Process(target=node['pointer'],args=(node['args'],)) + pthread.start() \ No newline at end of file