From fcbe36030b91ed3318ca0bebbe9f3fd6e55fa7e7 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 16 Nov 2017 16:43:04 -0600 Subject: [PATCH] refactoring data-collector --- .gitignore | 0 .gitmodules | 0 init.sh | 14 +---- requirements.txt | 0 src/data-collector.py | 98 +++++++++++++++++++++--------- src/monitor.py | 122 ++++++++++++++++++++++++-------------- src/utils/__init__.py | 0 src/utils/params.py | 2 +- test/TestML.py | 0 test/TestServerMonitor.py | 0 test/data.csv | 0 test/demo.py | 0 12 files changed, 148 insertions(+), 88 deletions(-) mode change 100644 => 100755 .gitignore mode change 100644 => 100755 .gitmodules mode change 100644 => 100755 init.sh mode change 100644 => 100755 requirements.txt mode change 100644 => 100755 src/data-collector.py mode change 100644 => 100755 src/utils/__init__.py mode change 100644 => 100755 src/utils/params.py mode change 100644 => 100755 test/TestML.py mode change 100644 => 100755 test/TestServerMonitor.py mode change 100644 => 100755 test/data.csv mode change 100644 => 100755 test/demo.py diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/.gitmodules b/.gitmodules old mode 100644 new mode 100755 diff --git a/init.sh b/init.sh old mode 100644 new mode 100755 index 2192d4f..6f481aa --- a/init.sh +++ b/init.sh @@ -13,16 +13,6 @@ install(){ `sandbox/bin/pip freeze|sort |diff requirements.txt -|grep \<|grep -E " .+$" -o|sandbox/bin/pip install --upgrade` -} -register(){ - # - # $1 uid customer's identifier (email) - # $2 pid customer's plan identifier - # - curl -X POST https://the-phi.com/store/init/monitor-logs -H "uid:$1" -H"pid: $2" - - - } upgrade(){ git pull @@ -46,9 +36,9 @@ stop(){ status(){ pid=`ps -eo pid,command|grep python|grep -E "$PWD"|grep data-collector|grep -E "^ {0,}[0-9]+" -m 1 -o` if [ "$pid" = "" ]; then - echo "DATA-COLLECTOR IS OFFLINE" + echo "Data Collector is Offline" else - echo "DATA-COLLECTOR IS ONLINE $pid" + echo "Data Collector is Online $pid" fi } diff --git a/requirements.txt b/requirements.txt old mode 100644 new mode 100755 diff --git a/src/data-collector.py b/src/data-collector.py old mode 100644 new mode 100755 index e50a84f..28e44f9 --- a/src/data-collector.py +++ b/src/data-collector.py @@ -18,8 +18,10 @@ import requests import pickle import json from threading import Thread, RLock - -ENDPOINT="https://the-phi.com/monitor" +import monitor +import utils.agents.actor as actor +from utils.agents.manager import Manager +ENDPOINT="http://localhost/monitor" class Collector(Thread) : def __init__(self): Thread.__init__(self) @@ -30,47 +32,78 @@ class Collector(Thread) : @param id node identifier """ - for id in ['apps','folders']: - if id in SYS_ARGS : - SYS_ARGS[id] = SYS_ARGS[id].split(',') # # Let's open the file with the key (nothing else should be in the file - f = open(SYS_ARGS['key']) - SYS_ARGS['key'] = f.read() - f.close() + #f = open(SYS_ARGS['key']) + #SYS_ARGS['key'] = f.read() + #f.close() headers = {"key":SYS_ARGS["key"],"id":SYS_ARGS["id"]} #,"scope":json.dumps(scope)} - headers['content-type'] = 'application/json' + self.plan = None + self.store= None + + #headers['content-type'] = 'application/json' try: + self.key = SYS_ARGS['key'] Logger.log(subject='Collector',object='api',action='request',value=ENDPOINT) url = "/".join([ENDPOINT,"init/collector"]) - data = {} - for id in SYS_ARGS : - if id not in ['id','key'] : - data[id] = SYS_ARGS[id] - r = requests.post(url,headers=headers,data=json.dumps(data)) + r = requests.post(url,headers=headers) + r = json.loads(r.text) - self.monitor = pickle.loads(r[0]) - self.monitor.lock = RLock() - #:w - #self.monitor.set('lock',RLock()) - Logger.log(subject='Collector',object='api',action='load',value='') - except Exception,e: + if r : + # + # Persisting plan and data-store ... + self.plan = r['plan'] + self.store = r['store'] + info = {"store":self.store,"plan":self.plan} - Logger.log(subject='Collector',object='api',action='error',value=str(e)) + if info['plan'] is not None and info['store'] is not None: + info['plan'] = self.plan['name'] + info['store'] = self.store['args']['dbname'] + _action = 'init' + self.initialize() + else: + info['plan'] = self.plan is not None + info['store']= self.store is not None + _action = 'init.error' + Logger.log(subject='collector',object='api',action=_action,value=info) + except Exception as e: + print(e) + Logger.log(subject='collector',object='api',action='init.error',value=str(e)) self.monitor = None + def initialize(self): + """ + This function creates a monitoring object with the associated parameters from the plan + plan.metadata = {"agents":...,"folder_size":...,"delay":...,"limit":...,"actors":...} + """ + _agents = [monitor.DetailProcess(),monitor.FileWatch()] + _actors = [actor.Apps(),actor.Folders(),actor.Mailer()] + # Initialiing the agents with the parameter values we know of + r = [] + for agent in _agents : + if agent.getName() in SYS_ARGS : + agent.init(SYS_ARGS[agent.getName()]) + r.append(agent) + _agents = r + + + config = {"store":self.store,"plan":self.plan} + self.manager = Manager() + self.manager.init(node=SYS_ARGS['id'],agents=_agents,actors=_actors,config=config,key=self.key) + def run(self): """ This funtion runs the authorized features and """ #self.monitor.start() - Logger.log(subject='Collector',object='monitor',action='start',value='') - thread = Thread(target=self.monitor.run) + # Logger.log(subject='Collector',object='monitor',action='rpc',value=(self.manager is None) ) + thread = Thread(target=self.manager.run) thread.start() + # print self.manager # print self.monitor.config['store'] # for agent in self.pool : # try: @@ -83,17 +116,24 @@ class Collector(Thread) : # except Exception,e: # print e -if __name__ == '__main__' and 'path' in SYS_ARGS: +if __name__ == '__main__' : # # + if 'path' in SYS_ARGS : + path = SYS_ARGS['path'] + f = open(path) + p = json.loads(f.read()) + f.close() + + else: + for id in ['apps','folders']: + if id in SYS_ARGS : + SYS_ARGS[id] = SYS_ARGS[id].split(',') - path = SYS_ARGS['path'] - f = open(path) - p = json.loads(f.read()) - f.close() + p = dict(SYS_ARGS) Logger.init('data-collector') SYS_ARGS = dict(SYS_ARGS,** p) thread = Collector() thread.start() else: - print (h) \ No newline at end of file + print (h) diff --git a/src/monitor.py b/src/monitor.py index ad85026..b61fae1 100755 --- a/src/monitor.py +++ b/src/monitor.py @@ -16,6 +16,8 @@ from threading import Thread, RLock import time import numpy as np from utils.ml import ML +import sys + class Analysis: def __init__(self): self.logs = [] @@ -30,6 +32,11 @@ class Analysis: return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute} def getName(self): return self.__class__.__name__ + def reboot(self,row,conf) : + return False + def cleanup(self,text): + return re.sub('[^a-zA-Z0-9\s:]',' ',str(text)).strip() + """ This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values @@ -106,6 +113,8 @@ class Sandbox(Analysis): return [row.replace('-',' ').replace('_',' ') for row in r if row.strip() != ''] def evaluate(self): pass + def reboot(self,rows,limit=None) : + return sum([ len(item['missing']) for item in rows ]) > 0 """ This function returns the ratio of existing modules relative to the ones expected """ @@ -148,14 +157,17 @@ class ProcessCounter(Analysis): #n = sum(r) #return n/N return dict(self.getNow(),**r) + """ This class returns an application's both memory and cpu usage """ class DetailProcess(Analysis): def __init__(self): Analysis.__init__(self) + def init (self,names): #Analysis.init(self) + self.names = names; def getName(self): return "apps" @@ -167,42 +179,52 @@ class DetailProcess(Analysis): return list(g.groups())+['1']+[name] else: return '' - def evaluate(self,name) : - cmd = "ps -eo pmem,pcpu,vsize,command|grep -E \":app\"" - handler = subprocess.Popen(cmd.replace(":app",name),shell=True,stdout=subprocess.PIPE) - ostream = handler.communicate()[0].split('\n') - #xstr = ostream - ostream = [ self.split(name,row) for row in ostream if row != '' and 'grep' not in row] - if len(ostream) == 0 or len(ostream[0]) < 4 : - ostream = [['0','0','0','0',name]] - r = [] - for row in ostream : - # - # Though the comm should only return the name as specified, - # On OSX it has been observed that the fully qualified path is sometimes returned (go figure) - # - row = [float(value) for value in row if value.strip() != '' and name not in value ] +[re.sub('\$|^','',name)] - r.append(row) + def reboot(self,rows,conf=None) : + return np.sum([int(item['label']=='crash') for item in rows]) > 0 + def parse(self,row,fields): + """ + The last field should be the command in its integrity + @pre len(fields) > len(row) + """ + r = {} + + now = self.getNow() + r['date'] = now + row = [term for term in row.split() if term.strip() != ''] + for name in fields : + index = fields.index(name) + + r[name] = row[index] if row else 0 + if name not in ['user','cmd','status','pid'] : + r[name] = float(r[name]) + r[name] = row[index: ] if row else [] # - # At this point we should aggregate results - # The aggregation is intended for applications with several processes (e.g: apache2) + # Let's set the status give the data extracted # - if len(r) > 1: - m = None - for row in r: - if m is None: - m = row - else: - m[3] += row[3] - m[0] += row[0] - m[1] += row[1] - m[2] += row[2] - m[0] = round((m[0] / m[3]),2) - m[1] = round((m[1] / m[3]),2) - m[2] = round((m[2] / m[3]),2) - - r = [m] + if r['status'] == 0 : + r['status'] = 'crash' + elif 'Z' in r['status'] : + r['status'] = 'zombie' + elif r['memory_usage'] > 0 and r['cpu_usage'] > 0: + r['status'] = 'running' + else: + r['status'] = 'idle' return r + + def evaluate(self,name=None) : + if name is None : + name = ".*" + fields = ["user","pid","memory_usage","cpu_usage","memory_available","status","cmd"] + cmd = "ps -eo user,pid,pmem,pcpu,vsize,stat,command|grep -Ei \":app\"".replace(":app",name) + handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) + logs = handler.communicate()[0].split('\n') + logs = [row for row in logs if (row.strip() != '') and ('grep -Ei' in row )== False ] + + if len(logs) == 0: + return [dict(self.parse('',fields),**{'label':name}) ] + else : + return [dict(self.parse(row,fields),**{'label':name}) for row in logs if row.strip() != '' and 'grep' not in row and '-Ei' not in row] + def status(self,row): x = row['memory_usage'] y = row['cpu_usage'] @@ -213,20 +235,17 @@ class DetailProcess(Analysis): return "idle" else: return "crash" - def format(self,row): - r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"proc_count":row[3],"label":row[4]} - status = self.status(r) - r['status'] = status - return r + #def format(self,row): + # r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"proc_count":row[3],"label":self.cleanup(row[4])} + # status = self.status(r) + # r['status'] = status + # return r def composite(self): ma = [] - now = self.getNow() for name in self.names: - - matrix = self.evaluate(name) - - ma += [ dict(now, **self.format(row)) for row in matrix] + row = self.evaluate(name) + ma += row return ma """ @@ -237,6 +256,7 @@ class FileWatch(Analysis): def __init__(self): pass def init(self,folders): + print folders self.folders = folders; def getName(self): return "folders" @@ -277,12 +297,22 @@ class FileWatch(Analysis): def evaluate(self,path): cmd = "find :path -print0|xargs -0 ls -ls |awk '{print $6,$7,$8,$9,$10}'".replace(":path",path) - handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) ostream = handler.communicate()[0].split('\n') - + ostream = [row for row in ostream if row.strip() != ''] + print cmd + print ostream[0] + print ostream[1] #return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream] - return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))] + #return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))] + return [] + def toMB(self,size): + m = {'GB':1000,'TB':1000000} + v,u = size.split(' ') + return round(float(v)* m[u.upper()],2) + + def reboot(self,rows,limit) : + return np.sum([ int(self.toMB(item['size']) > self.toMB(limit)) for item in rows]) > 0 def composite(self): d = [] #-- vector of details (age,size) diff --git a/src/utils/__init__.py b/src/utils/__init__.py old mode 100644 new mode 100755 diff --git a/src/utils/params.py b/src/utils/params.py old mode 100644 new mode 100755 index 4808f93..ea98de4 --- a/src/utils/params.py +++ b/src/utils/params.py @@ -27,6 +27,6 @@ class Logger : logging.basicConfig(filename=name,level=logging.INFO,format="%(message)s") @staticmethod def log(**args) : - args['date'] = datetime.now().strftime('%d-%m-%Y %M:%H:%S') + args['date'] = datetime.now().strftime('%d-%m-%Y %H:%M:%S') logging.info(json.dumps(args)) diff --git a/test/TestML.py b/test/TestML.py old mode 100644 new mode 100755 diff --git a/test/TestServerMonitor.py b/test/TestServerMonitor.py old mode 100644 new mode 100755 diff --git a/test/data.csv b/test/data.csv old mode 100644 new mode 100755 diff --git a/test/demo.py b/test/demo.py old mode 100644 new mode 100755