#import multiprocessing from threading import Thread, RLock #from utils import transport from utils.transport import * from utils.ml import AnomalyDetection,ML import time import monitor import sys import os import datetime class BasicWorker(Thread): def __init__(self,config,lock): Thread.__init__(self) self.reader_class = config['store']['class']['read'] self.write_class = config['store']['class']['write'] self.rw_args = config['store']['args'] self.factory = DataSourceFactory() self.lock = lock """ This class is intended to collect data given a configuration """ class Top(Thread): def __init__(self,_config,lock): Thread.__init__(self) self.lock = lock self.reader_class = _config['store']['class']['read'] self.write_class = _config['store']['class']['write'] self.rw_args = _config['store']['args'] self.factory = DataSourceFactory() self.name = 'Zulu-Top' self.quit = False className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()']) self.handler = eval(className) self.config = _config['monitor']['processes']['config'] def stop(self): self.quit = True def run(self): while self.quit == False: print ' ** ',self.name,datetime.datetime.today() for label in self.config : self.lock.acquire() gwriter = self.factory.instance(type=self.write_class,args=self.rw_args) apps = self.config[label] self.handler.init(apps) r = self.handler.composite() gwriter.write(label=label,row=r) time.sleep(5) self.lock.release() if 'MONITOR_CONFIG_PATH' in os.environ: # # This suggests we are in development mode # break ELLAPSED_TIME = 60*20 time.sleep(ELLAPSED_TIME) print "Exiting ",self.name class Learner(Thread) : """ This function expects paltform config (store,learner) It will leverage store and learner in order to operate """ def __init__(self,config,lock): Thread.__init__(self) self.name = 'Zulu-Learner' self.lock = lock self.reader_class = config['store']['class']['read'] self.write_class = config['store']['class']['write'] self.rw_args = config['store']['args'] self.features = config['learner']['anomalies']['features'] self.yo = config['learner']['anomalies']['label'] self.apps = config['learner']['anomalies']['apps'] self.factory = DataSourceFactory() self.quit = False def stop(self): self.quit = True """ This function will initiate learning every (x-hour) If there is nothing to learn the app will simply go to sleep """ def run(self): reader = self.factory.instance(type=self.reader_class,args=self.rw_args) data = reader.read() # # Let's make sure we extract that which has aleady been learnt # if 'learn' in data: r = data['learn'] del data['learn'] r = ML.Extract('label',r) logs = [row[0] for row in r] logs = list(set(logs)) else: logs = [] # # In order to address the inefficiencies below, we chose to adopt the following policy # We don't learn that which is already learnt, This measure consists in filtering out the list of the apps that already have learning data # self.apps = list(set(self.apps) - set(logs)) while self.quit == False: r = {} lapps = list(self.apps) print ' ** ',self.name,datetime.datetime.today() for key in data : logs = data[key] # # There poor design at this point, we need to make sure things tested don't get tested again # This creates innefficiencies (cartesian product) # for app in lapps: handler = AnomalyDetection() value = handler.learn(logs,'label',app,self.features,self.yo) if value is not None: if key not in r: r[key] = {} r[key][app] = value i = lapps.index(app) del lapps[i] # # This offers a clean write to the data store upon value retrieved # The removal of the application enables us to improve efficiency (among other things) # value = dict(value,**{"features":self.features}) self.lock.acquire() writer = self.factory.instance(type=self.write_class,args=self.rw_args) writer.write(label='learn',row=value) self.lock.release() # # Usually this is used for development # @TODO : Remove this and find a healthy way to stop the server # if 'MONITOR_CONFIG_PATH' in os.environ: # # This suggests we are in development mode # break TIME_ELLAPSED = 60*120 #-- Every 2 hours time.sleep(TIME_ELLAPSED) print "Exiting ",self.name class FileWatchWorker(BasicWorker): def __init__(self,config,lock): BasicWorker.__init__(self,config,lock) self.name = "Zulu-FileWatch" self.config = config ; self.folder_config = config['monitor']['folders']['config'] self.quit = False def stop(self): self.quit = True def run(self): TIME_ELAPSED = 60 * 10 handler = monitor.FileWatch() while self.quit == False : r = [] print ' ** ',self.name,datetime.datetime.today() for id in self.folder_config : folders = self.folder_config [id] handler.init(folders) xo = handler.composite() # # We should perform a distribution analysis of the details in order to have usable data # xo_age = [row['age'] for row in xo[0]['details']] xo_size= [row['size'] for row in xo[0]['details']] xo[0]['details'] = {"id":id, "age":ML.distribution(xo_age,self.lock),"size":ML.distribution(xo_size,self.lock)} # # Now we can save the file # self.lock.acquire() writer = self.factory.instance(type=self.write_class,args=self.rw_args) writer.write(label='folders',row=xo) self.lock.release() if 'MONITOR_CONFIG_PATH' in os.environ: # # This suggests we are in development mode # break time.sleep(TIME_ELAPSED) print 'Exiting ',self.name """ This class is a singleton designed to start quit dependent threads * monitor is designed to act as a data collection agent * learner is designed to be a learner i.e machine learning model(s) @TODO: - How to move them to processes that can be read by the os (that would allow us to eat our own dog-food) - Additionally we also need to have a pruning thread, to control the volume of data we have to deal with.This instills the "will to live" in the application """ class ThreadManager: Pool = {} @staticmethod def start(config): lock = RLock() ThreadManager.Pool['monitor'] = Top(config,lock) ThreadManager.Pool['learner'] = Learner(config,lock) ThreadManager.Pool['file-watch'] = FileWatchWorker(config,lock) for id in ThreadManager.Pool : thread = ThreadManager.Pool[id] thread.start() @staticmethod def stop(): for id in ThreadManager.Pool : thread = ThreadManager.Pool[id] thread.stop() @staticmethod def status(): r = {} for id in ThreadManager.Pool : thread = ThreadManager.Pool[id] r[id] = thread.isAlive() class Factory : """ This function will return an instance of an object in the specified in the configuration file """ @staticmethod def instance(id,config): if id in config['monitor'] : className = config['monitor'][id]['class'] ref = "".join(["monitor.",className,"()"]) ref = eval(ref) return {"class":ref,"config":config['monitor'][id]["config"]} else: return None if __name__ =='__main__' : import utils.params as SYS_ARGS import json PARAMS = SYS_ARGS.PARAMS f = open(PARAMS['path']) CONFIG = json.loads(f.read()) f.close() ThreadManager.start(CONFIG)