diff --git a/src/utils/workers.py b/src/utils/workers.py deleted file mode 100644 index 332d361..0000000 --- a/src/utils/workers.py +++ /dev/null @@ -1,266 +0,0 @@ -#import multiprocessing -from threading import Thread, RLock -#from utils import transport -from utils.transport import * -from utils.ml import AnomalyDetection,ML -import time -import monitor -import sys -import os -import datetime -class BasicWorker(Thread): - def __init__(self,config,lock): - Thread.__init__(self) - self.reader_class = config['store']['class']['read'] - self.write_class = config['store']['class']['write'] - self.rw_args = config['store']['args'] - self.factory = DataSourceFactory() - self.lock = lock - -""" - This class is intended to collect data given a configuration - -""" -class Top(Thread): - def __init__(self,_config,lock): - Thread.__init__(self) - self.lock = lock - self.reader_class = _config['store']['class']['read'] - self.write_class = _config['store']['class']['write'] - self.rw_args = _config['store']['args'] - self.factory = DataSourceFactory() - - self.name = 'Zulu-Top' - self.quit = False - - - className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()']) - self.handler = eval(className) - self.config = _config['monitor']['processes']['config'] - def stop(self): - self.quit = True - def run(self): - while self.quit == False: - print ' ** ',self.name,datetime.datetime.today() - for label in self.config : - self.lock.acquire() - gwriter = self.factory.instance(type=self.write_class,args=self.rw_args) - apps = self.config[label] - self.handler.init(apps) - r = self.handler.composite() - - gwriter.write(label=label,row=r) - time.sleep(5) - self.lock.release() - if 'MONITOR_CONFIG_PATH' in os.environ: - # - # This suggests we are in development mode - # - break - ELLAPSED_TIME = 60*20 - time.sleep(ELLAPSED_TIME) - print "Exiting ",self.name - -class Learner(Thread) : - - """ - This function expects paltform config (store,learner) - It will leverage store and learner in order to operate - """ - def __init__(self,config,lock): - Thread.__init__(self) - self.name = 'Zulu-Learner' - self.lock = lock - self.reader_class = config['store']['class']['read'] - self.write_class = config['store']['class']['write'] - self.rw_args = config['store']['args'] - self.features = config['learner']['anomalies']['features'] - self.yo = config['learner']['anomalies']['label'] - self.apps = config['learner']['anomalies']['apps'] - self.factory = DataSourceFactory() - self.quit = False - - def stop(self): - self.quit = True - """ - This function will initiate learning every (x-hour) - If there is nothing to learn the app will simply go to sleep - """ - def run(self): - reader = self.factory.instance(type=self.reader_class,args=self.rw_args) - data = reader.read() - - - # - # Let's make sure we extract that which has aleady been learnt - # - - if 'learn' in data: - r = data['learn'] - del data['learn'] - - r = ML.Extract('label',r) - logs = [row[0] for row in r] - logs = list(set(logs)) - - - else: - logs = [] - # - # In order to address the inefficiencies below, we chose to adopt the following policy - # We don't learn that which is already learnt, This measure consists in filtering out the list of the apps that already have learning data - # - self.apps = list(set(self.apps) - set(logs)) - while self.quit == False: - r = {} - lapps = list(self.apps) - print ' ** ',self.name,datetime.datetime.today() - for key in data : - logs = data[key] - # - # There poor design at this point, we need to make sure things tested don't get tested again - # This creates innefficiencies (cartesian product) - # - for app in lapps: - handler = AnomalyDetection() - value = handler.learn(logs,'label',app,self.features,self.yo) - - if value is not None: - - if key not in r: - r[key] = {} - r[key][app] = value - i = lapps.index(app) - del lapps[i] - # - # This offers a clean write to the data store upon value retrieved - # The removal of the application enables us to improve efficiency (among other things) - # - value = dict(value,**{"features":self.features}) - self.lock.acquire() - writer = self.factory.instance(type=self.write_class,args=self.rw_args) - writer.write(label='learn',row=value) - self.lock.release() - - - # - # Usually this is used for development - # @TODO : Remove this and find a healthy way to stop the server - # - if 'MONITOR_CONFIG_PATH' in os.environ: - # - # This suggests we are in development mode - # - break - - TIME_ELLAPSED = 60*120 #-- Every 2 hours - time.sleep(TIME_ELLAPSED) - print "Exiting ",self.name - -class FileWatchWorker(BasicWorker): - def __init__(self,config,lock): - BasicWorker.__init__(self,config,lock) - self.name = "Zulu-FileWatch" - self.config = config ; - self.folder_config = config['monitor']['folders']['config'] - self.quit = False - def stop(self): - self.quit = True - def run(self): - TIME_ELAPSED = 60 * 10 - handler = monitor.FileWatch() - ml_handler = ML() - while self.quit == False : - r = [] - print ' ** ',self.name,datetime.datetime.today() - for id in self.folder_config : - folders = self.folder_config [id] - handler.init(folders) - xo = handler.composite() - # - # We should perform a distribution analysis of the details in order to have usable data - # - xrow = {} - xrow[id] = [] - for xo_row in xo: - xo_age = [row['age'] for row in xo_row['details']] - xo_size= [row['size'] for row in xo_row['details']] - xo_row['details'] = {"age":ML.distribution(xo_age,self.lock),"size":ML.distribution(xo_size,self.lock)} - - xo_row['id'] = id - xrow[id].append(xo_row) - # - # Now we can save the file - # - self.lock.acquire() - writer = self.factory.instance(type=self.write_class,args=self.rw_args) - writer.write(label='folders',row=xrow) - self.lock.release() - if 'MONITOR_CONFIG_PATH' in os.environ: - # - # This suggests we are in development mode - # - break - time.sleep(TIME_ELAPSED) - print 'Exiting ',self.name - -""" - This class is a singleton designed to start quit dependent threads - * monitor is designed to act as a data collection agent - * learner is designed to be a learner i.e machine learning model(s) - @TODO: - - How to move them to processes that can be read by the os (that would allow us to eat our own dog-food) - - Additionally we also need to have a pruning thread, to control the volume of data we have to deal with.This instills the "will to live" in the application -""" -class ThreadManager: - - Pool = {} - @staticmethod - def start(config): - lock = RLock() - ThreadManager.Pool['monitor'] = Top(config,lock) - ThreadManager.Pool['learner'] = Learner(config,lock) - if 'folders' not in config : - ThreadManager.Pool['file-watch'] = FileWatchWorker(config,lock) - for id in ThreadManager.Pool : - thread = ThreadManager.Pool[id] - thread.start() - @staticmethod - def stop(): - for id in ThreadManager.Pool : - thread = ThreadManager.Pool[id] - thread.stop() - @staticmethod - def status(): - r = {} - for id in ThreadManager.Pool : - thread = ThreadManager.Pool[id] - r[id] = thread.isAlive() - - - - -class Factory : - """ - This function will return an instance of an object in the specified in the configuration file - """ - @staticmethod - def instance(id,config): - if id in config['monitor'] : - className = config['monitor'][id]['class'] - ref = "".join(["monitor.",className,"()"]) - ref = eval(ref) - return {"class":ref,"config":config['monitor'][id]["config"]} - else: - return None - - -if __name__ =='__main__' : - import utils.params as SYS_ARGS - import json - PARAMS = SYS_ARGS.PARAMS - f = open(PARAMS['path']) - CONFIG = json.loads(f.read()) - f.close() - ThreadManager.start(CONFIG) - \ No newline at end of file