From ccc52fdaea65d542304b3be5db9acd34f2998d22 Mon Sep 17 00:00:00 2001 From: "Steve Nyemba, The Architect" Date: Mon, 8 Oct 2018 11:45:31 -0500 Subject: [PATCH] bug fix redesign --- src/data-collector.py | 205 ++++++++++----------- src/monitor.py | 405 +++++++++++------------------------------- 2 files changed, 195 insertions(+), 415 deletions(-) diff --git a/src/data-collector.py b/src/data-collector.py index 09c88ae..cd225c5 100755 --- a/src/data-collector.py +++ b/src/data-collector.py @@ -7,146 +7,125 @@ h=""" python data-collector.py --path config.json The configuration file is structured as JSON object as follows : { - id: node identifier - key: customer's identification key - apps:"app_1,app_2,...", - folders:"path_1,path_2, ..." + id : identifier + key: customer's identification key, + api: http://localhost/monitor/1/client + folders:[] } + NOTE: You can download a sample configuration file from https://the-phi.com/smart-top """ from utils.params import PARAMS as SYS_ARGS, Logger +import os import requests -import pickle import json -from threading import Thread, RLock -import monitor -import utils.agents.actor as actor -from utils.agents.manager import Manager -SYS_ARGS['host']='localhost' -ENDPOINT="http://:host/monitor".replace(":host",SYS_ARGS['host']) -class Collector(Thread) : - def __init__(self): - Thread.__init__(self) +# from threading import Thread, RLock +from monitor import Apps, Folders +import time +from datetime import datetime +class Collector : + def __init__(self) : """ - This function initializes the data collector with critical information - The process will include validating the user's account and plan - @param key customer's key - @param id node identifier - + The configuration file is passed to the class for basic initialization of variables """ + self.httpclient = requests.Session() + + def init(self): + # if 'folders' not in SYS_ARGS : + # # + # # If nothing is set it will monitor the temporary directory + # self.locations = [os.environ[name] for name in ['TEMP','TMP','TMPDIR'] if name in os.environ] + # else: + # self.locations = SYS_ARGS['folders'] + # - # Let's open the file with the key (nothing else should be in the file - #f = open(SYS_ARGS['key']) - #SYS_ARGS['key'] = f.read() - #f.close() + # -- let's get the list of features we are interested . + url = SYS_ARGS['api']+'/1/client/login' + key = SYS_ARGS['key'] + id = SYS_ARGS['id'] if 'id' in SYS_ARGS else os.environ['HOSTNAME'] + headers = {"key":key,"id":id} - headers = {"key":SYS_ARGS["key"],"id":SYS_ARGS["id"]} #,"scope":json.dumps(scope)} - self.plan = None - self.store= None + # + #-- what features are allowed + r = self.httpclient.post(url,headers=headers) - #headers['content-type'] = 'application/json' - try: - self.key = SYS_ARGS['key'] - # Logger.log(subject='Collector',object='api',action='request',value=ENDPOINT) - # url = "/".join([ENDPOINT,"init/collector"]) - Logger.log(subject='Collector',object='api',action='request',value=SYS_ARGS['api']) - url = "/".join([SYS_ARGS['api'],"init/collector"]) - - r = requests.post(url,headers=headers) - - r = json.loads(r.text) - - if r : - # - # Persisting plan and data-store ... - self.plan = r['plan'] - self.store = r['store'] - info = {"store":self.store,"plan":self.plan} - - if info['plan'] is not None and info['store'] is not None: - info['plan'] = self.plan['name'] - info['store'] = self.store['args']['dbname'] - _action = 'init' - self.initialize() - else: - info['plan'] = self.plan is not None - info['store']= self.store is not None - _action = 'init.error' - Logger.log(subject='collector',object='api',action=_action,value=info) - except Exception as e: - print(e) - Logger.log(subject='collector',object='api',action='init.error',value=str(e)) - self.monitor = None - def initialize(self): - """ - This function creates a monitoring object with the associated parameters from the plan - plan.metadata = {"agents":...,"folder_size":...,"delay":...,"limit":...,"actors":...} - """ - _agents = [monitor.DetailProcess(),monitor.FileWatch()] - _actors = [actor.Apps(),actor.Folders(),actor.Mailer()] - # Initialiing the agents with the parameter values we know of - r = [] - for agent in _agents : + if r.status_code == 200 : + r = r.json() - if agent.getName() in SYS_ARGS : - values = SYS_ARGS[agent.getName()] - # print (["init ",agent.getName(),values]) - Logger.log(subject='Collector',object=agent.getName(),action='init.agent',value=values ) - agent.init(values) - r.append(agent) - _agents = r - + self.features = r['features'] + self.config = r['config'] #-- contains apps and folders + Logger.log(action="login",value=r) + else: + self.features = None + self.config = None + Logger.log(action='login',value='error') + def callback(self,channel,method,header,stream): + pass + def listen(self): + factory = DataSourceFactory() - config = {"store":self.store,"plan":self.plan} - #@TODO: add SYS_ARGS content so apps that are configured reboot/delete folders that are marked - # This is an important security measure! - # - self.manager = Manager() - self.manager.init(node=SYS_ARGS['id'],agents=_agents,actors=_actors,config=config,key=self.key,host=SYS_ARGS['host']) + # self.qlistener = factory.instance(type="QueueListener",args=_args) + # self.qlistener.callback = self.callback + # self.qlistener.init(SYS_ARGS['id']) + def post(self,**args) : + """ + This function will post data to the endpoint + """ + url = SYS_ARGS['api']+'/1/client/log' + key = SYS_ARGS['key'] + id = SYS_ARGS['id'] if 'id' in SYS_ARGS else os.environ['HOSTNAME'] + headers = {"key":key,"id":id,"context":args['context'],"content-type":"application/json"} + body = args['data'].to_json(orient='records') + if args.shape[0] > 0 : + r = self.httpclient.post(url,headers=headers,data=body) + Logger.log(action="post."+args['context'],value=r.status_code) + else: + Logger.log(action="data.error",value="no data") def run(self): """ - This funtion runs the authorized features and + This function will execute the basic functions to monitor folders and apps running on the system + given the configuration specified on the server . """ - #self.monitor.start() - - # Logger.log(subject='Collector',object='monitor',action='rpc',value=(self.manager is None) ) - thread = Thread(target=self.manager.run) - thread.start() - # print self.manager - # print self.monitor.config['store'] - # for agent in self.pool : - # try: - # agent = pickle.loads(agent) - # agent.start() + while True : + try: + self.init() + if self.config and self.features : + ELAPSED_TIME = 60* int(self.features['schedule'].replace("min","").strip()) + if 'apps' in self.config : + self.post( data=(Apps()).get(filter=self.config['apps']),context="apps") + if 'folders' in self.config and self.config['folders'] : + folder = Folders() + f = folder.get(path=self.config['folders']) + self.post(data = f ,context="folders") + + Logger.log(action='sleeping',value=ELAPSED_TIME) + # + # In case no configuration is provided, the system will simply fall asleep and wait + # @TODO: Evaluate whether to wake up the system or not (security concerns)! + # + time.sleep(ELAPSED_TIME) + + except Exception,e: + Logger.log(action='error',value=e.message) + print e + break - # #p = pickle.loads(self.pool[key]) - # #p.init(SYS_ARGS[key].split(',')) - # #p.start() - # except Exception,e: - # print e + + + pass if __name__ == '__main__' : # # if 'path' in SYS_ARGS : path = SYS_ARGS['path'] f = open(path) - p = json.loads(f.read()) + SYS_ARGS = json.loads(f.read()) f.close() - else: - for id in ['apps','folders']: - if id in SYS_ARGS : - SYS_ARGS[id] = SYS_ARGS[id].split(',') - - p = dict(SYS_ARGS) Logger.init('data-collector') - SYS_ARGS = dict(SYS_ARGS,** p) - if 'apps' in SYS_ARGS : - SYS_ARGS['apps'] += [__file__] - - thread = Collector() - thread.start() + collector = Collector() + collector.run() else: print (h) diff --git a/src/monitor.py b/src/monitor.py index 0f3bbbe..203862c 100755 --- a/src/monitor.py +++ b/src/monitor.py @@ -1,275 +1,113 @@ """ - This program is designed to inspect an application environment - This program should only be run on unix friendly systems + Steve L. Nyemba + The Phi Technology - Smart Top - We enable the engines to be able to run a several configurations - Similarly to what a visitor design-pattern would do + This program is the core for evaluating folders and applications. Each class is specialized to generate a report in a pandas data-frame + The classes will focus on Apps, Folders and Protocols + - SmartTop.get(**args) + @TODO: + Protocols (will be used in anomaly detection) """ from __future__ import division import os import subprocess -from sets import Set -import re -import datetime -import urllib2 as http, base64 -from threading import Thread, RLock -import time import numpy as np -from utils.ml import ML import sys -from scipy import stats +import pandas as pd +import datetime +class SmartTop: + def get(self,**args): + return None -class Analysis: +class Apps(SmartTop) : def __init__(self): - self.logs = [] - pass - def post(self,object): - self.logs.append(object) - def init(self): - d = datetime.datetime.now() - self.now = {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute} - def getNow(self): - d = datetime.datetime.now() - return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute,"second":d.second} - def getName(self): - return self.__class__.__name__ - def reboot(self,row,conf) : - return False - def cleanup(self,text): - return re.sub('[^a-zA-Z0-9\s:]',' ',str(text)).strip() - + """ + This class will process a system command and parse the outpout accordingly given a parser + @param parse is a parser pointer + """ + self.cmd = "ps -eo pid,user,pmem,pcpu,stat,etime,args|awk 'OFS=\";\" {$1=$1; if($5 > 9) print }'" + self.xchar = ';' + def get_app(self,stream): + index = 1 if os.path.exists(" ".join(stream[:1])) else len(stream)-1 -""" - This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values - The class returns a quantifiable assessment of the environment variables (expected 100%) -""" -class Env(Analysis): - def __init__(self): - Analysis.__init__(self) - def init(self,values): - #Analysis.init(self) - self.values = values - """ - This function evaluate the validity of an environment variable by returning a 1 or 0 (computable) - The function will use propositional logic (https://en.wikipedia.org/wiki/Propositional_calculus) - """ - def evaluate(self,id): + cmd = " ".join(stream[:index]) if index > 0 else " ".join(stream) - if id in os.environ : - # - # We can inspect to make sure the environment variable is not a path or filename. - # Using propositional logic we proceed as follows: - # - (p) We determine if the value is an folder or file name (using regex) - # - (q) In case of a file or folder we check for existance - # The final result is a conjuction of p and q - # - value = os.environ[id] - expressions = [os.sep,'(\\.\w+)$'] - p = sum([ re.search(xchar,value) is not None for xchar in expressions]) - q = os.path.exists(value) - - return int(p and q) - else: - return 0 - - def composite (self): - #Analysis.init(self) - r = [ self.evaluate(id) for id in self.values] ; - N = len(r) - n = sum(r) - value = 100 * round(n/N,2) - - missing = [self.values[i] for i in range(0,N) if r[i] == 0] - return dict(self.getNow(),**{"value":value,"missing":missing}) -""" - This class is designed to handle analaysis of the a python virtual environment i.e deltas between requirments file and a virtualenv - @TODO: update the virtual environment -""" -class Sandbox(Analysis): - def __init__(self): - Analysis.__init__(self) - def init(self,conf): - #Analysis.init(self) - if os.path.exists(conf['sandbox']) : - self.sandbox_path = conf['sandbox'] - else: - self.sandbox_path = None - if os.path.exists(conf['requirements']) : - self.requirements_path = conf['requirements'] + if ' ' in cmd.split('/')[len(cmd.split('/'))-1] : + p = cmd.split('/')[len(cmd.split('/'))-1].split(' ') + name = p[0] + args = " ".join(p[1:]) else: - self.requirements_path = None - - def get_requirements (self): - f = open(self.requirements_path) - return [ name.replace('-',' ').replace('_',' ') for name in f.read().split('\n') if name != ''] - """ - This function will return the modules installed in the sandbox (virtual environment) - """ - def get_sandbox_requirements(self): - cmd = ['freeze'] - xchar = ''.join([os.sep]*2) - pip_vm = ''.join([self.sandbox_path,os.sep,'bin',os.sep,'pip']).replace(xchar,os.sep) - cmd = [pip_vm]+cmd - r = subprocess.check_output(cmd).split('\n') - return [row.replace('-',' ').replace('_',' ') for row in r if row.strip() != ''] - def evaluate(self): - pass - def reboot(self,rows,limit=None) : - return sum([ len(item['missing']) for item in rows ]) > 0 - """ - This function returns the ratio of existing modules relative to the ones expected - """ - def composite(self): - Analysis.init(self) - if self.sandbox_path and self.requirements_path : - required_modules= self.get_requirements() - sandbox_modules = self.get_sandbox_requirements() - N = len(required_modules) - n = len(Set(required_modules) - Set(sandbox_modules)) - value = round(1 - (n/N),2)*100 - missing = list(Set(required_modules) - Set(sandbox_modules)) + name = cmd.split('/')[len(cmd.split('/'))-1] + args = " ".join(stream[index:]) if index > 0 else "" + return [name,cmd,args] + def to_pandas(self,m): + """ + This function will convert the output of ps to a data-frame + @param m raw matrix i.e list of values like a csv - return dict(self.getNow(),**{"value":value,"missing":missing}) - else: - return None - -""" - This class performs the analysis of a list of processes and determines - The class provides a quantifiable measure of how many processes it found over all -""" -class ProcessCounter(Analysis): - def __init__(self): - Analysis.__init__(self) - def init(self,names): - #Analysis.init(self) - self.names = names - def evaluate(self,name): - cmd = "".join(['ps -eo comm |grep ',name,' |wc -l']) - handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) - - return int(handler.communicate()[0].replace("\n","") ) - def composite(self): - #Analysis.init(self) - r = {} - for name in self.names : - r[name] = self.evaluate(name) + """ + m = [item for item in m if len(item) != len (m[0])] + m = "\n".join(m[1:]) + df = pd.read_csv(pd.compat.StringIO(m),sep=self.xchar) + df.columns =['pid','user','mem','cpu','status','started','name','cmd','args'] + return df + def empty(self,name): + return pd.DataFrame([{"pid":None,"user":None,"mem":0,"cpu":0,"status":"X","started":None,"name":name,"cmd":None,"args":None}]) + def parse(self,rows): + m = [] + TIME_INDEX = 5 + ARGS_INDEX = 6 - #N = len(r) - #n = sum(r) - #return n/N - return dict(self.getNow(),**r) - -""" - This class returns an application's both memory and cpu usage -""" -class DetailProcess(Analysis): - def __init__(self): - Analysis.__init__(self) - - def init (self,names): - #Analysis.init(self) - self.names = names; - def getName(self): - return "apps" - def split(self,name,stream): + for item in rows : + if rows.index(item) != 0 : + parts = item.split(self.xchar) + row = parts[:TIME_INDEX] + row.append(' '.join(parts[TIME_INDEX:ARGS_INDEX])) + row += self.get_app(parts[ARGS_INDEX:]) + else: + row = item.split(self.xchar) + row = (self.xchar.join(row)).strip() + if len(row.replace(";","")) > 0 : + m.append(row) - pattern = "(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)".replace(":name",name).strip() - g = re.match(pattern,stream.strip()) - if g : - return list(g.groups())+['1']+[name] - else: - return '' - def reboot(self,rows,conf=None) : - return np.sum([int(item['label']=='crash') for item in rows]) > 0 - def parse(self,row,fields): + return m + def get(self,**args): """ - The last field should be the command in its integrity - @pre len(fields) > len(row) + This function returns a the output of a command to the calling code that is piped into the class + The output will be stored in a data frame with columns + @ """ - r = {} - - now = self.getNow() - r['date'] = now - row = [term for term in row.split() if term.strip() != ''] - for name in fields : - index = fields.index(name) + try: - r[name] = row[index] if row else 0 - if name not in ['user','cmd','status','pid'] : - r[name] = float(r[name]) - r[name] = row[index: ] if row else [] - # - # Let's set the status give the data extracted - # - if r['status'] == 0 : - r['status'] = 'crash' - elif 'Z' in r['status'] : - r['status'] = 'zombie' - elif r['memory_usage'] > 0 and r['cpu_usage'] > 0: - r['status'] = 'running' - else: - r['status'] = 'idle' - return r - - def evaluate(self,name=None) : - if name is None : - name = ".*" - fields = ["user","pid","memory_usage","cpu_usage","memory_available","status","cmd"] - cmd = "ps -eo user,pid,pmem,pcpu,vsize,stat,command|grep -Ei \":app\"".replace(":app",name) - handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) - logs = handler.communicate()[0].split('\n') - logs = [row for row in logs if (row.strip() != '') and ('grep -Ei' in row )== False ] - - if len(logs) == 0: - return [dict(self.parse('',fields),**{'label':name}) ] - else : - return [dict(self.parse(row,fields),**{'label':name}) for row in logs if row.strip() != '' and 'grep' not in row and '-Ei' not in row] + handler = subprocess.Popen(self.cmd,shell=True,stdout=subprocess.PIPE) + stream = handler.communicate()[0] + rows = stream.split('\n') + df = self.to_pandas(self.parse(rows)) + r = pd.DataFrame() + if 'filter' in args : + pattern = "|".join(args['filter']) + r = df[df.name.str.contains(pattern)] + for name in args['filter'] : + filter = "name == '"+name+"'" + if r.query(filter).size == 0 : + r = r.append(self.empty(name)) + return r + except Exception,e: + print (e) + return None - def status(self,row): - x = row['memory_usage'] - y = row['cpu_usage'] - z = row['memory_available'] - if z : - if y : - return "running" - return "idle" - else: - return "crash" - #def format(self,row): - # r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"proc_count":row[3],"label":self.cleanup(row[4])} - # status = self.status(r) - # r['status'] = status - # return r - def composite(self): - ma = [] - for name in self.names: - row = self.evaluate(name) - ma += row - - return ma -""" - This class evaluates a list of folders and provides detailed informaiton about age/size of each file - Additionally the the details are summarized in terms of global size, and oldest file. -""" -class FileWatch(Analysis): +class Folders(SmartTop): + """ + This class will assess a folder and produce a report in a data-frame that can be later on used for summary statistics + """ def __init__(self): pass - def init(self,folders): - self.folders = folders; - self.cache = [] - def getName(self): - return "folders" - - def evaluate(self,dir_path,r=[]): - """ - This function will recursively scan a folder and retrieve file sizes and age of the files. - The data will be returned as an array of {size,age,label} items - """ + def _get(self,dir_path,r=[]): for child in os.listdir(dir_path): path = os.path.join(dir_path, child) if os.path.isdir(path): - self.evaluate(path,r) + self._get(path,r) else: size = os.path.getsize(path) @@ -277,62 +115,25 @@ class FileWatch(Analysis): file_date = datetime.datetime.fromtimestamp(file_date) now = datetime.datetime.now() age = (now - file_date ).days - r.append({"label":path,"size":size,"age":age,"date":self.getNow()}) + + name = os.path.basename(path) + r.append({"name":name,"path":path,"size":size,"age":age}) return r - def reboot(self,rows,limit) : - return np.sum([ 1 for item in rows if rows['size'] > limit ]) > 0 - def composite(self): - out = [] - for folder in self.folders : - - r = self.evaluate(folder,[]) - file_count = len(r) - age_mode = [item[0] for item in stats.mode([item['age'] for item in r])] - size_mode= [item[0] for item in stats.mode([item['size'] for item in r])] - age = {"mode":age_mode,"median":np.median([item['age'] for item in r] ),"mean":np.mean([item['age'] for item in r] ),"var":np.mean([item['age'] for item in r])} - size = {"mode":size_mode,"median":np.median([item['size'] for item in r] ), "mean":np.mean([item['size'] for item in r] ),"var":np.mean([item['size'] for item in r])} - out.append({"label":folder,"date":self.getNow(),"stats":{"age":age,"size":size,"file_count":file_count},"logs":r}) - return out - - -# class Monitor (Thread): -# def __init__(self,pConfig,pWriter,id='processes') : -# Thread.__init__(self) + def get(self,**args): + # path = args['path'] -# self.config = pConfig[id] -# self.writer = pWriter; -# self.logs = [] -# self.handler = self.config['class'] -# self.mconfig = self.config['config'] - - - -# def stop(self): -# self.keep_running = False -# def run(self): -# r = {} -# self.keep_running = True -# lock = RLock() -# while self.keep_running: -# lock.acquire() -# for label in self.mconfig: - -# self.handler.init(self.mconfig[label]) -# r = self.handler.composite() -# self.writer.write(label=label,row = r) - -# time.sleep(2) -# lock.release() - -# self.prune() -# TIME_LAPSE = 60*2 -# time.sleep(TIME_LAPSE) -# print "Stopped ..." -# def prune(self) : - -# MAX_ENTRIES = 100 -# if len(self.logs) > MAX_ENTRIES : -# BEG = len(self.logs) - MAX_SIZE -1 -# self.logs = self.logs[BEG:] - + if isinstance(args['path'],list) == False: + paths = [args['path']] + else: + paths = args['path'] + _out = pd.DataFrame() + for path in paths : + if os.path.exists(path) : + # + # If the folder does NOT exists it should not be treated. + # + r = pd.DataFrame(self._get(path)) + r = pd.DataFrame([{"name":path,"files":r.shape[0],"age_in_days":r.age.mean(),"size_in_kb":r.size.sum()}]) + _out = _out.append(r) + return _out