diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index 6008745..b3cfdc3 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -17,6 +17,10 @@ import subprocess from monitor import ProcessCounter from utils.transport import QueueListener, QueueWriter class Actor(Thread): + @staticmethod + def instance(id,config): + pass + def __init__(self,config): Thread.__init__(self) self.config = config @@ -78,16 +82,7 @@ class Actor(Thread): qwriter.write(label=qid,row=content) #qwriter.close() pass - def run(self): - info = {} - host = self.config['api'] - uid = self.config['key'] - qid = self.config['id'] - - qlistener = QueueListener(qid=qid,uid=uid,host=host) - qlistener.callback = self.callback - qlistener.read() - r = [self.process(item) for item in self.items] + class Kill(Actor): def __init__(self,config): Actor.__init__(self,config) @@ -106,12 +101,53 @@ class Start(Actor): cmd = " ".join([path,args]) self.execute(cmd) +class Orchestrator(Actor): + def __init__(self,config): + self.actors = {} + for id in config : + _config_ = config[id] + item = Actor.instance(id,config[id]) + self.actors[id] = item + pass + + def callback(self,channel,method,header,stream): + print [self.getIdentifier(),stream] + message = json.loads(stream) + content = message['content'] + sender = message['from'] + if content.lower() == 'quit' : + channel.close() + print " *** ",self.getIdentifier() + elif content.lower() == 'ping': + self.post(to=sender,content="1") + else: + self.process(content) + self.post(to=sender,content=content) + + + def process(self,item): + id = item['cmd'] + actor = self.actors[id] + actor.process(item) ; + + def run(self): + info = {} + host = self.config['api'] + uid = self.config['key'] + qid = self.config['id'] + + qlistener = QueueListener(qid=qid,uid=uid,host=host) + qlistener.callback = self.callback + qlistener.read() + r = [self.process(item) for item in self.items] + """ This class is designed to send a message to a given AMQP enpoint The AMQP endpoint is implemented by QueueWriter class """ class Alert(Actor): def process(self,item): + pass