CO - actor refactor for killing and starting an application

community
Steve L. Nyemba 7 years ago
parent 31ff495f74
commit 2a3a0fcb96

@ -19,19 +19,36 @@ from monitor import ProcessCounter
from utils.transport import QueueListener, QueueWriter, QueueReader
from utils.params import PARAMS
from ngram import NGram as ng
class Actor(Thread):
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
class Actor():
@staticmethod
def instance(name,args):
"""
This function is a singleton that acts as a factory object for all the instances of this subclass
@param name name of the class to instantiate
@param args arguments to be passed in {configuration}
"""
o = None
try:
o = eval("".join([name,"()"]))
o.init(args)
except Exception,e:
print e
return o
def __init__(self):
Thread.__init__(self)
pass
def getIdentifier(self):
return self.__class__.__name__.lower()
"""
Initializing the class with configuration. The configuration will be specific to each subclass
@param args arguments the class needs to be configured
"""
def init(self,config,item=None):
self.config = config
self.item = item
def init(self,args):
self.config = args
def process(self,item):
pass
def isValid(self,item):
@ -45,31 +62,123 @@ class Actor(Thread):
except Exception,e:
pass
def run(self):
if self.item is not None:
self.process(self.item)
# def run(self):
# if self.item is not None:
# self.process(self.item)
"""
Sending a message to a queue with parameters to,from,content
"""
def post(self,**args):
pass
"""
This is designed to handle folders i.e cleaning/archiving the folders
class Apps(Actor) :
"""
This class is designed to handle application, restart, if need be.
conf{app-name:{args}}
"""
def __init__(self):
Actor.__init__(self)
self.ng = None
def init(self,config) :
"""
This function will initialize the the actor with applications and associated arguments
@param args {"apps_o":"","app_x":params}
"""
Actor.init(self,config)
self.ng = ng(self.config.keys())
def can_start(self,name):
"""
This function is intended to determine if it is possible to boot an application
"""
items = self.ng.search(name)
if len(items) == 0 :
return False
else:
return items[0][1] > 0.01
def startup(self,name) :
"""
This function is intended to start a program given the configuration
"""
items = self.ng.search(name)[0]
app = items[0]
args = self.config[app]
cmd = " ".join([app,args])
self.execute([app,args])
def kill(self,name) :
args = "".join(["-eo pid,command|grep -Ei ",name.lower(),'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
self.execute(["ps",args])
def analyze(self,logs) :
"""
This function is designed to analyze a few logs and take appropriate action
@param logs logs of application/process data; folder analysis or sandbox analysis
"""
for item in self.logs :
name = item['label']
if self.can_start(name) :
self.startup(name)
#
#
class Mailer (Actor):
"""
conf = {uid:<account>,host:<host>,port:<port>,password:<password>}
"""
def __init__(self,conf) :
self.uid = conf['uid']
try:
self.handler = smtplib.SMTP_SSL(conf['host'],conf['port'])
r = self.handler.login(self.uid,conf['password'])
#
# @TODO: Check the status of the authentication
# If not authenticated the preconditions have failed
#
except Exception,e:
print e
self.handler = None
pass
def send(self,**args) :
subject = args['subject']
message = args['message']
to = args['to']
if '<' in message and '>' in message :
message = MIMEText(message,'html')
else:
message = MIMEText(message,'plain')
message['From'] = self.uid
message['To'] = to
message['Subject'] = subject
return self.handler.sendmail(self.uid,to,message.as_string())
def close(self):
self.handler.quit()
"""
class Folders(Actor):
def init(self,config,item):
"""
This is designed to handle folders i.e cleaning/archiving the folders
if the user does NOT have any keys to cloud-view than she will not be able to archive
"""
def init(self,**args):
Actor.init(self,config,item)
self.lfolders = config['folders']
self.config = config['actions']['folders']
self.threshold = self.get_size(self.config['threshold'])
self.lfolders = args['folders'] #config['folders']
self.action = args['action'] #{clear,archive} config['actions']['folders']
self.threshold = self.get_size( args['threshold']) #self.config['threshold'])
self.item = item
def archive(self,item):
"""
This function will archive all files in a given folder
@pre : isValid
"""
"""
This function will archive all files in a given folder
@pre : isValid
"""
folder = item['label']
signature='-'.join([str(item['date']),str(item['count']),'-files'])
tarball=os.sep([folder,signature])
@ -82,12 +191,12 @@ class Folders(Actor):
pass
def clean(self,item):
"""
This function consists in deleting files from a given folder
"""
"""
This function consists in deleting files from a given folder
"""
rpath = item['label']
lists = os.listdir(item['label'])
for name in list() :
files = os.listdir(item['label'])
for name in list(files) :
path = os.sep([item['label'],name])
if os.path.isdir(path) :
shutil.rmtree(path)
@ -105,123 +214,127 @@ class Folders(Actor):
return float(value.upper().replace('MB','').strip()) * units[key]
def isvalid(self,item):
"""
This function returns whether the following :
p : folder exists
q : has_reached threashold
"""
"""
This function returns whether the following :
p : folder exists
q : has_reached threashold
"""
p = os.path.exists(item['label']) and item['label'] in self.lfolders
q = self.get_size(item['size']) >= self.threshold
return p and q
def process(self,item):
if self.isValid(item) :
name = self.config['action']
stream = "".join([name,'(',json.dumps(item),')'])
eval(stream)
class Kill(Actor):
def isValid(self,item):
return (item is not None) and (item in self.config)
def process(self,item):
args = "".join(["-eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
self.execute(["ps",args])
#
# We need to make sure we can get assess the process on this server
#
def analyze(self,logs):
r = {'clean':self.clean,'archive':self.archive}
for item in logs :
if self.isValid(item) :
id = self.config['action'].strip()
pointer = r[id]
pointer (item)
# class Kill(Actor):
# def isValid(self,item):
# return (item is not None) and (item in self.config)
# def process(self,item):
# args = "".join(["-eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
# self.execute(["ps",args])
# #
# # We need to make sure we can get assess the process on this server
# #
# class Start(Actor):
# def __init__(self):
# Actor.__init__(self)
# self.ng = None
# def init(self,config,item):
# Actor.init(self,config,item)
# self.config = config['apps']
# self.ng = ng(self.config.keys())
# def isValid(self,name):
# items = self.ng.search(name)
# if len(items) == 0 :
# return False
# else:
# return items[0][1] > 0.1
# def process(self,row):
# name = row['label']
# items = self.ng.search(name)[0]
# app = items[0]
# args = self.config[app]
# cmd = " ".join([app,args])
# self.execute([app,args])
# """
# This class is designed to handle applications i.e start/stopping applications
# @TODO: Assess if a reboot is required, by looking at the variance/anomaly detection
# """
# class Apps(Actor):
# def __init__(self):
# Actor.__init__(self)
# self.crashes = []
# self.running = []
# def isValid(self,rows):
# status = [row['status'] for row in rows]
# return 'crash' in status
# def classify(self,rows):
# self.crashes = []
# self.running = []
# for row in rows:
# if row['status'] == 'crash' :
# self.crashes.append(row)
# else:
# self.running.append(row)
# def reboot(self):
# for row_run in self.running:
# pass
# def start(self):
# for row_crash in self.crashes:
# thread = Start()
# thread.init(self.config,row_crash)
# thread.daemon = True
# thread.start()
# def process(self,rows):
# self.classify(rows)
# if self.crashes :
# self.start()
# if self.running:
# self.reboot()
class Start(Actor):
def __init__(self):
Actor.__init__(self)
self.ng = None
def init(self,config,item):
Actor.init(self,config,item)
self.config = config['apps']
self.ng = ng(self.config.keys())
def isValid(self,name):
items = self.ng.search(name)
if len(items) == 0 :
return False
else:
return items[0][1] > 0.1
def process(self,row):
name = row['label']
items = self.ng.search(name)[0]
app = items[0]
args = self.config[app]
cmd = " ".join([app,args])
self.execute([app,args])
"""
This class is designed to handle applications i.e start/stopping applications
@TODO: Assess if a reboot is required, by looking at the variance/anomaly detection
"""
class Apps(Actor):
def __init__(self):
Actor.__init__(self)
self.crashes = []
self.running = []
def isValid(self,rows):
status = [row['status'] for row in rows]
return 'crash' in status
def classify(self,rows):
self.crashes = []
self.running = []
for row in rows:
if row['status'] == 'crash' :
self.crashes.append(row)
else:
self.running.append(row)
def reboot(self):
for row_run in self.running:
pass
def start(self):
for row_crash in self.crashes:
thread = Start()
thread.init(self.config,row_crash)
thread.daemon = True
thread.start()
def process(self,rows):
self.classify(rows)
if self.crashes :
self.start()
if self.running:
self.reboot()
class Event(Thread):
def __init__(self,config):
pass
def run(self):
"""
The orchestrator class is designed to aggregate actions and communicate back to the caller
Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor
"""
The orchestrator class is designed to aggregate actions and communicate back to the caller
Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor
The orchestrator is implemented using a simple iterator design-pattern
@TODO: action specifications should be provided remotely
"""
The orchestrator is implemented using a simple iterator design-pattern
@TODO: action specifications should be provided remotely
"""
pass
class Orchestrator(Actor):
def __init__(self,config=None):
Actor.__init__(self)
if config is None:
f = open(PARAMS['path'])
config = json.loads(f.read())
f.close()
# if config is None:
# f = open(PARAMS['path'])
# config = json.loads(f.read())
# f.close()
self.config = config
Actor.__init__(self)

Loading…
Cancel
Save