parent
024ee9550f
commit
6d8ab4e412
@ -1,266 +0,0 @@
|
||||
#import multiprocessing
|
||||
from threading import Thread, RLock
|
||||
#from utils import transport
|
||||
from utils.transport import *
|
||||
from utils.ml import AnomalyDetection,ML
|
||||
import time
|
||||
import monitor
|
||||
import sys
|
||||
import os
|
||||
import datetime
|
||||
class BasicWorker(Thread):
|
||||
def __init__(self,config,lock):
|
||||
Thread.__init__(self)
|
||||
self.reader_class = config['store']['class']['read']
|
||||
self.write_class = config['store']['class']['write']
|
||||
self.rw_args = config['store']['args']
|
||||
self.factory = DataSourceFactory()
|
||||
self.lock = lock
|
||||
|
||||
"""
|
||||
This class is intended to collect data given a configuration
|
||||
|
||||
"""
|
||||
class Top(Thread):
|
||||
def __init__(self,_config,lock):
|
||||
Thread.__init__(self)
|
||||
self.lock = lock
|
||||
self.reader_class = _config['store']['class']['read']
|
||||
self.write_class = _config['store']['class']['write']
|
||||
self.rw_args = _config['store']['args']
|
||||
self.factory = DataSourceFactory()
|
||||
|
||||
self.name = 'Zulu-Top'
|
||||
self.quit = False
|
||||
|
||||
|
||||
className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()'])
|
||||
self.handler = eval(className)
|
||||
self.config = _config['monitor']['processes']['config']
|
||||
def stop(self):
|
||||
self.quit = True
|
||||
def run(self):
|
||||
while self.quit == False:
|
||||
print ' ** ',self.name,datetime.datetime.today()
|
||||
for label in self.config :
|
||||
self.lock.acquire()
|
||||
gwriter = self.factory.instance(type=self.write_class,args=self.rw_args)
|
||||
apps = self.config[label]
|
||||
self.handler.init(apps)
|
||||
r = self.handler.composite()
|
||||
|
||||
gwriter.write(label=label,row=r)
|
||||
time.sleep(5)
|
||||
self.lock.release()
|
||||
if 'MONITOR_CONFIG_PATH' in os.environ:
|
||||
#
|
||||
# This suggests we are in development mode
|
||||
#
|
||||
break
|
||||
ELLAPSED_TIME = 60*20
|
||||
time.sleep(ELLAPSED_TIME)
|
||||
print "Exiting ",self.name
|
||||
|
||||
class Learner(Thread) :
|
||||
|
||||
"""
|
||||
This function expects paltform config (store,learner)
|
||||
It will leverage store and learner in order to operate
|
||||
"""
|
||||
def __init__(self,config,lock):
|
||||
Thread.__init__(self)
|
||||
self.name = 'Zulu-Learner'
|
||||
self.lock = lock
|
||||
self.reader_class = config['store']['class']['read']
|
||||
self.write_class = config['store']['class']['write']
|
||||
self.rw_args = config['store']['args']
|
||||
self.features = config['learner']['anomalies']['features']
|
||||
self.yo = config['learner']['anomalies']['label']
|
||||
self.apps = config['learner']['anomalies']['apps']
|
||||
self.factory = DataSourceFactory()
|
||||
self.quit = False
|
||||
|
||||
def stop(self):
|
||||
self.quit = True
|
||||
"""
|
||||
This function will initiate learning every (x-hour)
|
||||
If there is nothing to learn the app will simply go to sleep
|
||||
"""
|
||||
def run(self):
|
||||
reader = self.factory.instance(type=self.reader_class,args=self.rw_args)
|
||||
data = reader.read()
|
||||
|
||||
|
||||
#
|
||||
# Let's make sure we extract that which has aleady been learnt
|
||||
#
|
||||
|
||||
if 'learn' in data:
|
||||
r = data['learn']
|
||||
del data['learn']
|
||||
|
||||
r = ML.Extract('label',r)
|
||||
logs = [row[0] for row in r]
|
||||
logs = list(set(logs))
|
||||
|
||||
|
||||
else:
|
||||
logs = []
|
||||
#
|
||||
# In order to address the inefficiencies below, we chose to adopt the following policy
|
||||
# We don't learn that which is already learnt, This measure consists in filtering out the list of the apps that already have learning data
|
||||
#
|
||||
self.apps = list(set(self.apps) - set(logs))
|
||||
while self.quit == False:
|
||||
r = {}
|
||||
lapps = list(self.apps)
|
||||
print ' ** ',self.name,datetime.datetime.today()
|
||||
for key in data :
|
||||
logs = data[key]
|
||||
#
|
||||
# There poor design at this point, we need to make sure things tested don't get tested again
|
||||
# This creates innefficiencies (cartesian product)
|
||||
#
|
||||
for app in lapps:
|
||||
handler = AnomalyDetection()
|
||||
value = handler.learn(logs,'label',app,self.features,self.yo)
|
||||
|
||||
if value is not None:
|
||||
|
||||
if key not in r:
|
||||
r[key] = {}
|
||||
r[key][app] = value
|
||||
i = lapps.index(app)
|
||||
del lapps[i]
|
||||
#
|
||||
# This offers a clean write to the data store upon value retrieved
|
||||
# The removal of the application enables us to improve efficiency (among other things)
|
||||
#
|
||||
value = dict(value,**{"features":self.features})
|
||||
self.lock.acquire()
|
||||
writer = self.factory.instance(type=self.write_class,args=self.rw_args)
|
||||
writer.write(label='learn',row=value)
|
||||
self.lock.release()
|
||||
|
||||
|
||||
#
|
||||
# Usually this is used for development
|
||||
# @TODO : Remove this and find a healthy way to stop the server
|
||||
#
|
||||
if 'MONITOR_CONFIG_PATH' in os.environ:
|
||||
#
|
||||
# This suggests we are in development mode
|
||||
#
|
||||
break
|
||||
|
||||
TIME_ELLAPSED = 60*120 #-- Every 2 hours
|
||||
time.sleep(TIME_ELLAPSED)
|
||||
print "Exiting ",self.name
|
||||
|
||||
class FileWatchWorker(BasicWorker):
|
||||
def __init__(self,config,lock):
|
||||
BasicWorker.__init__(self,config,lock)
|
||||
self.name = "Zulu-FileWatch"
|
||||
self.config = config ;
|
||||
self.folder_config = config['monitor']['folders']['config']
|
||||
self.quit = False
|
||||
def stop(self):
|
||||
self.quit = True
|
||||
def run(self):
|
||||
TIME_ELAPSED = 60 * 10
|
||||
handler = monitor.FileWatch()
|
||||
ml_handler = ML()
|
||||
while self.quit == False :
|
||||
r = []
|
||||
print ' ** ',self.name,datetime.datetime.today()
|
||||
for id in self.folder_config :
|
||||
folders = self.folder_config [id]
|
||||
handler.init(folders)
|
||||
xo = handler.composite()
|
||||
#
|
||||
# We should perform a distribution analysis of the details in order to have usable data
|
||||
#
|
||||
xrow = {}
|
||||
xrow[id] = []
|
||||
for xo_row in xo:
|
||||
xo_age = [row['age'] for row in xo_row['details']]
|
||||
xo_size= [row['size'] for row in xo_row['details']]
|
||||
xo_row['details'] = {"age":ML.distribution(xo_age,self.lock),"size":ML.distribution(xo_size,self.lock)}
|
||||
|
||||
xo_row['id'] = id
|
||||
xrow[id].append(xo_row)
|
||||
#
|
||||
# Now we can save the file
|
||||
#
|
||||
self.lock.acquire()
|
||||
writer = self.factory.instance(type=self.write_class,args=self.rw_args)
|
||||
writer.write(label='folders',row=xrow)
|
||||
self.lock.release()
|
||||
if 'MONITOR_CONFIG_PATH' in os.environ:
|
||||
#
|
||||
# This suggests we are in development mode
|
||||
#
|
||||
break
|
||||
time.sleep(TIME_ELAPSED)
|
||||
print 'Exiting ',self.name
|
||||
|
||||
"""
|
||||
This class is a singleton designed to start quit dependent threads
|
||||
* monitor is designed to act as a data collection agent
|
||||
* learner is designed to be a learner i.e machine learning model(s)
|
||||
@TODO:
|
||||
- How to move them to processes that can be read by the os (that would allow us to eat our own dog-food)
|
||||
- Additionally we also need to have a pruning thread, to control the volume of data we have to deal with.This instills the "will to live" in the application
|
||||
"""
|
||||
class ThreadManager:
|
||||
|
||||
Pool = {}
|
||||
@staticmethod
|
||||
def start(config):
|
||||
lock = RLock()
|
||||
ThreadManager.Pool['monitor'] = Top(config,lock)
|
||||
ThreadManager.Pool['learner'] = Learner(config,lock)
|
||||
if 'folders' not in config :
|
||||
ThreadManager.Pool['file-watch'] = FileWatchWorker(config,lock)
|
||||
for id in ThreadManager.Pool :
|
||||
thread = ThreadManager.Pool[id]
|
||||
thread.start()
|
||||
@staticmethod
|
||||
def stop():
|
||||
for id in ThreadManager.Pool :
|
||||
thread = ThreadManager.Pool[id]
|
||||
thread.stop()
|
||||
@staticmethod
|
||||
def status():
|
||||
r = {}
|
||||
for id in ThreadManager.Pool :
|
||||
thread = ThreadManager.Pool[id]
|
||||
r[id] = thread.isAlive()
|
||||
|
||||
|
||||
|
||||
|
||||
class Factory :
|
||||
"""
|
||||
This function will return an instance of an object in the specified in the configuration file
|
||||
"""
|
||||
@staticmethod
|
||||
def instance(id,config):
|
||||
if id in config['monitor'] :
|
||||
className = config['monitor'][id]['class']
|
||||
ref = "".join(["monitor.",className,"()"])
|
||||
ref = eval(ref)
|
||||
return {"class":ref,"config":config['monitor'][id]["config"]}
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
if __name__ =='__main__' :
|
||||
import utils.params as SYS_ARGS
|
||||
import json
|
||||
PARAMS = SYS_ARGS.PARAMS
|
||||
f = open(PARAMS['path'])
|
||||
CONFIG = json.loads(f.read())
|
||||
f.close()
|
||||
ThreadManager.start(CONFIG)
|
||||
|
Loading…
Reference in new issue