diff --git a/src/api/static/img/uml-design b/src/api/static/img/uml-design index d3b699d..d68bed6 100644 --- a/src/api/static/img/uml-design +++ b/src/api/static/img/uml-design @@ -1,50 +1,50 @@ - + umbrello uml modeller http://umbrello.kde.org 1.6.9 UnicodeUTF8 - + - + - - - - - + + + + + - + - - - - - - - - - - - + + + + + + + + + + + - + - - - - + + + + - + - + @@ -52,129 +52,129 @@ - + - + - - - - - - - - - - - + + + + + + + + + + + - - + + - - - - - + + + + + - - + + - - + + - - + + - + - + - - - - - - - - - - - + + + + + + + + + + + - - - - - + + + + + - + - - + + - + - - + + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + @@ -183,17 +183,17 @@ - + - + - + diff --git a/src/api/static/js/jx b/src/api/static/js/jx index 36f9f10..3d00f5a 160000 --- a/src/api/static/js/jx +++ b/src/api/static/js/jx @@ -1 +1 @@ -Subproject commit 36f9f10ff44406cc0418cc5934eb475bc77ebb1e +Subproject commit 3d00f5a126574f2277cdac25d60008ee35dc8740 diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index 24be011..51707d0 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -16,18 +16,23 @@ import os import subprocess from monitor import ProcessCounter from utils.transport import QueueListener, QueueWriter, QueueReader +from utils.params import PARAMS class Actor: - def __init__(self,config): - self.config = config + def __init__(self): + pass def getIdentifier(self): return self.__class__.__name__.lower() - - def init(self,litems): - self.items = litems + """ + Initializing the class with configuration. The configuration will be specific to each subclass + + """ + def init(self,config): + self.config = config def process(self,item): pass - def isValid(self,item): - return False + def isValid(self,item): + return False + def execute(self,cmd): stream = None try: @@ -41,24 +46,27 @@ class Actor: Sending a message to a queue with parameters to,from,content """ def post(self,**args): - to = args['to'] - content = args['content'] - message = {"from":self.getIdentifier(),"to":to,"content":content} - host = self.config['api'] - uid = self.config['key'] - qid = to#self.conorfig['id'] - - qwriter = QueueWriter(host=host,uid=uid,qid=qid) - qwriter.init(qid) - qwriter.write(label=qid,row=content) - #qwriter.close() - pass - + # to = args['to'] + # content = args['content'] + # message = {"from":self.getIdentifier(),"to":to,"content":content} + # host = self.config['api'] + # uid = self.config['key'] + # qid = to#self.conorfig['id'] + + # qwriter = QueueWriter(host=host,uid=uid,qid=qid) + # qwriter.init(qid) + # qwriter.write(label=qid,row=content) + # #qwriter.close() + pass +class Folders(Actor): + def isvalid(self,item): + print self.conf + def process(self,item): + print item class Kill(Actor): - def isValid(self,item): - print self.config - return (item is not None) and (item in self.config) + def isValid(self,item): + return (item is not None) and (item in self.config) def process(self,item): cmd = "".join(["ps -eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9']) self.execute(cmd) @@ -66,17 +74,31 @@ class Kill(Actor): # We need to make sure we can get assess the process on this server # class Start(Actor): - def __init__(self,config): - Actor.__init__(self,config) + def __init__(self): + Actor.__init__(self) def isValid(self,name): - return name in self.config + return name in self.config def process(self,name): - item = self.config[name] - path = item['path'] - args = item['args'] if 'args' in item else '' - cmd = " ".join([path,args]) - self.execute(cmd) + if name in self.config : + item = self.config['actions']['apps']['start'][name] + path = item['path'] + args = item['args'] if 'args' in item else '' + cmd = " ".join([path,args]) + self.execute(cmd) +class Apps(Actor): + def __init__(self): + Actor.__init__(self) + def isValid(self,rows): + status = [row['status'] for row in rows] + return 'crash' in status + + def process(self,rows): + rows = [row for row in rows if row['status'] == 'crash'] : + handler = Start() + for app in rows: + handler.process(app['label']) + """ The orchestrator class is designed to aggregate actions and communicate back to the caller Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor @@ -85,91 +107,95 @@ class Start(Actor): @TODO: action specifications should be provided remotely """ class Orchestrator(Actor,Thread): - def __init__(self,config): - Thread.__init__(self) - Actor.__init__(self,config) - self.actors = {} - self.is_master_node = False - self.items = [] + def __init__(self,config=None): + Thread.__init__(self) + if config is None: + f = open(PARAMS['path']) + config = json.loads(f.read()) + f.close() + self.config = config + Actor.__init__(self) + self.actors = {"apps":Apps(),"folders":Folders()} + self.is_master_node = False + self.items = [] + # + # If the configuration only has id,key then this is NOT the maestro + # + host = config['api'] + qid = config['id'] + if 'actions' in config : # - # If the configuration only has id,key then this is NOT the maestro + # We are to assume this is the maestro/main node, and it will make the configuration available to other nodes + # NOTE: This is has a predefined master thus is subject to known short comings (zombies) # - host = config['api'] - qid = config['id'] - if 'actions' in config : - # - # We are to assume this is the maestro/main node, and it will make the configuration available to other nodes - # NOTE: This is has a predefined master thus is subject to known short comings (zombies) - # - - q = QueueWriter(host=host,uid=config['key'],qid=qid) - q.flush(config['id']) - q.write(label=qid,row=json.dumps(config['actions'])) - q.close() - self.is_master_node = True - - - else: - qid = config['master'] - q = QueueReader(host=host,uid=config['key'],qid=qid) - r = q.read() - q.close() - info = r[qid][0] - self.init(info) - print "Initialized ***** ",self.getIdentifier(), " as ",config['id'] - # - # This object will have to request for the configuration - # - #for id in config['actions'] : - #conf = config['actions'][id] - #item = eval("".join([id,"(",json.dumps(conf),")"])) - #self.actors[id.lower()] = item + q = QueueWriter(host=host,uid=config['key'],qid=qid) + q.flush(config['id']) + q.write(label=qid,row=json.dumps(config['actions'])) + q.close() + self.is_master_node = True + + + else: + qid = config['master'] + q = QueueReader(host=host,uid=config['key'],qid=qid) + r = q.read() + q.close() + info = r[qid][0] + self.init(info) + print "Initialized ***** ",self.getIdentifier(), " as ",config['id'] + + # + # This object will have to request for the configuration + # + #for id in config['actions'] : + #conf = config['actions'][id] + #item = eval("".join([id,"(",json.dumps(conf),")"])) + #self.actors[id.lower()] = item """ This function is designed to provide the orchestrator a configuration @pre """ def init(self,config): - for id in config: - - setup_info = config[id] - item = eval("".join([id,"(",json.dumps(setup_info),")"])) - self.actors[id.lower()] = item + for id in config: + + setup_info = config[id] + item = eval("".join([id,"(",json.dumps(setup_info),")"])) + self.actors[id.lower()] = item def callback(self,channel,method,header,stream): - message = json.loads(stream) - if 'content' in message : - content = message['content'] - sender = message['from'] - to = message['to'] - if content.lower() == 'quit' or to == 'quit': - channel.close() - - elif content.lower() == 'ping' or to == 'ping': - self.post(to=sender,content="1") - else: - id = to.lower() - actor = self.actors[id] - print "\tPerforming ",actor.getIdentifier()," on ",content," status ",actor.isValid(content) - if actor.isValid(content) : - actor.process(content) - else: - content = {"status":"invalid","content":content} - - self.post(to=sender,content=content) + message = json.loads(stream) + if 'content' in message : + content = message['content'] + + #sender = message['from'] + to = message['to'] + if isinstance(content,basestring) and content.lower() in ['quit'] or to=='quit': + if content.lower() == 'quit' or to == 'quit': + channel.close() + else: + id = to.lower() + actor = self.actors[id] + print [id,actor.isValid(content)] + if actor is not None and actor.isValid(content) : + actor.process(content) + else: + content = {"status":"invalid","content":content} + + #self.post(to=sender,content=content) def run(self): info = {} host = self.config['api'] uid = self.config['key'] qid = self.config['id'] - if not self.is_master_node : - qlistener = QueueListener(qid=qid,uid=uid,host=host) - qlistener.callback = self.callback - qlistener.read() - r = [self.process(item) for item in self.items] + + qlistener = QueueListener(qid=qid,uid=uid,host=host) + qlistener.callback = self.callback + qlistener.read() + r = [self.process(item) for item in self.items] """ This class is designed to send a message to a given AMQP enpoint @@ -193,14 +219,7 @@ config = { #thread = Orchestrator(config) #thread.start() -config = { - "master":"demo-000", - "id":"demo-001", - "key":"[0v8]-247&7!v3","api":"localhost" - - - } -thread = Orchestrator(config) +thread = Orchestrator() thread.start() #config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"} diff --git a/src/utils/agents/data-collector.py b/src/utils/agents/data-collector.py index 2704d5d..636dcf3 100644 --- a/src/utils/agents/data-collector.py +++ b/src/utils/agents/data-collector.py @@ -74,6 +74,7 @@ class ICollector(Thread) : write_class = self.config['store']['class']['write'] read_args = self.config['store']['args'] DELAY = self.config['delay'] * 60 + while self.quit == False: for thread in self.pool : @@ -88,10 +89,20 @@ class ICollector(Thread) : else: label = id row = data - # - # At this point we should check for the status and if it prompts an action - # @TODO Use a design pattern for this ... - # + # + # At this point we should check for the status and if it prompts an action + # @TODO Use a design pattern for this ... + # - submit the row to Event for analysis + # - The event orchestrator will handle things from this point on + # + message = {} + + message['to'] = thread.getName() + message['content'] = row + qwriter = QueueWriter(host=self.config['api'],uid=self.config['key'],qid=self.id) + qwriter.write(label=self.id,row = message) + qwriter.close() + self.lock.acquire() store = self.factory.instance(type=write_class,args=read_args) store.flush(size=200) @@ -112,4 +123,4 @@ class ICollector(Thread) : if __name__ == '__main__': thread = ICollector() # thread.daemon = True - thread.start() \ No newline at end of file + thread.start() diff --git a/src/utils/transport.py b/src/utils/transport.py index 13b6406..724975c 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -587,7 +587,7 @@ class CouchdbWriter(Couchdb,Writer): self.dbase.save_doc(document) def flush(self,**params) : - size = params['size'] + size = params['size'] if 'size' in params else 0 has_changed = False document = self.dbase.get(self.uid) for key in document: @@ -595,7 +595,7 @@ class CouchdbWriter(Couchdb,Writer): content = document[key] else: continue - if isinstance(content,list): + if isinstance(content,list) and size > 0: index = len(content) - size content = content[index:] document[key] = content @@ -603,8 +603,8 @@ class CouchdbWriter(Couchdb,Writer): else: document[key] = {} has_changed = True - if has_changed: - self.dbase.save_doc(document) + + self.dbase.save_doc(document) def archive(self,params=None): document = self.dbase.get(self.uid)