bug fix redesign

data-collector
Steve L. Nyemba 6 years ago
parent 8ecca833b3
commit ccc52fdaea

@ -7,146 +7,125 @@ h="""
python data-collector.py --path config.json python data-collector.py --path config.json
The configuration file is structured as JSON object as follows : The configuration file is structured as JSON object as follows :
{ {
id: node identifier id : identifier
key: customer's identification key key: customer's identification key,
apps:"app_1,app_2,...", api: http://localhost/monitor/1/client
folders:"path_1,path_2, ..." folders:[]
} }
NOTE: You can download a sample configuration file from https://the-phi.com/smart-top
""" """
from utils.params import PARAMS as SYS_ARGS, Logger from utils.params import PARAMS as SYS_ARGS, Logger
import os
import requests import requests
import pickle
import json import json
from threading import Thread, RLock # from threading import Thread, RLock
import monitor from monitor import Apps, Folders
import utils.agents.actor as actor import time
from utils.agents.manager import Manager from datetime import datetime
SYS_ARGS['host']='localhost' class Collector :
ENDPOINT="http://:host/monitor".replace(":host",SYS_ARGS['host']) def __init__(self) :
class Collector(Thread) :
def __init__(self):
Thread.__init__(self)
""" """
This function initializes the data collector with critical information The configuration file is passed to the class for basic initialization of variables
The process will include validating the user's account and plan
@param key customer's key
@param id node identifier
""" """
self.httpclient = requests.Session()
def init(self):
# if 'folders' not in SYS_ARGS :
# #
# # If nothing is set it will monitor the temporary directory
# self.locations = [os.environ[name] for name in ['TEMP','TMP','TMPDIR'] if name in os.environ]
# else:
# self.locations = SYS_ARGS['folders']
# #
# Let's open the file with the key (nothing else should be in the file # -- let's get the list of features we are interested .
#f = open(SYS_ARGS['key']) url = SYS_ARGS['api']+'/1/client/login'
#SYS_ARGS['key'] = f.read() key = SYS_ARGS['key']
#f.close() id = SYS_ARGS['id'] if 'id' in SYS_ARGS else os.environ['HOSTNAME']
headers = {"key":key,"id":id}
headers = {"key":SYS_ARGS["key"],"id":SYS_ARGS["id"]} #,"scope":json.dumps(scope)} #
self.plan = None #-- what features are allowed
self.store= None r = self.httpclient.post(url,headers=headers)
#headers['content-type'] = 'application/json' if r.status_code == 200 :
try: r = r.json()
self.key = SYS_ARGS['key']
# Logger.log(subject='Collector',object='api',action='request',value=ENDPOINT)
# url = "/".join([ENDPOINT,"init/collector"])
Logger.log(subject='Collector',object='api',action='request',value=SYS_ARGS['api'])
url = "/".join([SYS_ARGS['api'],"init/collector"])
r = requests.post(url,headers=headers)
r = json.loads(r.text)
if r :
#
# Persisting plan and data-store ...
self.plan = r['plan']
self.store = r['store']
info = {"store":self.store,"plan":self.plan}
if info['plan'] is not None and info['store'] is not None:
info['plan'] = self.plan['name']
info['store'] = self.store['args']['dbname']
_action = 'init'
self.initialize()
else:
info['plan'] = self.plan is not None
info['store']= self.store is not None
_action = 'init.error'
Logger.log(subject='collector',object='api',action=_action,value=info)
except Exception as e:
print(e)
Logger.log(subject='collector',object='api',action='init.error',value=str(e))
self.monitor = None
def initialize(self):
"""
This function creates a monitoring object with the associated parameters from the plan
plan.metadata = {"agents":...,"folder_size":...,"delay":...,"limit":...,"actors":...}
"""
_agents = [monitor.DetailProcess(),monitor.FileWatch()]
_actors = [actor.Apps(),actor.Folders(),actor.Mailer()]
# Initialiing the agents with the parameter values we know of
r = []
for agent in _agents :
if agent.getName() in SYS_ARGS : self.features = r['features']
values = SYS_ARGS[agent.getName()] self.config = r['config'] #-- contains apps and folders
# print (["init ",agent.getName(),values]) Logger.log(action="login",value=r)
Logger.log(subject='Collector',object=agent.getName(),action='init.agent',value=values ) else:
agent.init(values) self.features = None
r.append(agent) self.config = None
_agents = r Logger.log(action='login',value='error')
def callback(self,channel,method,header,stream):
pass
def listen(self):
factory = DataSourceFactory()
config = {"store":self.store,"plan":self.plan} # self.qlistener = factory.instance(type="QueueListener",args=_args)
#@TODO: add SYS_ARGS content so apps that are configured reboot/delete folders that are marked # self.qlistener.callback = self.callback
# This is an important security measure! # self.qlistener.init(SYS_ARGS['id'])
# def post(self,**args) :
self.manager = Manager() """
self.manager.init(node=SYS_ARGS['id'],agents=_agents,actors=_actors,config=config,key=self.key,host=SYS_ARGS['host']) This function will post data to the endpoint
"""
url = SYS_ARGS['api']+'/1/client/log'
key = SYS_ARGS['key']
id = SYS_ARGS['id'] if 'id' in SYS_ARGS else os.environ['HOSTNAME']
headers = {"key":key,"id":id,"context":args['context'],"content-type":"application/json"}
body = args['data'].to_json(orient='records')
if args.shape[0] > 0 :
r = self.httpclient.post(url,headers=headers,data=body)
Logger.log(action="post."+args['context'],value=r.status_code)
else:
Logger.log(action="data.error",value="no data")
def run(self): def run(self):
""" """
This funtion runs the authorized features and This function will execute the basic functions to monitor folders and apps running on the system
given the configuration specified on the server .
""" """
#self.monitor.start() while True :
try:
# Logger.log(subject='Collector',object='monitor',action='rpc',value=(self.manager is None) ) self.init()
thread = Thread(target=self.manager.run) if self.config and self.features :
thread.start() ELAPSED_TIME = 60* int(self.features['schedule'].replace("min","").strip())
# print self.manager if 'apps' in self.config :
# print self.monitor.config['store'] self.post( data=(Apps()).get(filter=self.config['apps']),context="apps")
# for agent in self.pool : if 'folders' in self.config and self.config['folders'] :
# try: folder = Folders()
# agent = pickle.loads(agent) f = folder.get(path=self.config['folders'])
# agent.start() self.post(data = f ,context="folders")
Logger.log(action='sleeping',value=ELAPSED_TIME)
#
# In case no configuration is provided, the system will simply fall asleep and wait
# @TODO: Evaluate whether to wake up the system or not (security concerns)!
#
time.sleep(ELAPSED_TIME)
except Exception,e:
Logger.log(action='error',value=e.message)
print e
break
# #p = pickle.loads(self.pool[key])
# #p.init(SYS_ARGS[key].split(','))
# #p.start()
# except Exception,e:
# print e
pass
if __name__ == '__main__' : if __name__ == '__main__' :
# #
# #
if 'path' in SYS_ARGS : if 'path' in SYS_ARGS :
path = SYS_ARGS['path'] path = SYS_ARGS['path']
f = open(path) f = open(path)
p = json.loads(f.read()) SYS_ARGS = json.loads(f.read())
f.close() f.close()
else:
for id in ['apps','folders']:
if id in SYS_ARGS :
SYS_ARGS[id] = SYS_ARGS[id].split(',')
p = dict(SYS_ARGS)
Logger.init('data-collector') Logger.init('data-collector')
SYS_ARGS = dict(SYS_ARGS,** p) collector = Collector()
if 'apps' in SYS_ARGS : collector.run()
SYS_ARGS['apps'] += [__file__]
thread = Collector()
thread.start()
else: else:
print (h) print (h)

@ -1,275 +1,113 @@
""" """
This program is designed to inspect an application environment Steve L. Nyemba <steve@the-phi.com>
This program should only be run on unix friendly systems The Phi Technology - Smart Top
We enable the engines to be able to run a several configurations This program is the core for evaluating folders and applications. Each class is specialized to generate a report in a pandas data-frame
Similarly to what a visitor design-pattern would do The classes will focus on Apps, Folders and Protocols
- SmartTop.get(**args)
@TODO:
Protocols (will be used in anomaly detection)
""" """
from __future__ import division from __future__ import division
import os import os
import subprocess import subprocess
from sets import Set
import re
import datetime
import urllib2 as http, base64
from threading import Thread, RLock
import time
import numpy as np import numpy as np
from utils.ml import ML
import sys import sys
from scipy import stats import pandas as pd
import datetime
class SmartTop:
def get(self,**args):
return None
class Analysis: class Apps(SmartTop) :
def __init__(self): def __init__(self):
self.logs = [] """
pass This class will process a system command and parse the outpout accordingly given a parser
def post(self,object): @param parse is a parser pointer
self.logs.append(object) """
def init(self): self.cmd = "ps -eo pid,user,pmem,pcpu,stat,etime,args|awk 'OFS=\";\" {$1=$1; if($5 > 9) print }'"
d = datetime.datetime.now() self.xchar = ';'
self.now = {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute} def get_app(self,stream):
def getNow(self): index = 1 if os.path.exists(" ".join(stream[:1])) else len(stream)-1
d = datetime.datetime.now()
return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute,"second":d.second}
def getName(self):
return self.__class__.__name__
def reboot(self,row,conf) :
return False
def cleanup(self,text):
return re.sub('[^a-zA-Z0-9\s:]',' ',str(text)).strip()
""" cmd = " ".join(stream[:index]) if index > 0 else " ".join(stream)
This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values
The class returns a quantifiable assessment of the environment variables (expected 100%)
"""
class Env(Analysis):
def __init__(self):
Analysis.__init__(self)
def init(self,values):
#Analysis.init(self)
self.values = values
"""
This function evaluate the validity of an environment variable by returning a 1 or 0 (computable)
The function will use propositional logic (https://en.wikipedia.org/wiki/Propositional_calculus)
"""
def evaluate(self,id):
if id in os.environ : if ' ' in cmd.split('/')[len(cmd.split('/'))-1] :
# p = cmd.split('/')[len(cmd.split('/'))-1].split(' ')
# We can inspect to make sure the environment variable is not a path or filename. name = p[0]
# Using propositional logic we proceed as follows: args = " ".join(p[1:])
# - (p) We determine if the value is an folder or file name (using regex)
# - (q) In case of a file or folder we check for existance
# The final result is a conjuction of p and q
#
value = os.environ[id]
expressions = [os.sep,'(\\.\w+)$']
p = sum([ re.search(xchar,value) is not None for xchar in expressions])
q = os.path.exists(value)
return int(p and q)
else:
return 0
def composite (self):
#Analysis.init(self)
r = [ self.evaluate(id) for id in self.values] ;
N = len(r)
n = sum(r)
value = 100 * round(n/N,2)
missing = [self.values[i] for i in range(0,N) if r[i] == 0]
return dict(self.getNow(),**{"value":value,"missing":missing})
"""
This class is designed to handle analaysis of the a python virtual environment i.e deltas between requirments file and a virtualenv
@TODO: update the virtual environment
"""
class Sandbox(Analysis):
def __init__(self):
Analysis.__init__(self)
def init(self,conf):
#Analysis.init(self)
if os.path.exists(conf['sandbox']) :
self.sandbox_path = conf['sandbox']
else:
self.sandbox_path = None
if os.path.exists(conf['requirements']) :
self.requirements_path = conf['requirements']
else: else:
self.requirements_path = None name = cmd.split('/')[len(cmd.split('/'))-1]
args = " ".join(stream[index:]) if index > 0 else ""
def get_requirements (self): return [name,cmd,args]
f = open(self.requirements_path) def to_pandas(self,m):
return [ name.replace('-',' ').replace('_',' ') for name in f.read().split('\n') if name != ''] """
""" This function will convert the output of ps to a data-frame
This function will return the modules installed in the sandbox (virtual environment) @param m raw matrix i.e list of values like a csv
"""
def get_sandbox_requirements(self):
cmd = ['freeze']
xchar = ''.join([os.sep]*2)
pip_vm = ''.join([self.sandbox_path,os.sep,'bin',os.sep,'pip']).replace(xchar,os.sep)
cmd = [pip_vm]+cmd
r = subprocess.check_output(cmd).split('\n')
return [row.replace('-',' ').replace('_',' ') for row in r if row.strip() != '']
def evaluate(self):
pass
def reboot(self,rows,limit=None) :
return sum([ len(item['missing']) for item in rows ]) > 0
"""
This function returns the ratio of existing modules relative to the ones expected
"""
def composite(self):
Analysis.init(self)
if self.sandbox_path and self.requirements_path :
required_modules= self.get_requirements()
sandbox_modules = self.get_sandbox_requirements()
N = len(required_modules)
n = len(Set(required_modules) - Set(sandbox_modules))
value = round(1 - (n/N),2)*100
missing = list(Set(required_modules) - Set(sandbox_modules))
return dict(self.getNow(),**{"value":value,"missing":missing}) """
else: m = [item for item in m if len(item) != len (m[0])]
return None m = "\n".join(m[1:])
df = pd.read_csv(pd.compat.StringIO(m),sep=self.xchar)
""" df.columns =['pid','user','mem','cpu','status','started','name','cmd','args']
This class performs the analysis of a list of processes and determines return df
The class provides a quantifiable measure of how many processes it found over all def empty(self,name):
""" return pd.DataFrame([{"pid":None,"user":None,"mem":0,"cpu":0,"status":"X","started":None,"name":name,"cmd":None,"args":None}])
class ProcessCounter(Analysis): def parse(self,rows):
def __init__(self): m = []
Analysis.__init__(self) TIME_INDEX = 5
def init(self,names): ARGS_INDEX = 6
#Analysis.init(self)
self.names = names
def evaluate(self,name):
cmd = "".join(['ps -eo comm |grep ',name,' |wc -l'])
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
return int(handler.communicate()[0].replace("\n","") )
def composite(self):
#Analysis.init(self)
r = {}
for name in self.names :
r[name] = self.evaluate(name)
#N = len(r) for item in rows :
#n = sum(r) if rows.index(item) != 0 :
#return n/N parts = item.split(self.xchar)
return dict(self.getNow(),**r) row = parts[:TIME_INDEX]
row.append(' '.join(parts[TIME_INDEX:ARGS_INDEX]))
""" row += self.get_app(parts[ARGS_INDEX:])
This class returns an application's both memory and cpu usage else:
""" row = item.split(self.xchar)
class DetailProcess(Analysis): row = (self.xchar.join(row)).strip()
def __init__(self): if len(row.replace(";","")) > 0 :
Analysis.__init__(self) m.append(row)
def init (self,names):
#Analysis.init(self)
self.names = names;
def getName(self):
return "apps"
def split(self,name,stream):
pattern = "(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)".replace(":name",name).strip() return m
g = re.match(pattern,stream.strip()) def get(self,**args):
if g :
return list(g.groups())+['1']+[name]
else:
return ''
def reboot(self,rows,conf=None) :
return np.sum([int(item['label']=='crash') for item in rows]) > 0
def parse(self,row,fields):
""" """
The last field should be the command in its integrity This function returns a the output of a command to the calling code that is piped into the class
@pre len(fields) > len(row) The output will be stored in a data frame with columns
@
""" """
r = {} try:
now = self.getNow()
r['date'] = now
row = [term for term in row.split() if term.strip() != '']
for name in fields :
index = fields.index(name)
r[name] = row[index] if row else 0 handler = subprocess.Popen(self.cmd,shell=True,stdout=subprocess.PIPE)
if name not in ['user','cmd','status','pid'] : stream = handler.communicate()[0]
r[name] = float(r[name]) rows = stream.split('\n')
r[name] = row[index: ] if row else [] df = self.to_pandas(self.parse(rows))
# r = pd.DataFrame()
# Let's set the status give the data extracted if 'filter' in args :
# pattern = "|".join(args['filter'])
if r['status'] == 0 : r = df[df.name.str.contains(pattern)]
r['status'] = 'crash' for name in args['filter'] :
elif 'Z' in r['status'] : filter = "name == '"+name+"'"
r['status'] = 'zombie' if r.query(filter).size == 0 :
elif r['memory_usage'] > 0 and r['cpu_usage'] > 0: r = r.append(self.empty(name))
r['status'] = 'running' return r
else: except Exception,e:
r['status'] = 'idle' print (e)
return r return None
def evaluate(self,name=None) :
if name is None :
name = ".*"
fields = ["user","pid","memory_usage","cpu_usage","memory_available","status","cmd"]
cmd = "ps -eo user,pid,pmem,pcpu,vsize,stat,command|grep -Ei \":app\"".replace(":app",name)
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
logs = handler.communicate()[0].split('\n')
logs = [row for row in logs if (row.strip() != '') and ('grep -Ei' in row )== False ]
if len(logs) == 0:
return [dict(self.parse('',fields),**{'label':name}) ]
else :
return [dict(self.parse(row,fields),**{'label':name}) for row in logs if row.strip() != '' and 'grep' not in row and '-Ei' not in row]
def status(self,row):
x = row['memory_usage']
y = row['cpu_usage']
z = row['memory_available']
if z :
if y :
return "running"
return "idle"
else:
return "crash"
#def format(self,row):
# r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"proc_count":row[3],"label":self.cleanup(row[4])}
# status = self.status(r)
# r['status'] = status
# return r
def composite(self): class Folders(SmartTop):
ma = [] """
for name in self.names: This class will assess a folder and produce a report in a data-frame that can be later on used for summary statistics
row = self.evaluate(name) """
ma += row
return ma
"""
This class evaluates a list of folders and provides detailed informaiton about age/size of each file
Additionally the the details are summarized in terms of global size, and oldest file.
"""
class FileWatch(Analysis):
def __init__(self): def __init__(self):
pass pass
def init(self,folders): def _get(self,dir_path,r=[]):
self.folders = folders;
self.cache = []
def getName(self):
return "folders"
def evaluate(self,dir_path,r=[]):
"""
This function will recursively scan a folder and retrieve file sizes and age of the files.
The data will be returned as an array of {size,age,label} items
"""
for child in os.listdir(dir_path): for child in os.listdir(dir_path):
path = os.path.join(dir_path, child) path = os.path.join(dir_path, child)
if os.path.isdir(path): if os.path.isdir(path):
self.evaluate(path,r) self._get(path,r)
else: else:
size = os.path.getsize(path) size = os.path.getsize(path)
@ -277,62 +115,25 @@ class FileWatch(Analysis):
file_date = datetime.datetime.fromtimestamp(file_date) file_date = datetime.datetime.fromtimestamp(file_date)
now = datetime.datetime.now() now = datetime.datetime.now()
age = (now - file_date ).days age = (now - file_date ).days
r.append({"label":path,"size":size,"age":age,"date":self.getNow()})
name = os.path.basename(path)
r.append({"name":name,"path":path,"size":size,"age":age})
return r return r
def reboot(self,rows,limit) : def get(self,**args):
return np.sum([ 1 for item in rows if rows['size'] > limit ]) > 0 # path = args['path']
def composite(self):
out = []
for folder in self.folders :
r = self.evaluate(folder,[])
file_count = len(r)
age_mode = [item[0] for item in stats.mode([item['age'] for item in r])]
size_mode= [item[0] for item in stats.mode([item['size'] for item in r])]
age = {"mode":age_mode,"median":np.median([item['age'] for item in r] ),"mean":np.mean([item['age'] for item in r] ),"var":np.mean([item['age'] for item in r])}
size = {"mode":size_mode,"median":np.median([item['size'] for item in r] ), "mean":np.mean([item['size'] for item in r] ),"var":np.mean([item['size'] for item in r])}
out.append({"label":folder,"date":self.getNow(),"stats":{"age":age,"size":size,"file_count":file_count},"logs":r})
return out
# class Monitor (Thread):
# def __init__(self,pConfig,pWriter,id='processes') :
# Thread.__init__(self)
# self.config = pConfig[id] if isinstance(args['path'],list) == False:
# self.writer = pWriter; paths = [args['path']]
# self.logs = [] else:
# self.handler = self.config['class'] paths = args['path']
# self.mconfig = self.config['config'] _out = pd.DataFrame()
for path in paths :
if os.path.exists(path) :
#
# def stop(self): # If the folder does NOT exists it should not be treated.
# self.keep_running = False #
# def run(self): r = pd.DataFrame(self._get(path))
# r = {} r = pd.DataFrame([{"name":path,"files":r.shape[0],"age_in_days":r.age.mean(),"size_in_kb":r.size.sum()}])
# self.keep_running = True _out = _out.append(r)
# lock = RLock() return _out
# while self.keep_running:
# lock.acquire()
# for label in self.mconfig:
# self.handler.init(self.mconfig[label])
# r = self.handler.composite()
# self.writer.write(label=label,row = r)
# time.sleep(2)
# lock.release()
# self.prune()
# TIME_LAPSE = 60*2
# time.sleep(TIME_LAPSE)
# print "Stopped ..."
# def prune(self) :
# MAX_ENTRIES = 100
# if len(self.logs) > MAX_ENTRIES :
# BEG = len(self.logs) - MAX_SIZE -1
# self.logs = self.logs[BEG:]

Loading…
Cancel
Save