new design/architecture

data-collector
Steve L. Nyemba 4 years ago
parent 1a9c4b6630
commit 0a0e606456

@ -0,0 +1,37 @@
"""
"""
from setuptools import setup, find_packages
import os
import sys
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {
"name":"smart-top",
"version":"1.0.0",
"author":"The Phi Technology LLC","author_email":"info@the-phi.com",
"license":"MIT",
"packages":["smart"]}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pandas','numpy','requests']
args["url"] = "https://dev.the-phi.com/git/steve/smart-top.git"
if sys.version_info[0] == 2 :
args['use_2to3'] = True
args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']
setup(**args)
# setup(
# name = "data-transport",
# version = "1.0",
# author = "The Phi Technology LLC",
# author_email = "steve@the-phi.com",
# license = "MIT",
# packages=['transport'],
# keywords=['mongodb','couchdb','rabbitmq','file','read','write','s3'],
# install_requires = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open'],
# url="https://dev.the-phi.com/git/steve/data-transport.git",
# use_2to3=True,
# long_description=read('README.md'),
# convert_2to3_doctests=['README.md'],
# #use_2to3_fixers=['your.fixers'],
# use_2to3_exclude_fixers=['lib2to3.fixes.fix_import'],
# )

@ -0,0 +1,49 @@
"""
This framework allows data to be logged to a given data store i.e :
- disk, cloud (google, dropbox, box, sugarsync or s3) or a queue server
The intent of the framework is to work as a standalone or embedded in code as a logging framework
usage:
dependencies :
data-transport pip install git+https://dev.the-phi.com/git/steve/data-transport.git
"""
import smart.top
import smart.folder
import smart.logger
# from transport import factory
# class logger :
# """
# This class is a basic logger, it will log data regardless of the types of data, We will have subclasses that will implement various data extraction schemas:
# - processes (top),
# """
# def __init__(self,**args):
# """
# :store data store (disk,mongo,couch,google,dropbox)
# :args arguments to pass for the data-store (read transport documentation)
# :notify function that returns true/false for notification
# """
# self.store = factory.instance(type=store,args=args['args'])
# if 'notify' in args :
# self.notify = args
# pass
# def log(self,row):
# """
# This function will log data to a data store
# :row row to be stored
# """
# self.store.write(row=row)
# if(hasattr(self,'notify')):
# if (self.notify(row)) :
# #
# # Let us notify the backend by generating a report and submitting it
# #
# stream = self.get.report()
# pass
# else:
# pass
# def report(self) :

@ -0,0 +1,15 @@
"""
This file will submit an alert to either a mailbox given a set of parameters, this will perform as following :
- as-a-service
- embedded
"""
check = None
def post(**args):
"""
This function will submit a report to a given target provided some input
:key will perform as-a-service
:data data that will be submitted to smtp/queue server
:smtp will send the file to a mailbox
"""
pass

@ -0,0 +1,63 @@
"""
This file is designed to retrieve information on a folder
{files,size,hash}
"""
import subprocess
import sys
import re
import os
import pandas as pd
import io
import datetime
class Util :
def size(self,stream):
PATTERN = '(^.+)([A-Z]+$)'
value,units = re.match('^(.+)([A-Z]+$)',stream).groups()
value = float(value)
if 'G' == units :
value *= 1000
elif 'K' == units:
value /= 1000
units = 'MB'
return {"size":value,"units":units}
def content(self,stream):
return {"content":stream.split(' ')[0].strip()}
def read(**args):
"""
The path can also take in regular expressions
"""
cmd = {"size":"du -sh :path","content":"find :path -type f -exec md5sum {} + | sort -z|md5sum"}
r = {}
util = Util()
for key in cmd :
_cmd = cmd[key]
handler = subprocess.Popen(_cmd.replace(':path',args['path']),shell=True,stdout=subprocess.PIPE,encoding='utf-8')
stream = handler.communicate()[0]
if sys.version_info[0] > 2 :
rows = str(stream).split('\n')
else:
rows = stream.split('\n')
if key == 'size' :
rows = rows[0]
rows = util.size(rows.split('\t')[0])
elif key == 'content' :
#
# There is a hash key that is generated and should be extracted
rows = rows[0]
rows = util.content(rows)
r = dict(r, **rows)
r['path'] = args['path']
r['name'] = args['path'].split(os.sep)[-1:][0]
r['node'] = os.uname()[1]
r['date'] = datetime.datetime.now().strftime('%m-%d-%Y')
r['time'] = datetime.datetime.now().strftime('%H:%M:%S')
return pd.DataFrame([r])
pass

@ -0,0 +1,2 @@
import smart.folder
print (smart.folder.read(path='/home/steve/dev/data/vumc/aou'))

@ -0,0 +1,52 @@
import pandas as pd
import numpy as np
import transport
import datetime
import io
import json
import requests
def subscribe (self,**args) :
"""
This function will subscribe an email to a given service (report,notification). If already susbcribed no further action will be performed
:email provide a valid email for the free plan. Upgrades will be done via the website
:id service identifier accepted values are GOOGLE_DRIVE,DROPBOX,BOX,ONE_DRIVE
"""
url = "https://the-phi.com/store/smart-top/subscribe"
SERVICES=['GOOGLE','DROPBOX','BOX','ONE_DRIVE']
if args['id'].upper() in SERVICES :
data = {"email":args['email']}
requests.post(url,data=data)
pass
def log(**args) :
"""
This function will write to a designated location provided a set of inputs
:store mongo,file,couch,api
"""
#
# @TODO: Provide facility to write to a given cloud store (google,one-drive ...)
# This will have to be supported by some sort of subscription service
#
STORE_MAP = {"mongo":"MongoWriter","disk":"DiskWriter","couch":"CouchWriter",'sqlite':'SQLiteWriter'}
if 'store' not in args :
_id = 'console'
else:
_id = 'disk' if args['store'] == 'file' else args['store']
_id = 'disk' if _id == 'sqlite' else _id
if _id == 'console' :
"""
We are going to print whatever we have to the console ... using the tool in cli mode
"""
print()
print (args['data'])
print ()
# stream = args['memory']
# stream.write(json.dumps(args['row']) if isinstance(args['row'],dict) else args['row'])
# stream.write("\n")
else:
store_type = ".".join([args['store'],STORE_MAP[_id]])
store_args = args['params']
store = transport.factory.instance(type=store_type,args=store_args)
store.write( args['row'])

Binary file not shown.

@ -0,0 +1,124 @@
"""
This file contains class and functions that extract data from running processes like top and stores them into a data store of the calling codes choice
dependencies:
- top (on the os)
@TODO:
Test this thing on windows to see if it works
"""
import pandas as pd
import numpy as np
import subprocess
import os
import datetime
# from transport import factory
import sys
import hashlib
class Util:
def app(self,stream):
"""
Formatting application name, sometimes the name has parameters os separators ...
"""
index = 1 if os.path.exists(" ".join(stream[:1])) else len(stream)-1
cmd = " ".join(stream[:index]) if index > 0 else " ".join(stream)
if ' ' in cmd.split(os.sep)[len(cmd.split(os.sep))-1] :
p = cmd.split(os.sep)[len(cmd.split(os.sep))-1].split(' ')
name = p[0]
args = " ".join(p[1:])
else:
name = cmd.split('/')[len(cmd.split(os.sep))-1]
args = " ".join(stream[index:]) if index > 0 else ""
return [name,cmd,args]
def parse(self,rows,xchar=';'):
"""
This function parses the document returned by the execution of the command returns a document that will have to be parsed and formatted
"""
m = []
TIME_INDEX = 5
ARGS_INDEX = 6
for item in rows :
if rows.index(item) != 0 :
parts = item.split(xchar)
row = parts[:TIME_INDEX]
row.append(' '.join(parts[TIME_INDEX:ARGS_INDEX]))
row += self.app(parts[ARGS_INDEX:])
else:
row = item.split(xchar)
row = (xchar.join(row)).strip()
if len(row.replace(";","")) > 0 :
m.append(row)
return m
def read(args) :
"""
This function will perform the actual reads of process informations.
@return {user,pid,start,status, name, args, mem,cpu}
"""
cmd = "ps -eo pid,user,pmem,pcpu,stat,etime,args|awk 'OFS=\";\" {$1=$1; if($5 > 9) print }'"
xchar = ";"
try:
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
stream = handler.communicate()[0]
if sys.version_info[0] > 2 :
rows = str(stream).split('\\n')
else:
rows = stream.split('\n')
formatter = Util()
m = formatter.parse(rows)
d = datetime.datetime.now().strftime('%m-%d-%Y')
t = datetime.datetime.now().strftime('%H:%M:%S')
m = [item for item in m if len(item) != len (m[0])]
m = "\n".join(m[1:])
df = pd.read_csv(pd.compat.StringIO(m),sep=xchar)
df['date'] = np.repeat(d,df.shape[0])
df['time'] = np.repeat(t,df.shape[0])
df['node'] = np.repeat(os.uname()[1],df.shape[0])
df.columns =['pid','user','mem','cpu','status','started','name','cmd','args','date','time','node']
#
# We should filter the name of the apps we are interested in here (returning the full logs )
# @TODO: Add filter here to handle filter on different columns
#
if 'name' in args :
names = args['name'].split(',')
r = pd.DataFrame()
for name in names :
tmp = df[df.name == name.strip()]
if tmp.shape[0] :
r = r.append(tmp)
df = r
#
# For security reasons lets has the args columns with an MD5 or sha256
#
df.args = [hashlib.md5(str(value).encode('utf-8')).hexdigest() for value in df.args.tolist()]
STATUS = {'R':'RUNNING','Z':'DEAD','D':'STASIS','S':'SLEEP','Sl':'SLEEP','Ss':'SLEEP','W':'PAGING','T':'DEAD'}
df.status = df.status.apply(lambda value: STATUS.get(value,'UNKNOWN'))
if 'cols' in args :
_cols = list(set(df.columns.tolist()) & set(args['cols']))
if _cols :
df = df[_cols]
#
# we return a list of objects (no data-frames)
if 'logger' in args and args['logger'] != None :
logger = args['logger']
logger(data=df)
return df.to_dict(orient='records')
except Exception as e:
print (e)
pass
if __name__ == '__main__' :
#
# Being directly called (external use of the )
print(read())

@ -0,0 +1,5 @@
import smart.top
import pandas as pd
df = pd.DataFrame (smart.top.read(name='firefox,code'))
print (df.groupby(['user'])['cpu','mem'].sum())

@ -0,0 +1,45 @@
"""
"""
import smart
import sys
import json
import pandas as pd
import numpy as np
from multiprocessing import Process
SYS_ARGS = {}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i][2:] #.replace('-','')
SYS_ARGS[key] = 1
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value:
SYS_ARGS[key] = value
i += 2
#
# Let's determine what data to return ...
# --folder <name> and/or --app <names>
def format(prefix,pointer,logger,container):
return [{'args':{prefix:term.strip(),'logger':logger},'pointer':pointer} for term in container]
folders = format('path',smart.folder.read,smart.logger.log,SYS_ARGS['folders'].split(',')) if 'folders' in SYS_ARGS else []
apps = format('name',smart.top.read,smart.logger.log,SYS_ARGS['apps'].split(',')) if 'apps' in SYS_ARGS else []
if 'cols' in SYS_ARGS :
cols = [name.strip() for name in SYS_ARGS['cols'].split(',')]
else:
cols = []
nodes = folders + apps
for node in nodes :
if cols :
node['args']['cols'] = cols
pthread = Process(target=node['pointer'],args=(node['args'],))
pthread.start()
Loading…
Cancel
Save