From 5083ea7c9099dbd94280f3aeb59ce5b37c7a889a Mon Sep 17 00:00:00 2001 From: steve Date: Wed, 3 May 2017 15:14:45 -0500 Subject: [PATCH] Handling of actions @TODO: Folder clean/archive --- requirements.txt | 15 ++- src/utils/agents/actor.py | 264 +++++++++++++++++++------------------- 2 files changed, 139 insertions(+), 140 deletions(-) diff --git a/requirements.txt b/requirements.txt index b7bb58c..c83f924 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,16 @@ -aniso8601==1.2.0 -click==6.6 -couchdbkit==0.6.5 Flask==0.11.1 Flask-Session==0.3.0 Flask-SocketIO==2.8.2 -http-parser==0.8.3 -itsdangerous==0.24 Jinja2==2.8 MarkupSafe==0.23 +Werkzeug==0.11.11 +aniso8601==1.2.0 +argparse==1.2.1 +click==6.6 +couchdbkit==0.6.5 +http-parser==0.8.3 +itsdangerous==0.24 +ngram==3.3.0 numpy==1.11.3 pika==0.10.0 python-dateutil==2.6.0 @@ -17,4 +20,4 @@ pytz==2016.10 restkit==4.2.2 six==1.10.0 socketpool==0.5.3 -Werkzeug==0.11.11 +wsgiref==0.1.2 diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index 51707d0..7103bb8 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -17,52 +17,46 @@ import subprocess from monitor import ProcessCounter from utils.transport import QueueListener, QueueWriter, QueueReader from utils.params import PARAMS -class Actor: - def __init__(self): - pass - def getIdentifier(self): - return self.__class__.__name__.lower() +from ngram import NGram as ng +class Actor(Thread): + def __init__(self): + Thread.__init__(self) + pass + def getIdentifier(self): + return self.__class__.__name__.lower() """ Initializing the class with configuration. The configuration will be specific to each subclass """ - def init(self,config): + def init(self,config,item=None): self.config = config + self.item = item def process(self,item): pass def isValid(self,item): return False - def execute(self,cmd): - stream = None - try: - handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) - stream = handler.communicate()[0] - except Exception,e: - pass - return stream - - """ - Sending a message to a queue with parameters to,from,content - """ - def post(self,**args): - # to = args['to'] - # content = args['content'] - # message = {"from":self.getIdentifier(),"to":to,"content":content} - # host = self.config['api'] - # uid = self.config['key'] - # qid = to#self.conorfig['id'] - - # qwriter = QueueWriter(host=host,uid=uid,qid=qid) - # qwriter.init(qid) - # qwriter.write(label=qid,row=content) - # #qwriter.close() + def execute(self,cmd): + stream = None + try: + subprocess.call (cmd,shell=False) + #stream = handler.communicate()[0] + except Exception,e: pass + + def run(self): + if self.item is not None: + self.process(self.item) + """ + Sending a message to a queue with parameters to,from,content + """ + def post(self,**args): + pass class Folders(Actor): - def isvalid(self,item): - print self.conf - def process(self,item): - print item + def isvalid(self,item): + print self.conf + def process(self,item): + print item class Kill(Actor): def isValid(self,item): @@ -73,31 +67,68 @@ class Kill(Actor): # # We need to make sure we can get assess the process on this server # + class Start(Actor): - def __init__(self): - Actor.__init__(self) - def isValid(self,name): - return name in self.config + def __init__(self): + Actor.__init__(self) + self.ng = None + + def init(self,config,item): + Actor.init(self,config,item) + self.config = config['start'] + self.ng = ng(self.config.keys()) + + def isValid(self,name): + items = self.ng.search(name) + if len(items) == 0 : + return False + else: + return items[0][1] > 0.1 - def process(self,name): - if name in self.config : - item = self.config['actions']['apps']['start'][name] - path = item['path'] - args = item['args'] if 'args' in item else '' - cmd = " ".join([path,args]) - self.execute(cmd) + def process(self,row): + name = row['label'] + items = self.ng.search(name)[0] + app = items[0] + + + args = self.config[app] + cmd = " ".join([app,args]) + + self.execute([app,args]) +""" + This class is designed to handle applications i.e start/stopping applications + @TODO: Assess if a reboot is required, by looking at the variance/anomaly detection +""" class Apps(Actor): def __init__(self): Actor.__init__(self) + self.crashes = [] + self.running = [] + def isValid(self,rows): status = [row['status'] for row in rows] return 'crash' in status - + + def classify(self,rows): + self.crashes = [] + self.running = [] + for row in rows: + if row['status'] == 'crash' : + self.crashes.append(row) + else: + self.running.append(row) + def process(self,rows): - rows = [row for row in rows if row['status'] == 'crash'] : - handler = Start() - for app in rows: - handler.process(app['label']) + #rows = [row for row in rows if row['status'] == 'crash'] : + self.classify(rows) + #handler = Start() + #handler.init(self.config) + #[handler.process(row_crash) for row_crash in self.crashes ] + for row_crash in self.crashes: + thread = Start() + thread.init(self.config,row_crash) + thread.daemon = True + thread.start() """ The orchestrator class is designed to aggregate actions and communicate back to the caller @@ -106,65 +137,46 @@ class Apps(Actor): The orchestrator is implemented using a simple iterator design-pattern @TODO: action specifications should be provided remotely """ -class Orchestrator(Actor,Thread): - def __init__(self,config=None): - Thread.__init__(self) - if config is None: - f = open(PARAMS['path']) - config = json.loads(f.read()) - f.close() - self.config = config - Actor.__init__(self) - self.actors = {"apps":Apps(),"folders":Folders()} - self.is_master_node = False - self.items = [] +class Orchestrator(Actor): + + def __init__(self,config=None): + Actor.__init__(self) + if config is None: + f = open(PARAMS['path']) + config = json.loads(f.read()) + f.close() + self.config = config + Actor.__init__(self) + self.actors = {"apps":Apps(),"folders":Folders()} + self.is_master_node = False + self.items = [] + # + # If the configuration only has id,key then this is NOT the maestro + # + host = config['api'] + qid = config['id'] + print "Initialized ***** ",self.getIdentifier(), " as ",config['id'] + # - # If the configuration only has id,key then this is NOT the maestro + # This object will have to request for the configuration # - host = config['api'] - qid = config['id'] - if 'actions' in config : - # - # We are to assume this is the maestro/main node, and it will make the configuration available to other nodes - # NOTE: This is has a predefined master thus is subject to known short comings (zombies) - # - - q = QueueWriter(host=host,uid=config['key'],qid=qid) - q.flush(config['id']) - q.write(label=qid,row=json.dumps(config['actions'])) - q.close() - self.is_master_node = True - - - else: - qid = config['master'] - q = QueueReader(host=host,uid=config['key'],qid=qid) - r = q.read() - q.close() - info = r[qid][0] - self.init(info) - print "Initialized ***** ",self.getIdentifier(), " as ",config['id'] - - # - # This object will have to request for the configuration - # - #for id in config['actions'] : - #conf = config['actions'][id] - #item = eval("".join([id,"(",json.dumps(conf),")"])) - #self.actors[id.lower()] = item + #for id in config['actions'] : + #conf = config['actions'][id] + #item = eval("".join([id,"(",json.dumps(conf),")"])) + #self.actors[id.lower()] = item """ This function is designed to provide the orchestrator a configuration @pre """ - def init(self,config): - - for id in config: - - setup_info = config[id] - item = eval("".join([id,"(",json.dumps(setup_info),")"])) - self.actors[id.lower()] = item + def init(self,config): + + for id in config: + + setup_info = config[id] + item = eval("".join([id,"(",json.dumps(setup_info),")"])) + self.actors[id.lower()] = item - def callback(self,channel,method,header,stream): + def callback(self,channel,method,header,stream): message = json.loads(stream) if 'content' in message : @@ -174,23 +186,26 @@ class Orchestrator(Actor,Thread): to = message['to'] if isinstance(content,basestring) and content.lower() in ['quit'] or to=='quit': if content.lower() == 'quit' or to == 'quit': + print '**** closing ',self.getIdentifier() channel.close() else: id = to.lower() actor = self.actors[id] - print [id,actor.isValid(content)] - if actor is not None and actor.isValid(content) : + + if actor is not None and actor.isValid(content) : + actor.init(self.config['actions']) actor.process(content) else: content = {"status":"invalid","content":content} #self.post(to=sender,content=content) - def run(self): - info = {} - host = self.config['api'] - uid = self.config['key'] - qid = self.config['id'] + def run(self): + + info = {} + host = self.config['api'] + uid = self.config['key'] + qid = self.config['id'] qlistener = QueueListener(qid=qid,uid=uid,host=host) qlistener.callback = self.callback @@ -201,29 +216,10 @@ class Orchestrator(Actor,Thread): This class is designed to send a message to a given AMQP enpoint The AMQP endpoint is implemented by QueueWriter class """ -class Alert(Actor): - def process(self,item): - - pass - -config = { - "id":"demo-000", - "key":"[0v8]-247&7!v3","api":"localhost", - "actions":{ - "Kill":["firefox"], - "Alert":[] - } - - - } -#thread = Orchestrator(config) -#thread.start() - -thread = Orchestrator() -thread.start() - -#config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"} -#actor = Kill(config) -#actor.start() +# class Alert(Actor): +# def process(self,item): +# pass -#config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"} +if __name__ == '__main__': + thread = Orchestrator() + thread.start()