diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index b3cfdc3..04677bb 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -16,56 +16,27 @@ import os import subprocess from monitor import ProcessCounter from utils.transport import QueueListener, QueueWriter -class Actor(Thread): - @staticmethod - def instance(id,config): - pass - - def __init__(self,config): - Thread.__init__(self) - self.config = config - self.items = [] - self.__id = config['id'] +class Actor: + def __init__(self,config): + self.config = config def getIdentifier(self): - return self.__id + return self.__class__.__name__.lower() def init(self,litems): self.items = litems def process(self,item): pass + def isValid(self,item): + return False def execute(self,cmd): stream = None try: - print self.getIdentifier() - print cmd handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) stream = handler.communicate()[0] except Exception,e: pass return stream - def callback(self,channel,method,header,stream): - print [self.getIdentifier(),stream] - message = json.loads(stream) - content = message['content'] - sender = message['from'] - if content.lower() == 'quit' : - channel.close() - print " *** ",self.getIdentifier() - elif content.lower() == 'ping': - self.post(to=sender,content="1") - else: - self.process(content) - self.post(to=sender,content=content) - - #message = None - #try: - #message = json.loads(stream) - #except Exception, e: - #pass - #if message is not None: - #if 'id' in message : - #if 'payload' in message: - #self.execute(message['payload'] + """ Sending a message to a queue with parameters to,from,content """ @@ -84,8 +55,8 @@ class Actor(Thread): pass class Kill(Actor): - def __init__(self,config): - Actor.__init__(self,config) + def isValid(self,item): + return (item is not None) and (item in self.config) def process(self,item): cmd = "".join(["ps -eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9']) self.execute(cmd) @@ -95,41 +66,54 @@ class Kill(Actor): class Start(Actor): def __init__(self,config): Actor.__init__(self,config) - def process(self,item): + def isValid(self,name): + return name in self.config: + + def process(self,name): + item = self.config[name] path = item['path'] args = item['args'] if 'args' in item else '' cmd = " ".join([path,args]) self.execute(cmd) - -class Orchestrator(Actor): +""" + The orchestrator class is designed to aggregate actions and communicate back to the caller + Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor + + The orchestrator is implemented using a simple iterator design-pattern + @TODO: action specifications should be provided remotely +""" +class Orchestrator(Actor,Thread): def __init__(self,config): + Thread.__init__(self) + Actor.__init__(self,config) self.actors = {} - for id in config : - _config_ = config[id] - item = Actor.instance(id,config[id]) - self.actors[id] = item - pass + for id in config['actions'] : + conf = config['actions'][id] + item = eval("".join([id,"(",json.dumps(conf),")"])) + self.actors[id.lower()] = item + def callback(self,channel,method,header,stream): - print [self.getIdentifier(),stream] + message = json.loads(stream) content = message['content'] sender = message['from'] - if content.lower() == 'quit' : + to = message['to'] + if content.lower() == 'quit' or to == 'quit': channel.close() print " *** ",self.getIdentifier() - elif content.lower() == 'ping': + elif content.lower() == 'ping' or to == 'ping': self.post(to=sender,content="1") else: - self.process(content) + id = to.lower() + actor = self.actors[id] + if actor.isValid(content) : + actor.process(content) + else: + content = {"status":"invalid","content":content} + self.post(to=sender,content=content) - - def process(self,item): - id = item['cmd'] - actor = self.actors[id] - actor.process(item) ; - def run(self): info = {} host = self.config['api'] @@ -139,7 +123,7 @@ class Orchestrator(Actor): qlistener = QueueListener(qid=qid,uid=uid,host=host) qlistener.callback = self.callback qlistener.read() - r = [self.process(item) for item in self.items] + #r = [self.process(item) for item in self.items] """ This class is designed to send a message to a given AMQP enpoint @@ -150,11 +134,20 @@ class Alert(Actor): pass - -config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"} -actor = Kill(config) -actor.start() +config = { + "id":"demo-000", + "key":"[0v8]-247&7!v3","api":"localhost", + "actions":{ + "Kill":["firefox"], + "Alert":[] + } + + + } +thread = Orchestrator(config) +thread.start() +#config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"} +#actor = Kill(config) +#actor.start() -config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"} -actor_1 = Kill(config) -actor_1.start() \ No newline at end of file +#config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"}