CO - Bug found and fixed

community
Steve L. Nyemba 7 years ago
parent f2d1e0501a
commit 024ee9550f

@ -49,10 +49,8 @@ class Actor():
def getName(self): def getName(self):
return self.__class__.__name__.lower() return self.__class__.__name__.lower()
def getIdentifier(self): # def getIdentifier(self):
return self.__class__.__name__.lower() # return self.__class__.__name__.lower()
def init(self,args): def init(self,args):
self.config = args self.config = args
@ -254,184 +252,3 @@ class Folders(Actor):
id = self.config['action'].strip() id = self.config['action'].strip()
pointer = r[id] pointer = r[id]
pointer (item) pointer (item)
# class Kill(Actor):
# def isValid(self,item):
# return (item is not None) and (item in self.config)
# def process(self,item):
# args = "".join(["-eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
# self.execute(["ps",args])
# #
# # We need to make sure we can get assess the process on this server
# #
# class Start(Actor):
# def __init__(self):
# Actor.__init__(self)
# self.ng = None
# def init(self,config,item):
# Actor.init(self,config,item)
# self.config = config['apps']
# self.ng = ng(self.config.keys())
# def isValid(self,name):
# items = self.ng.search(name)
# if len(items) == 0 :
# return False
# else:
# return items[0][1] > 0.1
# def process(self,row):
# name = row['label']
# items = self.ng.search(name)[0]
# app = items[0]
# args = self.config[app]
# cmd = " ".join([app,args])
# self.execute([app,args])
# """
# This class is designed to handle applications i.e start/stopping applications
# @TODO: Assess if a reboot is required, by looking at the variance/anomaly detection
# """
# class Apps(Actor):
# def __init__(self):
# Actor.__init__(self)
# self.crashes = []
# self.running = []
# def isValid(self,rows):
# status = [row['status'] for row in rows]
# return 'crash' in status
# def classify(self,rows):
# self.crashes = []
# self.running = []
# for row in rows:
# if row['status'] == 'crash' :
# self.crashes.append(row)
# else:
# self.running.append(row)
# def reboot(self):
# for row_run in self.running:
# pass
# def start(self):
# for row_crash in self.crashes:
# thread = Start()
# thread.init(self.config,row_crash)
# thread.daemon = True
# thread.start()
# def process(self,rows):
# self.classify(rows)
# if self.crashes :
# self.start()
# if self.running:
# self.reboot()
class Event(Thread):
def __init__(self,config):
pass
def run(self):
"""
The orchestrator class is designed to aggregate actions and communicate back to the caller
Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor
The orchestrator is implemented using a simple iterator design-pattern
@TODO: action specifications should be provided remotely
"""
pass
class Orchestrator(Actor):
def __init__(self,config=None):
Actor.__init__(self)
# if config is None:
# f = open(PARAMS['path'])
# config = json.loads(f.read())
# f.close()
self.config = config
Actor.__init__(self)
self.actors = {"apps":Apps(),"folders":Folders()}
self.is_master_node = False
self.items = []
#
# If the configuration only has id,key then this is NOT the maestro
#
host = config['api']
qid = config['id']
print "Initialized ***** ",self.getIdentifier(), " as ",config['id']
#
# This object will have to request for the configuration
#
#for id in config['actions'] :
#conf = config['actions'][id]
#item = eval("".join([id,"(",json.dumps(conf),")"]))
#self.actors[id.lower()] = item
"""
This function is designed to provide the orchestrator a configuration
@pre
"""
def init(self,config):
for id in config:
setup_info = config[id]
item = eval("".join([id,"(",json.dumps(setup_info),")"]))
self.actors[id.lower()] = item
def callback(self,channel,method,header,stream):
message = json.loads(stream)
if 'content' in message :
content = message['content']
print self.actors
to = message['to']
if isinstance(content,basestring) and content.lower() in ['quit'] or to=='quit':
if content.lower() == 'quit' or to == 'quit':
print '**** closing ',self.getIdentifier()
channel.close()
else:
id = to.lower()
actor = self.actors[id]
if actor is not None and actor.isValid(content) :
actor.init(self.config['actions'])
actor.process(content)
else:
content = {"status":"invalid","content":content}
#self.post(to=sender,content=content)
def run(self):
info = {}
host = self.config['api']
uid = self.config['key']
qid = self.config['id']
qlistener = QueueListener(qid=qid,uid=uid,host=host)
qlistener.callback = self.callback
qlistener.read()
r = [self.process(item) for item in self.items]
"""
This class is designed to send a message to a given AMQP enpoint
The AMQP endpoint is implemented by QueueWriter class
"""
# class Alert(Actor):
# def process(self,item):
# pass
conf = {"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome":"https://the-phi.com"}
a = Actor.instance('Apps',conf)
if __name__ == '__main__':
print dir(a)

Loading…
Cancel
Save