Bug fix with data collector

data-collector
Steve Nyemba 7 years ago
parent 20e1de6317
commit 726895d862

@ -21,7 +21,8 @@ from threading import Thread, RLock
import monitor import monitor
import utils.agents.actor as actor import utils.agents.actor as actor
from utils.agents.manager import Manager from utils.agents.manager import Manager
ENDPOINT="http://localhost/monitor" SYS_ARGS['host']='localhost'
ENDPOINT="http://:host/monitor".replace(":host",SYS_ARGS['host'])
class Collector(Thread) : class Collector(Thread) :
def __init__(self): def __init__(self):
Thread.__init__(self) Thread.__init__(self)
@ -84,7 +85,10 @@ class Collector(Thread) :
r = [] r = []
for agent in _agents : for agent in _agents :
if agent.getName() in SYS_ARGS : if agent.getName() in SYS_ARGS :
agent.init(SYS_ARGS[agent.getName()]) values = SYS_ARGS[agent.getName()]
# print (["init ",agent.getName(),values])
Logger.log(subject='Collector',object=agent.getName(),action='init.agent',value=values )
agent.init(values)
r.append(agent) r.append(agent)
_agents = r _agents = r
@ -92,7 +96,7 @@ class Collector(Thread) :
config = {"store":self.store,"plan":self.plan} config = {"store":self.store,"plan":self.plan}
self.manager = Manager() self.manager = Manager()
self.manager.init(node=SYS_ARGS['id'],agents=_agents,actors=_actors,config=config,key=self.key) self.manager.init(node=SYS_ARGS['id'],agents=_agents,actors=_actors,config=config,key=self.key,host=SYS_ARGS['host'])
def run(self): def run(self):
""" """
@ -133,6 +137,9 @@ if __name__ == '__main__' :
p = dict(SYS_ARGS) p = dict(SYS_ARGS)
Logger.init('data-collector') Logger.init('data-collector')
SYS_ARGS = dict(SYS_ARGS,** p) SYS_ARGS = dict(SYS_ARGS,** p)
if 'apps' in SYS_ARGS :
SYS_ARGS['apps'] += [__file__]
thread = Collector() thread = Collector()
thread.start() thread.start()
else: else:

@ -218,7 +218,7 @@ class DetailProcess(Analysis):
cmd = "ps -eo user,pid,pmem,pcpu,vsize,stat,command|grep -Ei \":app\"".replace(":app",name) cmd = "ps -eo user,pid,pmem,pcpu,vsize,stat,command|grep -Ei \":app\"".replace(":app",name)
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
logs = handler.communicate()[0].split('\n') logs = handler.communicate()[0].split('\n')
logs = [row for row in logs if (row.strip() != '') and ('grep -Ei' in row )== False ] logs = [row for row in logs if (row.strip() != '') and ('grep -Ei' in row )== False and (__file__ not in row)]
if len(logs) == 0: if len(logs) == 0:
return [dict(self.parse('',fields),**{'label':name}) ] return [dict(self.parse('',fields),**{'label':name}) ]
@ -256,7 +256,6 @@ class FileWatch(Analysis):
def __init__(self): def __init__(self):
pass pass
def init(self,folders): def init(self,folders):
print folders
self.folders = folders; self.folders = folders;
def getName(self): def getName(self):
return "folders" return "folders"
@ -300,9 +299,6 @@ class FileWatch(Analysis):
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE) handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
ostream = handler.communicate()[0].split('\n') ostream = handler.communicate()[0].split('\n')
ostream = [row for row in ostream if row.strip() != ''] ostream = [row for row in ostream if row.strip() != '']
print cmd
print ostream[0]
print ostream[1]
#return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream] #return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream]
#return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))] #return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))]
return [] return []

@ -63,7 +63,6 @@ class Actor():
try: try:
# subprocess.call (cmd,shell=False) # subprocess.call (cmd,shell=False)
out = subprocess.Popen(cmd,stdout=subprocess.PIPE) out = subprocess.Popen(cmd,stdout=subprocess.PIPE)
print out
#stream = handler.communicate()[0] #stream = handler.communicate()[0]
except Exception,e: except Exception,e:
pass pass

@ -13,6 +13,7 @@ from datetime import datetime
from utils.transport import * from utils.transport import *
import monitor import monitor
import requests import requests
class Manager() : class Manager() :
def version(self): def version(self):
return 1.1 return 1.1
@ -35,7 +36,7 @@ class Manager() :
self.plan = self.config['plan'] self.plan = self.config['plan']
self.DELAY = int(self.plan['metadata']['delay']) self.DELAY = int(self.plan['metadata']['delay'])
self.host = args['host']
self.update() #-- Initializing status information self.update() #-- Initializing status information
def update(self) : def update(self) :
@ -43,13 +44,15 @@ class Manager() :
This method inspect the plans for the current account and makes sure it can/should proceed This method inspect the plans for the current account and makes sure it can/should proceed
The user must be subscribed and to the service otherwise this is not going to work The user must be subscribed and to the service otherwise this is not going to work
""" """
# url="https://the-phi.com/store/status/monitor"
# r = requests.get(url,headers={"uid":self.key})
# plans = json.loads(r.text)
# meta = [item['metadata'] for item in plans if item['status']=='active' ]
meta = self.plan['metadata'] url="http://:host/monitor/init/collector".replace(':host',self.host)
r = requests.post(url,headers={"key":self.key,"id":self.id})
r = json.loads(r.text)
# meta = [item['metadata'] for item in plans if item['status']=='active' ]
self.plan = r['plan']
meta = self.plan['metadata']
print meta
if meta : if meta :
self.DELAY = 60* int(meta['delay']) self.DELAY = 60* int(meta['delay'])
self.LIMIT = int(meta['limit']) self.LIMIT = int(meta['limit'])
@ -218,6 +221,7 @@ class Manager() :
COUNT_STOP = int(24*60/ self.DELAY) COUNT_STOP = int(24*60/ self.DELAY)
write_class = self.config['store']['class']['write'] write_class = self.config['store']['class']['write']
read_args = self.config['store']['args'] read_args = self.config['store']['args']
print self.agents
while True : while True :
COUNT += 1 COUNT += 1
if COUNT > COUNT_STOP : if COUNT > COUNT_STOP :
@ -230,13 +234,14 @@ class Manager() :
label = agent.getName() label = agent.getName()
node = '@'.join([label,self.id]) node = '@'.join([label,self.id])
row = {} row = {}
if label == 'folders': if label == 'folders':
row = [ dict({"id":self.id}, **_row) for _row in data] row = [ dict({"id":self.id}, **_row) for _row in data]
else: else:
#label = id #label = id
row = data row = data
if type(row)==list and len(row) == 0 : if type(row)==list and len(row) == 0 :
continue continue
# #
@ -245,12 +250,13 @@ class Manager() :
if len(self.actors) > index and self.actors[index].getName() == agent.getName() : if len(self.actors) > index and self.actors[index].getName() == agent.getName() :
actor = self.actors[index] actor = self.actors[index]
print actor.analyze(row) actor.analyze(row)
# self.lock.acquire() # self.lock.acquire()
store = self.factory.instance(type=write_class,args=read_args) store = self.factory.instance(type=write_class,args=read_args)
store.flush(size=self.LIMIT) store.flush(size=self.LIMIT)
store.write(label=node,row=row) store.write(label=node,row=[row])
# self.lock.release() # self.lock.release()
print (["Falling asleep ",self.DELAY/60])
time.sleep(self.DELAY) time.sleep(self.DELAY)

@ -593,11 +593,11 @@ class CouchdbWriter(Couchdb,Writer):
row_is_list = isinstance(row,list) row_is_list = isinstance(row,list)
if label not in document : if label not in document :
document[label] = row if row_is_list else [row] document[label] = row if row_is_list else [row]
elif isinstance(document[label][0],list) : elif isinstance(document[label][0],list) :
document[label].append(row)
else:
document[label] += row document[label] += row
else:
document[label].append(row)
else : else :
if label not in document : if label not in document :
document[label] = {} document[label] = {}

Loading…
Cancel
Save