From 2a3a0fcb967d7777ab6c2e999937a3349790053a Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Wed, 23 Aug 2017 12:30:24 -0500 Subject: [PATCH] CO - actor refactor for killing and starting an application --- src/utils/agents/actor.py | 343 +++++++++++++++++++++++++------------- 1 file changed, 228 insertions(+), 115 deletions(-) diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index cda5980..c267bd4 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -19,19 +19,36 @@ from monitor import ProcessCounter from utils.transport import QueueListener, QueueWriter, QueueReader from utils.params import PARAMS from ngram import NGram as ng -class Actor(Thread): - def __init__(self): - Thread.__init__(self) +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +class Actor(): + @staticmethod + def instance(name,args): + """ + This function is a singleton that acts as a factory object for all the instances of this subclass + @param name name of the class to instantiate + @param args arguments to be passed in {configuration} + """ + o = None + try: + o = eval("".join([name,"()"])) + o.init(args) + except Exception,e: + print e + return o + def __init__(self): pass def getIdentifier(self): return self.__class__.__name__.lower() """ Initializing the class with configuration. The configuration will be specific to each subclass - + @param args arguments the class needs to be configured """ - def init(self,config,item=None): - self.config = config - self.item = item + def init(self,args): + self.config = args + def process(self,item): pass def isValid(self,item): @@ -45,31 +62,123 @@ class Actor(Thread): except Exception,e: pass - def run(self): - if self.item is not None: - self.process(self.item) + # def run(self): + # if self.item is not None: + # self.process(self.item) """ Sending a message to a queue with parameters to,from,content """ def post(self,**args): pass -""" - This is designed to handle folders i.e cleaning/archiving the folders +class Apps(Actor) : + """ + This class is designed to handle application, restart, if need be. + conf{app-name:{args}} + """ + def __init__(self): + Actor.__init__(self) + self.ng = None + def init(self,config) : + """ + This function will initialize the the actor with applications and associated arguments + @param args {"apps_o":"","app_x":params} + """ + Actor.init(self,config) + self.ng = ng(self.config.keys()) -""" + + def can_start(self,name): + """ + This function is intended to determine if it is possible to boot an application + + """ + items = self.ng.search(name) + if len(items) == 0 : + return False + else: + return items[0][1] > 0.01 + + def startup(self,name) : + """ + This function is intended to start a program given the configuration + """ + items = self.ng.search(name)[0] + app = items[0] + args = self.config[app] + cmd = " ".join([app,args]) + self.execute([app,args]) + + def kill(self,name) : + args = "".join(["-eo pid,command|grep -Ei ",name.lower(),'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9']) + self.execute(["ps",args]) + def analyze(self,logs) : + """ + This function is designed to analyze a few logs and take appropriate action + @param logs logs of application/process data; folder analysis or sandbox analysis + """ + for item in self.logs : + name = item['label'] + if self.can_start(name) : + self.startup(name) + # + # + +class Mailer (Actor): + """ + conf = {uid:,host:,port:,password:} + """ + def __init__(self,conf) : + self.uid = conf['uid'] + + + try: + + self.handler = smtplib.SMTP_SSL(conf['host'],conf['port']) + r = self.handler.login(self.uid,conf['password']) + # + # @TODO: Check the status of the authentication + # If not authenticated the preconditions have failed + # + except Exception,e: + print e + self.handler = None + pass + + + def send(self,**args) : + subject = args['subject'] + message = args['message'] + to = args['to'] + if '<' in message and '>' in message : + message = MIMEText(message,'html') + else: + message = MIMEText(message,'plain') + message['From'] = self.uid + message['To'] = to + message['Subject'] = subject + return self.handler.sendmail(self.uid,to,message.as_string()) + def close(self): + self.handler.quit() + + class Folders(Actor): - def init(self,config,item): + """ + This is designed to handle folders i.e cleaning/archiving the folders + if the user does NOT have any keys to cloud-view than she will not be able to archive + """ + + def init(self,**args): Actor.init(self,config,item) - self.lfolders = config['folders'] - self.config = config['actions']['folders'] - self.threshold = self.get_size(self.config['threshold']) + self.lfolders = args['folders'] #config['folders'] + self.action = args['action'] #{clear,archive} config['actions']['folders'] + self.threshold = self.get_size( args['threshold']) #self.config['threshold']) self.item = item def archive(self,item): - """ - This function will archive all files in a given folder - @pre : isValid - """ + """ + This function will archive all files in a given folder + @pre : isValid + """ folder = item['label'] signature='-'.join([str(item['date']),str(item['count']),'-files']) tarball=os.sep([folder,signature]) @@ -82,12 +191,12 @@ class Folders(Actor): pass def clean(self,item): - """ - This function consists in deleting files from a given folder - """ + """ + This function consists in deleting files from a given folder + """ rpath = item['label'] - lists = os.listdir(item['label']) - for name in list() : + files = os.listdir(item['label']) + for name in list(files) : path = os.sep([item['label'],name]) if os.path.isdir(path) : shutil.rmtree(path) @@ -105,123 +214,127 @@ class Folders(Actor): return float(value.upper().replace('MB','').strip()) * units[key] def isvalid(self,item): - """ - This function returns whether the following : - p : folder exists - q : has_reached threashold - """ + """ + This function returns whether the following : + p : folder exists + q : has_reached threashold + """ - p = os.path.exists(item['label']) and item['label'] in self.lfolders - + p = os.path.exists(item['label']) and item['label'] in self.lfolders q = self.get_size(item['size']) >= self.threshold return p and q - def process(self,item): - if self.isValid(item) : - - name = self.config['action'] - stream = "".join([name,'(',json.dumps(item),')']) - eval(stream) + def analyze(self,logs): + r = {'clean':self.clean,'archive':self.archive} + for item in logs : + if self.isValid(item) : + + id = self.config['action'].strip() + pointer = r[id] + pointer (item) + -class Kill(Actor): +# class Kill(Actor): - def isValid(self,item): - return (item is not None) and (item in self.config) - def process(self,item): - args = "".join(["-eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9']) - self.execute(["ps",args]) - # - # We need to make sure we can get assess the process on this server - # +# def isValid(self,item): +# return (item is not None) and (item in self.config) +# def process(self,item): +# args = "".join(["-eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9']) +# self.execute(["ps",args]) +# # +# # We need to make sure we can get assess the process on this server +# # -class Start(Actor): - def __init__(self): - Actor.__init__(self) - self.ng = None +# class Start(Actor): +# def __init__(self): +# Actor.__init__(self) +# self.ng = None - def init(self,config,item): - Actor.init(self,config,item) - self.config = config['apps'] - self.ng = ng(self.config.keys()) +# def init(self,config,item): +# Actor.init(self,config,item) +# self.config = config['apps'] +# self.ng = ng(self.config.keys()) - def isValid(self,name): - items = self.ng.search(name) - if len(items) == 0 : - return False - else: - return items[0][1] > 0.1 +# def isValid(self,name): +# items = self.ng.search(name) +# if len(items) == 0 : +# return False +# else: +# return items[0][1] > 0.1 - def process(self,row): - name = row['label'] - items = self.ng.search(name)[0] - app = items[0] +# def process(self,row): +# name = row['label'] +# items = self.ng.search(name)[0] +# app = items[0] - args = self.config[app] - cmd = " ".join([app,args]) +# args = self.config[app] +# cmd = " ".join([app,args]) - self.execute([app,args]) -""" - This class is designed to handle applications i.e start/stopping applications - @TODO: Assess if a reboot is required, by looking at the variance/anomaly detection -""" -class Apps(Actor): - def __init__(self): - Actor.__init__(self) - self.crashes = [] - self.running = [] +# self.execute([app,args]) +# """ +# This class is designed to handle applications i.e start/stopping applications +# @TODO: Assess if a reboot is required, by looking at the variance/anomaly detection +# """ +# class Apps(Actor): +# def __init__(self): +# Actor.__init__(self) +# self.crashes = [] +# self.running = [] - def isValid(self,rows): - status = [row['status'] for row in rows] - return 'crash' in status +# def isValid(self,rows): +# status = [row['status'] for row in rows] +# return 'crash' in status - def classify(self,rows): - self.crashes = [] - self.running = [] - for row in rows: - if row['status'] == 'crash' : - self.crashes.append(row) - else: - self.running.append(row) - def reboot(self): - for row_run in self.running: - pass - def start(self): - for row_crash in self.crashes: - thread = Start() - thread.init(self.config,row_crash) - thread.daemon = True - thread.start() +# def classify(self,rows): +# self.crashes = [] +# self.running = [] +# for row in rows: +# if row['status'] == 'crash' : +# self.crashes.append(row) +# else: +# self.running.append(row) +# def reboot(self): +# for row_run in self.running: +# pass +# def start(self): +# for row_crash in self.crashes: +# thread = Start() +# thread.init(self.config,row_crash) +# thread.daemon = True +# thread.start() - def process(self,rows): - self.classify(rows) - if self.crashes : - self.start() - if self.running: - self.reboot() +# def process(self,rows): +# self.classify(rows) +# if self.crashes : +# self.start() +# if self.running: +# self.reboot() + class Event(Thread): def __init__(self,config): pass def run(self): -""" - The orchestrator class is designed to aggregate actions and communicate back to the caller - Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor - - The orchestrator is implemented using a simple iterator design-pattern - @TODO: action specifications should be provided remotely -""" + """ + The orchestrator class is designed to aggregate actions and communicate back to the caller + Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor + + The orchestrator is implemented using a simple iterator design-pattern + @TODO: action specifications should be provided remotely + """ + pass class Orchestrator(Actor): def __init__(self,config=None): Actor.__init__(self) - if config is None: - f = open(PARAMS['path']) - config = json.loads(f.read()) - f.close() + # if config is None: + # f = open(PARAMS['path']) + # config = json.loads(f.read()) + # f.close() self.config = config Actor.__init__(self)